- A+
什么是发布-订阅
发布订阅是一种众所周知并被广泛使用的消息传送模式,常用在微服务架构的服务间通信,高并发削峰等情况。但是不同的消息中间件之间存在细微的差异,项目使用不同的产品需要实现不同的实现类,虽然是明智的决策,但必须编写和维护抽象及其基础实现。 此方法需要复杂、重复且容易出错的自定义代码。
Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在 Dapr 构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可。
工作原理
Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。
服务将消息发布到指定主题, 业务服务订阅主题以使用消息。
服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。
任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。 若要发布消息,请进行以下 API 调用:
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
上述调用中有几个特定于 Dapr 的 URL 段:
<dapr-port>
提供 Dapr sidecar 侦听的端口号。<pub-sub-name>
提供所选 Dapr pub/sub 组件的名称。<topic>
提供消息发布到的主题的名称。
设置发布订阅组件
Dapr 为 Pub/Sub 提供很多支持的组件,例如 Redis 和 Kafka 等。支持组件详见 链接
在win10上的自承载的Dapr中,默认在 %UserProfile%.daprcomponentspubsub.yaml 中使用redis作为了pub/sub组件,dapr run一个app时,使用默认组件作为pub/sub组件
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: ""
订阅主题
Dapr 允许两种方法订阅主题:
- 声明式,其中定义在外部文件中。
- 编程方式,订阅在用户代码中定义
1.声明式订阅
在默认组件目录 %UserProfile%.daprcomponentspubsub.yaml 中新建subscription.yaml文件,并写入以下内容
apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: myevent-subscription spec: topic: test_topic route: /TestPubSub pubsubname: pubsub scopes: - frontend
上面的示例显示了 test_topic
主题的事件订阅,使用组件 pubsub
。
route
告诉 Dapr 将所有主题消息发送到应用程序中的/TestPubSub
端点。scopes
为 FrontEnd启用订阅
现在需要在FrontEnd项目中定义接口TestSub,在FrontEnd中新建TestPubSubController
using Dapr.Client; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System.IO; using System.Text; using System.Threading.Tasks; namespace FrontEnd.Controllers { [Route("[controller]")] [ApiController] public class TestPubSubController : ControllerBase { private readonly ILogger<TestPubSubController> _logger; private readonly DaprClient _daprClient; public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient) { _logger = logger; _daprClient = daprClient; } [HttpPost] public ActionResult Post() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); return Ok(content); } [HttpGet("pub")] public async Task<ActionResult> PubAsync() { var data = new WeatherForecast(); await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data); return Ok(); } } }
需要在Startup的Configure中开启重复读取Body才能读取到数据
app.Use((context, next) => { context.Request.EnableBuffering(); return next(); });
启动FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .FrontEndbinDebugnet5.0FrontEnd.dll
调用 pub API发布消息
查看订阅情况,订阅消息消费成功
2.编程式订阅
为了防止声明式订阅的影响,需要先把目录<%UserProfile%.daprcomponentspubsub.yaml>中subscription.yaml文件删除
TestPubSubController新增Api Sub
[Topic("pubsub", "test_topic")] [HttpPost("sub")] public async Task<ActionResult> Sub() { Stream stream = Request.Body; byte[] buffer = new byte[Request.ContentLength.Value]; stream.Position = 0L; stream.ReadAsync(buffer, 0, buffer.Length); string content = Encoding.UTF8.GetString(buffer); _logger.LogInformation("testsub" + content); return Ok(content); }
在Startup的Configure方法中新增中间件
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { // ... app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); // ... }); }
启动FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .FrontEndbinDebugnet5.0FrontEnd.dll
调用API发布消息
查看订阅情况,订阅消息消费成功
通过DapreCLI同样可以发布消息
dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'
查看订阅情况,订阅消息消费成功