Redis-分布式锁

如何使用分布式锁

正常在一个java服务中使用sync锁或lock锁完全可以满足线程安全问题的,但是在部署集群的情况下,不同的jvm不能锁同一个方法,因此需要分布式锁用来保护线程安全问题。

分布式锁实现

常见的分布式锁解决方案:

  1. Mysql:自带悲观锁,但是不太好维护
  2. redis:利用setnx实现互斥,操作方便,推荐使用
  3. zookeeper:利用节点实现互斥

本章主要采用redis的方式进行实现

public interface ILock {
    /**
     * 分布式-互斥锁
     */
    boolean tryLock(String name, Long time, TimeUnit unit);

    /**
     * 分布式-释放互斥锁
     */
    void unLock(String name);
}
/**
 * 分布式锁实现
 */
@Component
public class DistributedLock implements ILock{

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /** 分布式锁key */
    private final String DISTRIBUTED_LOCK = "distributed_lock:";

    /**
     * 分布式互斥锁
     */
    @Override
    public boolean tryLock(String name, Long time, TimeUnit unit) {
        // value
        String value = Thread.currentThread().getId() + "";
        // key
        String key = DISTRIBUTED_LOCK + name;
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
        // 防止自动拆箱空指针
        return BooleanUtil.isTrue(aBoolean);
    }

    /**
     * 分布式释放锁
     */
    @Override
    public void unLock(String name) {
        String key = DISTRIBUTED_LOCK + name;
        stringRedisTemplate.delete(key);
    }
}

分布式锁误删问题

在设置互斥锁的时候为了解决redis宕机导致互斥锁永久失效的情况下,加了一个过期时间。此时如果缓存重建的时间比过期时间更长,会导致多个线程释放不同的锁资源导致分布式锁误删问题。
解决误删问题:

  1. 需要在获取锁时存入线程表示(uuid + 线程id)的方式
  2. 在释放锁时需要先获取锁中的线程标识,判断是否与当前线程标识一致
    • 如果一致则释放锁
    • 如果不一致则不释放锁

更新代码:

@Component
public class DistributedLock implements ILock{

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /** 分布式锁key */
    private final String DISTRIBUTED_LOCK = "distributed_lock:";
    /** UUID */
    private String uuid = UUID.randomUUID(true).toString();

    /**
     * 分布式互斥锁
     */
    @Override
    public boolean tryLock(String name, Long time, TimeUnit unit) {
        // value
        String value = uuid + Thread.currentThread().getId();
        // key
        String key = DISTRIBUTED_LOCK + name;
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
        // 防止自动拆箱空指针
        return BooleanUtil.isTrue(aBoolean);
    }

    /**
     * 分布式释放锁
     */
    @Override
    public void unLock(String name) {
        String key = DISTRIBUTED_LOCK + name;
        String value = uuid + Thread.currentThread().getId();
        // 获取互斥锁中的值
        String redisValue = stringRedisTemplate.opsForValue().get(key);
        if (StringUtils.equals(value,redisValue)){
            stringRedisTemplate.delete(key);
        }
    }

}

Redisson入门

正常使用setnx实现分布式锁存在以下几种问题

  1. 不可重入锁:同一现成无法多次获取同一把锁
  2. 不可重试:获取锁只尝试一次就返回,无法重试
  3. 超时释放:业务执行耗时较长,会导致锁释放
  4. 主从一致性:集群的情况下主节点宕机后同步数据过程种,导致锁失效

Redisson 是一个 Java 高级 Redis 客户端,提供了基于 Redis 的分布式和可扩展的 Java 数据结构,如并发集合(Concurrent Collections)、同步器(Synchronizers)、分布式服务(Distributed Services)等。Redisson 构建于 Jedis 之上,旨在简化 Redis 的使用,尤其对于分布式环境中的应用程序而言,它提供了一种易于使用的 API 来处理 Redis 中的数据,并实现了多种分布式锁和其他高级功能。Redisson底层采用的是Netty 框架

案例:每个用户对一件商品只能下一单。

配置文件

redisson:
  # redis key前缀
  keyPrefix:
  # 线程池数量
  threads: 4
  # Netty线程池数量
  nettyThreads: 8
  # 单节点配置
  singleServerConfig:
    # 客户端名称
    clientName: ${ruoyi.name}
    # 最小空闲连接数
    connectionMinimumIdleSize: 8
    # 连接池大小
    connectionPoolSize: 32
    # 连接空闲超时,单位:毫秒
    idleConnectionTimeout: 10000
    # 命令等待超时,单位:毫秒
    timeout: 3000
    # 发布和订阅连接池大小
    subscriptionConnectionPoolSize: 50
/**
 * Redisson 配置属性
 *
 */
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {

    /**
     * redis缓存key前缀
     */
    private String keyPrefix;

    /**
     * 线程池数量,默认值 = 当前处理核数量 * 2
     */
    private int threads;

    /**
     * Netty线程池数量,默认值 = 当前处理核数量 * 2
     */
    private int nettyThreads;

    /**
     * 单机服务配置
     */
    private SingleServerConfig singleServerConfig;

    /**
     * 集群服务配置
     */
    private ClusterServersConfig clusterServersConfig;

    @Data
    @NoArgsConstructor
    public static class SingleServerConfig {

        /**
         * 客户端名称
         */
        private String clientName;

        /**
         * 最小空闲连接数
         */
        private int connectionMinimumIdleSize;

        /**
         * 连接池大小
         */
        private int connectionPoolSize;

        /**
         * 连接空闲超时,单位:毫秒
         */
        private int idleConnectionTimeout;

        /**
         * 命令等待超时,单位:毫秒
         */
        private int timeout;

        /**
         * 发布和订阅连接池大小
         */
        private int subscriptionConnectionPoolSize;

    }

    @Data
    @NoArgsConstructor
    public static class ClusterServersConfig {

        /**
         * 客户端名称
         */
        private String clientName;

        /**
         * master最小空闲连接数
         */
        private int masterConnectionMinimumIdleSize;

        /**
         * master连接池大小
         */
        private int masterConnectionPoolSize;

        /**
         * slave最小空闲连接数
         */
        private int slaveConnectionMinimumIdleSize;

        /**
         * slave连接池大小
         */
        private int slaveConnectionPoolSize;

        /**
         * 连接空闲超时,单位:毫秒
         */
        private int idleConnectionTimeout;

        /**
         * 命令等待超时,单位:毫秒
         */
        private int timeout;

        /**
         * 发布和订阅连接池大小
         */
        private int subscriptionConnectionPoolSize;

        /**
         * 读取模式
         */
        private ReadMode readMode;

        /**
         * 订阅模式
         */
        private SubscriptionMode subscriptionMode;

    }

}
/**
 * redis配置
 *
 */
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {

    @Autowired
    private RedissonProperties redissonProperties;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public RedissonAutoConfigurationCustomizer redissonCustomizer() {
        return config -> {
            ObjectMapper om = objectMapper.copy();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            // 指定序列化输入的类型,类必须是非final修饰的。序列化时将对象全类名一起保存下来
            om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
            TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);
            // 组合序列化 key 使用 String 内容使用通用 json 格式
            CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);
            config.setThreads(redissonProperties.getThreads())
                .setNettyThreads(redissonProperties.getNettyThreads())
                // 缓存 Lua 脚本 减少网络传输(redisson 大部分的功能都是基于 Lua 脚本实现)
                .setUseScriptCache(true)
                .setCodec(codec);
            RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
            if (ObjectUtil.isNotNull(singleServerConfig)) {
                // 使用单机模式
                config.useSingleServer()
                    //设置redis key前缀
                    .setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
                    .setTimeout(singleServerConfig.getTimeout())
                    .setClientName(singleServerConfig.getClientName())
                    .setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
                    .setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
                    .setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
                    .setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
            }
            // 集群配置方式 参考下方注释
            RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
            if (ObjectUtil.isNotNull(clusterServersConfig)) {
                config.useClusterServers()
                    //设置redis key前缀
                    .setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
                    .setTimeout(clusterServersConfig.getTimeout())
                    .setClientName(clusterServersConfig.getClientName())
                    .setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
                    .setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
                    .setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
                    .setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
                    .setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
                    .setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
                    .setReadMode(clusterServersConfig.getReadMode())
                    .setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
            }
            log.info("初始化 redis 配置");
        };
    }
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {

    private final BookOrderMapper baseMapper;

    private final SysUserMapper sysUserMapper;

    private final BooksMapper booksMapper;
    private final BookOrderDetailMapper bookOrderDetailMapper;

    /** 自定义分布式锁 */
    private final DistributedLock distributedLock;
    /** redission  */
    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
    /**
     * 模拟库存扣减并发问题
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void inventory(String bookId,Long userId) {
        // 一人一单校验
        Long aLong = bookOrderDetailMapper.selectCount(
            Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId)
            .eq(BookOrderDetail::getBookId,bookId)
        );
        if (aLong > 0){
            throw new ServiceException("下单失败");
        }

        // 自定义获取锁
        // boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);
        // redisClient分布式锁
        RLock lock = CLIENT.getLock("lock:order:");
        // 默认失败不等待锁时间,锁过期时间30秒
        boolean piker = lock.tryLock();
        if (piker){
            try {
                // 订单业务
                placingOrder(bookId, userId);
            }finally {
                // 自定义释放锁
                // distributedLock.unLock("PIKER");

                // redisson 释放锁
                lock.unlock();
            }
        }
    }

    /**
     * 业务操作
     */
    private void placingOrder(String bookId, Long userId) {
        // 1.减少库存
        Books books = booksMapper.selectById(bookId);
        books.setStockQuantity(books.getStockQuantity() - 1);
        booksMapper.updateById(books);

        // 2.增加订单
        BookOrderDetail bookOrder = new BookOrderDetail();
        bookOrder.setBookId(Long.parseLong(bookId));
        bookOrder.setNumber(userId.intValue());
        bookOrderDetailMapper.insert(bookOrder);
    }
}

Redisson-分布式锁实现原理

1.可重入锁
方法1{
	获取锁
	调用方法2
}
方法2{
	获取锁
}

以上这种情况下使用自定义的setnx方式就会造成死锁的情况,比较经典的重入锁。

Rdisson使用Lua脚本来实现可重入锁的。
Redis-分布式锁-LMLPHP

2.重试机制,超时释放

重试机制:在设置互斥锁时有两个线程A,B。A线程先获取锁资源,之后B在获取锁就会一直失败,因为锁的互斥性,没有重试的机制。

超时释放:给锁设置一个过期时间,防止redis宕机情况下锁一直没有办法被释放导致死锁情况,或者因为业务原因导致缓存重建时间大于锁过期时间导致数据丢失

注意:redisson不同版本的代码不同,但是整体流程是大差不大的,下面是结合黑马程序猿老师结合总结的流程图。
如果自己设置失效时间的话,锁过期时间就不是-1因此就不会触发看门狗机制了。

获取锁:

Redis-分布式锁-LMLPHP

释放锁:

Redis-分布式锁-LMLPHP

有了以上的机制可以实现:我有三个线程 A,B 设置等待时间3秒,线程A先获取到锁,由于业务原因进行阻塞,此时线程2开始获取锁。线程A业务执行了4秒,那么首先线程2获取锁失败。如果线程A执行业务在3秒内完成,那么线程2可以成功获取锁。

04-16 14:22