- A+
所属分类:.NET技术
- MassTransit是一个面向.net的免费开源分布式应用程序框架。
- MassTransit使得创建应用程序和服务变得很容易,这些应用程序和服务利用基于消息的、松散耦合的异步通信来获得更高的可用性、可靠性和可伸缩性。
- MassTransit 8.x版本。
实现简单发布订阅
- 添加Nuget包引用:
- MassTransit
- MassTransit.RabbitMQ(演示也可基于内存)
生产端
- 配置MassTransit
builder.Services.AddMassTransit(x => { // 使用内存 //x.UsingInMemory(); // 使用RabbitMq x.UsingRabbitMq((context, config) => { config.Host("rabbitmq://localhost:5672", host => { host.Username("admin"); host.Password("admin"); }); }); });
- 定义消息体
public class OrderEto { public Guid Id { get; init; } public string Name { get; set; } public DateTime CreationTime { get; set; } }
- 发布消息
[ApiController] [Route("[controller]")] public class PublishController : ControllerBase { private readonly ILogger<PublishController> _logger; private readonly IPublishEndpoint _publishEndpoint; public PublishController(ILogger<PublishController> logger, IPublishEndpoint publishEndpoint) { _logger = logger; _publishEndpoint = publishEndpoint; } [HttpGet] public async Task Get() { await _publishEndpoint.Publish<OrderEto>(new OrderEto() { Id = Guid.NewGuid(), Name = "Phone", CreationTime = DateTime.Now }); } }
消费者端
builder.Services.AddMassTransit(x => { // 通过扫描程序集注册消费者 x.AddConsumers(typeof(Program).Assembly); // 通过类型单个注册消费者 // x.AddConsumer<OrderEtoConsumer>(typeof(OrderEtoConsumerDefinition)); // x.SetKebabCaseEndpointNameFormatter(); // 通过泛型单个注册消费者 //x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>(); // 通过指定命名空间注册消费者 // x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>(); // 使用内存队列 // x.UsingInMemory(); x.UsingRabbitMq((context, config) => { config.Host("rabbitmq://localhost:5672", hostconfig => { hostconfig.Username("admin"); hostconfig.Password("admin"); }); config.ConfigureEndpoints(context); }); });
- 消费者定义
public class OrderEtoConsumer : IConsumer<OrderEto> { private readonly ILogger<OrderEtoConsumer> _logger; public OrderEtoConsumer(ILogger<OrderEtoConsumer> logger) { _logger = logger; } public Task Consume(ConsumeContext<OrderEto> context) { _logger.LogInformation($"MassTransit.Consumer.One 收到消息:{JsonSerializer.Serialize(context.Message)}"); return Task.CompletedTask; } } public class OrderEtoConsumerDefinition : ConsumerDefinition<OrderEtoConsumer> { protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<OrderEtoConsumer> consumerConfigurator) { endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000)); } }