我正在研究 TPL 数据流管道,并注意到一些与 TransformManyBlock 中的排序/并行相关的奇怪行为(也可能适用于其他块)。

这是我要重现的代码(.NET 4.7.2,TPL Dataflow 4.9.0):

class Program
{
    static void Main(string[] args)
    {
        var sourceBlock = new TransformManyBlock<int, Tuple<int, int>>(i => Source(i),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false });

        var targetBlock = new ActionBlock<Tuple<int, int>>(tpl =>
        {
            Console.WriteLine($"Received ({tpl.Item1}, {tpl.Item2})");
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = true });

        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (int i = 0; i < 10; i++)
        {
            sourceBlock.Post(i);
        }

        sourceBlock.Complete();
        targetBlock.Completion.Wait();
        Console.WriteLine("Finished");
        Console.Read();
    }

    static IEnumerable<Tuple<int, int>> Source(int i)
    {
        var rand = new Random(543543254);
        for (int j = 0; j < i; j++)
        {
            Thread.Sleep(rand.Next(100, 1500));
            Console.WriteLine($"Returning ({i}, {j})");
            yield return Tuple.Create(i, j);
        }
    }
}

我想要的行为如下:
  • 源块应该并行返回元组,唯一的要求是它们应该由次要属性 j 排序。
  • 目标块应该按照接收到的顺序处理消息。

  • 据我了解, yield return 的性质满足了二级排序条件,因此可以将 EnsureOrdered 设置为 false 。如果将其设置为 true ,则源块将在 Not Acceptable 时间内保留消息,因为它将等待所有 yield return 完成后再传递消息(在实际应用程序中处理了许多 GB 数据,这意味着我们想要尽快通过管道传播数据,以便我们可以释放 RAM)。这是源块的 EnsureOrdered 设置为 true 时的示例输出:
    Returning (1, 0)
    Returning (2, 0)
    Returning (4, 0)
    Returning (3, 0)
    Returning (2, 1)
    Returning (4, 1)
    Returning (3, 1)
    Received (1, 0)
    Received (2, 0)
    Received (2, 1)
    Returning (4, 2)
    Returning (3, 2)
    Received (3, 0)
    Received (3, 1)
    Received (3, 2)
    Returning (5, 0)
    Returning (6, 0)
    

    我们可以看到源块并行工作,但等待传播消息,直到行中的下一个 i 的所有消息都已生成(如预期的那样)。

    但是,当源块的 EnsureOrderedfalse 时(如代码示例中所示),我得到以下输出:
    Returning (2, 0)
    Received (2, 0)
    Returning (2, 1)
    Received (2, 1)
    Returning (4, 0)
    Received (4, 0)
    Returning (4, 1)
    Received (4, 1)
    Returning (4, 2)
    Received (4, 2)
    Returning (4, 3)
    Received (4, 3)
    Returning (1, 0)
    Received (1, 0)
    Returning (3, 0)
    Received (3, 0)
    Returning (3, 1)
    Received (3, 1)
    Returning (3, 2)
    Received (3, 2)
    

    源块在可用时成功传播消息,但是似乎失去了并行性,因为它一次只使用一个 i

    为什么是这样?如何强制它并行处理?

    最佳答案

    此处正在进行修复:https://github.com/dotnet/corefx/pull/31059

    感谢您的报告!

    关于c# - TPL 数据流 : Why does EnsureOrdered = false destroy parallelism for this TransformManyBlock?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51276432/

    10-17 02:45