简介

  • 什么是延时队列?

    • 一种带有延迟功能的消息队列
  • 使用场景

    • 比如存在某个业务场景

      • 发起一个订单,但是处于未支付的状态?如何及时的关闭订单并退还库存?
      • 如何定期检查处于退款订单是否已经成功退款?
    • 为了解决上述的场景,就可以通过延时队列去处理
  • 简单实现
    /**
* rabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate; /**
* rabbitAdmin
*/
@Autowired
private RabbitAdmin rabbitAdmin; /**
* messageProperties
*/
@Autowired
private MessageProperties messageProperties; /**
* 发送延迟队列消息
*
* @param exchange
* @param content
* @param millseconds
*/
public void delayPublish(String exchange, String content, int millseconds) {
String delayExchangeName = exchange + "_delay";
String delayQueueName = delayExchangeName + "->queue_delay";
String delayRouteKey = "dead";
Map<String, Object> arguments = new HashMap<>();
arguments.putIfAbsent("x-dead-letter-exchange", exchange);
declareExchange(exchange);
declareExchange(delayExchangeName);
rabbitAdmin.declareQueue(new Queue(delayQueueName, true, false, false, arguments));
rabbitAdmin.declareBinding(new Binding(delayQueueName, Binding.DestinationType.QUEUE, delayExchangeName,
delayRouteKey, Collections.emptyMap()));
MessageProperties messageProperties = getMessageProperties(Collections.emptyMap());
messageProperties.setExpiration(Integer.toString(millseconds));
publish(delayExchangeName, content, messageProperties);
} /**
* sendMessage
*
* @param exchange
* @param content
* @param messageProperties
*/
private void publish(String exchange, String content, MessageProperties messageProperties) {
declareExchange(exchange);
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
try {
rabbitTemplate.send(exchange, "", message);
log.debug("推送给exchange:{},消息体:{} 成功", exchange, content);
} catch (Exception e) {
log.error("推送给exchange:{},消息体:{} 失败!!", exchange, content, e);
throw e;
}
} /**
* declareExchange
*
* @param exchange
*/
private void declareExchange(String exchange) {
rabbitAdmin.declareExchange(new FanoutExchange(exchange, true, false, null));
} /**
* getMessageProperties
*
* @param header
* @return
*/
private MessageProperties getMessageProperties(Map<String, String> header) {
if (header == null) {
return this.messageProperties;
} MessageProperties customMessageProperties = new MessageProperties();
customMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
customMessageProperties.setHeader("content_encoding", "JSON");
for (Map.Entry<String, String> item : header.entrySet()) {
customMessageProperties.setHeader(item.getKey(), item.getValue());
}
return customMessageProperties;
}
  • 通过上述代码就可以实现一个简单的延时队列消息的发布
  • 那么只需要进行对应的监听便可以进行消费,达到延时队列的发布及消费的功能
05-27 12:40