前言

      今天跟大家分享的是一个基于redisson实现的延时队列,有个初版的封装工具,使用者只用关心延时时间到了取到的数据处理(或者之前处理,到时间只做剩下的业务),废话不多说,直接上货。


一、业务场景

      这里是对物联网设备做数据模拟上报。看下原型转化后的需求界面吧。
基于redisson实现延时队列解耦业务-LMLPHP

二、实现思路

1、实现其实有很多方案:

  1. 用timer实现
  2. 用java提供的队列实现
  3. redis实现
  4. redission实现

      最简单的直接用timer都可以做,我是想到这个延时队列以后还有其他场景使用,让其他开发小伙伴只用关心业务,所以基于redisson实现,封装延时队列工具类。

2、业务流程图

我自己画的简单流程图:
基于redisson实现延时队列解耦业务-LMLPHP

三、核心代码

1.redisson引入与配置

      这个我之前有写,这里就不重复了

2.延时队列工具

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * redisson实现的延时队列
 *
 *
 * @author zwmac
 */
@Slf4j
@Component
public class RedissonDelayQueue {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加任务到延时队列里面
     *
     * @param queueName 队列名称
     * @param data      数据
     * @param delayTime 延时时间,单位秒
     */
    public void addTaskToDelayQueue(String queueName,JSONObject data,Long delayTime) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(data, delayTime, TimeUnit.SECONDS);

        }
    }

    /**
     * 删除延时队列
     * @param queueName 队列名称
     */
    public void delDelayQueue(String queueName) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

            blockingDeque.clear();
            blockingDeque.delete();
            delayedQueue.clear();
            delayedQueue.destroy();

        }
    }

    /**
     * 判断队列是否存在
     * @param queueName 队列名称
     * @return true 存在,false 不存在
     */
    public boolean hasQueue(String queueName) {
        RBlockingDeque<JSONObject> blockingDeque =  redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        if (blockingDeque.isExists() && delayedQueue.isExists() && !delayedQueue.isEmpty()){
            return true;
        }
        return false;
    }

    /**
     * 队列消费者
     * @param consumer 消费者
     * @param queueName 队列名称
     */
    public void queueConsumer( Consumer consumer, String queueName){
        new Thread(() -> {
            while (true){
                try {
                    JSONObject data = this.takeFromDelayQueue(queueName);
                    if (data != null){
                        //消费接口
                        consumer.accept(data);
                        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
                        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                        if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                            //所有数据已经轮训完毕,删除队列
                            this.delDelayQueue(queueName);
                            //结束线程
                            log.info("队列名称:{},延时元素消费完成,退出释放线程",queueName);
                            break;
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //退出,释放线程
                    log.info("队列名称:{},退出线程释放,原因:{}",queueName,e.getMessage());
                    break;
                }

            }

        },queueName + "-Customer").start();
    }

    /**
     * 从延时队列里面取出数据
     * @param queueName 队列名称
     * @return 队列元素json对象
     * @throws Exception 异常
     */
    public JSONObject takeFromDelayQueue(String queueName) throws Exception {
        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        JSONObject jsonObject = null;
        try {
            //log.info("--队列名称:{},blockingDeque数量:{},delayedQueue数量:{}",queueName,blockingDeque.size(),delayedQueue.size());
            if (blockingDeque.isExists()){
                log.info("--出队列前--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
                jsonObject = blockingDeque.take();
                log.info("--出队列后--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
            }
            /** 这里处理早了,还没有消费就销毁了,会导致消费数据差一条
             if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                //所有数据已经轮训完毕,删除队列
                this.delDelayQueue(queueName);
                //结束线程
                //Thread.currentThread().interrupt();
                throw new RuntimeException("所有数据已经轮训完毕,删除队列");
            }**/
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return jsonObject;
    }
}

      里面有关于线程销毁注释了一段,有兴趣的可以看看,为什么销毁不在那里处理,当然原因我也写在注释里了的。

2.使用

@Resource
  private RedissonDelayQueue redissonDelayQueue;

private MessageInfo historyReceive(JSONObject jsonObject, String identify) {
    //从ES查询设备的历史数据
    List<JSONObject> historyData = searchHisFromEs(nbDeviceId, startTime, endTime,logMarkId);

    //查询该设备是否有重放队列在执行
    String hisRetryQueueKey = "hisRetryQueueKey-" + nbDeviceId;
    if(redissonDelayQueue.hasQueue(hisRetryQueueKey)){
      //有重放队列在执行,删除原队列
      redissonDelayQueue.delDelayQueue(hisRetryQueueKey);
    }

    //放到延时队列
    if (CollectionUtil.isNotEmpty(historyData)) {
      queueConsumer(redissonDelayQueue,nbDeviceId,logMarkId,identify,hisRetryQueueKey);
      for (int i = 0; i < historyData.size(); i++) {
        JSONObject data = historyData.get(i);
        Long interval = 2L;
        if (i > 0){
          interval = Long.valueOf(intervalTime * i) + interval;
        }
        redissonDelayQueue.addTaskToDelayQueue(hisRetryQueueKey,data,interval);
      }
    }

    return new MessageInfo(0, "success");
  }
/**延时数据业务处理
**/
private void queueConsumer(RedissonDelayQueue redissonDelayQueue, String nbDeviceId, String logMarkId, String identify, String hisRetryQueueKey) {
    //消费延时队列数据
    redissonDelayQueue.queueConsumer(data -> {
      //重放数据做数据重新组织后,直接放到解析完成的队列
      log.info("时间:{}---重放数据:{}", DateUtil.now(),data);
      //业务处理
      

    },hisRetryQueueKey);
  }

      我这里是在从延时队列取到元素后做的一些业务操作,如果没有一些下游级联操作,其实可以在放入队列的for循环里做,真正到时间了,再做一些简单的业务也可以。
      可以看出,现在使用就只需要处理for循环放入延时队列,queueConsumer消费处理延时到期的业务。

3.效果

基于redisson实现延时队列解耦业务-LMLPHP


总结

  1. 解耦,让开发只用关注业务
  2. 基于redisson不用太关注redis底层实现,这里可以理解就是2个队列,一个未到期队列、一个到期队列,随着时间的推移redisson帮我们实现从未到期移动数据到到期,我们只用管从到期取到数据的操作
  3. 封装还很粗糙,还有进步空间
    就分享到这,希望能帮到大家,uping!
08-24 20:02