本文介绍了Queue.Peek()的BlockingCollection的模拟听消耗的IEnumerable&LT时,T>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用管道模式实现从分离的消息消费者生产者为了避免缓慢的消费问题。

I'm using Pipelines pattern implementation to decouple messages consumer from a producer to avoid slow-consumer issue.

在的情况下对消息处理阶段的任何异常 [1] 将丢失,不会被分派到其他服务/层 [2 ] 。如何处理在这样的问题[3] 这样的信息不会丢失,什么是重要的!订单信息将不会被搞混了,因此上层业务/层会在他们来到的顺序消息。我有涉及的其他中间队列的想法但似乎复杂?不幸的是 BlockingCollection< T> 不公开的任何模拟Queue.Peek()方法,这样我就可以读取下一个可用消息,如果全成加工做出列()

In case of any exception on a message processing stage [1] it will be lost and not dispatched to an other service/layer [2]. How can I handle such issue in [3] so message will not be lost and what is important! order of messages will not be mixed up so upper service/layer will get messages in the order they came in. I have an idea which involves an other intermediate Queue but it seems complex? Unfortunately BlockingCollection<T> does not expose any analogue of Queue.Peek() method so I can just read next available message and in case of successfull processing do Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            

        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

修改:忘了说了,这是IIS托管WCF服务,通过客户端回调契约调度信息返回到Silverlight客户端WCF代理

EDIT: Forgot to say this is IIS hosted WCF service which dispatches messages back to Silverlight client WCF Proxy via client callback contract.

EDIT2:下面是一个使用我会怎么做这个皮克(),我失去了一些东西。

Below is how I would do this using Peek(), Am I missing something?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3:当然,我可以使用老式的方法使用whil​​e循环,布尔标志,队列的AutoResetEvent ,但我只是想知道是否同样可以使用 BlockingCollection GetConsumingEnumerable()我想设施如皮克将使用时一起与消费枚举,非常有帮助的,否则所有的管道模式实现的实例新的东西,如 BlockingCollection GetConsumingEnumerable()看起来不耐用,我不得不搬回老办法。

Surely I can use old style approach using while loop, bool flag, Queue and AutoResetEvent, but I just wondering whether the same is possible using BlockingCollection and GetConsumingEnumerable() I think facility like Peek would bevery helpful when using together with consuming enumerable, since otherwise all Pipeline pattern implementation examples new stuff like BlockingCollection and GetConsumingEnumerable() looks not durable and I have to move back to the old approach.

推荐答案

您应该考虑中间队列。

You should consider intermediate queue.

BlockingCollection&LT; T&GT; 无法窥视项目,因为它的本质 - 可以有多个消费者。其中之一就可以的偷看的一个项目,而另一个可能的需要的它 - 因此,第一个将尝试采取项目,已被采取

BlockingCollection<T> can't "peek" items because of its nature - there can be more than one consumer. One of them can peek an item, and another one can take it - hence, the first one will try to take item, that already has been taken.

这篇关于Queue.Peek()的BlockingCollection的模拟听消耗的IEnumerable&LT时,T&GT;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 15:14