如果同时设置队列和队列中消息的TTL
,则TTL
值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL
过期时间则成为Dead Letter
(死信)。
Dead Letter Exchanges
(DLX
)
DLX
即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ
的 Queue
(队列)可以配置两个参数x-dead-letter-exchange
和 x-dead-letter-routing-key
(可选),一旦队列内出现了Dead Letter
(死信),则按照这两个参数可以将消息重新路由到另一个Exchange
(交换机),让消息重新被消费。
x-dead-letter-exchange
:队列中出现Dead Letter
后将Dead Letter
重新路由转发到指定 exchange
(交换机)。
x-dead-letter-routing-key
:指定routing-key
发送,一般为要指定转发的队列。
队列出现Dead Letter
的情况有:
下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue
,并设置x-message-tt
消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter
(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange
,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。
发送消息时指定消息延迟的时间
public void send(String delayTimes) {
amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}
}
设置延迟队列出现死信后的转发规则
/**
* 延时队列
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabbitConstant.DEAD_LETTER_QUEUE)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "order.close.exchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.build();
}
6、时间轮
前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafka
、netty
都有基于时间轮算法实现延时队列,下边主要实践Netty
的延时队列讲一下时间轮是什么原理。
先来看一张时间轮的原理图,解读一下时间轮的几个基本概念wheel
:时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈round
长度为24秒
,刻度数为 8
,那么每一个刻度表示 3秒
。那么时间精度就是 3秒
。时间长度 / 刻度数值越大,精度越大。
当添加一个定时、延时任务A
,假如会延迟25秒
后才会执行,可时间轮一圈round
的长度才24秒
,那么此时会根据时间轮长度和刻度得到一个圈数 round
和对应的指针位置 index
,也是就任务A
会绕一圈指向0格子
上,此时时间轮会记录该任务的round
和 index
信息。当round=0,index=0 ,指针指向0格子
任务A
并不会执行,因为 round=0不满足要求。
所以每一个格子代表的是一些时间,比如1秒
和25秒
都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap
的数据有些类似。
Netty
构建延时队列主要用HashedWheelTimer
,HashedWheelTimer
底层数据结构依然是使用DelayedQueue
,只是采用时间轮的算法来实现。
下面我们用Netty
简单实现延时队列,HashedWheelTimer
构造函数比较多,解释一下各参数的含义。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
public class NettyDelayQueue {
public static void main(String[] args) {
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
//定时任务
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order1 5s 后执行 ");
timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order2 10s 后执行");
timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
}
};
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
//延迟任务
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order3 15s 后执行一次");
}
}, 15, TimeUnit.SECONDS);
}
}
从执行的结果看,order3
、order3
延时任务只执行了一次,而order2
、order1
为定时任务,按照不同的周期重复执行。
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行
总结
为了让大家更容易理解,上边的代码写的都比较简单粗糙,几种实现方式的demo
已经都提交到github
地址:https://github.com/chengxy-nds/delayqueue
,感兴趣的小伙伴可以下载跑一跑。
这篇文章肝了挺长时间,写作一点也不比上班干活轻松,查证资料反复验证demo的可行性,搭建各种RabbitMQ
、Redis
环境,只想说我太难了!
可能写的有不够完善的地方,如哪里有错误或者不明了的,欢迎大家踊跃指正!!!
最后
原创不易,码字不易,点个再看吧~
redis 分布式锁的 5个坑,真是又大又深
一口气说出 6种 @Transactional 注解失效场景
一口气说出 4种 “附近的人” 实现方式,面试官笑了
本文分享自微信公众号 - 程序员内点事(chengxy-nds)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。