- A+
所属分类:.NET技术
本地事件总线和事务
通过重写Ef Core
的SaveChanges/SaveChangesAsync
来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。
Github仓库地址:soda-event-bus
实现AggregateRoot
类
AggregateRoot
类主要通过一个集合来记录本次事务的所有事件,到保存前再展开读取,在Abp
中采用的ICollection
记录的本地事件,通过实现一个排序器来保证顺序问题,我这里直接采用了ConcurrentQueue
,保证原子操作的同时保证了顺序性,实现更简单一些。
public abstract class AggregateRoot { public ConcurrentQueue<object> LocalEvents { get; } = new(); public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent { LocalEvents.Enqueue(eventData); } public bool GetLocalEvent(out object? @event) { LocalEvents.TryDequeue(out var eventData); @event = eventData; return @event is not null; } public void ClearLocalEvents() { LocalEvents.Clear(); } }
重写DbContext
主要是从ServiceProvider
中获取对应实体类包含的事件,并且找到对应的Handler
进行处理,然后再当作一个事务提交。
public class EventBusDbContext<TDbContext> : DbContext where TDbContext : DbContext { private readonly IServiceProvider _serviceProvider; public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options) { _serviceProvider = serviceProvider; } public override int SaveChanges() { return base.SaveChanges(); } public override int SaveChanges(bool acceptAllChangesOnSuccess) { return base.SaveChanges(acceptAllChangesOnSuccess); } public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default) { await HandleEventsAsync(); return await base.SaveChangesAsync(cancellationToken); } public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default) { await HandleEventsAsync(); return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken); } private async Task HandleEventsAsync() { foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>()) { while (entityEntry.Entity.GetLocalEvent(out var @event)) { if (@event is null) break; await HandleEventAsync(@event); } entityEntry.Entity.ClearLocalEvents(); } } private async Task HandleEventAsync(object @event) { var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType()); var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType); var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync)); var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException)); try { await (Task)method!.Invoke(eventHandler, new[] { @event })!; } catch (Exception ex) { exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex }); } } }
分布式事件总线和事务
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。