本文介绍了如何实现一个高效的WhenEach流式传输任务结果的IAsyncEnumerable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 C#8 ,一种似乎特别有用的方法是 Task.WhenAll ,它返回 IAsyncEnumerable .此方法应在任务结果可用时立即对其进行流式处理,因此将其命名为WhenAll并没有多大意义. WhenEach听起来更合适.该方法的签名是:

I am trying to update my toolset with the new tools offered by C# 8, and one method that seems particularly useful is a version of Task.WhenAll that returns an IAsyncEnumerable. This method should stream the task results as soon as they become available, so naming it WhenAll doesn't make much sense. WhenEach sounds more appropriate. The signature of the method is:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

这种方法可以这样使用:

This method could be used like this:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

预期输出:

我设法使用方法 Task.WhenAny 循环运行,但是这种方法存在问题:

I managed to write a basic implementation using the method Task.WhenAny in a loop, but there is a problem with this approach:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

问题在于性能. Task.WhenAny实现创建提供的任务列表的防御性副本,因此在循环中重复调用它会导致O(n²)计算复杂性.我幼稚的实现难以处理10,000个任务.我的机器的开销将近10秒.我希望该方法的性能与内置Task.WhenAll几乎一样,可以轻松处理成千上万的任务.如何改善WhenEach方法以使其表现良好?

The problem is the performance. The implementation of the Task.WhenAny creates a defensive copy of the supplied list of tasks, so calling it repeatedly in a loop results in O(n²) computational complexity. My naive implementation struggles to process 10,000 tasks. The overhead is nearly 10 sec in my machine. I would like the method to be nearly as performant as the build-in Task.WhenAll, that can handle hundreds of thousands of tasks with ease. How could I improve the WhenEach method to make it performs decently?

推荐答案

通过使用这篇文章,您可以实现以下内容:

By using code from this article, you can implement the following:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

然后更改您的WhenEach以调用Interleaved代码

Then change your WhenEach to call the Interleaved code

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

然后您可以照常呼叫WhenEach

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

我做了一些基本的基准测试,处理了1万个任务,并在速度方面提高了5倍.

I did some rudimentary benchmarking with 10k tasks and performed 5 times better in terms of speed.

这篇关于如何实现一个高效的WhenEach流式传输任务结果的IAsyncEnumerable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 13:25