【.NET Core】深入理解任务并行库 (TPL)

一、概述

任务并行库(TPL)英文:Task Parallel LibrarySystem.ThreadingSystem.Threading.Tasks空间中的一组公共类型和API。TPL的目的是通过简化将并行和并发添加到应用程序的过程来提高开发人员的工作效率。TPL动态缩放并发的程度以最有效地使用所有可用的处理器。此外,TPL还处理工作分区,ThreadPool上的线程调度、取消支持、状态管理以及其他低级别的细节操作。通过使用TPL,你可以在将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。

在.NET Framework4中,首选TPL编写多线程代码和并行代码。但是,并不是所有代码都适合并行化。 例如,如果某个循环在每次迭代时只执行少量工作,或它在很多次迭代时都不运行,那么并行化的开销可能导致代码运行更慢。

二、数据并行(任务并行库)

数据并行指的是对源集合或数组的元素同时(既:并发)执行相同操作的场景。在数据并行操作中,对源集合进行分区,以便对个线程能够同时在不同的网段上操作。

任务并行库(TPL)支持通过System.Threading.Tasks.Parallel类实现的数据并行。此类对for循环和foreach循环提供了基于方法的并行执行。Parallel.ForParallel.ForEach循环编写的循环逻辑与编写循环的相似。无需创建线程或列工作项。在基本循环中,不需要加锁。TPL为你处理所有低级别的工作。

下面的代码示例演示了一个简单的 foreach 循环及其并行等效项。

//Sequential version
foreach(var item in sourceCollection)
{
    SetMethod(item);
}
// Parallel equivalent
Parallel.ForEach(sourceCollection, item => SetMethod(item));

并行循环运行时,TPL 将数据源进行分区,以便该循环可以同时对多个部分进行作用。 在后台,任务计划程序基于系统资源和工作负荷来划分任务。 如有可能,如果工作负荷变得不平衡了,计划程序将重新分配多个线程与处理器之间的工作。

三、Parallel.For 循环示例

static void Main(string[] args)
{
       Stopwatch stopwatch = new Stopwatch();
       stopwatch.Start();
       char[] charList = "在验证目录存在后它需要将单个目录路径作为参数,并报告该目录中文件的数量和总大小。".ToCharArray();
       Parallel.For(0, charList.Length,index =>{
                             Console.WriteLine($"index{index}");
                             Console.WriteLine($"CharList->Char:{charList[index]}");
       });
       stopwatch.Stop();
       Console.WriteLine($"Run Time  {stopwatch.ElapsedMilliseconds}");
       stopwatch.Start();
       foreach ( char c in charList )
       {
            Console.WriteLine($"CharList->Char:{c}");

       }
       stopwatch.Stop();
       Console.WriteLine($"Run Time  {stopwatch.ElapsedMilliseconds}");
       Console.WriteLine("Directory Run End");
       Console.ReadKey();
}

For的此重载的第三个参数的类型为Action<int>。不管Action委托具有零个、一个或十六个类型参数,它都始终返回void

四、Parallel.ForEach 循环示例

static void Main(string[] args)
{
     var limit = 200;
     var numbers = Enumerable.Range(0, limit).ToList();
     Parallel.ForEach(numbers, number =>
     {
         Console.WriteLine(number);
     });
     Console.ReadKey();
}

Parallel.ForEach循环的工作原理类似Parallel.For循环。该循环对源集合进行分区,并根据系统环境在多个线程上安排工作。系统上的处理器越多,并行方法的运行速度就越快。对于一些源集合,有序循环可能会更快,具体根据源大小以及该循环要执行的工作类型而定。

五、处理并行循环中的异常

Parallel.ForParallel.ForEach重载没有任何用于处理可能引发异常的特性机制。在这一方面,他们类似于常规forforeach循环;未处理的异常会导致循环在当前运行的迭代完成后立即终止。

向并行循环添加自己的异常处理逻辑时,将处理类似于在多个线程上同时引发相似异常的情况,以及一个线程上引发异常导致另一个线程上引发另一个异常的情况。你可以通过将循环中的所有异常包装到一个System.AggregateException中处理这两种情况。

static void Main(string[] args)
{
     byte[] data = new byte[20];
     Random r = Random.Shared;
     r.NextBytes(data);
     var exceptions = new ConcurrentQueue<Exception>();
     Parallel.ForEach(data, d =>
     {
        try
        {
            if (d < 3) throw new ArgumentException($"Value is {d}. Value must be greater than or equal to 3.");
            else Console.Write(d + " ");
        }
        catch (Exception e)
        {
            exceptions.Enqueue(e);
        }
      });
      Console.WriteLine();
      if (!exceptions.IsEmpty)
      {
        throw new AggregateException(exceptions);
      }
   }
}

六、数据并行总结

在许多情况下,与普通的顺序循环相比,Parallel.ForParallel.ForEach可以显著提升性能。但是,对循环进行并行的工作增加了复杂性,可能会导致在顺序代码中出现不常见或根本不会遇到的问题。

6.1 不要假定并行的速度始终更快

在某些情况下,并行循环可能比它等效的顺序循环的运行速度更慢。 基本的经验法则是具有较少迭代和快速用户委托的并行循环未必会快很多。 但是,由于性能会涉及到很多因素,因此我们建议始终衡量实际的结果。

6.2 不要假定并行的速度始终更快

在顺序代码中,从静态变量或类字段中读取或写入静态变量或类字段的情况很常见。 但是,每当多个线程同时访问此类变量时,则很有可能会出现争用条件。 即使可以使用锁来同步对变量的访问,但同步开销可能会对性能造成损害。 因此,我们建议尽可能地避免在一个并行循环中访问共享状态,或至少限制对共享状态的访问。

6.3 避免过度并行化

通过使用并行循环,将会产生对源集合进行分区和同步工作线程的开销成本。 计算机上的处理器数量进一步限制了并行化的优点。 仅在一个处理器上运行多个受计算限制的线程时,速度并不会得到提升。 因此,必须要小心,不要对循环进行过度并行化。

在嵌套的循环中,最有可能发生过度并行化的情况。 在大多数情况下,除非满足以下一个或多个条件,否则最好仅对外部循环进行并行化:

  • 已知内部循环非常长。
  • 正在对每个订单执行开销极大的计算。
  • 已知目标系统具有足够的处理器来处理通过对 cust.Orders 上的查询进行并行化所产生的线程数。

6.4 避免调用非线程安全方法

如果从并行循环中写入非线程安全实例方法,可能会导致出现程序可能检测到也可能检测不到的数据损坏。 还可能会导致异常。

6.5 避免调用非线程安全方法

.NET 中的大多数静态方法是线程安全的,并且可以同时从多个线程中调用。 但是,即使在这些情况下,所涉及到的同步也可能会导致查询速度大幅度下降。

01-26 11:33