# 事件总线 统一的事件总线,低层使用RocketMQ,进一步封装:约束命名规范、权限验证、链路追踪等 ## 发送 [参考:](/event/producer_test.go) 1. 生成发送实例 ```go producer, err := event.NewProducer("192.168.6.107:9876") // 带有验证凭证的示例 producer, err := NewProducer("192.168.6.107:9876", producer.WithProducerCredentials("accessKey", "secretKey", "securityToken")) ``` 生成时可选的配置项有: * event.WithProducerCredentials 设置生产者的凭证 2. 在Data层中注入生成的实例 3. 在biz中声明event的interface 4. 在data层中实现biz声明的interface,同repository类似 5. 使用的方法: * SendSync 同步发送 * SendAsync 异步发送 * BatchSendSync 批量同步发送 * BatchSendAsync 批量异步发送 6. 默认支持链路追踪,如要关闭,请调用DisableTraceTelemetry方法 ## 消费 [参考:](/event/producer_test.go) ### 一、随着Kratos服务一起启动 1. 在server中新增consumer.go ```go // ConsumerServer 消费者Server type ConsumerServer struct { manager *event.ConsumerManager } // NewConsumerServer 工厂方法 func NewConsumerServer( //注入一些依赖的对象(包括biz,同http、grpc的Sever类似) ) *ConsumerServer { manager := event.NewConsumerManager() // 添加一些订阅方法,示例: _ = manager.Subscribe(ctx, connConf, consumerConf, func(message *ConsumerMessage) error { // mock 业务耗时 time.Sleep(10 * time.Second) // 返回nil才会commit,否则会重试 return nil }) return &ConsumerServer{manager:manager} } func (c *ConsumerServer) Start(ctx context.Context) error { return c.manager.Start() } func (c *ConsumerServer) Stop(ctx context.Context) error { return c.manager.Stop() } ``` 2. 添加进provider_set.go 3. 打开main.go ```go // 在newApp方法中注入ConsumerServer,并添加到 serverOption := kratos.Server( //在这里面添加ConsumerServer的实例 ) ``` ### 二、使用cli模式启动 参考上面的启动方式,顺序为:subscribe -> start -> stop