01_在NET中使用RabbitMQ

  • 01_在NET中使用RabbitMQ已关闭评论
  • 63 次浏览
  • A+
所属分类:.NET技术
摘要

1.Linux上安装Docken   删除docker-ce命令:yum remove docker-ce
  删除镜像、容器、配置文件等内容
  rm -rf /var/lib/containerd
  rm -rf /var/lib/docker

1.Linux上安装Docken

服务器系统版本以及内核版本:cat /etc/redhat-release 查看服务器内核版本:uname -r 安装依赖包:yum install -y yum-utils device-mapper-persistent-data lvm2 设置阿里云镜像源:yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 安装Docker:yum install -y docker-ce     社区版(Community Edition,缩写为 CE)     企业版(Enterprise Edition,缩写为 EE)   启动docker并设置开机自启:     启动docker命令:systemctl start docker     设置开机自启命令:systemctl enable docker     查看docker版本命令:docker version

  删除docker-ce命令:yum remove docker-ce
  删除镜像、容器、配置文件等内容
  rm -rf /var/lib/containerd
  rm -rf /var/lib/docker



----------------------------------------通过docker help命令来查看更多的命令--------------------------------

  docker search --镜像名 搜索仓库镜像

  docker pull --镜像名 拉取镜像

  docker ps 查看目前正在运行的所有容器 (-a 显示包括已经停止的容器)

  docker rmi image_id/image_name 删除镜像

  docker build 使用Dockerfile创建镜像

  docker run 运行容器

  docker exec 进入容器中执行命令 (例如:docker exec -it container_id/container_name /bin/bash)

  docker logs container_id/container_name 查看容器日志(例如:docker logs -f -t --tail 10 container_id )

  docker start container_id/container_name 启动容器

  docker restart container_id/container_name 重启容器

  docker stop container_id/container_name 停止容器

  docker rm container_id/container_name 删除容器(只能删除已停止的容器)

 

2基于Docken安装RabbitMq

  docker启动:systemctl start docker
  docker重启:ystemctl restart docker
  docker关闭:systemctl stop docker

查看正在运行容器:docker ps 

查询Rabbitmq镜像: docker search rabbitmq
安装Rabbitmq镜像:
指定版本:docker pull rabbitmq:3.7.7-management
最新版本:docker pull rabbitmq
创建和启动容器:docker run -d --hostname myrabbitmq --name rabbitmq -p 5672:5672 -p 15673:15672 rabbitmq
-d 后台运行容器;
--hostname  主机名;
--name 指定容器名;
-p 指定服务运行的端口
5672 控制台Web端口号(服务端)
15672 应用访问端口(客户端)
-v 映射目录或文件
-e 指定环境变量(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
进入容器内部:docker exec -it 容器id /bin/bash
运行:rabbitmq-plugins enable rabbitmq_management
重启rabbitmq:docker start rabbitmq
重启容器:docker restart rabbitmq
停止容器:docker stop rabbitmq

访问:http://ip:15672/
账号密码:guest/guest

其它命令:

  列出所有用户:rabbitmqctl list_users
  添加用户:rabbitmqctl add_user username password   如:新增一个用户:rabbitmqctl add_user 名称 密码
  删除用户:rabbitmqctl delete_user username
  修改密码:rabbitmqctl change_password username newpassword
  列出用户权限:rabbitmqctl list_user_permissions username
  列出虚拟主机上的所有权限:rabbitmqctl list_permissions -p vhostpath
  设置用户权限:rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*”    如:设置用户权限:rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP

 

3.添加用户和设置权限

01_在NET中使用RabbitMQ

 

 

01_在NET中使用RabbitMQ

01_在NET中使用RabbitMQ

 

 

4.NET中使用RabbitMQ

RabbitMq有7种模式:RabbitMQ Tutorials | RabbitMQ

安装包:RabbitMQ.Client

 

定义队列和交换机名称

    /// <summary>     /// 定义队列和交换机名称     /// </summary>     public class RabbitConstant     {         public const string QUEUE_HELLO_WORLD = "helloworld.queue";         public const string QUEUE_SMS = "sms.queue";         public const string EXCHANGE_WEATHER = "weather.exchange";         public const string QUEUE_BAIDU = "baidu.queue";         public const string QUEUE_SINA = "sina.queue";         public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange";         public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange";     }

 

 

第一种模式:Hello World

消费者:

using RabbitMQ.Client; using RabbitMQ.Client.Events;    public class HelloConsumer     {         public static void HelloWorldShow()         {             var factory = new ConnectionFactory();             factory.HostName = "127.0.0.1";             factory.Port = 5672;//5672是RabbitMQ默认的端口号             factory.UserName = "admin";             factory.Password = "admin";             factory.VirtualHost = "my_vhost";              using (var connection = factory.CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     /*                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列                      * 第一个参数:队列名称ID                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列                      * 其他额外参数为null                      */                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);                     Console.ForegroundColor = ConsoleColor.Cyan;                     //事件消费者类                     EventingBasicConsumer consumers = new EventingBasicConsumer(channel);                     // 触发事件                     consumers.Received += (model, ea) =>                     {                         var body = ea.Body.ToArray();                         var message = Encoding.UTF8.GetString(body);                          // false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息                         channel.BasicAck(ea.DeliveryTag, false);                         Console.WriteLine($"Consumer01接收消息:{message}");                     };                     /*                      * 从MQ服务器中获取数据                      * 创建一个消息消费者                      * 第一个参数:队列名                      * 第二个参数:是否自动确认收到消息,false代表手动确认消息,这是MQ推荐的做法                      * 第三个参数:要传入的IBasicConsumer接口                      */                     channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

生产者:

using RabbitMQ.Client;     public class HelloProducer     {         public static void HelloWorldShow()         {             var factory = new ConnectionFactory();             factory.HostName = "127.0.0.1";//IP             factory.Port = 5672;//端口             factory.UserName = "admin";//用户名             factory.Password = "admin";//密码             factory.VirtualHost = "my_vhost";//虚拟主机              // 获取TCP 长连接             using (var connection = factory.CreateConnection())             {                 // 创建通信“通道”,相当于TCP中的虚拟连接                 using (var channel = connection.CreateModel())                 {                     /*                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列                      * 第一个参数:队列名称ID                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列                      * 其他额外参数为null                      */                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);                     Console.ForegroundColor = ConsoleColor.Red;                     string message = "hello CodeMan 666";//要发送的数据                     var body = Encoding.UTF8.GetBytes(message);                      /*                      * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到                      * 第二个参数:路由key                      * 第三个参数:额外的设置属性                      * 第四个参数:最后一个参数是要传递的消息字节数组                      */                     channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body);                     Console.WriteLine($"producer消息:{message}已发送");                 }             }         }     }

 

-------------------------------------------------------------------------------漂亮的分割线--------------------------------------------------------------------------------------------------

获取ConnectionFactory 对象

/// <summary>     /// RabbitMQ连接类     /// </summary>     public class RabbitUtils     {         /// <summary>         /// 获取ConnectionFactory对象         /// </summary>         /// <returns></returns>         public static ConnectionFactory GetConnection()         {             var factory = new ConnectionFactory();             factory.HostName = "127.0.0.1";//IP地址             factory.Port = 5672;//5672是RabbitMQ默认的端口号             factory.UserName = "admin";//用户名             factory.Password = "admin";//密码             factory.VirtualHost = "my_vhost";//虚拟主机             return factory;         }     }

 

    /// <summary>     /// 发送消息内容类     /// </summary>     public class Sms     {         public string Name { get; set; }         public string Mobile { get; set; }         public string Content { get; set; }          public Sms()         {          }          public Sms(string name, string mobile, string content)         {             Name = name;             Mobile = mobile;             Content = content;         }     }

 

 

第二种模式:Work Queues

消费者1

public class SmsReceive     {         public static void Sender()         {             var connection = RabbitUtils.GetConnection().CreateConnection();              var channel = connection.CreateModel();             /*              * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列              * 第一个参数:队列名称ID              * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失              * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用              * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列              * 其他额外参数为null              */             channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);             // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者             // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的             channel.BasicQos(0, 1, false);              var consumer = new EventingBasicConsumer(channel);              consumer.Received += (model, ea) =>             {                 var body = ea.Body.ToArray();                 var message = Encoding.UTF8.GetString(body);                 Thread.Sleep(30);                 Console.WriteLine($"SmsSender-发送短信成功:{message}");                 channel.BasicAck(ea.DeliveryTag, false);             };              channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);             Console.WriteLine("Press [Enter] to exit");             Console.Read();         }     }

消费者2

public class SmsReceive     {         public static void Sender()         {             var connection = RabbitUtils.GetConnection().CreateConnection();             var channel = connection.CreateModel();              channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);             // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者             // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的             channel.BasicQos(0, 1, false);//处理完一个取一个              var consumer = new EventingBasicConsumer(channel);              consumer.Received += (model, ea) =>             {                 var body = ea.Body.ToArray();                 var message = Encoding.UTF8.GetString(body);                 Thread.Sleep(60);                 Console.WriteLine($"SmsSender-发送短信成功:{message}");                 channel.BasicAck(ea.DeliveryTag, false);             };              channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);             Console.WriteLine("Press [Enter] to exit");             Console.Read();         }     }

生产者

 public class SmsSender     {         public static void Sender()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     /*                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列                      * 第一个参数:队列名称ID                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列                      * 其他额外参数为null                      */                     channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);                     for (int i = 0; i < 100; i++)                     {                         Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功");                         string jsonSms = JsonConvert.SerializeObject(sms);                         var body = Encoding.UTF8.GetBytes(jsonSms);                         /*                          * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到                          * 第二个参数:路由key                          * 第三个参数:额外的设置属性                          * 第四个参数:最后一个参数是要传递的消息字节数组                          */                         channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body);                         Console.WriteLine($"正在发送内容:{jsonSms}");                     }                     Console.WriteLine("发送数据成功");                 }             }         }     }

 

第三种模式:Publish/Subscribe

消费者1

public class WeatherFanout     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     //交换机                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);                     // 声明队列信息                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);                     /*                      * queueBind 用于将队列与交换机绑定                      * 参数1:队列名                      * 参数2:交换机名                      * 参数3:路由Key(暂时用不到)                      */                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"百度收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

消费者2

public class WeatherFanout     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);                     // 声明队列信息                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);                     /*                      * queueBind 用于将队列与交换机绑定                      * 参数1:队列名                      * 参数2:交换机名                      * 参数3:路由Key(暂时用不到)                      */                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"百度收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

生产者

public class WeatherFanout     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     string message = "20度";                     var body = Encoding.UTF8.GetBytes(message);                     /*                      * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到                      * 第二个参数:路由key                      * 第三个参数:额外的设置属性                      * 第四个参数:最后一个参数是要传递的消息字节数组                      */                     channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);                     Console.WriteLine("天气信息发送成功!");                 }             }         }     }

 

第三种模式:Routing

消费者1

public class WeatherDirect     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     //交换机                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);                     //队列                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);                     /*                     * queueBind 用于将队列与交换机绑定                     * 参数1:队列名                     * 参数2:交换机名                     * 参数3:路由Key(暂时用不到)                     */                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"百度收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

消费者2

public class WeatherDirect     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     //交换机                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);                     // 声明队列信息                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);                     /*                      * queueBind 用于将队列与交换机绑定                      * 参数1:队列名                      * 参数2:交换机名                      * 参数3:路由Key                      */                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"新浪收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

生产者

public class WeatherDirect     {         public static void Weather()         {             Dictionary<string, string> area = new Dictionary<string, string>();             area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");             area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");             area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");             area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");              using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     foreach (var item in area)                     {                         /*                          * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到                          * 第二个参数:路由key                          * 第三个参数:额外的设置属性                          * 第四个参数:最后一个参数是要传递的消息字节数组                          */                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,                             null, Encoding.UTF8.GetBytes(item.Value));                     }                      Console.WriteLine("气象信息发送成功!");                 }             }         }     }

 

第五章模式:Topics

消费者1

public class WeatherTopic     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     //交换机                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);                     // 声明队列信息                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);                     /*                      * queueBind 用于将队列与交换机绑定                      * 参数1:队列名                      * 参数2:交换机名                      * 参数3:路由Key(暂时用不到)                      */                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");//有关china的所有信息                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"百度收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

消费者2

public class WeatherTopic     {         public static void Weather()         {             using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     //交换机                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);                     // 声明队列信息                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);                     /*                      * queueBind 用于将队列与交换机绑定                      * 参数1:队列名                      * 参数2:交换机名                      * 参数3:路由Key(暂时用不到)                      */                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");//有关china.hubei.的信息                      channel.BasicQos(0, 1, false);                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += ((model, ea) =>                     {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine($"新浪收到的气象信息:{message}");                         channel.BasicAck(ea.DeliveryTag, false);                     });                      channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);                     Console.WriteLine("Press [Enter] to exit");                     Console.Read();                 }             }         }     }

生产者

 public class WeatherTopic     {         public static void Weather()         {             Dictionary<string, string> area = new Dictionary<string, string>();             area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");             area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");             area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");             area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");              using (var connection = RabbitUtils.GetConnection().CreateConnection())             {                 using (var channel = connection.CreateModel())                 {                     foreach (var item in area)                     {                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,                             null, Encoding.UTF8.GetBytes(item.Value));                     }                      Console.WriteLine("气象信息发送成功!");                 }             }         }     }

 

延时队列和死信队列

RabbitMQ 支持延时队列的实现,通常可以通过两种方式来实现延时队列:
1.利用 RabbitMQ 的插件(如 rabbitmq_delayed_message_exchange 插件)来实现延时队列,该插件可以让消息在特定的延时后再被投递到目标队列中。
2.利用消息的 TTL(Time-To-Live)属性和死信队列(Dead Letter Exchange)来实现延时队列。通过设置消息的 TTL 属性,让消息在一定时间后变成死信,然后通过死信队列将这些消息重新投递到目标队列中。

 

生产者

其实只要设置void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);

 var channel = _connection.GetConnection().CreateModel();//获取RabbitMq的链接IModel             var prop = channel.CreateBasicProperties();             var props = new Dictionary<string, object>()                 {                     {"x-delay", 1000 * 20}                 };             var delay = props["x-delay"];             prop.Expiration = delay.ToString();             channel.BasicPublish(exchange, routingKey, false, prop, Encoding.UTF8.GetBytes(content));

如果20秒内没有被消费则会变成死信消息;

 

死信队列是用于存放无法被消费者消费的消息。当消息变成死信时,会被重新发送到死信队列中,以便进行进一步处理或分析。
在 RabbitMQ 中,可以通过设置队列的 x-dead-letter-exchangex-dead-letter-routing-key 参数来指定死信队列。当消息变成死信时,会根据这些参数将消息发送到指定的死信队列中。

场景:

1.消息过期:当消息的 TTL(Time To Live)过期时,消息会变成死信并发送到死信队列。
2.消息被拒绝:当消息被消费者拒绝时,可以选择将消息发送到死信队列。
3.消息队列长度限制:当队列达到最大长度时,新消息会被发送到死信队列。

在RabbitMQ中,BasicAck和BasicNack是两种用于消息确认的方法。

BasicAck用于确认已经成功处理的消息,告诉RabbitMQ可以将该消息从队列中删除。BasicAck的语法如下:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

BasicNack用于拒绝消息,告诉RabbitMQ重新将该消息放回队列中。BasicNack的语法如下:

channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);