本文介绍了在Task.WhenAll中执行许多任务时,C#线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我在一个线程上执行此操作,将会发生什么情况

 await Task.WhenAll(items.select(x => SomeAsyncMethod(x)))

// Where SomeAsyncMethod is defined like this (writeAsync is pure async io)
async Task SomeAsyncMethod(Item item){
  await myDevice.writeAsync(...).ConfigureAwait(false);
  //do some cpu intensive stuff...
}
 

,并说items中有10.000个项目.当每个SomeAsyncMethod在等待后继续执行时,它将在线程池中的线程上执行此操作.因此,当许多SomeAsyncsMethod返回值时,线程池中的多个线程会同时被占用,还是在这种情况下,只有一个线程在任何给定的时刻执行SomeAsyncMethod中的"CPU密集型内容"? >

更新:好的,这是一个示例程序.当我在具有8个逻辑内核的PC上进行测试时,则minthreads为12或13,而maxthreads在35-40范围内结束.因此,似乎将为逻辑核心创建多达4个线程.创建10.000或100.000文件无关紧要-使用相同的最大线程数-也许是因为所有任务都排队等待访问文件系统吗?请注意,该程序将在c:\ tmp \ asynctest中创建许多小文件:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            for (var i = 0; i < 10000; i++) {
                ints.Add(i);
            }
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync(i.ToString())));
            task.Wait();
            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }
    }

    public class MyDevice {
        public ConcurrentDictionary<string, string> ThreadIds;
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}

更新2.现在,如果我启动多个线程,每个线程同时执行许多aysnc io任务,那么与更新1中的单线程示例相比,似乎总共使用了更多线程.运行时,每个线程由4个线程创建10.000个文件,然后最大线程为41个线程,最小线程为12个线程-因此似乎可以集中控制多少个线程用于异步任务继续.这是一个示例,其中4个线程每个线程开始10.000个异步操作:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            const int limit = 10000;
            for (var i = 0; i < limit; i++) {
                ints.Add(i);
            }

            List<Task> jobs = new List<Task>();
            for (var j = 0; j < 4*limit; j+=limit) {
                var jobid = j;
                jobs.Add(Task.Run(() => Runjob(ints, myDevice, jobid)));
            }
            Task.WaitAll(jobs.ToArray());

            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }

        private static void Runjob(List<int> ints, MyDevice myDevice, int jobid) {
            Console.WriteLine("Starting job " + jobid);
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync((jobid+i).ToString())));
            task.Wait();
            Console.WriteLine("Finished job " + jobid);
        }
    }

    public class MyDevice {
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}
解决方案

最可能的情况是,"CPU密集型内容"将分别在随机线程池线程上发生-如果它确实受CPU限制,则每个逻辑核心将获得大约1-2个线程.

关键点是,虽然继续执行原始任务(Task.WhenAll)将在UI线程上运行(当然,如果 是同步上下文),但继续执行单个I/O操作将发布在线程池上,因为您明确要求忽略同步上下文(ConfigureAwait(false)).

但是,如果I/O请求同步完成,那么一切也有可能在原始线程上运行.在这种情况下,将不执行异步调度,并且任务没有机会切换线程.如果需要确保并行化,则必须显式使用Task.Run.

还应注意,这主要取决于实现,而不是您可以依赖的东西.对于大量异步I/O应用程序来说,这可能也是一种不好的方法,因为您可能正在I/O线程池中的线程上运行CPU密集型程序-干扰了线程池中线程的框架平衡,并阻止新的异步响应通过,直到您完成工作.如果您正在执行的工作不是纯粹的CPU工作,则尤其如此-例如,在Web服务器之类的线程池线程上进行阻塞可能会非常痛苦.

What will happen if, on a single thread, I do this:

await Task.WhenAll(items.select(x => SomeAsyncMethod(x)))

// Where SomeAsyncMethod is defined like this (writeAsync is pure async io)
async Task SomeAsyncMethod(Item item){
  await myDevice.writeAsync(...).ConfigureAwait(false);
  //do some cpu intensive stuff...
}

and say there are 10.000 items in items. When each of the SomeAsyncMethod continue after the await, then it does so on a thread from the thread pool. So when many of the SomeAsyncsMethods return will several threads from the thread pool then be taken simultaneously, or will only a single thread execute "do some CPU intensive stuff" in SomeAsyncMethod at any given moment in this case?

UPDATE: Ok here is a sample program. When I test this on a PC with 8 logical cores, then minthreads is 12 or 13 and maxthreads ends in the 35-40 range. So it looks as if up to 4 threads will be created pr logical core. It does not matter if 10.000 or 100.000 files are created - the same max number of threads is used - maybe this is because all the tasks queue up waiting for access to the file system ?. Please note, that this program will create lots of small files in c:\tmp\asynctest:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            for (var i = 0; i < 10000; i++) {
                ints.Add(i);
            }
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync(i.ToString())));
            task.Wait();
            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }
    }

    public class MyDevice {
        public ConcurrentDictionary<string, string> ThreadIds;
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}

Update 2. Now if I start multiple threads that each do lots of aysnc io tasks simultaneously, then it does not look as if more threads in total are used compared to the single threaded example in update 1. In the test I just ran, where 10.000 files are created each by 4 threads, then max threads was 41 and min threads 12 - so there seems to be some central control of how many threads are used for async task continuations. Here is a the example where 4 threads starts 10.000 async operations each:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            const int limit = 10000;
            for (var i = 0; i < limit; i++) {
                ints.Add(i);
            }

            List<Task> jobs = new List<Task>();
            for (var j = 0; j < 4*limit; j+=limit) {
                var jobid = j;
                jobs.Add(Task.Run(() => Runjob(ints, myDevice, jobid)));
            }
            Task.WaitAll(jobs.ToArray());

            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }

        private static void Runjob(List<int> ints, MyDevice myDevice, int jobid) {
            Console.WriteLine("Starting job " + jobid);
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync((jobid+i).ToString())));
            task.Wait();
            Console.WriteLine("Finished job " + jobid);
        }
    }

    public class MyDevice {
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}
解决方案

The most likely scenario is that the "CPU instensive stuff" will each happen on a random thread-pool thread - and if it's truly CPU-bound, you'll get about 1-2 threads per a logical core doing the work.

The key point is that while the continuation to the original task (Task.WhenAll) will run back on the UI thread (if there is a synchronization context, of course), the continuations to the individual I/O operations will be posted on the thread-pool, since you explicitly requested the synchronization context to be ignored (ConfigureAwait(false)).

However, there is also a chance that everything will run on the original thread - if the I/O request completes synchronously. In that case, no asynchronous dispatch is done, and there is no opportunity for the tasks to switch threads. If you need to ensure parallelisation, you must use Task.Run explicitly.

It should also be noted that this is mostly implementation dependent, and not something you can rely on. It might also be a bad approach for a heavily asynchronous-I/O application, since you might be running the CPU intensive stuff on a thread from the I/O thread-pool - disturbing the framework balancing of threads in the thread-pool, and preventing new asynchronous responses from coming through until you're finished with your work. This is especially true if the work you're doing isn't pure CPU work - blocking on a thread-pool thread can be quite painful on something like a web server, for example.

这篇关于在Task.WhenAll中执行许多任务时,C#线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-24 16:48