本文介绍了使用 FromEventPattern 在订阅之前捕获事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Rx 框架编写消息侦听器.

I'm writing a listener for messages using the Rx framework.

我面临的问题是我使用的库使用了一个消费者,只要有消息到达,它就会发布事件.

The problem I'm facing is that the library I'm using uses a consumer that publishes events whenever a message has arrived.

我已经设法通过 Observable.FromEventPattern 使用传入的消息,但我对服务器中已有的消息有问题.

I've managed to consume the incoming messages via Observable.FromEventPattern but I have a problem with the messages that are already in the server.

目前我有以下命令链

  1. 创建消费者
  2. 使用 FromEventPattern 创建一个可观察序列并应用所需的转换
  3. 告诉消费者开始
  4. 订阅序列
  1. Create a consumer
  2. Create an observable sequence with FromEventPattern and apply needed transformations
  3. Tell the consumer to start
  4. Subscribe to the sequence

最简单的解决方案是交换第 3 步和第 4 步,但由于它们发生在系统的不同组件中,因此我很难这样做.

The easiest solution would be to swap steps 3. and 4. but since they happen in different components of the system, it's very hard for me to do so.

理想情况下,我希望在第 4 步发生时执行第 3 步(如 OnSubscribe 方法).

Ideally I would like to execute step 3 when step 4 happens (like a OnSubscribe method).

感谢您的帮助:)

PS:要添加更多详细信息,事件来自 RabbitMQ 队列,我使用的是 RabbitMQ.Client 包中的 EventingBasicConsumer 类.

PS: to add more details, the events are coming from a RabbitMQ queue and I am using the EventingBasicConsumer class found in the RabbitMQ.Client package.

在这里您可以找到我正在开发的库.具体来说,这个是类/method 给我带来了问题.

Here you can find the library I am working on. Specifically, this is the class/method giving me problems.

编辑

这是有问题的代码的剥离版本

Here is a stripped version of the problematic code

void Main()
{
    var engine = new Engine();

    var messages = engine.Start();

    messages.Subscribe(m => m.Dump());

    Console.ReadLine();

    engine.Stop();
}

public class Engine
{
    IConnection _connection;
    IModel _channel;

    public IObservable<Message> Start()
    {
        var connectionFactory = new ConnectionFactory();

        _connection = connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();

        EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);

        var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
                                        a => consumer.Received += a, 
                                        a => consumer.Received -= a)
                                    .Select(e => e.EventArgs);

        _channel.BasicConsume("a_queue", false, consumer);

        return observable.Select(Transform);
    }

    private Message Transform(BasicDeliverEventArgs args) => new Message();

    public void Stop()
    {
        _channel.Dispose();
        _connection.Dispose();
    }
}

public class Message { }

我遇到的症状是,由于我在订阅序列之前调用了 BasicConsume,因此 RabbitMQ 队列中的任何消息都会被获取,但不会传递到管道中.

The symptom I experience is that since I invoke BasicConsume before subscribing to the sequence, any message that is in the RabbitMQ queue is fetched but not passed down the pipeline.

由于我没有打开autoack",程序一停止,消息就会返回到队列中.

Since I don't have "autoack" on, the messages are returned to the queue as soon as the program stops.

推荐答案

正如某些人在评论中指出的那样,正如您在问题中所指出的,问题是由于您使用 RabbitMQ 客户端的方式造成的.

As some have noted in the comments, and as you note in the question, the issue is due to the way you're using the RabbitMQ client.

>

为了解决其中一些问题,我实际上是创建了一个 ObservableConsumer 类.这是当前使用的 EventingBasicConsumer 的替代方案.我这样做的一个原因是为了解决问题中描述的问题,但这样做的另一件事是允许您在单个连接/通道实例之外重用这个消费者对象.这样做的好处是,尽管存在瞬态连接/通道特性,但允许您的下游反应代码保持连线.

To get around some of these issues, what I actually did was create an ObservableConsumer class. This is an alternative to the EventingBasicConsumer which is in use currently. One reason I did this was to deal with the issue described in the question, but the other thing this does is allow you to re-use this consumer object beyond a single connection/channel instance. This has the benefit of allowing your downstream reactive code to remain wired in spite of transient connection/channel characteristics.

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;

namespace com.rabbitmq.consumers
{
    public sealed class ObservableConsumer : IBasicConsumer
    {
        private readonly List<string> _consumerTags = new List<string>();
        private readonly object _consumerTagsLock = new object();
        private readonly Subject<Message> _subject = new Subject<Message>();

        public ushort PrefetchCount { get; set; }
        public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }

        /// <summary>
        /// Registers this consumer on the given queue. 
        /// </summary>
        /// <returns>The consumer tag assigned.</returns>
        public string ConsumeFrom(IModel channel, string queueName)
        {
            Model = channel;
            return Model.BasicConsume(queueName, false, this);
        }

        /// <summary>
        /// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
        /// </summary>
        public IObservable<Message> IncomingMessages
        {
            get { return _subject.ObserveOn(Scheduler.ThreadPool); }
        }

        ///<summary>Retrieve the IModel instance this consumer is
        ///registered with.</summary>
        public IModel Model { get; private set; }

        ///<summary>Returns true while the consumer is registered and
        ///expecting deliveries from the broker.</summary>
        public bool IsRunning
        {
            get { return _consumerTags.Count > 0; }
        }

        /// <summary>
        /// Run after a consumer is cancelled.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerCanceled(string consumerTag)
        {

        }

        /// <summary>
        /// Run after a consumer is added.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerAdded(string consumerTag)
        {

        }

        public void HandleBasicConsumeOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (!_consumerTags.Contains(consumerTag))
                    _consumerTags.Add(consumerTag);
            }
        }

        public void HandleBasicCancelOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleBasicCancel(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
        {
            //Don't need to do anything.
        }

        public void HandleBasicDeliver(string consumerTag,
                                       ulong deliveryTag,
                                       bool redelivered,
                                       string exchange,
                                       string routingKey,
                                       IBasicProperties properties,
                                       byte[] body)
        {
            //Hack - prevents the broker from sending too many messages.
            //if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
            //    Model.BasicReject(deliveryTag, true);
            //    return;
            //}

            var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
            var deliveryData = new MessageDeliveryData()
            {
                ConsumerTag = consumerTag,
                DeliveryTag = deliveryTag,
                Redelivered = redelivered,
            };

            message.Tag = deliveryData;

            if (AckMode != AcknowledgeMode.AckWhenReceived) {
                message.Acknowledged += messageAcknowledged;
                message.Failed += messageFailed;
            }

            _subject.OnNext(message);
        }

        void messageFailed(Message message, Exception ex, bool requeue)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
                }
            }
            catch {}
        }

        void messageAcknowledged(Message message)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
                    Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
                }
            }
            catch {}
        }
    }
}

这篇关于使用 FromEventPattern 在订阅之前捕获事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-17 00:56