分布式锁
在分布式系统中,如何保证多个进程或线程之间对共享资源的访问互斥性和并发性?
在分布式锁的情况下,还是需要单机锁的,可以减少对同时加锁的并发量,减轻中间件的压力。
没有抢到锁怎么处理?自旋,性能差。事件通知回调最好。
如何保证多个进程或线程之间的互斥性和并发性?
在一个分布式系统中,多个进程或线程需要对共享资源进行读写操作,如果不加限制,可能会导致数据不一致或者出现竞态条件。因此,需要引入分布式锁来控制对共享资源的访问。
- MySQL实现分布式锁的一种常见方式是使用InnoDB的行级锁和SELECT … FOR UPDATE语句。简单,使用方便,不需要引入Redis、zookeeper等中间件。但不适合高并发的场景,db操作性能较差,没有设置超时时间,有锁表的风险。具体步骤如下:
创建一个名为mutex的表,该表只有一个名为name的列,用于存储锁的名称。例如:1
2
3
4
5CREATE TABLE mutex (
name varchar(128) NOT NULL,
PRIMARY KEY (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `mutex`(`name`) VALUES('my_lock_name');
当需要获取锁时,使用SELECT … FOR UPDATE语句查询mutex表中对应的行,并在事务中执行该语句。例如:1
2
3
4
5START TRANSACTION; -- 一定要使用事务
SELECT name FROM mutex WHERE name = 'my_lock_name' FOR UPDATE;
-- 如果返回结果为空,则说明该锁没有被其他事务持有,可以继续执行下面的业务逻辑
-- 如果返回结果不为空,则说明该锁已经被其他事务持有,请稍后重试
COMMIT;
当不再需要锁时,使用DELETE语句将mutex表中对应的行删除。例如:1
DELETE FROM mutex WHERE name = 'my_lock_name';
另外一种方式,0或者没有这行数据,代表没有持有锁,非0代表存储的锁具体持有锁的对象,需要实现一个定时器,定时检测锁超时,需要自己主动定时检测,锁是否释放。
缺点:
- 速度慢
- 锁超时不方便
- 锁释放通知不方便
- 高可用的方案比较少(分布式关系型数据库tidb)
1
2
3
4
5
6
7
8
9
10
11
12
13
14DROP TABLE IF EXISTS `dislock`;
CREATE TABLE `dislock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`lock_type` varchar(64) NOT NULL COMMENT '锁类型',
`owner_id` varchar(255) NOT NULL COMMENT '持锁对象',
`count` int(11) NOT NULL COMMENT '计数器',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE `idx_lock_type` (`lock_type`) -- 唯一性约束
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='分布式锁表';
INSERT INTO dislock(`lock_type`, `owner_id`, `count`) VALUES ('activity_test', '3245jk', 0);
delete from dislock where `lock_type` = 'activity_test' and `owner_id` ='3245jk';
update dislock set count = count + 1 where `lock_type` = 'activity_test' and `owner_id` ='3245jk'; -- 实现可重入
- MySQL的乐观锁可以使用版本号或时间戳来实现。具体步骤如下:
- 在需要应用乐观锁的表中新增一个版本号或时间戳字段。例如,新增一个名为version的整型字段。
当需要更新一条记录时,在UPDATE语句中将版本号加1(或将时间戳更新为当前时间),并且WHERE条件中加入版本号等于原始值的判断。例如:
1
2
3
4
5
6
7
8
9CREATE TABLE `mylock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
`value` varchar(64) NOT NULL COMMENT '',
`version` int(11) NOT NULL COMMENT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='';
INSERT INTO `mylock` (`id`, `value`, `version`) VALUES (1, 'test', 456);
UPDATE `mylock` SET `value` = 'new_value', `version` = `version` + 1 WHERE `id` = 1 AND `version` = 456;
<!-- 其中,123是要更新的记录的ID,456是该记录的原始版本号。如果更新成功,则说明没有其他事务修改过该记录;否则,说明有其他事务已经修改了该记录,此时需要重新获取最新的记录并重试。 -->如果使用时间戳来实现乐观锁,则需要在更新时将时间戳更新为当前时间,而不是加1。例如:
1
2UPDATE `table_name` SET `column1` = 'new_value', `timestamp_column` = CURRENT_TIMESTAMP() WHERE `id` = 123 AND `timestamp_column` = 'original_timestamp';
<!-- 其中,timestamp_column是存储时间戳的字段,original_timestamp是要更新的记录的原始时间戳值。 -->
需要注意的是,使用乐观锁时需要避免长事务和慢查询等问题,以确保锁的释放和事务的快速执行。
- 可以使用ZooKeeper等分布式协调服务来实现分布式锁。如果有很多的客户端频繁的申请加锁、释放锁,对于Zookeeper集群的压力会比较大。性能不如redis实现的分布式锁。
为了确保公平,可以简单的规定:编号最小的那个节点,表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。
每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
ZooKeeper的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode锁的客户端与ZooKeeper集群服务器失去联系,这个临时Znode也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。正是由于这个原因,在创建取号节点的时候,尽量创建临时znode节点。
ZooKeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反应。
Zookeeper的节点Znode有四种类型:
- 持久节点:默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在。
- 持久节点顺序节点:所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号,持久节点顺序节点就是有顺序的持久节点。
- 临时节点:和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。
- 临时顺序节点:有顺序的临时节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56class DistributedLock {
public:
DistributedLock(const std::string& lock_path, const std::string& node_path);
~DistributedLock();
bool lock();
bool unlock();
private:
std::shared_ptr<zookeeper_handle_t> zk_handle_;
std::string lock_path_;
std::string node_path_;
};
// 需要创建一个父节点,尽量是持久节点(PERSISTENT类型)
DistributedLock::DistributedLock(const std::string& lock_path, const std::string& node_path)
: lock_path_(lock_path), node_path_(node_path) {
// 创建 ZooKeeper 的句柄
zk_handle_ = std::make_shared<zookeeper_handle_t>();
zookeeper_init(zk_handle_.get(), NULL, 0, NULL, NULL, 0);
zoo_create(zk_handle_.get(), lock_path_.c_str(), nullptr, -1, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);
}
DistributedLock::~DistributedLock() {
// 关闭 ZooKeeper 句柄
zookeeper_close(zk_handle_.get());
}
bool DistributedLock::lock() {
// 在 ZooKeeper 上创建临时有序节点
// 在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面,会加上一个次序编号,而这个生成的次序编号,是上一个生成的次序编号加一。
std::string node_name = zoo_create(zk_handle_.get(), (lock_path_ + "/").c_str(), NULL, -1,
&ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
NULL, 0);
if (node_name.empty()) {
return false;
}
// 获取所有子节点并排序
std::vector<std::string> children;
get_children(lock_path_, children);
std::sort(children.begin(), children.end());
// 如果创建的节点是当前所有节点的最小节点,则获取到了锁,返回 true
if (node_name == (lock_path_ + "/" + children.front())) {
return true;
}
// 否则监听前一个节点并等待通知
int index = std::distance(children.begin(), std::lower_bound(children.begin(), children.end(), node_name.substr(lock_path_.size()+1)));
std::string prev_node_name = lock_path_ + "/" + children[index-1];
// 当多个进程或线程同时尝试获取锁时,可能会出现死锁或活锁问题。死锁指的是多个进程或线程相互等待对方释放锁,导致无法继续执行;活锁则是所有进程或线程都在重试获取锁,但始终无法成功。
return wait_for_event(prev_node_name, 50); // 这里只有一个节点监听,所以没有活锁问题
}
bool DistributedLock::unlock() {
return delete_node(node_path_);
}
通过 ZooKeeper 实现可重入的分布式锁,可以使用两种方式来实现:
- 在客户端实现可重入分布式锁,需要在客户端维护一个计数器,记录当前进程持有锁的数量,并且在释放锁时减少计数器。这样可以保证同一进程多次获取同一个锁时,不会因为重复获取而导致死锁。
- 在服务端实现可重入分布式锁,则需要在节点数据中保存当前持有该锁的客户端ID和计数器。当一个客户端再次请求同一个锁时,服务端判断是否是同一客户端,并且增加该客户端持有该锁的计数器即可。
- Redis则提供了一种基于SETNX命令的分布式锁实现,可以通过在Redis中保存一个标志位来表示锁的占有状态。性能好,适合高并发场景,但数据不是强一致,数据可能丢失,以广播的方式进行锁释放通知,引起惊群。
redis挂了怎么办?不要去做HA、主从、主备(同步的是全量数据,不是强一致性),分片集群即使有多个分片,也不一定同步及时,所以也不行。直接用单机redis,要做隔离,最多出现不可用。
伪代码:1
2
3
4
5
6
7-- 加锁流程
set lock uuid ex 30 nx
-- 释放锁流程
if redis.call("get", "lock") == uuid then
redis.call("del", "lock")
publish channel 1 -- 通知其他的监听者
end
1 | try { |
1 | -- keys[1]:自定义锁的key RLock lock = redissonClient.getLock(lockKey); |
1 |
|
使用expire来实现过期,但是setnx和expire分开了,不是原子操作,setnx后进程崩溃了就无法解锁了。1
2
3
4
5
6
7
8
9
10
11
12function acquire_lock(lock_name, expiration_time):
setnx_result = redis.setnx(lock_name, 1)
if setnx_result == 1:
# Lock acquired successfully
redis.expire(lock_name, expiration_time)
return True
else:
# Lock is already held by another client
return False
function release_lock(lock_name):
redis.delete(lock_name)
将value值设置为当前时间加上过期时间,并在获取锁时检查是否超时。通过将锁的value值设置为当前时间加上过期时间来实现锁的超时机制。当获取锁失败时,检查锁的当前值是否小于当前时间,如果小于,则说明锁已经超时,此时通过getset命令获取当前值并设置新值,如果设置成功则表示获取锁成功;否则,说明锁已经被其他进程持有,获取锁失败。
- 过期时间是客户端自己生成的,必须要求分布式环境下,每个客户端的时间必须同步。
- 如果锁过期的时候,并发多个客户端同时请求过来,都执行getset,最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖。
- 该锁没有保存持有者的唯一标识,可能被别的客户端释放/解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37// 获取锁并设置过期时间
$lock_name = 'my_lock_name'; // 锁的名称
$expire_time = 5000; // 过期时间,单位为毫秒
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$lock_value = time() + $expire_time;
$success = $redis->setnx($lock_name, $lock_value);
if ($success) {
// 成功获取锁,设置锁的过期时间并返回
$redis->pexpire($lock_name, $expire_time);
return true;
} else {
// 检查锁是否已经超时
$current_value = $redis->get($lock_name);
if ($current_value && $current_value < time()) {
// 锁已经超时,尝试获取锁并设置过期时间
$old_value = $redis->getset($lock_name, $lock_value); // 这里可以多个客户端操作
if ($old_value && $old_value == $current_value) {
// 成功获取锁,设置锁的过期时间并返回
$redis->pexpire($lock_name, $expire_time);
return true;
}
}
// 获取锁失败或者锁已经被其他进程持有
return false;
}
// 释放锁
$current_value = $redis->get($lock_name);
if ($current_value && $current_value >= time()) {
// 当前持有锁且未超时,删除锁并返回true
$redis->del($lock_name);
return true;
} else {
// 锁已经超时或者已经被其他进程持有,不需要进行删除操作
return false;
}
Lua脚本来保证原子性(包含setnx和expire两条指令),不推荐用redis事务机制。因为我们的生产环境,基本都是redis集群环境,做了数据分片操作。你一个事务中有涉及到多个key操作的时候,这多个key不一定都存储在同一个redis-server上。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52// 优化版
class RedisLock {
public:
RedisLock(const char* name, const char* address, int port)
: m_name(name), m_address(address), m_port(port) {}
~RedisLock() {
if (m_redis) {
redisFree(m_redis);
}
}
bool lock(int timeout) {
using namespace std::chrono;
m_redis = redisConnect(m_address.c_str(), m_port);
if (!m_redis || m_redis->err) {
std::cerr << "Failed to connect to Redis server: " << m_redis ? m_redis->errstr : "unknown error" << std::endl;
return false;
}
milliseconds start = duration_cast<milliseconds>(system_clock::now().time_since_epoch());
while (true) {
std::string value = std::to_string(start.count() + timeout * 1000 + 1);
// 如果持有锁的线程在过期时间内没有执行完任务,锁会被提前释放
// 再等任务执行完后,却删除了别人的锁
redisReply* reply = static_cast<redisReply*>(redisCommand(m_redis, "SET %s %s NX EX %d", m_name.c_str(), value.c_str(), timeout));
if (!reply || reply->type != REDIS_REPLY_STATUS || strcmp(reply->str, "OK") != 0) {
freeReplyObject(reply);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
freeReplyObject(reply);
return true;
}
}
void unlock() {
if (m_redis) {
redisReply* reply = static_cast<redisReply*>(redisCommand(m_redis, "DEL %s", m_name.c_str()));
freeReplyObject(reply);
}
}
private:
std::string m_name;
std::string m_address;
int m_port;
redisContext* m_redis = nullptr;
};
首先使用SET命令尝试向Redis服务器设置键值对,并且带有NX和PX选项。如果返回值为“OK”,则表示成功获取到锁,此时可以执行业务逻辑。接着使用GET命令获取当前锁的值,并判断其是否与之前设置的随机值相等,如果相等说明获取到的锁依然是当前客户端持有的,此时可以释放锁;否则说明锁已经被其他客户端获取,在这种情况下,当前客户端不应该主动释放锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27def acquire_redis_lock(redis_client, key, value, expire_time):
lua_script = '''
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("DEL", KEYS[1])
return true
else
return false
end
'''
set_result = redis_client.execute_command('SET', key, value, 'NX', 'PX', expire_time)
if set_result == b'OK':
# 成功获取到锁
# 这里可以写入需要执行的业务逻辑
# TODO:锁过期释放,业务没执行完
# 校验随机值
lock_value = redis_client.get(key)
if lock_value == value.encode('utf-8'):
# 在校验成功后释放锁
redis_client.eval(lua_script, 1, key, value.encode('utf-8')) # 判断加删除这里不是非原子的,为了更严谨,一般也是用lua脚本代替
return True
else:
# 锁已经被其他客户端获取
return False
else:
# 获取锁失败
return False
是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。参考java的Redisson。1
2
3
4
5
6
7
8
9
10
11
12
13
14redis_lock = RedissonLock(key, ttl)
if redis_lock.acquire():
# 成功获取到锁,开启续期线程
def renew_thread():
while redis_lock.locked:
redis_lock.renew()
time.sleep(redis_lock.ttl * 0.8)
t = threading.Thread(target=renew_thread)
t.start()
# 在这里执行需要保护的代码块
redis_lock.release()
else:
print("Failed to acquire lock")
如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。
RedLock是一种基于Redis的分布式锁实现方案,其实现步骤如下:
- 初始化Redis集群:至少需要3个独立的Redis节点组成伪集群,redis之间不需要通信。
- 获取当前时间戳和唯一标识符:所有客户端获取的时间戳必须相同且精确到毫秒级别,同时每个客户端都要生成一个唯一的标识符,可以是UUID或其他随机数。
- 计算加锁过期时间并尝试获取锁:通过向Redis集群的每个节点顺序(预防都没有抢到过半的锁,如果由于分区隔离确实抢不到,则重新抢锁)发送SET命令实现加锁,同时指定过期时间为固定值。如果客户端在超时之前成功地从大多数Redis节点上获取了锁,则认为该客户端已经获得了锁,全量抢锁存在可用性问题,只要抢到过半就行。
- 释放锁:如果需要释放锁,客户端必须向所有Redis节点发送DEL命令,以确保所有节点上的锁都被正确地释放。
1 | import redis |
为了实现可重入锁,我们可以将锁的value由一个随机字符串改为一个计数器,并使用Lua脚本来保证原子性地进行加减操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14-- Lua脚本
if redis.call("exists", KEYS[1]) == 0 then
redis.call("hset", KEYS[1], ARGV[2], 1)
redis.call("pexpire", KEYS[1], ARGV[1])
return 1
end
if redis.call("hexists", KEYS[1], ARGV[2]) == 1 then
redis.call("hincrby", KEYS[1], ARGV[2], 1)
redis.call("pexpire", KEYS[1], ARGV[1])
return 1
end
return 0
该脚本接受三个参数:第一个参数是锁的名称,第二个参数是当前线程的标识符,第三个参数是锁的过期时间(单位为毫秒)。
该脚本的作用是,如果缓存中不存在该锁,则创建一个新的锁,并将当前线程的计数器设置为1;如果缓存中存在该锁并且当前线程已经持有该锁,则将当前线程的计数器加1;否则无法获取锁,返回0。
在尝试释放锁时,我们需要使用以下方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15-- Lua脚本
if redis.call("exists", KEYS[1]) == 0 then
return 0
end
local counter = redis.call("hget", KEYS[1], ARGV[2])
if not counter then
return 0
elseif tonumber(counter) > 1 then
redis.call("hincrby", KEYS[1], ARGV[2], -1)
return 1
else
redis.call("del", KEYS[1])
return 1
end
该脚本也接受三个参数,第一个参数是锁的名称,第二个参数是当前线程的标识符。该脚本的作用是,如果当前线程未持有该锁,则直接返回0;如果当前线程持有该锁但是计数器大于1,则将计数器减1;否则,将该锁删除。
- 使用etcd实现分布式锁通常需要以下步骤:
监听前一个,来实现公平锁。./etcdctl get lock –prefix –sort-by=”CREATE”,以创建的时间排序。
在etcdv3版本的客户端库中已经有了分布式锁的实现,让我们看一下实现逻辑。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
func main() {
// 建立etcd客户端 首先,你需要建立一个etcd的client对象。可以使用 etcd-client-go 库来建立连接。
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer cli.Close()
// 创建互斥锁 然后,在etcd中创建一个互斥锁。互斥锁的实现基于etcd的租约机制。它会在etcd中创建一个临时节点,并为这个节点绑定一个租约。只有持有这个租约的客户端才能修改这个节点的值。其他客户端将会等待这个租约过期后再次尝试获取锁。
session, err := concurrency.NewSession(cli)
if err != nil {
panic(err)
}
mutex := concurrency.NewMutex(session, "/my-lock")
// 获取锁 最后,使用 mutex.Lock() 方法来获取锁。如果当前没有任何其他客户端持有这个锁,则当前客户端将成为这个锁的持有者。否则,当前客户端将等待其他客户端释放锁后再次尝试获取锁。
if err := mutex.Lock(context.Background()); err != nil {
panic(err)
}
defer mutex.Unlock(context.Background())
// do something here...
}
实现高可用性和容错性
当出现网络异常或者宕机等故障时,需要保证分布式锁服务的高可用性和容错性。
可以使用多个 ZooKeeper 集群来提高可用性和容错性。具体实现方式如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61class DistributedLock {
public:
DistributedLock(const std::vector<std::string>& zk_hosts, const std::string& lock_path, const std::string& node_path);
~DistributedLock();
bool lock();
bool unlock();
private:
std::vector<std::shared_ptr<zookeeper_handle_t>> zk_handles_;
std::string lock_path_;
std::string node_path_;
};
DistributedLock::DistributedLock(const std::vector<std::string>& zk_hosts, const std::string& lock_path, const std::string& node_path)
: lock_path_(lock_path), node_path_(node_path) {
// 创建多个 ZooKeeper 的句柄
for (const auto& host : zk_hosts) {
std::shared_ptr<zookeeper_handle_t> zk_handle = std::make_shared<zookeeper_handle_t>();
zookeeper_init(zk_handle.get(), host.c_str(), 5000, NULL, NULL, 0);
zk_handles_.push_back(zk_handle);
}
}
DistributedLock::~DistributedLock() {
//关闭所有 ZooKeeper 句柄
for (const auto& zk_handle : zk_handles_) {
zookeeper_close(zk_handle.get());
}
}
bool DistributedLock::lock() {
// 在多个 ZooKeeper 上创建临时有序节点
std::vectorstd::string node_names;
for (const auto& zk_handle : zk_handles_) {
std::string node_name = zoo_create(zk_handle.get(), (lock_path_ + "/").c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
NULL, 0);
if (node_name.empty()) {
return false;
}
node_names.push_back(node_name);
}
// 获取所有节点并排序
std::vector<std::string> children;
for (const auto& zk_handle : zk_handles_) {
get_children(zk_handle.get(), lock_path_, children);
}
std::sort(children.begin(), children.end());
// 如果创建的节点是当前所有节点的最小节点,则获取到了锁,返回 true
std::string min_node_name = lock_path_ + "/" + children.front();
for (const auto& node_name : node_names) {
if (node_name == min_node_name) {
continue;
}
delete_node(node_name); // 删除其它集群没有匹配到的节点
}
return true;
}
bool DistributedLock::unlock() {
return delete_node(node_path_);
}
在代码示例中,涉及到了一些函数的调用,下面给出这些函数的实现参考:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 获取指定路径下所有子节点
void get_children(zookeeper_handle_t* zk_handle, const std::string& path, std::vector<std::string>& children) {
struct String_vector str_vec;
int ret = zoo_get_children(zk_handle, path.c_str(), 0, &str_vec);
if (ret == ZOK) {
for (int i = 0; i < str_vec.count; ++i) {
children.push_back(str_vec.data[i]);
}
deallocate_String_vector(&str_vec);
}
}
// 删除指定路径上的节点
bool delete_node(const std::string& node_path) {
int ret = zoo_delete(zk_handle_.get(), node_path.c_str(), -1);
return (ret == ZOK);
}
void lock_watcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {
if (type == ZOO_DELETED_EVENT && path != nullptr && node_path.compare(path) == 0) {
cout << "Lock node has been deleted!" << endl;
cv.notify_all();
}
}
// 等待指定节点上发生事件,redis可以用发布订阅来实现
bool wait_for_event(const std::string& node_name, int timeout_ms) {
zoo_exists(zk_handle_.get(), node_name.c_str(), lock_watcher, nullptr, nullptr);
unique_lock<mutex> lk(m);
if (cv.wait_for(lk, timeout_ms + get_timestamp_ms()) == cv_status::timeout) {
return false;
} else {
return true;
}
}
// 获取当前时间戳(毫秒级)
int64_t get_timestamp_ms() {
struct timeval tv;
gettimeofday(&tv, NULL);
return (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
分布式锁不仅可以使用 ZooKeeper 实现,还可以使用 Redis、etcd 等其他分布式协调服务来实现。
并发控制
- 数据同步问题:如何确保不同节点上的数据副本保持同步?
不同节点之间的网络通信可能出现延迟、丢包等问题,导致数据同步失败。
可以采用基于版本号的同步机制,在每个节点维护一个版本号,当节点更新数据时,先比较版本号,如果版本号不一致,则说明该数据已经被其他节点更新过了,需要进行合并操作。此外,可以利用快照复制、增量更新等方式来提高同步效率。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16struct Node {
uint32_t version;
// 其他属性
};
void sync(Node& a, Node& b) {
if (a.version == b.version) {
return; // 版本号相同,无需同步
}
if (a.version > b.version) {
merge(b, a); // 将节点 b 的数据合并到节点 a 中
} else {
merge(a, b); // 将节点 a 的数据合并到节点 b 中
}
}
在分布式系统中,保证多个操作的原子性是非常重要的。因此需要实现分布式事务,以保证多个操作在所有节点上都成功或者都失败。常用的分布式事务实现包括Two-Phase Commit(2PC)和Saga等。其中,2PC是一种经典的分布式事务实现,它通过协调器协调各个节点的事务提交过程,以确保所有节点的事务要么全部提交要么全部回滚。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class TwoPhaseCommit {
public:
TwoPhaseCommit(std::string host, int port) {
context = redisConnect(host.c_str(), port);
}
~TwoPhaseCommit() {
redisFree(context);
}
bool prepare(std::string key, std::string value) {
// Step 1: Phase one - prepare
// Send prepare message to all participants
std::vector<std::string> participants = getParticipants(key);
std::unordered_map<std::string, bool> votes;
for (auto participant : participants) {
redisReply* reply = (redisReply*)redisCommand(context, "MULTI\nSET %s %s\nEXEC", key.c_str(), value.c_str());
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) {
freeReplyObject(reply);
return false;
}
bool success = (reply->element[1]->type == REDIS_REPLY_STATUS && strcmp(reply->element[1]->str, "OK") == 0);
freeReplyObject(reply);
votes[participant] = success;
}
// Step 2: Phase two - commit or abort
// Send commit/abort message to all participants
bool commit = true;
for (auto vote : votes) {
if (!vote.second) {
commit = false;
break;
}
}
std::string cmd = commit ? "EXEC" : "DISCARD";
bool ret = true;
for (auto participant : participants) {
redisReply* reply = (redisReply*)redisCommand(context, "MULTI\n%s\nDEL __%s\nEXEC", cmd.c_str(), participant.c_str());
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) {
freeReplyObject(reply);
ret = false;
continue;
}
if (commit) {
ret = (reply->element[0]->type == REDIS_REPLY_STATUS && strcmp(reply->element[0]->str, "OK") == 0);
}
freeReplyObject(reply);
}
return ret;
}
private:
redisContext* context;
std::vector<std::string> getParticipants(std::string key) {
// Use a simple hash function to assign participants based on the key
std::vector<std::string> participants;
participants.push_back("node1");
participants.push_back("node2");
participants.push_back("node3");
return participants;
}
};
- 冲突检测和解决问题:如何检测到不同节点对同一数据对象的操作冲突,并解决这些冲突?
不同节点对同一数据对象进行操作时,可能会出现冲突,需要进行检测和解决。
采用基于版本控制的并发控制算法:基于版本控制的并发控制算法,例如 MVCC(多版本并发控制)等,可以有效地避免锁竞争和死锁问题,提高并发控制的效率。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// Implement multi-version concurrency control based on timestamp ordering
class MVCC {
public:
MVCC() : current_ts(0) {}
void start_transaction() {
// Get a new transaction timestamp
current_ts++;
}
bool read(int key, std::string& value, long long& ts) {
// Check if the key is visible to the current transaction
auto iter = data.find(key);
if (iter == data.end()) {
return false;
}
if (iter->second.ts > current_ts) {
return false;
}
// Read the latest version of the value
value = iter->second.value.back();
ts = iter->second.ts;
return true;
}
void write(int key, const std::string& value) {
// Append the new version to the value history
auto& entry = data[key];
entry.value.push_back(value);
entry.ts = current_ts;
}
private:
struct Entry {
std::vector<std::string> value; // value history
long long ts; // timestamp of the latest version
};
std::unordered_map<int, Entry> data; // data store
long long current_ts; // current transaction timestamp
};
利用分布式锁来避免数据更新冲突:分布式锁可以用来保证只有一个节点能够修改数据,从而避免数据更新冲突问题。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39// Implement distributed lock with ZooKeeper
class DistributedLock {
public:
DistributedLock(const std::string& zk_servers, const std::string& lock_path) {
zk_handle = zookeeper_init(zk_servers.c_str(), NULL, 30000, NULL, NULL, 0);
path = lock_path;
}
~DistributedLock() {
zookeeper_close(zk_handle);
}
bool acquire_lock() {
char buf[1024];
int buflen = sizeof(buf);
int ret = zoo_create(zk_handle, path.c_str(), "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, buf, buflen);
if (ret == ZOK) {
// Lock acquired
return true;
} else if (ret == ZNODEEXISTS) {
// Lock already held by another client
return false;
} else {
// Error occurred
return false;
}
}
bool release_lock() {
int ret = zoo_delete(zk_handle, path.c_str(), -1);
if (ret == ZOK) {
// Lock released
return true;
} else {
// Error occurred
return false;
}
}
private:
zhandle_t* zk_handle;
std::string path;
};
分布式计算或存储
分布式计算或存储是指将计算任务和数据存储在多台计算机上,通过网络互联实现协同工作。它可以提高计算效率,减少单点故障的风险,并支持大规模数据处理和分析。作为一种重要的分布式系统应用,它已经广泛应用于云计算、物联网、大数据分析等领域。
Spark 和 Hadoop 是两个不同的分布式计算框架,虽然它们都可以用于大规模数据处理和分析,但是在某些方面有一些区别。
计算模型
Hadoop 使用的是 MapReduce 模型,它将任务分成 Map 和 Reduce 两个阶段,中间需要依赖磁盘进行数据传输。而 Spark 采用了更为通用的 Resilient Distributed Datasets(弹性分布式数据集) (RDD) 模型,支持多种计算操作和数据访问方式。这使得 Spark 在一些场景下比 Hadoop 更加高效和灵活。内存管理
Hadoop 在处理大规模数据时,需要将数据存储到磁盘上并且频繁地进行 I/O 操作。而 Spark 对内存的利用更加高效,它将数据存储在内存中,并且通过使用 RDD 模型对数据进行处理,使得 Spark 处理大规模数据时速度更快。支持的语言
Hadoop 主要使用 Java 或者 Scala 进行编程。而 Spark 支持多种编程语言,包括 Java、Scala、Python 和 R 等。这也使得 Spark 更加容易上手,并且能够适应更广泛的数据处理场景。生态系统
Hadoop 的生态系统非常庞大,包括 HDFS 分布式文件系统、Hive 数据仓库、HBase NoSQL 数据库等多个组件和工具。而 Spark 的生态系统也在不断发展和壮大,目前已经包括了 Spark SQL、GraphX、MLlib等多个子项目,逐渐形成了一个完整的数据处理和分析平台。
我们应该将 Spark 看作是 Hadoop MapReduce 的一个替代品而不是 Hadoop 的替代品。其意图并非是替代 Hadoop,而是为了提供一个管理不同的大数据用例和需求的全面且统一的解决方案。
总体来说,Spark 和 Hadoop 都是非常优秀的分布式计算框架,具有各自的优势和特点。在选择使用哪个框架时,需要根据具体的业务需求和数据处理场景进行评估和选择。
解决什么问题
分布式计算或存储的目标是解决如何快速、可靠地处理大量数据的问题。在传统的计算模型中,单个计算机的处理能力有限,而且容易出现瓶颈。当需要处理海量数据时,单个计算机已经无法胜任。因此,采用分布式计算或存储可以有效解决这些问题。
数据一致性
在分布式存储系统中,多个节点同时访问同一份数据时,需要保证数据的一致性。然而,由于网络延迟、故障恢复等因素的影响,数据的一致性很容易受到破坏。
数据一致性问题的解决方法有很多,比如基于副本的数据复制、基于版本号的数据协调、基于锁的并发控制等,但每种方法都有其局限性和挑战性。例如,基于副本的数据复制可能会导致数据过期和冲突,而基于版本号的数据协调可能会造成大量的消息通信和资源消耗。
基于Paxos算法的一致性协议
Paxos算法是一种用于实现分布式一致性的经典算法,它可以保证在任意情况下都能达成一致的决策,并且具备高可用性和容错性。具体来说,Paxos算法通过将所有节点划分成三种角色(提议者、接受者和学习者),以及两个阶段(prepare和accept)来实现一致性决策。Paxos算法的缺点是实现较为复杂,需要处理各种异常情况和细节。
基于Raft算法的一致性协议
Raft算法是近年来发展起来的一种新型一致性协议,与Paxos算法类似,也是通过多轮投票来达成一致的决策。不同之处在于,Raft算法将节点划分为三种角色(领导者、跟随者和候选人),并且采用了心跳机制和随机化等技术来优化性能和可用性。相比Paxos算法,Raft算法更加易于理解和实现,但也存在一些挑战和局限性。
1 | // 定义Paxos消息类型 |
对于基于Raft算法的实现,与Paxos算法相比,其核心逻辑和实现方式有所不同。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103// 定义Raft消息类型
struct RaftMessage {
int term; // 当前任期号
int leader_id; // 领导者节点ID
int prev_log_index; // 前一条日志条目的索引值
int prev_log_term; // 前一条日志条目的任期号
vector<LogEntry> entries; // 日志条目集合
int leader_commit; // 领导者已提交的日志索引值
};
// 定义Raft节点角色
enum RaftRole { FOLLOWER, CANDIDATE, LEADER };
// 实现Raft算法核心逻辑
class RaftAlgorithm {
public:
void request_vote(int term, int candidate_id, int last_log_index, int last_log_term) {
// 发送requestVote请求
for (int i = 0; i < peers.size(); ++i) {
send_request_vote(peers[i], term, candidate_id, last_log_index, last_log_term);
}
// 等待投票回应
int votes = 0;
for (int i = 0; i < peers.size(); ++i) {
bool vote_granted = receive_vote_response();
if (vote_granted) {
votes++;
}
}
// 判断是否获得多数投票
if (votes > peers.size() / 2) {
become_leader();
} else {
// 未获得多数投票,继续作为候选人
become_candidate();
}
}
void append_entries(int leader_term, int leader_id, int prev_log_index, int prev_log_term, vector<LogEntry> entries, int leader_commit) {
// 处理心跳信号
if (entries.empty()) {
return;
}
// 检测日志是否匹配
if (!check_log_match(prev_log_index, prev_log_term)) {
return;
}
// 添加新日志
add_new_entries(entries);
// 提交新日志
commit_new_entries(leader_commit);
// 成为跟随者
become_follower();
}
private:
vector<RaftRole> roles;
vector<int> peers;
int current_term;
int voted_for;
int commit_index;
int last_applied;
void send_request_vote(int peer, int term, int candidate_id, int last_log_index, int last_log_term) {
// 发送requestVote请求
}
bool receive_vote_response() {
// 接收投票回应
}
bool check_log_match(int prev_log_index, int prev_log_term) {
// 检测日志是否匹配
}
void add_new_entries(vector<LogEntry> entries) {
// 添加新日志
}
void commit_new_entries(int leader_commit) {
// 提交新日志
}
void become_follower() {
// 成为跟随者
}
void become_candidate() {
// 成为候选人
}
void become_leader() {
// 成为领导者
}
};
// 用例示例
int main() {
RaftAlgorithm raft;
raft.request_vote(1, 2, 3, 4);
raft.append_entries(5, 6, 7, 8, {}, 9);
return 0;
}
容错性
由于分布式计算或存储涉及到多个节点,其中任何一个节点出现故障都可能影响整个系统的可用性。因此,如何设计容错机制,保障整个系统的稳定性是非常重要的。
可以使用备份技术来实现容错性,将数据存储到多个节点上,在其中一些节点出现故障时,可以从备份节点上恢复数据。1
2
3
4
5
6
7
8
9
10// 多备份
void Put(const std::string& key, const std::string& value) {
std::vector<NodeID> nodes = hash_ring_.GetNodes(key);
for (auto& node : nodes) {
auto iter = stores_.find(node);
if (iter != stores_.end()) {
iter->second.Put(key, value);
}
}
}
1 | import socket |
数据分片和负载均衡
通常采用哈希算法对数据进行分片,并采用负载均衡算法将这些数据分配到不同的计算节点上。例如,使用一致性哈希算法将数据映射到一个虚拟环上,然后根据节点的数量、负载情况等动态调整虚拟节点与物理节点的对应关系,以实现负载均衡。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import hashlib
class ConsistentHashing(object):
def __init__(self, nodes=[], replicas=3):
self.replicas = replicas
self.hash_ring = {}
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.replicas):
key = self.gen_key('%s:%d' % (node, i))
self.hash_ring[key] = node # 将节点映射到虚拟环上
def remove_node(self, node):
for i in range(self.replicas):
key = self.gen_key('%s:%d' % (node, i))
del self.hash_ring[key]
def get_node(self, key):
if not self.hash_ring:
return None
hash_key = self.gen_key(key)
for node_key in sorted(self.hash_ring.keys()): # 获取key的规则
if hash_key <= node_key:
return self.hash_ring[node_key]
def gen_key(self, key):
m = hashlib.md5()
m.update(key.encode('utf-8'))
return int(m.hexdigest(), 16)
# Example usage:
nodes = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
c_hash = ConsistentHashing(nodes=nodes, replicas=3)
print(c_hash.get_node('foo')) # output: 192.168.1.2