.Net Core 5.x Api开发笔记 — 消息队列RabbitMQ实现事件总线EventBus(二)

  • A+
所属分类:.NET技术
摘要

上一节说了事件总线本节在消息队列中实现事件处理:.Net Core 5.x Api开发笔记 — 消息队列RabbitMQ实现事件总线EventBus(一)

上一节说了事件总线

本节在消息队列中实现事件处理:.Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(一)

既然是消息队列,就需要有生产者和消费者(订阅)

 1 public interface IMessageQueue  2 {  3     /// <summary>  4     /// 发布消息  5     /// </summary>  6     void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class;  7   8     /// <summary>  9     /// 订阅消息 10     /// </summary> 11     void Consume<T>(Func<T, Task<bool>> func, string queueName = ""); 12 }

生产者端实现发布消息接口:

 1 public class RabbitMQPublishClient : IMessageQueue  2 {  3     private static ConcurrentDictionary<string, string> QueueDic = new ConcurrentDictionary<string, string>();  4     public RabbitMQPublishClient(){}  5       6     /// <summary>  7     /// 发布消息  8     /// </summary>  9     /// <typeparam name="T"></typeparam> 10     /// <param name="message">消息实体</param> 11     /// <param name="exchangeName">交换机名称(默认default)</param> 12     /// <param name="queueName">队列名称(默认类名)</param> 13     public void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class 14     { 15         using (var conn = GetConnection()) 16         { 17             using (var channel = conn.CreateModel()) 18             { 19                 if (!QueueDic.ContainsKey(queueName) || QueueDic[queueName] != exchangeName) 20                 { 21                     CreateQueue(channel, exchangeName, queueName, true); 22                     QueueDic.TryAdd(queueName, exchangeName); 23                 } 24                 var props = channel.CreateBasicProperties(); 25                 props.Persistent = true;  //消息持久化 26                 props.DeliveryMode = 2;   //消息持久化 27                 props.CorrelationId = Guid.NewGuid().ToString(); 28  29                 string content = JsonConvert.SerializeObject(message); 30                 var body = Encoding.UTF8.GetBytes(content); 31                 channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: body); 32             } 33         } 34     } 35  36     /// <summary> 37     /// 创建队列,绑定到交换机 38     /// </summary> 39     private void CreateQueue(IModel channel, string exchangeName, string queueName, bool isDurable = true) 40     { 41         channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: isDurable, autoDelete: false, arguments: null); 42         channel.QueueDeclare(queueName, durable: isDurable, exclusive: false, autoDelete: false, arguments: null); 43         channel.QueueBind(queueName, exchangeName, routingKey: queueName); 44     } 45 }

消费者(订阅者)实现订阅接口

 1 public class RabbitMQConsumerClient : IMessageQueue  2 {  3     public RabbitMQConsumerClient(){}  4   5     /// <summary>  6     /// 订阅消息  7     /// </summary>  8     public void Consume<T>(Func<T, Task<bool>> func, string queueName = "")  9     { 10         string exchangeName = exchangeName.Equals("default") ? $"{customKey}.default" : $"{customKey}.{queueName}"; 11  12         var collection = GetConnection(); 13         var channel = collection.CreateModel(); 14         channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null); 15         channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); 16         channel.QueueBind(queueName, exchangeName, routingKey: queueName, arguments: null); 17  18         //消费事件 19         var consumer = new EventingBasicConsumer(channel); 20         consumer.Received += async (sender, e) => 21         { 22             try 23             { 24                 string content = Encoding.UTF8.GetString(e.Body.ToArray()); 25                 T message = JsonConvert.DeserializeObject<T>(content); 26                 bool isSuccess = await func.Invoke(message);    //执行func委托  这里才是真正执行传入的事件处理程序 27                 if (isSuccess) 28                 { 29                     channel.BasicAck(e.DeliveryTag, false);     //手动确认消息消费成功 30                 } 31                 else 32                 { 33                    channel.BasicReject(e.DeliveryTag, true);    //手动打回队列,下次重新消费 34                 } 35             } 36             catch (Exception ex) 37             { 38                 channel.BasicAck(e.DeliveryTag, false); 39             } 40         }; 41  42         channel.BasicQos(0, 1, false);  //限制同时接收消息数量为1,true是将限制应用于channel级,false是将限制应用于consumer级 43         channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);   //订阅消息   autoAck: false 表示手动确认消息 44     } 45 }

上边都是平时很常用的功能逻辑,接下来是才是重点!!!

1,事件总线有了

2,生产者订阅者也有了

接下来还需要做至少3件事:

1,创建事件处理程序,就是真正干活的

2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端)

3,订阅端启动消费者订阅监控,并触发事件处理程序

4,发布消息测试

-----------------------------------------------------------------

1,创建事件处理程序

 1 /// <summary>  2 /// 事件处理程序,真正干活的是我  3 /// 继承IEventHandler<T>接口,其中T必须用EventWithData<TData>类型,因为EventWithData<TData>继承了EventBase类  4 /// </summary>  5 public class Person_EventHandler : IEventHandler<EventWithData<Person>>  6 {  7     //最终干活的  8     public async Task HandleEvent(EventWithData<Person> eventWithData)  9     { 10         var data = eventWithData.Data; 11         if (data == null) 12         { 13             Console.WriteLine("么有数据呀!"); 14         } 15         else 16         { 17             try 18             { 19                 Console.WriteLine($"{DateTime.Now}-------" + JsonConvert.SerializeObject(data)); 20             } 21             catch (Exception ex) 22             { 23                 Console.WriteLine("异常:" + ex.Message); 24             } 25         } 26     } 27 }

2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端),这一步在 Startup 中实现!!!

 1 public IServiceCollection ServiceCollection { get; private set; }  3 public void ConfigureServices(IServiceCollection services)  4 {  5     ServiceCollection = services;    //将services传递给ServiceCollection  6     services.AddControllersWithViews();  7   8     //注册消费者  9     services.AddSingleton<IMessageQueue, RabbitMQConsumerClient>(); 10     //注册事件处理程序,绑定事件源:IEventHandler<EventWithData<Person>>和事件处理程序:Person_EventHandler的关系 11     services.AddSingleton<Person_EventHandler>(); 12 } 13  14 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 15 { 16     //绑定事件源和事件处理程序的映射关系   17     //这里跟上边的绑定不是一回事,这里仅仅就是绑定了一个映射关系,等到后边事件触发的时候需要用到该处映射关系 18     EventBus.Default.RegisterHandler<EventWithData<Person>, Person_EventHandler>(); 19  20     //其它中间件。。 21     。。。 22  23     //启动消费者订阅监控入口 24     app.Consumer(app.ApplicationServices, ServiceCollection); 25 }

3,启动消费者订阅监控,并触发事件处理程序

 1 /// <summary>  2 /// 启动消费者订阅监控入口  3 /// </summary>  4 public static void Consumer(this IApplicationBuilder app, IServiceProvider provider, IServiceCollection services)  5 {  6     //获取实例对象   7     IMessageQueue queue = provider.GetService<IMessageQueue>();     8   9     //异步调用,这里接收的消息类型T为:EventWithData<Person>  也可以是其它自定义消息类型,没有限制 10     queue.Consume<EventWithData<Person>>(async message => 11     { 12         using (var currentContainer = services.BuildServiceProvider().CreateScope())   //使用系统内置服务容器 13         { 14             IocManager.Configure(currentContainer.ServiceProvider);                //将容器服务传递过去 15              16             var e = EventWithData<Person>.New(message.Data);      //这里要传递的消息类型必须使用 EventWithData<TData>类型初始化消息 17              18             await EventBus.Default.TriggerAsync(e);               //调用触发事件逻辑(需要提前绑定映射关系和注册容器) 19              20             return true; 21         } 22     }); 23 }

说明:

IocManager.Configure(currentContainer.ServiceProvider)  实际上就是声明了一个全局的 IServiceProvider 变量,然后将当前的ServiceProvider赋值给全局变量

await EventBus.Default.TriggerAsync(e)   执行的就是上一节事件总线中的 public async Task TriggerAsync<TEvent>(TEvent e)  方法

到这里 事件源和事件处理程序就一一对应上了。

接下来开始测试一下是否可用,先在发布的应用程序Startup中注册一下消息队列容器

1 services.AddSingleton<IMessageQueue, RabbitMQPublishClient>();

然后发布一条消息,注意:这里消息的类型EventWithData<Person>没有限制的,你可以不使用EventWithData<Person>类型的消息也行

 1 //发布消息  2 var message = EventWithData<Person>.New(  3     new Person()  4     {  5         UserId = 1,  6         UserName = "张三"  7     }  8 );  9  10 messageQueue.Publish(message);

最后测试,订阅者成功消费了刚刚生产的消息

.Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(二)