分布式锁为解决分布式系统中多个应用同时访问同一个资源的问题。
分布式锁的使用场景
一般是在两个场景下会防止对同一个资源的重复访问
- 提升效率
比如多个节点计算同一批任务,如果某个任务已经有节点在计算了,那其他节点就不用重复计算了,以免浪费计算资源。不过重复计算也没事,不会造成其他更大的损失,允许偶尔的失败。
- 保证正确性
这种情况对锁的要求就很高了,如果重复计算,会对正确性造成影响,不允许失败。
分布式锁支持的特性
在这个技术不断更新迭代的情况下,分布式这个概念,在企业中的权重越来越高。谈及分布式时,不可避免一定会提到分布式锁,现阶段分布式锁的实现方式主流的有几种实现方式,Zookeeper、Mysql、Redis,Etcd,分布式锁需要保证锁的要互斥、防死锁、高性能、可重入。
本篇文章以 Redis 为例,从我们的角度来看,下面的三个属性是有效使用分布式锁所需的最低保证。
- 安全特性:互斥
在任何给定时刻,只有一个客户端可以持有锁。
- 活力属性:无死锁
最终,即使锁定资源的客户端崩溃或分区,也始终可以获得锁。
- 活动性:容错能力
只要大多数 Redis 节点都处于运行状态,客户端就可以获取和释放锁。
一般分布式锁需要支持以下功能,才能满足大多数场景的需求
**功能 ** | 是否必须 | 说明 |
---|---|---|
失效时间 | 是 | 防止 Java 应用忽然挂掉 或 网络动荡 而产生死锁 |
自动续租 | 是 | 支持分布式锁过期,自动续租 |
可重入性 | 是 | 单线程调用的多个函数或地方可能会锁住同一把锁 (不支持调用服务化接口) |
阻塞、公平锁 | 是 | 第三方的介质实现分布式锁,非公平的锁竞争并不会提高性能 |
接入动态配置 | 是 | 可自动配置 etcd 等锁的配置中心地址,通知 client 端进行切换 |
尝试锁 | 可选 | tryLock , 超过指定时间没有获得锁则抛出异常,由使用方捕获异常 |
Redis 实现分布式锁的几种方式
1、单机
直接单机上锁,这台机器挂了就 GG 了,整个业务系统都获取不到锁了,单点故障。
2、哨兵
既然单点故障,那我搞个哨兵,Sentinel,自动主从切换。但是会有如下新问题:
锁写到 Master 后,还没同步到 Slave 呢,Master 挂了。Slave 选举成了 Master,但是 Slave 里没有锁,其他线程再次能上锁了,不安全。
3、集群
集群只是做了 slot 分片,锁还是只写到一个 Master 上,所以和 Sentinel 哨兵模式有同样的问题。
4、红锁
也称 RedLock,非常著名,是 Redis 实现分布式锁相对最安全可靠的一种手段。
核心思路是:搞几个独立的 Master,比如 5 个。然后挨着个的加锁,只要超过一半以上(这里是 5 / 2 + 1 = 3 个)那就代表加锁成功,然后释放锁的时候也逐台释放。这样的好处在于一台 Master 挂了的话,还有其他的,所以不耽误,看起来好像完美解决了上面的问题,但是并不是 100%安全。
不管用 Redis 的哪种方式来实现分布式锁,都不是 100%安全的,那就不用 Redis 做分布式锁了吗?不然,我觉得取决于业务吧,如果你业务要求必须,100%不能出问题,那用 zk/etcd 来实现吧。但是据我了解,至少 80%的互联网公司都不这么强烈要求,大对数还是 Redis 分布式锁,即使用 zk 来实现的也可能不是业务上 100%要求不能出现问题。比如你项目就没用 zk,只用了 Redis,那完全没必要搭建一套 zk 来做分布式锁,Redis 的红锁也能保证高可用,几乎不会出现问题的。
Redis 多节点实现分布式锁带来的挑战
我们使用 Redis 锁定资源的最简单方法是:
- 在实例中创建锁。
- 锁通常使用 Redis 过期功能,在有限时间存在,最终将被释放,超过给定时间会被删除。
- 当客户端需要释放资源时,它将删除锁。
乍一看,似乎并没有什么问题。但是不妨我们深究一下,这种实现方案在 redis 单机环境下似乎并没有什么问题。但是如果节点坏了呢?好吧,那么让我们添加一个 slave 节点。如果主服务器宕机了,就使用这个节点。但是我们不妨来看看她真的能保证可用吗?在谈论这个的致命缺陷时,我们需要了解一个知识点,Redis 复制是异步的。
1、客户端 A 获取主服务器中的锁
2、在将锁复制传输到从机之前,主机崩溃
3、slave 晋升为 master
4、客户端 B 获取锁,因为从机并没有该锁的对象,获取成功
显然,这样是不对的,主节点因为没来得及同步数据就宕机了,所以从节点没有该数据,从而造成分布式锁的失效
Redlock 红锁
作者认为,我们应该使用多个 Redis,这些节点是完全独立的,不需要使用复制或者任何协调数据的系统。
RedLock 多个 redis 系统获取锁过程
- 以毫秒 ms 为单位获取当前的服务器时间
- 尝试使用相同的 key 和随机值来获取锁,对每一个机器获取锁时都应该有一个超时时间,比如锁的过期时间为 10s,那么获取单个节点锁的超时时间就应该为 5 到 50 毫秒左右,他这样做的目的是为了保证客户端与故障的机器连接,耗费多余的时间。超时间时间内未获取数据就放弃该节点,从而去下一个节点获取,直至将所有节点全部获取一遍。
- 获取完成后,获取当前时间减去步骤一获取的时间,当且仅当客户端半数以上获取成功且获取锁的时间小于锁额超时时间,则证明该锁生效。
- 获取锁之后,锁的超时时间等于设置的有效时间-获取锁花费的时间
- 如果 获取锁的机器不满足半数以上,或者锁的超时时间计算完毕后为负数 等异常操作,则系统会尝试解锁所有实例,即使有些实例没有获取锁成功,依旧会被尝试解锁。
- 释放锁,只需在所有实例中释放锁,无论客户端是否认为它能够成功锁定给定的实例。
Redlock 真能够解决问题吗
Martin Kleppmann 发表文章,Redlock 并不能保证该锁的安全性,分布式锁用途有下面 2 种:
1、提升效率,用锁来保证一个任务没有必要被执行两次,比如(很昂贵的计算)。
2、保证正确性,使用锁来保证任务按照正常的步骤执行,防止两个节点同时操作一份数据,造成文件冲突,数据丢失。
提升效率,允许偶尔的失败。对锁是有一定宽容度的,就算发生了两个节点同时工作,对系统的影响也仅仅是多付出了一些计算的成本,没什么额外的影响。使用单点的 Redis 就能很好的解决问题,没有必要使用 RedLock,维护那么多的 Redis 实例,提升系统的维护成本。
分布式锁的超时性,所带来的缺点
但是对于第二种场景保证正确性来说,就比较慎重了,因为很可能涉及到一些金钱交易,如果锁定失败,并且两个节点同时处理同一数据,则结果将导致文件损坏,数据丢失,永久性不一致,或者金钱方面的损失。
我们假设一种场景,我们有两个客户端,每一个客户端必须拿到锁之后才能去保存数据到数据库,我们使用 RedLock 算法实现会出现什么问题呢?RedLock 中,为了防止死锁,锁是具有过期时间的,但是 Martin 认为这是不安全的。该流程图类似于这样。
客户端 1 获取到锁成功后,开始执行,执行到一半系统发生 Full GC ,系统服务被挂起,过段时间锁超时了。客户端 2 等待客户端 1 的锁超时后,成功的获取到锁,开始执行入库操作,完成后,客户端 1 完成了 Full GC,又做了一次入库操作。这是不安全的。如何解决呢?
Martin 提出来一种类似乐观锁的实现机制,示例图如下:
客户端 1 长时间被挂起后,客户端 2 获取到锁,开始写库操作,同时携带令牌 34,写库完成后,客户端 1 苏醒,开始进行入库操作,但是因为携带的令牌为 33 小于最新令牌,该次提交就被拒绝。
这个想法听起来似乎时很完备的思路,这样即使系统因为某些原因被挂起,数据也能够被正确的处理。但是仔细想一下:
如果仅当您的令牌大于所有过去的令牌时,数据存储区才能始终接受写入,则它是可线性化的存储区,相当与使用数据库来实现一个 分布式锁系统,那么 RedLock 的作用就变的微乎其微。甚至不在需要使用 redis 保证分布式锁。
RedLock 对于系统时钟强依赖
回想一下 Redlock 算法获取锁的几个步骤,你会发现锁的有效性是与当前的系统时钟强依赖,我们假设,我们有,A、B、C、D、E 五个 redis 节点:
- 客户端 1 获取节点 A,B,C 的锁定。由于网络问题,无法访问 D 和 E。
- 节点 C 上的时钟向前跳,导致锁过期。
- 客户端 2 获取节点 C,D,E 的锁定。由于网络问题,无法访问 A 和 B。
- 现在,客户 1 和 2 都认为他们持有该锁。
如果 C 在将锁持久保存到磁盘之前崩溃并立即重新启动,则可能会发生类似的问题。
Martin 认为系统时间的阶跃主要来自两个方面(以及作者给出的解决方案):
- 人为修改
对于人为修改,能说啥呢?人要搞破坏没办法避免。
- 从 NTP 服务收到了一个跳跃时时钟更新
NTP 网络时间协议(Network Time Protocol)受到一个阶跃时钟更新,对于这个问题,需要通过运维来保证。需要将阶跃的时间更新到服务器的时候,应当采取小步快跑的方式。多次修改,每次更新时间尽量小。
分布式锁需要注意的问题
1、加锁成功的客户端挂掉或网络动荡,可能产生死锁
对于加的分布式锁,需要设置 expire 过期时间,锁过期后进行释放
SET key value [EX seconds] [PX milliseconds] NX
2、业务未执行完,分布式锁超时,需要进行锁续期
Redisson 实现了一种保证锁失效时间绝对大于业务程序执行时间的机制。官方叫做看门狗机制(Watchdog),主要原理是,在程序成功获取锁之后,启动一个 Watchdog,会 fork 一条子线程去不断的给该锁续期,直至该锁释放为止。原理图大概如下所示:
Redisson 使用守护线程来进行锁的续期(守护线程的作用:当主线程销毁,会和主线程一起销毁)防止程序宕机后,线程依旧不断续命,造成死锁。
RedissonLock 加锁续期
RLock lock = redisson.getLock("myLock");
lock.lock();
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取锁,leaseTime为-1,开启看门狗续期
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 加锁失败,while(true)等待重试
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// threadId,当前的线程id
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// leaseTime == -1 就 scheduleExpirationRenewal 开启看门狗续期,
// leaseTime != -1 就不续期,只是把 internalLockLeaseTime 时间变成传进来的时间。
// lockWatchdogTimeout 看门狗 默认超时时间 30 秒
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 当前线程 threadId 是否要续租
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 当前线程 threadId 是否要续租
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
// 这里是个知识点,续期线程在过期时间达到三分之一的时候工作,比如9s过期时间,那么续期会在第3秒的时候工作,也就是还剩余6s的时候进行续期
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
很简单,就是看当前线程有没有加锁 hexists, KEYS[1], ARGV[2]) == 1,有加锁的话就代表业务线程还没执行完,就给他的锁重新续期 pexpire’, KEYS[1], ARGV[1],然后返回 1,也就是 true,没加锁的话返回 0,也就是 false。那就是返回 1 就调用自己准备下一次续期 scheduleExpirationRenewal(threadId),返回 0 就调用在删除后不做处理。
这里有几个关键点:
- 要使 watchLog 机制生效,lock 时不要设置过期时间
- 续期核心 lua 脚本在 renewExpirationAsync 里
- Watchdog 通过类似 netty 的 Future 功能来实现异步延时
- 续期的开始时间是超过过期时间的三分之一,比如 9s 过期时间,那么第 3s 的时候开始续期
- 续期成功自己调用自己,也就是为下一次续期做准备,续期失败不做后续处理
因为分布式锁的续期是在客户端执行的,所以如果 client 宕机了,续期线程就不能工作了,也就不能续期了,只能等到超时时间后锁被自动删除。这时应该把分布式锁删除,让其他客户端来获取。
如果要立刻删除,需要增加额外的工作,比如增加哨兵机制,让哨兵来维护所有 redis 客户端的列表。哨兵定时监控客户端是否宕机,如果检测到宕机,立刻删除这个客户端的锁。
另外,Redisson 定时器使用的是 netty-common 包中的 HashedWheelTime 来实现的,Redisson 还实现并且优化了 RedLock 算法、公平锁、可重入锁、连锁等操作,使 Redis 分布式锁的实现方式更加简便高效。
3、对于要保证正确的分布式锁,需要注意原子性
Redisson 加锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
加锁 lua 脚本解释
lock入参:
keys[1]:自定义锁的key RLock lock = redissonClient.getLock(lockKey);
argv[1]=锁的租期,默认30s
argv[2]=锁的名称(UUID:threadId)
// 1.不存在key锁
if(exists keys[1]==0) then
// 赋值 key field value -->1.1 尝试获取锁
hset keys[1] argv[2] 1
// 过期 expire key time -->1.2 设置锁过期时间
pexpire keys[1] argv[1]
return 空;
end
// 存在Key name 的锁 --> 2.当前线程已获取锁
if(hexists keys[1] argv[2]==1) then
// -->2.1 原子计数器+1 锁重入!!!
hincrby keys[1] argv[2] 1
// 过期 -->2.2 重置锁过期时间
pexpire keys[1] argv[1]
return 空;
end
// -->3.返回剩余过期时间
return pttl keys[1]
注意:lua 脚本数组下标从 1 开始
Redisson 释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
释放 lua 脚本解释
unlock入参:
keys[1]:自定义锁的key RLock lock = redissonClient.getLock(lockKey);
keys[2]:通道名称 redisson_lock__channel:{UUID:threadId}
argv[1]= publish unlock消息=0
argv[2]=锁的租期,默认30s
argv[3]=锁的名称(UUID:threadId)
// -->1.不存在key锁,直接返回
if(hexists keys[1] argv[3]==0) then
return 空
// -->2.存在锁,原子计数器-1
counter=hincrby keys[1] argv[3] -1
if(counter>0) then
// -->2.1 计数器>0,还有锁没释放,重置锁过期时间
pexpire KEYS[1] ARGV[2]
return 0;
else
// -->2.2计数器=0,锁已经全部释放完毕。
// -->删除key
del KEYS[1]
// -->发布消息 publish channel message
publish KEYS[2] ARGV[1]
return 1;
end
return 空;
实现原理的学习可参考 https://bbs.huaweicloud.com/blogs/238821