• 命令的逻辑并不复杂,但不得不说,作者的设计还是很有心的,用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。

    每一步命令对应的逻辑我都在下面的图中标注了,大家可以读一下:

    我们继续跟代码吧,根据上面的命令可以看出,如果线程拿到锁的话,tryLock方法会直接返回true,万事大吉。

    拿不到的话,就会返回锁的剩余过期时长,这个时长有什么作用呢?我们回到tryLock方法中死循环的那个地方:

    这里有一个针对waitTime和key的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。

    那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,各位看官应该还记得,我们上面贴的tryLock代码中还有这一段:

    current = System.currentTimeMillis();
    // 订阅分布式锁, 解锁时进行通知
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);

    订阅的逻辑显然是在subscribe方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:

    这段代码的作用在于将当前线程的threadId添加到一个AsyncSemaphore中,并且设置一个redis的监听器,这个监听器是通过redis的发布、订阅功能实现的。

    一旦监听器收到redis发来的消息,就从中获取与当前thread相关的,如果是锁被释放的消息,就立马通过操作Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。

    释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。

    这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。

    这就是tryLock获取锁的整个过程了,画一张流程图的话表示大概是这样:

    lock

    除了tryLock,一般我们还经常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire方法来获取锁的,不同的是,它会跑到这部分的逻辑:

    这段代码做了两件事:

    1、预设30秒的过期时长,然后去获取锁

    2、开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长

    刷新过期时长的方法是scheduleExpirationRenewal,贴一下源码吧:

    private void scheduleExpirationRenewal(final long threadId) {
     // expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }

            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    // 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }

    代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime,也就是30秒的时间。

    而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key就为“线程ID:key名称”,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。

    上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。

    其他的逻辑就跟tryLock()基本没什么两样啦,大家看一下就知道了

    解锁

    有拿锁的方法,自然也就有解锁。Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数

    @Override
        public void unlock() {
         // 发起释放锁的命令请求
            Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
            if (opStatus == null) {
                throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + Thread.currentThread().getId());
            }
            if (opStatus) {
             // 成功释放锁,取消"看门狗"的续时线程
                cancelExpirationRenewal();
            }
        }

    解锁相关的命令操作在unlockInnerAsync方法中定义,

    又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点,不过没关系,我们简单梳理一下,命令的逻辑大概是这么几步:

    1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;

    2、锁存在但不是当前线程持有,返回空置nil;

    3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;

    当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程

    void cancelExpirationRenewal() {
     // expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了
        Timeout task = expirationRenewalMap.remove(getEntryName());
        if (task != null) {
            task.cancel();
        }
    }

    这就是释放锁的过程了,怎么样,是不是还是比较简单的,阅读起来比加锁那份代码舒服多了,当然啦,简单归简单,为了方便你们理清整个分布式锁的过程,我当然还是费心费力的给你们画流程图展示下啦(就冲这点,是不是该给我来个三连啊,哈哈):

    RedLock

    以上就是Redisson分布式锁的原理讲解,总的来说,就是简单的用lua脚本整合基本的set命令实现锁的功能,这也是很多Redis分布式锁工具的设计原理。除此之外,Redisson还支持用"RedLock算法"来实现锁的效果,这个工具类就是RedissonRedLock

    用法也很简单,创建多个Redisson Node, 由这些无关联的Node就可以组成一个完整的分布式锁

    RLock lock1 = Redisson.create(config1).getLock(lockKey);
    RLock lock2 = Redisson.create(config2).getLock(lockKey);
    RLock lock3 = Redisson.create(config3).getLock(lockKey);

    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    try {
       redLock.lock();
    } finally {
       redLock.unlock();
    }

    RedLock算法原理方面我就不细说了,大家有兴趣可以看我之前的文章,或者是网上搜一下,简单的说就是能一定程度上能有效防止Redis实例单点故障的问题,但并不完全可靠,不管是哪种设计,光靠Redis本身都是无法保证锁的强一致性的。

    还是那句话,鱼和熊掌不可兼得,性能和安全方面也往往如此,Redis强大的性能和使用的方便足以满足日常的分布式锁需求,如果业务场景对锁的安全隐患无法忍受的话,最保底的方式就是在业务层做幂等处理。

    总结

    看了本文的源码解析,相信各位看官对Redisson分布式锁的设计也有了足够的了解,当然啦,虽然是讲解源码,我们的主要精力还是放在分布式锁的原理上,一些无关流程的代码就没有带大家字斟酌句的解读了,大家有兴趣的话可以自己去阅读看看,源码中很多地方都展示了一些基础并发工具和网络通信的妙用之处,学习一下还是挺有收获的。

    最后我还是想吐槽一下,Redisson的注释是真的少啊。。。。。。

    如果您觉得文章有用的话,欢迎点个赞支持一下,这将是对我创作的最好鼓励!

    02-25 19:09