在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。

MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。

我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+.NET CORE 3.1

设计思路为:

内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。

那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。

下面结合代码讲下具体实现:

我们先看看业务队列发送消息时,如何定义

IBasicProperties properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                //初始化,需要内置一些消费异常,自动重试参数 
                if (headers == null)
                {
                    headers = new Dictionary<string, object>();
                }
                //ttlSecond 有值表示消息将投递到延迟队列
                //因为可以自建延迟队列,ttlSecond是业务标识 
                if (ttlSecond.HasValue)
                {
                    if (!headers.ContainsKey("x-delay"))
                    {
                        headers.Add("x-delay", ttlSecond * 1000);
                    }
                    else
                    {
                        headers["x-delay"] = ttlSecond * 1000;
                    }
                    //queueType = 1表示延迟队列
                    //框架内部重试机制需要此参数,因为重新投递到原始队列时,需要区分普通队列还是延迟队列
                    if (!headers.ContainsKey("queueType"))
                    {
                        headers.Add("queueType", 1);
                    }
                }
                else
                {
                    //queueType = 0表示普通队列
                    if (!headers.ContainsKey("queueType"))
                    {
                        headers.Add("queueType", 0);
                    }
                }
                //重试次数
                if (!headers.ContainsKey("retryCount"))
                {
                    headers.Add("retryCount", 0);
                }
                //原始交换机名称
                if (!headers.ContainsKey("retryExchangeName"))
                {
                    headers.Add("retryExchangeName", exchangeName);
                }
                //原始路由键
                if (!headers.ContainsKey("retryRoutingKey"))
                {
                    headers.Add("retryRoutingKey", routingKey);
                }
                properties.Headers = headers;
                channel.BasicPublish(exchangeName, routingKey, properties, Encoding.UTF8.GetBytes(message));
12-30 20:50