谈谈.NET Core下如何利用 AsyncLocal 实现共享变量

  • 谈谈.NET Core下如何利用 AsyncLocal 实现共享变量已关闭评论
  • 202 次浏览
  • A+
所属分类:.NET技术
摘要

在Web 应用程序中,我们经常会遇到这样的场景,如用户信息,租户信息本次的请求过程中都是固定的,我们希望是这种信息在本次请求内,一次赋值,到处使用。本文就来探讨一下,如何在.NET Core 下去利用AsyncLocal 实现全局共享变量。


前言

在Web 应用程序中,我们经常会遇到这样的场景,如用户信息,租户信息本次的请求过程中都是固定的,我们希望是这种信息在本次请求内,一次赋值,到处使用。本文就来探讨一下,如何在.NET Core 下去利用AsyncLocal 实现全局共享变量。

简介

我们如果需要整个程序共享一个变量,我们仅需将该变量放在某个静态类的静态变量上即可(不满足我们的需求,静态变量上,整个程序都是固定值)。我们在Web 应用程序中,每个Web 请求服务器都为其分配了一个独立线程,如何实现用户,租户等信息隔离在这些独立线程中。这就是今天要说的线程本地存储。针对线程本地存储 .NET 给我们提供了两个类 ThreadLocal 和 AsyncLocal。我们可以通过查看以下例子清晰的看到两者的区别:

 [TestClass] public class TastLocal {     private static ThreadLocal<string> threadLocal = new ThreadLocal<string>();     private static AsyncLocal<string> asyncLocal = new AsyncLocal<string>();     [TestMethod]     public void Test() {         threadLocal.Value = "threadLocal";         asyncLocal.Value = "asyncLocal";         var threadId = Thread.CurrentThread.ManagedThreadId;         Task.Factory.StartNew(() => {             var threadId = Thread.CurrentThread.ManagedThreadId;             Debug.WriteLine($"StartNew:threadId:{ threadId}; threadLocal:{threadLocal.Value}");             Debug.WriteLine($"StartNew:threadId:{ threadId}; asyncLocal:{asyncLocal.Value}");         });         CurrThread();     }     public void CurrThread() {         var threadId = Thread.CurrentThread.ManagedThreadId;         Debug.WriteLine($"CurrThread:threadId:{threadId};threadLocal:{threadLocal.Value}");         Debug.WriteLine($"CurrThread:threadId:{threadId};asyncLocal:{asyncLocal.Value}");     } } 

输出结果:

CurrThread:threadId:4;threadLocal:threadLocal StartNew:threadId:11; threadLocal: CurrThread:threadId:4;asyncLocal:asyncLocal StartNew:threadId:11; asyncLocal:asyncLocal 

从上面结果中可以看出 ThreadLocal 和 AsyncLocal 都能实现基于线程的本地存储。但是当线程切换后,只有 AsyncLocal 还能够保留原来的值。在Web 开发中,我们会有很多异步场景,在这些场景下,可能会出现线程的切换。所以我们使用AsyncLocal 去实现在Web 应用程序下的共享变量。

AsyncLocal 解读

  1. 官方文档
  2. 源码地址

源码查看:

public sealed class AsyncLocal<T> : IAsyncLocal {     private readonly Action<AsyncLocalValueChangedArgs<T>>? m_valueChangedHandler;      //     // 无参构造函数     //     public AsyncLocal()     {     }      //     // 构造一个带有委托的AsyncLocal<T>,该委托在当前值更改时被调用     // 在任何线程上     //     public AsyncLocal(Action<AsyncLocalValueChangedArgs<T>>? valueChangedHandler)     {         m_valueChangedHandler = valueChangedHandler;     }      [MaybeNull]     public T Value     {         get         {             object? obj = ExecutionContext.GetLocalValue(this);             return (obj == null) ? default : (T)obj;         }         set => ExecutionContext.SetLocalValue(this, value, m_valueChangedHandler != null);     }      void IAsyncLocal.OnValueChanged(object? previousValueObj, object? currentValueObj, bool contextChanged)     {         Debug.Assert(m_valueChangedHandler != null);         T previousValue = previousValueObj == null ? default! : (T)previousValueObj;         T currentValue = currentValueObj == null ? default! : (T)currentValueObj;         m_valueChangedHandler(new AsyncLocalValueChangedArgs<T>(previousValue, currentValue, contextChanged));     } }  // // 接口,允许ExecutionContext中的非泛型代码调用泛型AsyncLocal<T>类型 // internal interface IAsyncLocal {     void OnValueChanged(object? previousValue, object? currentValue, bool contextChanged); }  public readonly struct AsyncLocalValueChangedArgs<T> {     public T? PreviousValue { get; }     public T? CurrentValue { get; }      //     // If the value changed because we changed to a different ExecutionContext, this is true.  If it changed     // because someone set the Value property, this is false.     //     public bool ThreadContextChanged { get; }      internal AsyncLocalValueChangedArgs(T? previousValue, T? currentValue, bool contextChanged)     {         PreviousValue = previousValue!;         CurrentValue = currentValue!;         ThreadContextChanged = contextChanged;     } }  // // Interface used to store an IAsyncLocal => object mapping in ExecutionContext. // Implementations are specialized based on the number of elements in the immutable // map in order to minimize memory consumption and look-up times. // internal interface IAsyncLocalValueMap {     bool TryGetValue(IAsyncLocal key, out object? value);     IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent); } 

我们知道在.NET 里面,每个线程都关联着执行上下文。我们可以通 Thread.CurrentThread.ExecutionContext 属性进行访问 或者通过 ExecutionContext.Capture() 获取。

从上面我们可以看出 AsyncLocal 的 Value 存取是通过 ExecutionContext.GetLocalValue 和GetLocalValue.SetLocalValue 进行操作的,我们可以继续从 ExecutionContext 里面取出部分代码查看(源码地址),为了更深入地理解 AsyncLocal 我们可以查看一下源码,看看内部实现原理。

internal static readonly ExecutionContext Default = new ExecutionContext(); private static volatile ExecutionContext? s_defaultFlowSuppressed;  private readonly IAsyncLocalValueMap? m_localValues; private readonly IAsyncLocal[]? m_localChangeNotifications; private readonly bool m_isFlowSuppressed; private readonly bool m_isDefault;  private ExecutionContext() {     m_isDefault = true; }  private ExecutionContext(     IAsyncLocalValueMap localValues,     IAsyncLocal[]? localChangeNotifications,     bool isFlowSuppressed) {     m_localValues = localValues;     m_localChangeNotifications = localChangeNotifications;     m_isFlowSuppressed = isFlowSuppressed; }  public void GetObjectData(SerializationInfo info, StreamingContext context) {     throw new PlatformNotSupportedException(); }  public static ExecutionContext? Capture() {     ExecutionContext? executionContext = Thread.CurrentThread._executionContext;     if (executionContext == null)     {         executionContext = Default;     }     else if (executionContext.m_isFlowSuppressed)     {         executionContext = null;     }      return executionContext; }   internal static object? GetLocalValue(IAsyncLocal local) { ExecutionContext? current = Thread.CurrentThread._executionContext; if (current == null) {     return null; }  Debug.Assert(!current.IsDefault); Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context"); current.m_localValues.TryGetValue(local, out object? value); return value; }  internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications) { ExecutionContext? current = Thread.CurrentThread._executionContext;  object? previousValue = null; bool hadPreviousValue = false; if (current != null) {     Debug.Assert(!current.IsDefault);     Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");      hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue); }  if (previousValue == newValue) {     return; }  // Regarding 'treatNullValueAsNonexistent: !needChangeNotifications' below: // - When change notifications are not necessary for this IAsyncLocal, there is no observable difference between //   storing a null value and removing the IAsyncLocal from 'm_localValues' // - When change notifications are necessary for this IAsyncLocal, the IAsyncLocal's absence in 'm_localValues' //   indicates that this is the first value change for the IAsyncLocal and it needs to be registered for change //   notifications. So in this case, a null value must be stored in 'm_localValues' to indicate that the IAsyncLocal //   is already registered for change notifications. IAsyncLocal[]? newChangeNotifications = null; IAsyncLocalValueMap newValues; bool isFlowSuppressed = false; if (current != null) {     Debug.Assert(!current.IsDefault);     Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");      isFlowSuppressed = current.m_isFlowSuppressed;     newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);     newChangeNotifications = current.m_localChangeNotifications; } else {     // First AsyncLocal     newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications); }  // // Either copy the change notification array, or create a new one, depending on whether we need to add a new item. // if (needChangeNotifications) {     if (hadPreviousValue)     {         Debug.Assert(newChangeNotifications != null);         Debug.Assert(Array.IndexOf(newChangeNotifications, local) >= 0);     }     else if (newChangeNotifications == null)     {         newChangeNotifications = new IAsyncLocal[1] { local };     }     else     {         int newNotificationIndex = newChangeNotifications.Length;         Array.Resize(ref newChangeNotifications, newNotificationIndex + 1);         newChangeNotifications[newNotificationIndex] = local;     } }  Thread.CurrentThread._executionContext =     (!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ?     null : // No values, return to Default context     new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);  if (needChangeNotifications) {     local.OnValueChanged(previousValue, newValue, contextChanged: false); } } 

从上面可以看出,ExecutionContext.GetLocalValue 和GetLocalValue.SetLocalValue 都是通过对 m_localValues 字段进行操作的。

m_localValues 的类型是 IAsyncLocalValueMap ,IAsyncLocalValueMap 的实现 和 AsyncLocal.cs 在一起,感兴趣的可以进一步查看 IAsyncLocalValueMap 是如何创建,如何查找的。

可以看到,里面最重要的就是ExecutionContext 的流动,线程发生变化时ExecutionContext 会在前一个线程中被默认捕获,流向下一个线程,它所保存的数据也就随之流动。在所有会发生线程切换的地方,基础类库(BCL) 都为我们封装好了对执行上下文的捕获 (如开始的例子,可以看到 AsyncLocal 的数据不会随着线程的切换而丢失),这也是为什么 AsyncLocal 能实现 线程切换后,还能正常获取数据,不丢失。

总结

  1. AsyncLocal 本身不保存数据,数据保存在 ExecutionContext 实例。

  2. ExecutionContext 的实例会随着线程切换流向下一线程(也可以禁止流动和恢复流动),保证了线程切换时,数据能正常访问。

在.NET Core 中的使用示例

  1. 先创建一个上下文对象
点击查看代码
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks;  namespace NetAsyncLocalExamples.Context {     /// <summary>     /// 请求上下文  租户ID     /// </summary>     public class RequestContext     {         /// <summary>         /// 获取请求上下文         /// </summary>         public static RequestContext Current => _asyncLocal.Value;         private readonly static AsyncLocal<RequestContext> _asyncLocal = new AsyncLocal<RequestContext>();          /// <summary>         /// 将请求上下文设置到线程全局区域         /// </summary>         /// <param name="userContext"></param>         public static IDisposable SetContext(RequestContext userContext)         {             _asyncLocal.Value = userContext;             return new RequestContextDisposable();         }          /// <summary>         /// 清除上下文         /// </summary>         public static void ClearContext()         {             _asyncLocal.Value = null;         }          /// <summary>         /// 租户ID         /// </summary>         public string TenantId { get; set; }        } }  namespace NetAsyncLocalExamples.Context {     /// <summary>     /// 用于释放对象     /// </summary>     internal class RequestContextDisposable : IDisposable     {         internal RequestContextDisposable() { }         public void Dispose()         {             RequestContext.ClearContext();         }     } } 
  1. 创建请求上下文中间件
点击查看代码
using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using NetAsyncLocalExamples.Context; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks;  namespace NetAsyncLocalExamples.Middlewares {     /// <summary>     /// 请求上下文     /// </summary>     public class RequestContextMiddleware : IMiddleware     {           protected readonly IServiceProvider ServiceProvider;         private readonly ILogger<RequestContextMiddleware> Logger;         public RequestContextMiddleware(IServiceProvider serviceProvider, ILogger<RequestContextMiddleware> logger)         {              ServiceProvider = serviceProvider;             Logger = logger;         }         public virtual async Task InvokeAsync(HttpContext context, RequestDelegate next)         {             var requestContext = new RequestContext();             using (RequestContext.SetContext(requestContext))             {                 requestContext.TenantId = $"租户ID:{DateTime.Now.ToString("yyyyMMddHHmmsss")}";                 await next(context);             }         }         } }  
  1. 注册中间件
点击查看代码
public void ConfigureServices(IServiceCollection services) { 	services.AddTransient<RequestContextMiddleware>(); 	services.AddRazorPages(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) {     if (env.IsDevelopment())     {         app.UseDeveloperExceptionPage();     }     else     {         app.UseExceptionHandler("/Error");         // The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.         app.UseHsts();     }      app.UseHttpsRedirection();     app.UseStaticFiles();      app.UseRouting();      app.UseAuthorization();      //增加上下文     app.UseMiddleware<RequestContextMiddleware>();      app.UseEndpoints(endpoints =>     {         endpoints.MapRazorPages();     }); }  
  1. 一次赋值,到处使用
点击查看代码
namespace NetAsyncLocalExamples.Pages {     public class IndexModel : PageModel     {         private readonly ILogger<IndexModel> _logger;          public IndexModel(ILogger<IndexModel> logger)         {             _logger = logger;             _logger.LogInformation($"测试获取全局变量1:{RequestContext.Current.TenantId}");         }          public void OnGet()         {             _logger.LogInformation($"测试获取全局变量2:{RequestContext.Current.TenantId}");         }     } }