TaskScheduler

从字面意思理解就是任务调度器,即将任务排队到线程中执行的管理器。并且能控制最大的并行任务数量。
其本身是一个抽象类,但是提供了一些静态公共属性与方法,先理解下这些。

公共属性及方法

Current

即获取当前的任务调度器

Default

以默认的任务调度器实例化一个TaskScheduler

Id

获取Id标识

FromCurrentSynchronizationContext()

获取当前的同步上下文SynchronizationContext

event UnobservedTaskException

事件:未能被正常捕获的异步中的异常触发事件,即一个Task执行后没有被合适的管理起来,就像是导弹发射出去后就不在管理了的时候。说的简单点,就是当异步Task没有被await、Wait()、Result处理时将不会触发;
并且这种异常事件不会立刻触发,需要强制GC回收才能触发。
其次,这个事件仅在release模式下有作用,debug会在throw异常的时候直接抛出异常结束任务。

GC.Collect();
GC.WaitForPendingFinalizers();

使用示例如下:

static void ExceptionTest()
{
    TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
    Task.Factory.StartNew(() =>
    {
        throw new Exception("一个错误");
    });

    Thread.Sleep(3000);//确保task执行完了,但是不使用await的处理
    GC.Collect();
    GC.WaitForPendingFinalizers();
    Console.WriteLine("end");
    Console.ReadKey();
}

private static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
    Console.WriteLine("UnobservedTaskException:" + e.Exception.InnerException.Message);
}

运行输出如下:

UnobservedTaskException:一个错误
end

如果在release模式下,不使用该事件,程序将正常运行知道结束。
当触发异常时如果需要中断程序,需在程序配置中设置

<configuration>   
   <runtime>   
      <ThrowUnobservedTaskExceptions enabled="true"/>   
   </runtime>   
</configuration>  

其他属性与方法

MaximumConcurrencyLevel

public virtual int MaximumConcurrencyLevel { get; }

对于实例化的调度器对象,可以通过MaximumConcurrencyLevel获取最大并发数量。
也可在继承类中重载该属性,设置自定义的最大并发数量。

IEnumerable? GetScheduledTasks()

获取已排队的所有任务,为抽象方法,继承类需要实现该方法。需要注意的是它仅对于调试器(Debug)支持。
返回值IEnumerable是一个允许调试器遍历当前排队到此计划程序中的任务的枚举。
请务必注意,调用此方法时,进程中的所有其他线程都将冻结。 因此,请务必避免与其他可能导致阻塞的线程同步。 如果需要同步,并且无法获取此方法中的锁,则应引发异常,以便调试器不会阻止。

protected override IEnumerable<Task> GetScheduledTasks()  
{  
    bool lockTaken = false;  
    try  
    {  
        Monitor.TryEnter(_syncObj, ref lockTaken);  
        if (lockTaken)  
        {  
            return _tasks.ToArray();  
        }  
        else throw new NotSupportedException();  
    }  
    finally  
    {  
        if (lockTaken) Monitor.Exit(_syncObj);  
	}
}

QueueTask(Task task)

将Task安排到线程中执行,这里以官方文档示例简单说明:
https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.taskscheduler?view=net-6.0

protected sealed override void QueueTask(Task task)
{
    lock (_tasks)
    {
        _tasks.AddLast(task);
        if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
        {
            ++_delegatesQueuedOrRunning;
            NotifyThreadPoolOfPendingWork();
        }
    }
}

private void NotifyThreadPoolOfPendingWork()
{
    ThreadPool.UnsafeQueueUserWorkItem(_ =>
    {
        _currentThreadIsProcessingItems = true;
        try
        {
            while (true)
            {
                Task item;
                lock (_tasks)
                {
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }
                base.TryExecuteTask(item);
            }
        }
        finally { _currentThreadIsProcessingItems = false; }
    }, null);
}

_tasks是类内任务集合
_delegatesQueuedOrRunning和_maxDegreeOfParallelism为内部成员,分别是正在运行的任务数量和最大并发数量。
QueueTask实际上是并行运行的,外部一次StartNew()就会执行一行,没执行一次_tasks就添加当次的Task对象。保证在这个调度器下创建的所有task均在内部成员_tasks中。
当没添加一个后_delegatesQueuedOrRunning自增,通过ThreadPool开辟线程,并从_tasks中取出第一个执行。
TryExecuteTask()是同步方法,即任务执行完毕后才会执行之后代码,所以while循环中的内容就是从_tasks中取出然后执行。
示例中的QueueTask实现,实际上_delegatesQueuedOrRunning不小于_maxDegreeOfParallelism时后续的task只做add处理了,所有实际上由_maxDegreeOfParallelism个NotifyThreadPoolOfPendingWork的实例(或副本)或者说内部的while循环在运行,当有每一个while循环,或者说每一个线程无法再从_tasks中取出task时光_delegatesQueuedOrRunning就自减,并结束循环。
最后,官方文档中有提到ConcurrentQueue,即前面提到的_tasks如果使用ConcurrentQueue类型,将更加方便高效。

TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued);

尝试以内连的方式运行task,该如何理解这个内敛呢?
官方对这个方法的说明是这样:

通过查看源码,这个方法的调用主要是在Task.Wait()、WaitAll()的时候。个人理解的意思是这样,这些方法是将异步转同步的,即异步方法不结束,主线程就阻塞的,即我让小明搬100块砖,Wait小明的时候我就是闲着的,尝试内联就是自己试试能不也参与搬砖,如果我不忙,那么我也参与搬砖,就等于两个人同时搬砖了。try的意思就是当前Wait()的线程不一定允许参与执行task,如果不可以那么还是由指定的线程执行,这样在一定程度上能够多一个并发数。
参考微软官方的例子,自定义一个任务调度器,然后执行如下任务:

static void Main()
{
    List<Task> ts = new List<Task>();
    MyTaskScheduler scheduler = new MyTaskScheduler(3);
    TaskFactory taskFactory = new TaskFactory(scheduler);
    for (int i = 0; i < 10; i++)
    {
        Task t = taskFactory.StartNew((n) =>
        {
            int s = 2000;
            Thread.Sleep(s);
            Console.WriteLine(">task threadid = {0},taskid = {1} index = {2}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId,n);
           
        }, i);
        ts.Add(t);
    }
    Console.WriteLine("wait -> threadid={0}", Thread.CurrentThread.ManagedThreadId);
    Task.WaitAll(ts.ToArray());
    Console.WriteLine("end");
}

TryExecuteTaskInline方法内直接return false时,输出如下:

wait -> threadid=1
>task threadid = 5,taskid = 2 index = 1
>task threadid = 3,taskid = 1 index = 0
>task threadid = 7,taskid = 3 index = 2
>task threadid = 3,taskid = 5 index = 4
>task threadid = 5,taskid = 4 index = 3
>task threadid = 7,taskid = 6 index = 5
>task threadid = 5,taskid = 8 index = 7
>task threadid = 3,taskid = 7 index = 6
>task threadid = 7,taskid = 9 index = 8
>task threadid = 5,taskid = 10 index = 9
end

任务是三个三个执行的,主线程id为1,而所有任务在3、5、7线程上执行。
把TryExecuteTaskInline方法换成如下:

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    //如果当前还有线程在运行,就尝试内联执行,否则就返回
    if (!_currentThreadIsProcessingItems) return false;
    //输出task信息
    Console.WriteLine("tryInLine TaskId={0},,PreviouslyQueued={1}", task.Id, taskWasPreviouslyQueued.ToString());
    if (taskWasPreviouslyQueued)//如果被排队了
        if (TryDequeue(task))//从队列中移除(带lock校验的,可能不成功)
            return base.TryExecuteTask(task);//如果移除了就在当前线程执行
        else
            return false;
    else
        return base.TryExecuteTask(task);
}

运行结果如下:

wait -> threadid=1
tryInLine TaskId=10,,PreviouslyQueued=True
>task threadid = 1,taskid = 10 index = 9
>task threadid = 7,taskid = 3 index = 2
tryInLine TaskId=9,,PreviouslyQueued=True
>task threadid = 3,taskid = 1 index = 0
>task threadid = 4,taskid = 2 index = 1
>task threadid = 1,taskid = 9 index = 8
>task threadid = 4,taskid = 6 index = 5
>task threadid = 3,taskid = 5 index = 4
tryInLine TaskId=8,,PreviouslyQueued=True
>task threadid = 7,taskid = 4 index = 3
>task threadid = 4,taskid = 7 index = 6
>task threadid = 3,taskid = 8 index = 7
end

任务是四个四个执行,部分任务的执行线程与主线程一致,并且内连执行是从任务列表的后向前执行的,这样就很好理解了,我找了三个人搬砖,Wait的时候我没事干,我也帮忙搬砖,但总不能影响搬砖工的工作,只能从砖摞后面开始,这样汇聚的时候就两边人就都知道结束了,也不会和别人有干扰。
当然,当甩手掌柜,直接return false也是可以的。

TaskScheduler的内部实现

TaskScheduler本身是一个抽象类,除了前面说的一些静态方法外,再来看下内部实现。
源码中只有三处实现:
ThreadPoolTaskScheduler、SynchronizationContextTaskScheduler、ConcurrentExclusiveTaskScheduler
源码位置:https://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/ThreadPoolTaskScheduler.cs,b76a4a6f77962f28,references

ThreadPoolTaskScheduler

namespace System.Threading.Tasks
{
    /// <summary>
    /// An implementation of TaskScheduler that uses the ThreadPool scheduler
    /// </summary>
    internal sealed class ThreadPoolTaskScheduler: TaskScheduler
    {
        /// <summary>
        /// Constructs a new ThreadPool task scheduler object
        /// </summary>
        internal ThreadPoolTaskScheduler()
        {
            int id = base.Id; // force ID creation of the default scheduler
        }
 
        // static delegate for threads allocated to handle LongRunning tasks.
        private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(LongRunningThreadWork);
 
        private static void LongRunningThreadWork(object obj)
        {
            Contract.Requires(obj != null, "TaskScheduler.LongRunningThreadWork: obj is null");
            Task t = obj as Task;
            Contract.Assert(t != null, "TaskScheduler.LongRunningThreadWork: t is null");
            t.ExecuteEntry(false);
        }
 
        /// <summary>
        /// Schedules a task to the ThreadPool.
        /// </summary>
        /// <param name="task">The task to schedule.</param>
        [SecurityCritical]
        protected internal override void QueueTask(Task task)
        {
            if ((task.Options & TaskCreationOptions.LongRunning) != 0)
            {
                // Run LongRunning tasks on their own dedicated thread.
                Thread thread = new Thread(s_longRunningThreadWork);
                thread.IsBackground = true; // Keep this thread from blocking process shutdown
                thread.Start(task);
            }
            else
            {
                // Normal handling for non-LongRunning tasks.
                bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
                ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
            }
        }
        
        /// <summary>
        /// This internal function will do this:
        ///   (1) If the task had previously been queued, attempt to pop it and return false if that fails.
        ///   (2) Propagate the return value from Task.ExecuteEntry() back to the caller.
        /// 
        /// IMPORTANT NOTE: TryExecuteTaskInline will NOT throw task exceptions itself. Any wait code path using this function needs
        /// to account for exceptions that need to be propagated, and throw themselves accordingly.
        /// </summary>
        [SecurityCritical]
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If the task was previously scheduled, and we can't pop it, then return false.
            if (taskWasPreviouslyQueued && !ThreadPool.TryPopCustomWorkItem(task))
                return false;
 
            // Propagate the return value of Task.ExecuteEntry()
            bool rval = false;
            try
            {
                rval = task.ExecuteEntry(false); // handles switching Task.Current etc.
            }
            finally
            {
                //   Only call NWIP() if task was previously queued
                if(taskWasPreviouslyQueued) NotifyWorkItemProgress();
            }
 
            return rval;
        }
 
        [SecurityCritical]
        protected internal override bool TryDequeue(Task task)
        {
            // just delegate to TP
            return ThreadPool.TryPopCustomWorkItem(task);
        }
 
        [SecurityCritical]
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return FilterTasksFromWorkItems(ThreadPool.GetQueuedWorkItems());
        }
 
        private IEnumerable<Task> FilterTasksFromWorkItems(IEnumerable<IThreadPoolWorkItem> tpwItems)
        {
            foreach (IThreadPoolWorkItem tpwi in tpwItems)
            {
                if (tpwi is Task)
                {
                    yield return (Task)tpwi;
                }
            }
        }
 
        /// <summary>
        /// Notifies the scheduler that work is progressing (no-op).
        /// </summary>
        internal override void NotifyWorkItemProgress()
        {
            ThreadPool.NotifyWorkItemProgress();
        }
 
        /// <summary>
        /// This is the only scheduler that returns false for this property, indicating that the task entry codepath is unsafe (CAS free)
        /// since we know that the underlying scheduler already takes care of atomic transitions from queued to non-queued.
        /// </summary>
        internal override bool RequiresAtomicStartTransition
        {
            get { return false; }
        }
    }
}

从源码来看,这是internal sealed的类,通过查找引用,其只有一处使用,即在TaskScheduler中。
默认的Task多线程均是使用该类来管理的,他通过线程池进行调度,将长任务(TaskCreationOptions.LongRunning)开辟新的线程执行,其他的使用ThreadPool.UnsafeQueueCustomWorkItem执行。

SynchronizationContextTaskScheduler

internal sealed class SynchronizationContextTaskScheduler : TaskScheduler
    {
        private SynchronizationContext m_synchronizationContext;
 
        /// <summary>
        /// Constructs a SynchronizationContextTaskScheduler associated with <see cref="T:System.Threading.SynchronizationContext.Current"/> 
        /// </summary>
        /// <exception cref="T:System.InvalidOperationException">This constructor expects <see cref="T:System.Threading.SynchronizationContext.Current"/> to be set.</exception>
        internal SynchronizationContextTaskScheduler()
        {
            SynchronizationContext synContext = SynchronizationContext.Current;
 
            // make sure we have a synccontext to work with
            if (synContext == null)
            {
                throw new InvalidOperationException(Environment.GetResourceString("TaskScheduler_FromCurrentSynchronizationContext_NoCurrent"));
            }
 
            m_synchronizationContext = synContext;
 
        }
 
        /// <summary>
        /// Implemetation of <see cref="T:System.Threading.Tasks.TaskScheduler.QueueTask"/> for this scheduler class.
        /// 
        /// Simply posts the tasks to be executed on the associated <see cref="T:System.Threading.SynchronizationContext"/>.
        /// </summary>
        /// <param name="task"></param>
        [SecurityCritical]
        protected internal override void QueueTask(Task task)
        {
            m_synchronizationContext.Post(s_postCallback, (object)task);
        }
 
        /// <summary>
        /// Implementation of <see cref="T:System.Threading.Tasks.TaskScheduler.TryExecuteTaskInline"/>  for this scheduler class.
        /// 
        /// The task will be executed inline only if the call happens within 
        /// the associated <see cref="T:System.Threading.SynchronizationContext"/>.
        /// </summary>
        /// <param name="task"></param>
        /// <param name="taskWasPreviouslyQueued"></param>
        [SecurityCritical]
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            if (SynchronizationContext.Current == m_synchronizationContext)
            {
                return TryExecuteTask(task);
            }
            else
                return false;
        }
 
        // not implemented
        [SecurityCritical]
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return null;
        }
 
        /// <summary>
        /// Implementes the <see cref="T:System.Threading.Tasks.TaskScheduler.MaximumConcurrencyLevel"/> property for
        /// this scheduler class.
        /// 
        /// By default it returns 1, because a <see cref="T:System.Threading.SynchronizationContext"/> based
        /// scheduler only supports execution on a single thread.
        /// </summary>
        public override Int32 MaximumConcurrencyLevel
        {
            get
            {
                return 1;
            }
        }
 
        // preallocated SendOrPostCallback delegate
        private static SendOrPostCallback s_postCallback = new SendOrPostCallback(PostCallback);
 
        // this is where the actual task invocation occures
        private static void PostCallback(object obj)
        {
            Task task = (Task) obj;
 
            // calling ExecuteEntry with double execute check enabled because a user implemented SynchronizationContext could be buggy
            task.ExecuteEntry(true);
        }
    }

从构造函数可以看出必须在当前同步上下文不为空时使用。
因为是internal sealed类,无法直接使用,唯一的使用方法是通过TaskScheduler.FromCurrentSynchronizationContext()实例化。
内部的Task的执行方式实际上是通过同步上下文的异步post执行的:SynchronizationContext.Post();

ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair)

Concurrent是“并发”的意思。Exclusive是“独占”的意思。ConcurrentExclusiveTaskScheduler是内置的并发独占任务调度器,实例化时通过ConcurrentExclusiveSchedulerPair的构造函数实现,其所有的构造函数走的都是同一个构造函数。

public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
{
    // Validate arguments
    if (taskScheduler == null) throw new ArgumentNullException("taskScheduler");
    if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
    if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException("maxItemsPerTask");
    Contract.EndContractBlock();

    // Store configuration
    m_underlyingTaskScheduler = taskScheduler;
    m_maxConcurrencyLevel = maxConcurrencyLevel;
    m_maxItemsPerTask = maxItemsPerTask;

    // Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level
    int mcl = taskScheduler.MaximumConcurrencyLevel;
    if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl;

    // Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't
    // have to special case UNLIMITED_PROCESSING later on in processing.
    if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue;
    if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue;

    // Create the concurrent/exclusive schedulers for this pair
    m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask);
    m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks);
}

它在内部会创建两个ConcurrentExclusiveTaskScheduler调度器(所以类型是**Pair),独占的任务调度器m_exclusiveTaskScheduler(最大并发等级1)和并发任务调度器m_concurrentTaskScheduler。

自定义TaskScheduler

TaskScheduler是一个抽象类,需要自定义新的调度器重载它。

public class MyTaskScheduler : TaskScheduler
{
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        throw new NotImplementedException();
    }

    protected override void QueueTask(Task task)
    {
        throw new NotImplementedException();
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
}

默认必须重载的方法有上面三个。
通过前文对这几个方法的说明,以实际示例展示如何自定义一个可设定最大并发数量的多线程调度器。
(以下实例仅供参考)

public class LimitedConcurrencyTaskScheduler : TaskScheduler
{
    //任务队列
    private LinkedList<Task> _tasks = new LinkedList<Task>();
    //最大并发数量
    private readonly int _maxThreadCount;
    //当前运行的线程数
    private int _aliveThreadCount = 0;
    //有线程在忙
    private bool _anyThreadWorking;
    /// <summary>
    /// 是否正在运行
    /// </summary>
    public bool IsBusy => _aliveThreadCount > 0;
    /// <summary>
    /// 最大并发数量
    /// </summary>
    public override int MaximumConcurrencyLevel => _maxThreadCount;

    public LimitedConcurrencyTaskScheduler()
    {
        int num = Environment.ProcessorCount / 2;
        _maxThreadCount = ((num < 1) ? 1 : num);
    }

    public LimitedConcurrencyTaskScheduler(int maxThreadCount)
    {
        _maxThreadCount = ((maxThreadCount < 1) ? 1 : maxThreadCount);
    }

    protected override void QueueTask(Task task)
    {
        lock (_tasks)
        {
            //加入队列
            _tasks.AddLast(task);
            if (_aliveThreadCount < _maxThreadCount)
            {
                //激活线程数小于最大并发限制时开新线程
                Thread thread = new Thread(ThreadWorkItem)
                {
                    IsBackground = true,
                    Priority = ThreadPriority.Normal
                };
                //激活线程数自增并启动线程
                Interlocked.Increment(ref _aliveThreadCount);
                if (!thread.IsAlive)
                {
                    thread.Start();
                }
            }
        }
            
    }

    /// <summary>
    /// 每个线程的处理内容
    /// </summary>
    private void ThreadWorkItem()
    {
        try
        {//不断从队列中取task并执行
            _anyThreadWorking = true;
            while (true)
            {
                Task task;
                lock (_tasks)
                {
                    if (_tasks.Count == 0)
                    {
                        break;
                    }
                    task = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }
                base.TryExecuteTask(task);
            }
        }
        finally
        {
            //队列清空后结束当前线程的工作
            _anyThreadWorking = false;
            Interlocked.Decrement(ref _aliveThreadCount);
        }
    }

    /// <summary>
    /// debug调试时获取
    /// </summary>
    /// <returns></returns>
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;//只能返回目前还未执行的
    }
    /// <summary>
    /// 尝试移除
    /// </summary>
    /// <param name="task"></param>
    /// <returns></returns>
    protected sealed override bool TryDequeue(Task task)
    {
        lock (_tasks) return _tasks.Remove(task);
    }
    /// <summary>
    /// 尝试以内联的方式运行
    /// </summary>
    /// <param name="task"></param>
    /// <param name="taskWasPreviouslyQueued"></param>
    /// <returns></returns>
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        //如果任务正在执行,直接返回
        if (!_anyThreadWorking) return false;
        //输出task信息
        Console.WriteLine("tryInLine TaskId={0},,PreviouslyQueued={1}", task.Id, taskWasPreviouslyQueued.ToString());

        bool dequeuedAndNoExcuted = false;
        bool inLine = false;
        if (taskWasPreviouslyQueued)//如果被排队了
            if (TryDequeue(task))//从队列中移除(带lock校验的,可能不成功)
            {
                inLine = base.TryExecuteTask(task);//如果移除了就在当前线程执行
                dequeuedAndNoExcuted = !inLine;
            }
            else
                inLine = false;
        else
            inLine = base.TryExecuteTask(task);

        if (dequeuedAndNoExcuted)
        {
            lock (_tasks) 
                _tasks.AddLast(task);                
        }
        return inLine;
    }
}
12-07 11:41