- A+
1.安装完成rabbitMQ
参考上一篇如何安装 rabbitMQ
2.安装 OTP服务
https://www.erlang.org/downloads
C#代码
需要导入RabbitMQ.Client
在NuGet中搜索自取
队列发送端
var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "192.168.0.22";
//端口号
//factory.Port = 15672;
factory.UserName = "admin";
factory.Password = "123456";
//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange2";
//routingKey的值
string eventName = "key1";
//连接服务器
using var conn = factory.CreateConnection();
int MsgInt = 1;
while (true)
{
//待发送的消息
string msg = $"{MsgInt++}条消息{DateTime.Now.ToLongTimeString()}";
//创建信道
using (var channel = conn.CreateModel())
{
//消息属性
var properties = channel.CreateBasicProperties();
//传输模式,1:非持久化,2:持久化
properties.DeliveryMode = 2;
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
byte[] body = Encoding.UTF8.GetBytes(msg);
//生产消息
channel.BasicPublish(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
}
Console.WriteLine("发送消息:" + msg);
Thread.Sleep(1000);
}
收信方
var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "localhost";
//端口号
//factory.Port = 15672;
factory.UserName = "admin";
factory.Password = "123456";
//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange2";
//routingKey的值
string eventName = "key1";
using var conn = factory.CreateConnection();
//创建信道
using var channel = conn.CreateModel();
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明队列
string queueName = "queue1";
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机中
channel.QueueBind(queueName, exchangeName, eventName);
//消费者拉取消息
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//消费消息
channel.BasicConsume(queueName, autoAck: false, consumer);
Console.WriteLine("按回车退出");
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
try
{
byte[] bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now.ToLongTimeString() + "收到了信息" + msg);
//消息确认处理,DeliveryTag:消息的编号 执行后等于告诉队列 已经收到了消息
channel.BasicAck(args.DeliveryTag, multiple: false);
await Task.Delay(800);
}
catch (Exception ex)
{
//对没有确认处理的消息进行消息重发
channel.BasicReject(args.DeliveryTag, true);
Console.WriteLine(DateTime.Now.ToLongTimeString() + "处理收到的消息出错:" + ex);
}
}
收信方和发送方 分别是 2个项目 做 测试
ConsoleApp1是发送 2是收信