- A+
前言
看过不少关于 await 的原理的文章,也知道背后是编译器给转成了状态机实现的,但是具体是怎么完成的,回调又是如何衔接的,一直都没有搞清楚,这次下定决心把源码自己跑了下,终于豁然开朗了
本文的演示代码基于 VS2022 + .NET 6
示例
public class Program { static int Work() { Console.WriteLine("In Task.Run"); return 1; } static async Task TestAsync() { Console.WriteLine("Before Task.Run"); await Task.Run(Work); Console.WriteLine("After Task.Run"); } static void Main() { _ = TestAsync(); Console.WriteLine("End"); Console.ReadKey(); } }
- 很简单的异步代码,我们来看下,编译器把它变成了啥
class Program { static int Work() { Console.WriteLine("In Task.Run"); return 1; } static Task TestAsync() { var stateMachine = new StateMachine() { _builder = AsyncTaskMethodBuilder.Create(), _state = -1 }; stateMachine._builder.Start(ref stateMachine); return stateMachine._builder.Task; } static void Main() { _ = TestAsync(); Console.WriteLine("End"); Console.ReadKey(); } class StateMachine : IAsyncStateMachine { public int _state; public AsyncTaskMethodBuilder _builder; private TaskAwaiter<int> _awaiter; void IAsyncStateMachine.MoveNext() { int num = _state; try { TaskAwaiter<int> awaiter; if (num != 0) { Console.WriteLine("Before Task.Run"); awaiter = Task.Run(Work).GetAwaiter(); if (!awaiter.IsCompleted) { _state = 0; _awaiter = awaiter; StateMachine stateMachine = this; _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); return; } } else { awaiter = _awaiter; _awaiter = default; _state = -1; } awaiter.GetResult(); Console.WriteLine("After Task.Run"); } catch (Exception exception) { _state = -2; _builder.SetException(exception); return; } _state = -2; _builder.SetResult(); } void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { } } }
- 编译后的代码经过我的整理,命名简化了,更容易理解
状态机实现
-
我们看到实际是生成了一个隐藏的状态机类
StateMachine
-
把状态机的初始状态
_state
设置 -1 -
stateMachine._builder.Start(ref stateMachine);
启动状态机,内部实际调用的就是状态机的MoveNext
方法 -
Task.Run
创建一个任务, 把委托放在Task.m_action
字段,丢到线程池,等待调度 -
任务在线程池内被调度完成后,是怎么回到这个状态机继续执行后续代码的呢?
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
就是关键了, 跟下去,到了如下的代码:if (!this.AddTaskContinuation(stateMachineBox, false)) { ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, true); } bool AddTaskContinuation(object tc, bool addBeforeOthers) { return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers)); }
- 这里很清楚的看到,尝试把状态机对象(实际是状态机的包装类),赋值到
Task.m_continuationObject
, 如果操作失败,则把状态机对象丢进线程池等待调度,这里为什么这么实现,看一下线程池是怎么执行的就清楚了
- 这里很清楚的看到,尝试把状态机对象(实际是状态机的包装类),赋值到
线程池实现
- .NET6 的线程池实现,实际是放到了
PortableThreadPool
, 具体调试步骤我就不放了,直接说结果就是, 线程池线程从任务队列中拿到任务后都执行了DispatchWorkItem
方法
static void DispatchWorkItem(object workItem, Thread currentThread) { Task task = workItem as Task; if (task != null) { task.ExecuteFromThreadPool(currentThread); return; } Unsafe.As<IThreadPoolWorkItem>(workItem).Execute(); } virtual void ExecuteFromThreadPool(Thread threadPoolThread) { this.ExecuteEntryUnsafe(threadPoolThread); }
-
我们看到, 线程池队列中的任务都是 object 类型的, 这里进行了类型判断, 如果是 Task , 直接执行
task.ExecuteFromThreadPool
, 更有意思的这个方法是个虚方法,后面说明 -
ExecuteFromThreadPool
继续追下去,我们来到了这里,代码做了简化private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null) { this.InnerInvoke(); this.Finish(true); } virtual void InnerInvoke() { Action action = this.m_action as Action; if (action != null) { action(); return; } }
-
很明显
this.InnerInvoke
就是执行了最开始Task.Run(Work)
封装的委托了, 在m_action
字段 -
this.Finish(true);
跟下去会发现会调用FinishStageTwo
设置任务的完成状态,异常等, 继续调用FinishStageThree
就来了重点:FinishContinuations
这个方法就是衔接后续回调的核心internal void FinishContinuations() { object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel); if (obj != null) { this.RunContinuations(obj); } }
-
还记得状态机实现么,
Task.m_continuationObject
字段实际存储的就是状态机的包装类,这里线程池线程也会判断这个字段有值的话,就直接使用它执行后续代码了void RunContinuations(object continuationObject) { var asyncStateMachineBox = continuationObject as IAsyncStateMachineBox; if (asyncStateMachineBox != null) { AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag2); return; } } static void RunOrScheduleAction(IAsyncStateMachineBox box, bool allowInlining) { if (allowInlining && AwaitTaskContinuation.IsValidLocationForInlining) { box.MoveNext(); return; } }
总结
Task.Run
创建Task
, 把委托放在m_action
字段, 把Task
压入线程池队列,等待调度_builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
尝试把状态机对象放在Task.m_continuationObject
字段上,等待线程池线程调度完成任务后使用(用来执行后续),若操作失败,直接把状态机对象压入线程池队列,等待调度- 线程池线程调度任务完成后,会判断
Task.m_continuationObject
有值,直接执行它的MoveNext
备注
-
状态机实现中,尝试修改
Task.m_continuationObject
,可能会失败,
就会直接把状态机对象压入线程池, 但是线程池调度,不都是判断是不是Task
类型么, 其实状态机的包装类是Task
的子类,哈哈,是不是明白了class AsyncStateMachineBox<TStateMachine> : Task<TResult>, IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine static void DispatchWorkItem(object workItem, Thread currentThread) { Task task = workItem as Task; if (task != null) { task.ExecuteFromThreadPool(currentThread); return; } Unsafe.As<IThreadPoolWorkItem>(workItem).Execute(); }
- 还有就是状态机包装类,重写了
Task.ExecuteFromThreadPool
,所以线程池调用task.ExecuteFromThreadPool
就是直接调用了状态机的MoveNext
了, Soga ^_^override void ExecuteFromThreadPool(Thread threadPoolThread) { this.MoveNext(threadPoolThread); }
参考链接
- 关于线程池和异步的更深刻的原理,大家可以参考下面的文章
概述 .NET 6 ThreadPool 实现: https://www.cnblogs.com/eventhorizon/p/15316955.html
.NET Task 揭秘(2):Task 的回调执行与 await: https://www.cnblogs.com/eventhorizon/p/15912383.html