分布式锁(2)- 基于Redis的实现

系列文章链接:

1. 使用Redis实现分布式锁的理由

  1. Redis具有很高的性能;
  2. Redis的命令对此支持较好,实现起来很方便;

2.Redis命令介绍

SETNX

// 当且仅当key不存在时,set一个key为val的字符串,返回1;
// 若key存在,则什么都不做,返回0。
SETNX key val;

expire

// 为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
expire key timeout;

delete

// 删除key
delete key;

我们通过Redis实现分布式锁时,主要通过上面的这三个命令。

3.分布式锁实现原理

3.1 加锁

最简单的方法是使用 setnx 命令。key 是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给 key 命名为 “lock_sale_商品ID” 。而 value 设置成什么呢?我们可以姑且设置成 1。加锁的伪代码如下:

setnx(lock_sale_商品ID, 1)

当一个线程执行 setnx 返回 1,说明 key 原本不存在,该线程成功得到了锁;当一个线程执行 setnx 返回 0,说明 key 已经存在,该线程抢锁失败。

3.2 解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行 del 指令,伪代码如下:

del(lock_sale_商品ID)

释放锁之后,其他线程就可以继续执行 setnx 命令来获得锁。

3.3 锁超时

锁超时是什么意思呢?如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住(死锁),别的线程再也别想进来。所以,setnxkey 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx 不支持超时参数,所以需要额外的指令,伪代码如下:

expire(lock_sale_商品ID, 30)

综合伪代码如下:

if(setnx(lock_sale_商品ID,1) == 1){
    expire(lock_sale_商品ID,30)
    try {
        do something ......
    } finally {
        del(lock_sale_商品ID)
    }
}

4.存在的问题

以上伪代码中存在三个致命问题

4.1 setnxexpire 的非原子性

设想一个极端场景,当某线程执行 setnx,成功得到了锁:

setnx 刚执行成功,还未来得及执行 expire 指令,节点 1 挂掉了。

这样一来,这把锁就没有设置过期时间,变成死锁,别的线程再也无法获得锁了。

怎么解决呢?setnx 指令本身是不支持传入超时时间的,set 指令增加了可选参数,伪代码如下:

set(lock_sale_商品ID,1,30,NX)

这样就可以取代 setnx 指令。

4.2 del 导致误删

又是一个极端场景,假如某线程成功得到了锁,并且设置的超时时间是 30 秒。

如果某些原因导致线程 A 执行的很慢很慢,过了 30 秒都没执行完,这时候锁过期自动释放,线程 B 得到了锁。

随后,线程 A 执行完了任务,线程 A 接着执行 del 指令来释放锁。但这时候线程 B 还没执行完,线程A实际上 删除的是线程 B 加的锁

怎么避免这种情况呢?可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁。至于具体的实现,可以在加锁的时候把当前的线程 ID 当做 value,并在删除之前验证 key 对应的 value 是不是自己线程的 ID。

加锁:

String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)

解锁:

if(threadId .equals(redisClient.get(key))){
    del(key)
}

但是,这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。

4.3 出现并发的可能性

还是刚才第二点所描述的场景,虽然我们避免了线程 A 误删掉 key 的情况,但是同一时间有 A,B 两个线程在访问代码块,仍然是不完美的。怎么办呢?我们可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。

当过去了 29 秒,线程 A 还没执行完,这时候守护线程会执行 expire 指令,为这把锁“续命”20 秒。守护线程从第 29 秒开始执行,每 20 秒执行一次。

当线程 A 执行完任务,会显式关掉守护线程。

另一种情况,如果节点 1 忽然断电,由于线程 A 和守护线程在同一个进程,守护线程也会停下。这把锁到了超时的时候,没人给它续命,也就自动释放了。

5.代码实现

/**
 * 分布式锁的redis实现
 *
 * @author moon
 */
public class DistributedLockByRedis {

    private final JedisPool jedisPool;
    /**
     * 锁的前缀
     */
    private final static String KEY_PREF = "lock:";

    public DistributedLockByRedis(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     *
     * @param lockName       String 锁的名称(key)
     * @param acquireTimeout long 获取超时时间
     * @param timeout        long 锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
        SetParams params = SetParams.setParams().nx().px(acquireTimeout);

        try (Jedis conn = jedisPool.getResource()) {
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即 key值
            String lockKey = KEY_PREF + lockName;
            // 超时时间, 上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                // 注意,这里设置锁和设置超时时间不是原子操作,所以可能会有问题,这里推荐使用lock()中的用法
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    return identifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 释放锁
     *
     * @param lockName 锁key
     * @param id       释放锁的标识
     * @return boolean
     */
    public boolean releaseLock(String lockName, String id) {
        String lockKey = KEY_PREF + lockName;
        boolean retFlag = false;
        try (Jedis conn = jedisPool.getResource()) {
            while (true) {
                // 监视lock, 准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若时该锁,则删除释放锁
                if (id.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retFlag;
    }

    // ---------------------------------------------------------------------------------------------//

    /**
     * 加锁
     *
     * @param lockName       锁的名称(key)
     * @param id             锁id(可以使用UUID或者其他手段来表示)
     * @param acquireTimeout 获取超时时间
     * @param timeout        锁的超时时间
     */
    public boolean lock(String lockName, String id, long acquireTimeout, long timeout) {
        // 锁名,即 key值
        String lockKey = KEY_PREF + lockName;
        // SET命令的参数
        SetParams params = SetParams.setParams().nx().px(acquireTimeout);
        try (Jedis jedis = jedisPool.getResource()) {
            long start = System.currentTimeMillis();
            while (true) {
                //SET命令返回OK ,则证明获取锁成功
                String lock = jedis.set(lockKey, id, params);
                if ("OK".equals(lock)) {
                    return true;
                }
                //否则循环等待,在timeout时间内仍未获取到锁,则获取失败
                long l = System.currentTimeMillis() - start;
                if (l >= timeout) {
                    return false;
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 解锁
     *
     * @param id       锁id(可以使用UUID或者其他手段来表示)
     * @param lockName 锁的名称(key)
     */
    public boolean unlock(String id, String lockName) {
        // 锁名,即 key值
        String lockKey = KEY_PREF + lockName;
        try (Jedis jedis = jedisPool.getResource()) {
            String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                    "   return redis.call('del',KEYS[1]) " +
                    "else" +
                    "   return 0 " +
                    "end";
            Object result = jedis.eval(script, Lists.newArrayList(lockKey), Lists.newArrayList(id));
            return "1".equals(result.toString());
        }
    }
}
03-05 22:10