.NET中使用RabbitMQ延时队列和死信队列

  • .NET中使用RabbitMQ延时队列和死信队列已关闭评论
  • 116 次浏览
  • A+
所属分类:.NET技术
摘要

延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。


RabbitMQ延时队列和死信队列

延时队列和死信队列

延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。

延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果消息在过期时间内没有被消费者消费,则会被自动发送到一个预先指定的死信队列中。

在RabbitMQ中,延时队列的实现可以通过以下步骤来完成:

  1. 创建一个普通的队列作为延时队列,设置x-message-ttl参数为消息的过期时间。
  2. 创建一个死信队列,用于接收延时队列中过期的消息。
  3. 将延时队列设置为普通队列的死信交换机,并指定死信路由键。
  4. 将消费者绑定到死信队列,以消费延时队列中过期的消息。

使用场景

  1. 订单在十分钟内未支付则自动取消。
  2. 新创建的店铺,如果十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 还有很多场景就不一一例举了。

TTL设置

方式一:

创建队列时设置x-message-ttl的属性,所有被投递到该队列的消息最多都不会超过60s

var args = new Dictionary<string,object>(); args.Add("x-message-ttl",60000); //单位为毫秒 model.QueueDeclare("myqueue",false,false,false,args); 

方式二:

为每条消息设置TTL,为每条消息设置过期时间。

IBasicProperties props = model.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2; props.Expiration = "60000" model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes); 

代码实践

模拟支付业务

整个项目由三部分组成

  • Web API项目:用于发送订单请求,生产者。
  • 控制台项目一:用于处理订单支付,延时队列。
  • 控制台项目二:用于处理超时未支付的订单,死信队列。

Web API项目

订单类,就简单的写一个用于演示,真实业务肯定不是这样~

public class OrderDto {     /// <summary>     /// 订单名称     /// </summary>     public string Name { get; set; }     /// <summary>     /// 订单状态     /// 0 未支付     /// 1 已支付     /// 2 超时     /// </summary>     public int Status { get; set; } } 

控制器

[ApiController] [Route("api/orders")] public class OrdersController : ControllerBase {     private readonly IOrderService _orderService;      public OrdersController(IOrderService orderService)     {         _orderService = orderService;     }      [HttpPost]     public IActionResult CreateOrder([FromBody] OrderDto orderDto)     {         // 处理订单逻辑         _orderService.ProcessOrder(orderDto);         return Ok();     } } 

订单服务

public interface IOrderService {     void ProcessOrder(OrderDto orderDto); }  public class OrderService : IOrderService {     private readonly RabbitMQConnectionFactory _connectionFactory;          public OrderService(RabbitMQConnectionFactory connectionFactory)     {         _connectionFactory = connectionFactory;     }      public void ProcessOrder(OrderDto orderDto)     {         using (var channel = _connectionFactory.CreateChannel())         {             var properties = channel.CreateBasicProperties();             properties.Headers = new Dictionary<string, object>             {                 { "x-delay", 1000 * 20  } // 设置20秒延时             };              var message = JsonConvert.SerializeObject(orderDto);             var body = Encoding.UTF8.GetBytes(message);              channel.BasicPublish("delayed_exchange", "routing_key", properties, body);         }     } } 

支付处理项目

ProcessPay类,用于接收订单消息

public class ProcessPay : IHostedService {     private readonly ConnectionFactory _factory;     private IConnection _connection;     private IModel _channel;      public ProcessPay()     {         _factory = new ConnectionFactory()         {             HostName = "ip",             Port = 5672,             UserName = "用户名",             Password = "密码"         };     }      public Task StartAsync(CancellationToken cancellationToken)     {         Console.WriteLine(" Press [enter] to exit.");         _connection = _factory.CreateConnection();         _channel = _connection.CreateModel();          _channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);         //关键代码,绑定死信交换机         var arguments = new Dictionary<string, object>         {             { "x-dead-letter-exchange", "dead_letter_exchange" },             { "x-dead-letter-routing-key", "dead_letter_routing_key" }         };         _channel.QueueDeclare("delayed_queue", true, false, false, arguments);         _channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");          var consumer = new EventingBasicConsumer(_channel);         consumer.Received += (model, ea) =>         {             var body = ea.Body.ToArray();             var message = Encoding.UTF8.GetString(body);              // 处理支付逻辑             var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);             Console.WriteLine($"订单信息:{orderDto.Name}");             Console.WriteLine("请输入价格(模拟支付):");              // 超时未支付             string? many = "";             // 支付处理             Console.WriteLine("请输入:");             // 超时未支付进行处理             Task.Factory.StartNew(() =>             {                 many = Console.ReadLine();                 Console.WriteLine($"many:{many}");             }).Wait(20 * 1000);             if (string.Equals(many, "100"))             {                 orderDto.Status = 1;                 Console.WriteLine("支付完成");                 _channel.BasicAck(ea.DeliveryTag, true);             }             else             {                 //重试几次依然失败                 Console.WriteLine("等待一定时间内失效超时未支付的订单");                 _channel.BasicNack(ea.DeliveryTag, false, false);             }         };          _channel.BasicConsume("delayed_queue", false, consumer);          return Task.CompletedTask;     }      public Task StopAsync(CancellationToken cancellationToken)     {         _channel?.Close();         _connection?.Close();         _channel?.Dispose();         _connection?.Dispose();          return Task.CompletedTask;     } } 

在Main方法中使用单例模式注册该服务,当然直接将代码写在Main方法也是没有问题的,只不过这种方式方便管理。

static void Main(string[] args) {     var host = new HostBuilder()         .ConfigureServices((hostContext, services) =>                            {                                services.AddSingleton<IHostedService,ProcessPay>();                            })         .Build();      host.Run(); } 

支付超时项目

创建一个死信队列服务,用于订阅死信队列中的订单消息,这里我就直接把代码写在Main方法中了

using (var connection = factory.CreateConnection()) {     using (var channel = connection.CreateModel())     {         channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);          channel.QueueDeclare("dead_letter_queue", true, false, false, null);          channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");                  var consumer = new EventingBasicConsumer(channel);         consumer.Received += (model, ea) =>         {             var body = ea.Body.ToArray();             var message = Encoding.UTF8.GetString(body);              // 处理超时未支付的订单逻辑             var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);             orderDto.Status = 2;             Console.WriteLine($"订单信息:{orderDto.Name},{orderDto.Status}");             Console.WriteLine("超时未支付");              channel.BasicAck(ea.DeliveryTag, true);         };          channel.BasicConsume("dead_letter_queue", false, consumer);                  Console.WriteLine(" Press [enter] to exit.");         Console.ReadLine();     } } 

效果展示

代码看不出效果,直接上图。

首先是3个项目各自运行效果图

.NET中使用RabbitMQ延时队列和死信队列

然后演示正常消费效果

.NET中使用RabbitMQ延时队列和死信队列

接下来是超时未支付效果

.NET中使用RabbitMQ延时队列和死信队列

结尾

这就是一个简单的延时队列和死信队列的代码,模拟了支付超时的场景,这里的数据都写死了的,真实运用的时候肯定是中数据库中获取,修改数据库实体的值。然后死信队列是用于处理在一定时间内未被处理的消息,死信交换机也只是一个普通的交换机,只不过他是用于处理超时的消息的交换机。

对于RabbitMQ的文章基本就结束了,可能还会有一篇RabbitMQ集群搭建的文章,但不是很想去写,最近太懒了~

有问题欢迎指出,活到老学到老~

RabbitMQ系列文章

参考资料