上一篇文章提到使用针对不同的业务场景如何合理使用Redis分布式锁,并引入了一个新的问题
若定义锁的过期时间是10s,此时A线程获取了锁然后执行业务代码,但是业务代码消耗时间花费了15s。这就会导致A线程还没有执行完业务代码,A线程却释放了锁(因为10s到了),第11s B线程发现锁已经释放,重新获取锁也开始执行业务代码。
此时多个线程同时执行业务代码,我们使用锁就是为了保证仅有一个线程执行这一块业务代码,说明这个锁是失效的!
本文将尝试探讨如何处理这个问题!
下面这个图解释了重置超时时间是什么意思,写一个定时任务,并单独使用一个线程每3s去检查一下是否到终点(任务是否执行完毕),第3s时发现没到终点,重置时间。 假设任务执行完毕需要花费11s。那么锁一共会延期3次,第11s之后,锁被手动释放,如果没释放。等到第19s时,会被自动释放。
如何实现锁的延期
伪代码: 定义锁的结构 key:uuid value:订单服务 if key(锁的唯一标识)是否存在 存在,if 锁是否被修改 未修改,重置超时时间
这部分有一点需要解释:
- 为什么判断锁是否被修改?
A线程获取了锁之后,B线程修改锁的value为 “文件下载服务”,不加一层校验,A线程就会对修改后的锁操作,而不是原始的锁。
此时你会直接写一个定时任务去实现,会有什么问题吗?
锁延期分为2步(第一步:判断锁;第二步:重置锁),这2步之间是存在间隙的,完全可以在判断锁后,重置锁前发生一些事情(例如恰巧在重置时间前锁被其他线程修改了)。如何才能避免这个间隙不发生意外?
使用lua脚本:使用lua语法实现锁的延期,然后执行这个脚本。lua语法将这两个步骤绑定成一个操作。这也就是为什么提到锁延期的实现,基本都是采用lua实现的根本原因。redis分布式锁自身是有局限性的,不能满足我们的需求,所以我们提出了锁延期。
巧在Redis很支持lua语法,我们只需要按照lua语法要求写好命令,调用Redis提供的方法入口传进去,Redis会自动解析这些命令。更巧在lua语法实现锁延期解决了上面的隐患。。。
/** * 锁续期 */ if (redis.call('exists', KEYS[1]) == 1) then // 锁还存在 if (redis.call('get', KEYS[1]) == ARGV[1]) then redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间 return 1 end end return 0
接下来完整的看一下如何使用Redis锁延期
/** * redis分布式锁 * 为了文件拉取加的,可能存在拉取任务耗时很久的情况,增加锁延时操作 * @author lixinyu */ public class RedisDistributeLock { private static final Logger log = LoggerFactory.getLogger(RedisDistributeLock.class); // 默认30秒后自动释放锁 private static long defaultExpireTime = 10 * 60 * 1000; // 默认10分钟 // 用于锁延时任务的执行 private static ScheduledThreadPoolExecutor renewExpirationExecutor; // 加锁和解锁的lua脚本 重入和不可重入两种 private static String lockScript; private static String unlockScript; private static String renewScript;// 锁延时脚本 private static String lockScript_reentrant; private static String unlockScript_reentrant; private static String renewScript_reentrant;// 锁延时脚本 static { /** * 如果指定的锁键(KEYS[1])不存在,则通过set命令设置锁的值(ARGV[1])和超时时间(ARGV[2])。 * 如果锁键已存在,则通过pttl命令返回锁的剩余超时时间。 */ StringBuilder sb = new StringBuilder(); sb.setLength(0); sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 如果不存在这个lockKey锁 sb.append(" redis.call('set', KEYS[1], ARGV[1]) ");// 设置锁 ,key-value结构 sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) ");// 设置锁超时时间 sb.append(" return nil "); sb.append(" end "); sb.append(" return redis.call('pttl', KEYS[1]) ");// 如果别的线程已经加锁,返回剩余时间 lockScript = sb.toString(); /** * 如果锁存在,则删除锁 */ sb.setLength(0); sb.append(" if (redis.call('get', KEYS[1]) == ARGV[1]) then "); sb.append(" return redis.call('del', KEYS[1]) "); sb.append(" else return 0 "); sb.append(" end"); unlockScript = sb.toString(); /** * 可重入锁主要解决的是同一个线程能够多次获取锁的问题,而不是防止多个线程同时获取锁 * 这通常发生在方法递归调用、嵌套调用或者同一个方法内部多次执行加锁操作的情况下 */ sb.setLength(0); sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 如果不存在这个lockKey锁 sb.append(" redis.call('hset', KEYS[1], ARGV[1], 1) ");// 设置锁 ,hash结构,hashkey为当前线程id,加锁数为1 sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) ");// 设置锁超时时间 sb.append(" return nil "); sb.append(" end "); sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then ");// 如果当前线程已经加锁 sb.append(" redis.call('hincrby', KEYS[1], ARGV[1], 1) ");// 可重入,增加锁计数 sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重设置锁超时时间 sb.append(" return nil "); sb.append(" end "); sb.append(" return redis.call('pttl', KEYS[1]) ");// 如果别的线程已经加锁,返回剩余时间 lockScript_reentrant = sb.toString(); /** * 释放锁,通过判断锁的存在、当前线程是否是加锁的线程、以及锁的计数器等条件来实现解锁的操作 */ sb.setLength(0); sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 不存在锁,返回1表示解锁成功 sb.append(" return 1 "); sb.append(" end "); sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then ");// 存在锁,不是本人加的,返回0失败 sb.append(" return 0 "); sb.append(" end "); sb.append(" local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1) ");// 存在自己加的锁,锁计数减一 sb.append(" if (counter > 0) then ");// 判断是否要删除锁,或重置超时时间 sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) "); sb.append(" return 0 "); sb.append(" else "); sb.append(" redis.call('del', KEYS[1]) "); sb.append(" return 1 "); sb.append(" end "); sb.append(" return nil "); unlockScript_reentrant = sb.toString(); /** * 锁续期 */ sb.setLength(0); sb.append(" if (redis.call('exists', KEYS[1]) == 1) then ");// 锁还存在 sb.append(" if (redis.call('get', KEYS[1]) == ARGV[1]) then "); sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间 sb.append(" return 1"); sb.append(" end "); sb.append(" end "); sb.append(" return 0 "); renewScript = sb.toString(); /** * 可重入锁续期 */ sb.setLength(0); sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then ");// 锁还存在 sb.append(" redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间 sb.append(" return 1 "); sb.append(" end "); sb.append(" return 0 "); renewScript_reentrant = sb.toString(); renewExpirationExecutor = new ScheduledThreadPoolExecutor(2); } private String uuid;// 当前锁对象标识 private boolean reentrant;// 当前锁是可重入还是不可重入 private RedisUtils redisUtils; public RedisDistributeLock(boolean reentrant) { this.uuid = UUIDUtils.randomUUID8(); this.reentrant = reentrant; this.redisUtils = SpringApplicationUtils.getBean(RedisUtils.class); } /** * 尝试对lockKey加锁 * @author: lixinyu 2023/4/25 **/ public boolean tryLock(String lockKey) { String script = lockScript; if (reentrant) { script = lockScript_reentrant; } Object result = redisUtils.evalScript(script, ReturnType.INTEGER, 1, lockKey, uuid, String.valueOf(defaultExpireTime)); boolean isSuccess = result == null; if (isSuccess) { // 若成功,增加延时任务 scheduleExpirationRenew(lockKey, uuid, reentrant); } return isSuccess; } /** * 解锁 * @author: lixinyu 2023/4/25 **/ public void unlock(String lockKey){ if (reentrant) { redisUtils.evalScript(unlockScript_reentrant, ReturnType.INTEGER, 1, lockKey, uuid, String.valueOf(defaultExpireTime)); } else { redisUtils.evalScript(unlockScript, ReturnType.INTEGER, 1, lockKey, uuid); } } /** * 锁延时,定时任务队列,定时判断一次是否续期 */ private void scheduleExpirationRenew(String lockKey, String lockValue, boolean reentrant) { Runnable renewTask = new Runnable(){ @Override public void run() { try { String script = renewScript; if (reentrant) { script = renewScript_reentrant; } // 将lua语法传给redis解析 Object result = evalScript(script, ReturnType.INTEGER, 1, lockKey, lockValue, String.valueOf(defaultExpireTime)); if (result != null && !result.equals(false) && result.equals(Long.valueOf(1))) { // 延时成功,再定时执行 scheduleExpirationRenew(lockKey, lockValue, reentrant); log.info("redis锁【" + lockKey + "】延时成功!"); } } catch (Exception e) { log.error("scheduleExpirationRenew run异常", e); } } }; renewExpirationExecutor.schedule(renewTask, defaultExpireTime / 3, TimeUnit.MILLISECONDS); } }
/** * 将lua语法传给redis */ public Object evalScript(String script, ReturnType returnType, int numKeys, String... keysAndArgs) { Object value = false; try { value = redisTemplate.execute((RedisCallback
使用锁
private void demo() { RedisDistributeLock lock = new RedisDistributeLock(false); String lockKey = redisSeqPrefix + "lock:" + seqName; try { if (lock.tryLock(lockKey)) { String redisValue = redisUtils.get(redisSeqPrefix + seqName); // 加锁之后再次判断是否超出规定长度,防止并发时重置多次 if (redisValue != null && redisValue.length() > seqLength) { redisUtils.set(redisSeqPrefix + seqName, "1"); } } } catch (Exception e) { logger.error("resetSeqValue异常", e); } finally { lock.unlock(lockKey); } }