C# 消息队列demo

  • C# 消息队列demo已关闭评论
  • 196 次浏览
  • A+
所属分类:.NET技术
摘要

1.安装完成rabbitMQ参考上一篇如何安装 rabbitMQ2.安装 OTP服务https://www.erlang.org/downloads 

1.安装完成rabbitMQ

参考上一篇如何安装 rabbitMQ

2.安装 OTP服务

https://www.erlang.org/downloads 

 

C#代码

需要导入RabbitMQ.Client

在NuGet中搜索自取

C# 消息队列demo

队列发送端

var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "192.168.0.22";
//端口号
//factory.Port = 15672;
factory.UserName = "admin";
factory.Password = "123456";
//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange2";
//routingKey的值
string eventName = "key1";
//连接服务器
using var conn = factory.CreateConnection();
int MsgInt = 1;

while (true)
{
    //待发送的消息
    string msg = $"{MsgInt++}条消息{DateTime.Now.ToLongTimeString()}";
    //创建信道
    using (var channel = conn.CreateModel())
    {
        //消息属性
        var properties = channel.CreateBasicProperties();
        //传输模式,1:非持久化,2:持久化
        properties.DeliveryMode = 2;
        //声明交换机
        channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
        byte[] body = Encoding.UTF8.GetBytes(msg);
        //生产消息
        channel.BasicPublish(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body);
    }
    Console.WriteLine("发送消息:" + msg);
    Thread.Sleep(1000);
}

收信方

var factory = new ConnectionFactory();
//RabbitMQ服务器地址
factory.HostName = "localhost";
//端口号
//factory.Port = 15672;
factory.UserName = "admin";
factory.Password = "123456";

//消息异步转发
factory.DispatchConsumersAsync = true;
//交换机名称
string exchangeName = "exchange2";
//routingKey的值
string eventName = "key1";
using var conn = factory.CreateConnection();
//创建信道
using var channel = conn.CreateModel();

//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明队列
string queueName = "queue1";
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机中
channel.QueueBind(queueName, exchangeName, eventName);
//消费者拉取消息
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//消费消息
channel.BasicConsume(queueName, autoAck: false, consumer);
Console.WriteLine("按回车退出");
Console.ReadLine();

async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        byte[] bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now.ToLongTimeString() + "收到了信息" + msg);
        //消息确认处理,DeliveryTag:消息的编号 执行后等于告诉队列 已经收到了消息
        channel.BasicAck(args.DeliveryTag, multiple: false);
        await Task.Delay(800);
    }
    catch (Exception ex)
    {
        //对没有确认处理的消息进行消息重发
        channel.BasicReject(args.DeliveryTag, true);
        Console.WriteLine(DateTime.Now.ToLongTimeString() + "处理收到的消息出错:" + ex);
    }
}

收信方和发送方 分别是 2个项目 做 测试

ConsoleApp1是发送 2是收信

C# 消息队列demo