C#利用RabbitMQ实现点对点消息传输

  • A+
所属分类:.NET技术
摘要

RabbitMQ做为消息代理,负责接收和转发消息,可以将RabbitMQ比喻为一个邮筒、一个邮局和一个邮递员。本文主要以一个简单的小例子,简述RabbitMQ实现消息传输的相关内容,仅供学习分享使用,如有不足之处,还请指正。

RabbitMQ做为消息代理,负责接收和转发消息,可以将RabbitMQ比喻为一个邮筒、一个邮局和一个邮递员。本文主要以一个简单的小例子,简述RabbitMQ实现消息传输的相关内容,仅供学习分享使用,如有不足之处,还请指正。

消息队列模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

C#利用RabbitMQ实现点对点消息传输

RabbitMQ设置

RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定。本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示:

C#利用RabbitMQ实现点对点消息传输

RabbitMQ动态库安装

通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

C#利用RabbitMQ实现点对点消息传输

RabbitMQ.Client相关知识点

  • ConnectionFactory:构造一个实例,主要创建连接。
  • IConnection:表示一个基于AMQP协议的连接。
  • IModel:表示一个RabbitMQ通道,可用于声明一个队列,然后开始消费。
  • EventingBasicConsumer:基于独立事件监听的基础消费者,可以监听并接收消息。
  • 生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息
  • 消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

示例效果图

本例主要有一个生产者,一个消费者,通过消息队列进行消息转发和接收。

生产者负责消息发送,如下图所示:

C#利用RabbitMQ实现点对点消息传输

消费者负责消息接收,如下图所示:

C#利用RabbitMQ实现点对点消息传输

核心代码

代码结构:主要包括生产者,消费者,公共基础代码,如下所示:

C#利用RabbitMQ实现点对点消息传输

RabbitMqHelper主要创建连接,如下所示:

 1     public class RabbitMqHelper  2     {  3           4         /// <summary>  5         /// 创建连接  6         /// </summary>  7         /// <returns></returns>  8         public IConnection GetConnection()  9         { 10             try 11             { 12                 var factory = new ConnectionFactory() 13                 { 14                     HostName = "127.0.0.1", 15                     Port = 5672, 16                     UserName = "guest", 17                     Password = "guest", 18                     VirtualHost = "/ShortMsgHost" 19                 }; 20                 var conn = factory.CreateConnection(); 21                 return conn; 22             } 23             catch (Exception ex) { 24                 throw ex; 25             } 26         } 27  28     29  30     }

RabbmitMqSendHelper用于发送消息,如下所示:

 1     public class RabbmitMqSendHelper : RabbitMqHelper  2     {  3         /// <summary>  4         /// 发送消息  5         /// </summary>  6         /// <param name="msg"></param>  7         /// <returns></returns>  8         public bool SendMsg(string msg)  9         { 10             try 11             { 12                 using (var conn = GetConnection()) 13                 { 14                     using (var channel = conn.CreateModel()) 15                     { 16                         channel.QueueDeclare(queue: "ShortMsgQueue", 17                                      durable: true, 18                                      exclusive: false, 19                                      autoDelete: false, 20                                      arguments: null); 21                         var body = Encoding.UTF8.GetBytes(msg); 22  23                         channel.BasicPublish(exchange: "amq.direct", 24                                              routingKey: "ShortMsgKey", 25                                              basicProperties: null, 26                                              body: body); 27  28                         //Console.WriteLine(" [x] Sent {0}", message); 29                     }; 30                 }; 31                 return true; 32             } 33             catch (Exception ex) 34             { 35                 throw ex; 36             } 37         } 38     }

RabbitMqReceiveHelper主要用于接收信息,如下所示:

 1     public class RabbitMqReceiveHelper : RabbitMqHelper  2     {  3         public RabbitMqReceiveEventHandler OnReceiveEvent;  4   5         private IConnection conn;  6   7         private IModel channel;  8   9         private EventingBasicConsumer consumer; 10  11         public bool StartReceiveMsg() 12         { 13             try 14             { 15                 conn = GetConnection(); 16  17                 channel = conn.CreateModel(); 18  19                 channel.QueueDeclare(queue: "ShortMsgQueue", 20                                 durable: true, 21                                 exclusive: false, 22                                 autoDelete: false, 23                                 arguments: null); 24  25                 consumer = new EventingBasicConsumer(channel); 26                 consumer.Received += (model, ea) => 27                 { 28                     var body = ea.Body.ToArray(); 29                     var message = Encoding.UTF8.GetString(body); 30                     //Console.WriteLine(" [x] Received {0}", message); 31                     if (OnReceiveEvent != null) 32                     { 33                         OnReceiveEvent(message); 34                     } 35                 }; 36                 channel.BasicConsume(queue: "ShortMsgQueue", 37                                         autoAck: true, 38                                         consumer: consumer); 39                 return true; 40             } 41             catch (Exception ex) 42             { 43                 throw ex; 44             } 45         } 46     }

关于RabbitMQ的基础知识介绍,可参考前几篇博文。

备注

浣溪沙·堤上游人逐画船

欧阳修 〔宋代〕

堤上游人逐画船,拍堤春水四垂天。绿杨楼外出秋千。
白发戴花君莫笑,六幺催拍盏频传。人生何处似尊前!