- A+
所属分类:.NET技术
闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~
首先定义一个事件约定的空接口
public interface IEvent{}
然后定义事件订阅者接口
public interface IEventSubscriber<T> where T : IEvent { Task HandleAsync(T @event, CancellationToken ct); /// <summary> /// 执行排序 /// </summary> int Order { get; } /// <summary> /// 如果发生错误是否抛出异常,将阻塞后续Handler /// </summary> bool ThrowIfError { get; } } public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent { public abstract Task HandleAsync(T @event, CancellationToken ct); public virtual int Order => 0; /// <summary> /// 默认不抛出异常 /// </summary> public virtual bool ThrowIfError => false; }
接着就是发布者
internal class Publisher(IServiceProvider serviceProvider) { public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent { var handlers = serviceProvider.GetServices<IEventSubscriber<T>>(); if (handlers is null) return; foreach (var handler in handlers.OrderBy(x => x.Order)) { try { await handler.HandleAsync(@event, ct); } catch { if (handler.ThrowIfError) { throw; } //todo: } } } }
到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了
核心代码如下:
static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>); static readonly object _lock = new();//锁 static bool IsToGenericInterface(this Type type, Type baseInterface) { if (type == null) return false; if (baseInterface == null) return false; return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface); } static IEnumerable<Type> _eventHanlers = null!; static IEnumerable<Type> EventHandlers { get { lock (_lock) return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x => !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber)); } } //注册EventSubscribers foreach (var subscriberType in EventSubscribers) { //存在一个订阅者订阅多个事件的情况: var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray(); foreach (var baseType in baseTypes) { services.AddScoped(baseType, subscriberType); } } //注册Publisher services.AddScoped<Publisher>();
至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:
internal interface IPublisher { /// <summary> /// Event Publish /// </summary> /// <typeparam name="T"></typeparam> /// <param name="event">Event</param> /// <returns></returns> Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent; }
然后BaseQuickApi实现IPublisher接口
internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher { ValueTask<Rsp> ExecuteAsync(Req request); } // BaseQuickApi.PublishAsync public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent { using var scope = ServiceRegistration.ServiceProvider.CreateScope(); var publisher = scope.ServiceProvider.GetRequiredService<Publisher>(); await publisher.PublishAsync(@event, cancellationToken); }
至此功能完成,接下来我们测试一下:
using Biwen.QuickApi.Events; using Microsoft.AspNetCore.Mvc; namespace Biwen.QuickApi.DemoWeb.Apis { public class MyEvent : BaseRequest<MyEvent>,IEvent { [FromQuery] public string? Message { get; set; } } public class MyEventHandler : EventSubscriber<MyEvent> { private readonly ILogger<MyEventHandler> _logger; public MyEventHandler(ILogger<MyEventHandler> logger) { _logger = logger; } public override Task HandleAsync(MyEvent @event, CancellationToken ct) { _logger.LogInformation($"msg 2 : {@event.Message}"); return Task.CompletedTask; } } /// <summary> /// 更早执行的Handler /// </summary> public class MyEventHandler2 : EventSubscriber<MyEvent> { private readonly ILogger<MyEventHandler2> _logger; public MyEventHandler2(ILogger<MyEventHandler2> logger) { _logger = logger; } public override Task HandleAsync(MyEvent @event, CancellationToken ct) { _logger.LogInformation($"msg 1 : {@event.Message}"); return Task.CompletedTask; } public override int Order => -1; } /// <summary> /// 抛出异常的Handler /// </summary> public class MyEventHandler3 : EventSubscriber<MyEvent> { private readonly ILogger<MyEventHandler3> _logger; public MyEventHandler3(ILogger<MyEventHandler3> logger) { _logger = logger; } public override Task HandleAsync(MyEvent @event, CancellationToken ct) { throw new Exception("error"); } public override int Order => -2; public override bool ThrowIfError => false; } [QuickApi("event")] public class EventApi : BaseQuickApi<MyEvent> { public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request) { //publish await PublishAsync(request); return IResultResponse.Content("send event"); } } }
最后我们运行项目测试一下功能:
curl -X 'GET' 'http://localhost:5101/quick/event?Message=hello%20world' -H 'accept: */*'
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi