1 场景

公司是做物联网相关业务 最近在和移远对接流量卡的流量管理,具体业务涉及到卡停机,卡复机。 当用户充值之后,后台调用移远的卡复机接口复机,但是对接的时候实际测试下来,发现卡停机之后马上复机,会提示操作失败,经过沟通得知,这是运营商那边的接口返回的,卡停机之后不能马上复机。要间隔10分钟以上,上网搜索了一下,稍加思考,决定采用延迟队列的方式实现停机之后的复机操作, 当用户第一次操作复机返回失败的时候,放置到延迟队列里面,等待10分钟之后再执行 下面上代码

首先创建一个延迟队列管理类,采用spring 托管

package com.juyi.camera.config;

import com.alibaba.fastjson.JSONObject;
import com.juyi.camera.utils.task.DelayTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/3/31
 * Time: 9:18
 */
@Component
public class DelayQueueManager implements CommandLineRunner {
    private final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);
    private DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
    @Resource
    private ExecutorService consumerExecutor;

    /**
     * 加入到延时队列中
     *
     * @param task
     */
    public void put(DelayTask task) {
        delayQueue.put(task);
    }

    @Override
    public void run(String... args) {
        consumerExecutor.execute(new Thread(this::excuteThread));
    }

    /**
     * 延时任务执行线程
     */
    private void excuteThread() {
        while (true) {
            try {
                DelayTask task = delayQueue.take();
                processTask(task);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    /**
     * 内部执行延时任务
     *
     * @param task
     */
    private void processTask(DelayTask task) {
        JSONObject cardInfo = task.getData().getPayload();
        String msisdn = cardInfo.getString("msisdn");
        String iccid = cardInfo.getString("iccid");
        logger.info("msisdn:{},iccid:{}", msisdn, iccid);
    }
}












excuteThread 方法里面 delayQueue 阻塞执行,直到有数据

package com.juyi.camera.utils.task;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/3/31
 * Time: 9:17
 * 延时任务
 */
public class DelayTask implements Delayed {
    final private TaskBase data;
    final private long expire;

    /**
     * 构造延时任务
     *
     * @param data   业务数据
     * @param expire 任务延时时间(ms)
     */
    public DelayTask(TaskBase data, long expire) {
        super();
        this.data = data;
        this.expire = expire + System.currentTimeMillis();
    }

    public TaskBase getData() {
        return data;
    }

    public long getExpire() {
        return expire;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof DelayTask) {
            String msgId = ((DelayTask) obj).getData().getMsgId();
            return this.data.getMsgId().equals(msgId);
        }
        return false;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), unit);
    }

    @Override
    public int compareTo(Delayed o) {
        long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return (int) delta;
    }
}










DelayTask 为自定义的队列元素 必须实现 Delayed

package com.juyi.camera.utils.task;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/3/31
 * Time: 9:17
 */
public class TaskBase {
    private String msgId;
    private JSONObject payload;


    public TaskBase(String msgId) {
        this.msgId = msgId;
    }

    public TaskBase(String msgId, JSONObject payload) {
        this.msgId = msgId;
        this.payload = payload;
    }

    public String getMsgId() {
        return msgId;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public JSONObject getPayload() {
        return payload;
    }

    public void setPayload(JSONObject payload) {
        this.payload = payload;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}









TaskBase 是用户要处理的延时任务的数据基类,存放一些自定义数据,例如 设备号、卡号、ID、操作的用户等等

package com.juyi.camera.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by IntelliJ IDEA.
 * User: xuzhou
 * Date: 2020/4/29
 * Time: 18:54
 */
@Configuration
@Slf4j
public class ThreadPoolUtil implements DisposableBean {
    /**
     * 使用有界队列,避免OOM
     */
    private static ExecutorService consumerExecutor;

    /**
     * 获取线程池实例
     *
     * @return 线程池对象
     */
    @Bean
    public ExecutorService getThreadPool() {
        consumerExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());
        log.info("ThreadPoolUtil 创建单例:{}", ((ThreadPoolExecutor) consumerExecutor).getActiveCount());
        return consumerExecutor;
    }

    @Override
    public void destroy() {
        log.info("销毁 consumerExecutor");
        consumerExecutor.shutdown();
    }
}






ThreadPoolUtil 是spring 托管的线程池,在初始化延迟队列的时候使用了

2 单元测试效果如下,可以很好的满足需求

记一次工作中使用延迟队列的场景-LMLPHP

3 后记

这部分代码只满足了,流量卡复机,如果小于10分钟,执行失败,然后放置的延迟队列里面,10分钟之后再执行这个逻辑,仔细思考下,如果延迟队列里面的流量卡复机操作也失败了,是否还要加上N次重试的机制?N次之后发邮件告警等等。。。

03-30 21:45