本文介绍了c# 客户端上的 RabbitMQ 手动 ACK的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在一个非常简单的控制台应用程序上使用手动 ACK,但我无法让它工作.

I'm trying to use a manual ACK on a very simple console application, but I can't make it work.

在发件人,我有以下代码:

On the sender, I have the following code:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);

    channel.ConfirmSelect();
    channel.BasicAcks += (sender, e) =>
    {
        Console.Write("ACK received");
    };

    var properties = channel.CreateBasicProperties();

    channel.BasicPublish(exchange: "",
                         routingKey: "task_queue",
                         basicProperties: properties,
                         body: body);

    Console.WriteLine(" [x] Sent {0}", message);
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

在接收器上,我有以下代码:

On the receiver I have the following code:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    channel.ConfirmSelect();

    Console.WriteLine(" [*] Waiting for messages.");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);

        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);

        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        Console.WriteLine(" [x] Done");
    };
    channel.BasicConsume(queue: "task_queue",
                         noAck: false,
                         consumer: consumer);

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

我期望的是,当我在接收器上调用 channel.BasicAck() 时,发送器上的事件 BasicAcks 会被触发,但是当消息在 consumer.Received 之前传递给客户端.

What I expect is that the event BasicAcks on the sender is fired when I call channel.BasicAck() on the receiver, but that event is being fired when the message is delivered to the client, before consumer.Received.

我期望的是正确的行为还是我遗漏了什么?

Is what I'm expecting the correct behavior or am I missing something?

推荐答案

您的期望不正确.BasicAcks 是关于发布者确认,而不是关于接收者的确认.因此,您向代理发布消息,broker(因此,RabbitMQ 本身)将在处理此消息时(例如 - 将其写入磁盘以获取持久消息时)确认或确认(否定确认)您, 或当 in 将其放入队列中).请注意,这里不涉及接收者 - 完全在发布者和 RabbitMQ 之间.

Your expectation is not correct. BasicAcks is about publisher confirms, not about ack from receiver. So you publish a message to broker and broker (so, RabbitMQ itself) will ack or nack (negative acknowledge) you when it handles this message (for example - when it will write it to disk for persistent messages, or when in puts it in queue). Note that no receiver is involved here - it's entirely between publisher and RabbitMQ.

现在,当您在接收方确认消息时 - 这再次仅在接收方和 RabbitMQ 之间 - 您告诉兔子该消息已被处理并且可以安全地删除.这样做是为了处理接收器在处理过程中崩溃的情况 - 然后 rabbit 将能够将此消息传递给下一个接收器(如果有).

Now when you Ack message at receiver - that's again only between receiver and RabbitMQ - you tell rabbit that message is processed and can be safely deleted. This is done to handle situations when receiver crashes during processing - then rabbit will be able to deliver this message to the next receiver (if any).

请注意,此类架构的全部目的是将发布者和接收者分开 - 他们不应相互依赖.

Note that the whole purpose of such arcitecture is to separate publishers and receivers - they should not be dependent on each other.

如果您有一个接收器(可以有多个)并且您想确保它处理您的消息 - 使用 RPC 模式:发送消息并等待来自该接收器的另一条消息.

If you have one receiver (there can be many) and you want to ensure it processed your message - use RPC pattern: send message and wait for another message back from this receiver.

这篇关于c# 客户端上的 RabbitMQ 手动 ACK的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 05:51