- A+
上一节说了事件总线
本节在消息队列中实现事件处理:.Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(一)
既然是消息队列,就需要有生产者和消费者(订阅)
1 public interface IMessageQueue 2 { 3 /// <summary> 4 /// 发布消息 5 /// </summary> 6 void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class; 7 8 /// <summary> 9 /// 订阅消息 10 /// </summary> 11 void Consume<T>(Func<T, Task<bool>> func, string queueName = ""); 12 }
生产者端实现发布消息接口:
1 public class RabbitMQPublishClient : IMessageQueue 2 { 3 private static ConcurrentDictionary<string, string> QueueDic = new ConcurrentDictionary<string, string>(); 4 public RabbitMQPublishClient(){} 5 6 /// <summary> 7 /// 发布消息 8 /// </summary> 9 /// <typeparam name="T"></typeparam> 10 /// <param name="message">消息实体</param> 11 /// <param name="exchangeName">交换机名称(默认default)</param> 12 /// <param name="queueName">队列名称(默认类名)</param> 13 public void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class 14 { 15 using (var conn = GetConnection()) 16 { 17 using (var channel = conn.CreateModel()) 18 { 19 if (!QueueDic.ContainsKey(queueName) || QueueDic[queueName] != exchangeName) 20 { 21 CreateQueue(channel, exchangeName, queueName, true); 22 QueueDic.TryAdd(queueName, exchangeName); 23 } 24 var props = channel.CreateBasicProperties(); 25 props.Persistent = true; //消息持久化 26 props.DeliveryMode = 2; //消息持久化 27 props.CorrelationId = Guid.NewGuid().ToString(); 28 29 string content = JsonConvert.SerializeObject(message); 30 var body = Encoding.UTF8.GetBytes(content); 31 channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: body); 32 } 33 } 34 } 35 36 /// <summary> 37 /// 创建队列,绑定到交换机 38 /// </summary> 39 private void CreateQueue(IModel channel, string exchangeName, string queueName, bool isDurable = true) 40 { 41 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: isDurable, autoDelete: false, arguments: null); 42 channel.QueueDeclare(queueName, durable: isDurable, exclusive: false, autoDelete: false, arguments: null); 43 channel.QueueBind(queueName, exchangeName, routingKey: queueName); 44 } 45 }
消费者(订阅者)实现订阅接口
1 public class RabbitMQConsumerClient : IMessageQueue 2 { 3 public RabbitMQConsumerClient(){} 4 5 /// <summary> 6 /// 订阅消息 7 /// </summary> 8 public void Consume<T>(Func<T, Task<bool>> func, string queueName = "") 9 { 10 string exchangeName = exchangeName.Equals("default") ? $"{customKey}.default" : $"{customKey}.{queueName}"; 11 12 var collection = GetConnection(); 13 var channel = collection.CreateModel(); 14 channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null); 15 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 16 channel.QueueBind(queueName, exchangeName, routingKey: queueName, arguments: null); 17 18 //消费事件 19 var consumer = new EventingBasicConsumer(channel); 20 consumer.Received += async (sender, e) => 21 { 22 try 23 { 24 string content = Encoding.UTF8.GetString(e.Body.ToArray()); 25 T message = JsonConvert.DeserializeObject<T>(content); 26 bool isSuccess = await func.Invoke(message); //执行func委托 这里才是真正执行传入的事件处理程序 27 if (isSuccess) 28 { 29 channel.BasicAck(e.DeliveryTag, false); //手动确认消息消费成功 30 } 31 else 32 { 33 channel.BasicReject(e.DeliveryTag, true); //手动打回队列,下次重新消费 34 } 35 } 36 catch (Exception ex) 37 { 38 channel.BasicAck(e.DeliveryTag, false); 39 } 40 }; 41 42 channel.BasicQos(0, 1, false); //限制同时接收消息数量为1,true是将限制应用于channel级,false是将限制应用于consumer级 43 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); //订阅消息 autoAck: false 表示手动确认消息 44 } 45 }
上边都是平时很常用的功能逻辑,接下来是才是重点!!!
1,事件总线有了
2,生产者订阅者也有了
接下来还需要做至少3件事:
1,创建事件处理程序,就是真正干活的
2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端)
3,订阅端启动消费者订阅监控,并触发事件处理程序
4,发布消息测试
-----------------------------------------------------------------
1,创建事件处理程序
1 /// <summary> 2 /// 事件处理程序,真正干活的是我 3 /// 继承IEventHandler<T>接口,其中T必须用EventWithData<TData>类型,因为EventWithData<TData>继承了EventBase类 4 /// </summary> 5 public class Person_EventHandler : IEventHandler<EventWithData<Person>> 6 { 7 //最终干活的 8 public async Task HandleEvent(EventWithData<Person> eventWithData) 9 { 10 var data = eventWithData.Data; 11 if (data == null) 12 { 13 Console.WriteLine("么有数据呀!"); 14 } 15 else 16 { 17 try 18 { 19 Console.WriteLine($"{DateTime.Now}-------" + JsonConvert.SerializeObject(data)); 20 } 21 catch (Exception ex) 22 { 23 Console.WriteLine("异常:" + ex.Message); 24 } 25 } 26 } 27 }
2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端),这一步在 Startup 中实现!!!
1 public IServiceCollection ServiceCollection { get; private set; } 3 public void ConfigureServices(IServiceCollection services) 4 { 5 ServiceCollection = services; //将services传递给ServiceCollection 6 services.AddControllersWithViews(); 7 8 //注册消费者 9 services.AddSingleton<IMessageQueue, RabbitMQConsumerClient>(); 10 //注册事件处理程序,绑定事件源:IEventHandler<EventWithData<Person>>和事件处理程序:Person_EventHandler的关系 11 services.AddSingleton<Person_EventHandler>(); 12 } 13 14 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 15 { 16 //绑定事件源和事件处理程序的映射关系 17 //这里跟上边的绑定不是一回事,这里仅仅就是绑定了一个映射关系,等到后边事件触发的时候需要用到该处映射关系 18 EventBus.Default.RegisterHandler<EventWithData<Person>, Person_EventHandler>(); 19 20 //其它中间件。。 21 。。。 22 23 //启动消费者订阅监控入口 24 app.Consumer(app.ApplicationServices, ServiceCollection); 25 }
3,启动消费者订阅监控,并触发事件处理程序
1 /// <summary> 2 /// 启动消费者订阅监控入口 3 /// </summary> 4 public static void Consumer(this IApplicationBuilder app, IServiceProvider provider, IServiceCollection services) 5 { 6 //获取实例对象 7 IMessageQueue queue = provider.GetService<IMessageQueue>(); 8 9 //异步调用,这里接收的消息类型T为:EventWithData<Person> 也可以是其它自定义消息类型,没有限制 10 queue.Consume<EventWithData<Person>>(async message => 11 { 12 using (var currentContainer = services.BuildServiceProvider().CreateScope()) //使用系统内置服务容器 13 { 14 IocManager.Configure(currentContainer.ServiceProvider); //将容器服务传递过去 15 16 var e = EventWithData<Person>.New(message.Data); //这里要传递的消息类型必须使用 EventWithData<TData>类型初始化消息 17 18 await EventBus.Default.TriggerAsync(e); //调用触发事件逻辑(需要提前绑定映射关系和注册容器) 19 20 return true; 21 } 22 }); 23 }
说明:
IocManager.Configure(currentContainer.ServiceProvider) 实际上就是声明了一个全局的 IServiceProvider 变量,然后将当前的ServiceProvider赋值给全局变量
await EventBus.Default.TriggerAsync(e) 执行的就是上一节事件总线中的 public async Task TriggerAsync<TEvent>(TEvent e) 方法
到这里 事件源和事件处理程序就一一对应上了。
接下来开始测试一下是否可用,先在发布的应用程序Startup中注册一下消息队列容器
1 services.AddSingleton<IMessageQueue, RabbitMQPublishClient>();
然后发布一条消息,注意:这里消息的类型EventWithData<Person>没有限制的,你可以不使用EventWithData<Person>类型的消息也行
1 //发布消息 2 var message = EventWithData<Person>.New( 3 new Person() 4 { 5 UserId = 1, 6 UserName = "张三" 7 } 8 ); 9 10 messageQueue.Publish(message);
最后测试,订阅者成功消费了刚刚生产的消息