package server import ( "context" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" "voucher/internal/conf" "voucher/internal/pkg/mq" "voucher/internal/service" ) var _ transport.Server = (*Consumer)(nil) type Consumer struct { hLog *log.Helper conf *conf.Bootstrap manager *mq.ConsumerManager } func NewConsumer( hLog *log.Helper, conf *conf.Bootstrap, voucherService *service.VoucherService, ) *Consumer { manager := mq.NewConsumerManager(hLog) cf := &mq.ConsumerConnConfig{ NameServers: conf.RocketMQ.Addr, AccessKey: conf.RocketMQ.AccessKey, SecretKey: conf.RocketMQ.SecretKey, } if c := voucherService.GetOrderConfig(); c != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) } } if c := voucherService.GetNotifyConfig(); c != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) } } return &Consumer{manager: manager, hLog: hLog, conf: conf} } func (c *Consumer) Start(ctx context.Context) error { // 当返回 err 时,框架会统一调用 stop,不用担心部分启动成功没有 stop 掉 return c.manager.Start(ctx) } func (c *Consumer) Stop(ctx context.Context) error { fmt.Println("关闭 consumer 中...") return c.manager.Stop(ctx) }