定义 subpub 组件
我们使用 Dapr 初始化时安装的 redis 作为 pubsub 的实现
创建文件 ~/.dapr/components/pubsub.yaml
(Windows 用户为 %USERPROFILE%\.dapr\components\pubsub.yaml
),内容如下
Dapr 初始化后
~/.dapr/components
文件夹会自动创建,里面有一个statestore.yaml
的组件定义。如果没有该文件夹也不用担心,手动创建即可
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
创建项目
创建 ASP.NET Core WebAPI 项目
$ dotnet new webapi --no-openapi --no-https
安装 Dapr SDK
dotnet CLI
$ dotnet add package Dapr.AspNetCore
程序包管理器控制台
Install-Package Dapr.AspNetCore
也可以在 Visual Studio 的 Nuget 包管理器中搜索安装
添加 Dapr 支持到 ASP.NET Core 框架
Program.cs
- builder.Services.AddControllers(); + builder.Services.AddControllers().AddDapr(); app.MapControllers(); + app.MapSubscribeHandler();
添加消息处理程序
PubSubController.cs
using Dapr; using Microsoft.AspNetCore.Mvc; namespace pubsub { public class PubSubController : ControllerBase { private readonly ILogger<PubSubController> _logger; public PubSubController(ILogger<PubSubController> logger) { _logger = logger; } [Topic("pubsub", "Test")] [HttpPost("test")] public async Task<IActionResult> SubscribeTest() { var sr = new StreamReader(HttpContext.Request.Body); var body = await sr.ReadToEndAsync(); _logger.LogInformation("received cloud event from Test: {message}", body); return Ok(); } } }
使用 Dapr.AspNetCore 提供的
TopicAttribute
可以方便的订阅主题并关联到 API 接口,必须在管道中配置app.MapSubscribeHandler()
, 否则即使标注了Topic
特性程序也不会订阅订阅主题的接口必须返回 HTTP 200 OK 状态码,否则 dapr 可能会视为消息推送不成功而进行重新推送,dapr 的消息推送规则为 AtLeastOnce,即至少一次
测试订阅
使用 dapr 启动程序
$ dapr run --app-id pubsubtest --app-port 5265 -- dotnet run
--app-port
为应用程序的 http 端口,对于 ASP.NET Core 开发环境来说,一般在Properties/launchSettings.json
中可以找到--app-id
为应用程序 id,同一组应用程序类似在发布订阅中类似消息队列中的消费者组,如果有多个程序的 app-id 相同,则只有一个程序实例能够收到消息使用 dapr 模拟发布
$ dapr publish -i pubsubtest -p pubsub -t Test -d '{"data":"this is a test message"}' Event published successfully
观察第 1 步中程序的输出
== APP == info: pubsub.PubSubController[0] == APP == received cloud event from Test: {"data":{"data":"this is a test message"},"datacontenttype":"application/json","id":"5d485363-1658-4771-8c25-a84615359dde","pubsubname":"pubsub","source":"pubsubtest","specversion":"1.0","time":"2022-12-15T01:17:10+08:00","topic":"Test","traceid":"00-363f742582302ed141396ce408dbacc0-4b6f5751d4039d75-01","traceparent":"00-363f742582302ed141396ce408dbacc0-4b6f5751d4039d75-01","tracestate":"","type":"com.dapr.event.sent"}
可以发现收到的数据是一个 JSON 格式数据, 其中
data
字段正是我们实际发送的内容{ "data": { "data": "this is a test message" }, "datacontenttype": "application/json", "id": "5d485363-1658-4771-8c25-a84615359dde", "pubsubname": "pubsub", "source": "pubsubtest", "specversion": "1.0", "time": "2022-12-15T01:17:10+08:00", "topic": "Test", "traceid": "00-363f742582302ed141396ce408dbacc0-4b6f5751d4039d75-01", "traceparent": "00-363f742582302ed141396ce408dbacc0-4b6f5751d4039d75-01", "tracestate": "", "type": "com.dapr.event.sent" }
这种数据格式在 dapr 中称为 CloudEvent,它记录了发送的内容,类型,主题和其他元信息,大部分情况下我们只需要
data
字段的内容,可以通过在 ASP.NET Core 管道中添加如下代码来对 CloudEvent 消息解包:+ app.UseCloudEvents(); app.MapControllers(); app.MapSubscribeHandler();
修改代码后重新启动程序并发布测试消息,可以观察到以下输出
== APP == info: pubsub.PubSubController[0] == APP == received cloud event from Test: {"data":"this is a test message"}
实际情况中不需要自己从
HttpContext.Request.Body
中读取数据,可以通过 ASP.NET Core 的模型绑定来做
测试发布
修改现有代码
private readonly ILogger<PubSubController> _logger; + private readonly DaprClient _daprClient; - public PubSubController(ILogger<PubSubController> logger) + public PubSubController(ILogger<PubSubController> logger, DaprClient daprClient) { _logger = logger; + _daprClient = daprClient; } + [HttpPost("pub")] + public async Task<IActionResult> PublishTest() + { + await _daprClient.PublishEventAsync("pubsub", "Test", new { data = "this is a test publish message" }); + return Ok(); + }
调用发布消息的 API 接口
$ curl -XPOST http://localhost:5265/pub
观察程序输出
== APP == info: pubsub.PubSubController[0] == APP == received cloud event from Test: {"data":"this is a test publish message"}
调用发布 API 后,程序发布了消息到 pubsub 组件的 Test 组件,而我们的程序又订阅了这个主题,所以会打印出我们发布出去的消息