package server import ( "context" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" "voucher/internal/conf" "voucher/internal/pkg/rdsmq" "voucher/internal/service" ) var _ transport.Server = (*RdbConsumer)(nil) type RdbConsumer struct { hLog *log.Helper conf *conf.Bootstrap manager *rdsmq.ConsumerManager voucherService *service.VoucherService } func NewRdbConsumer( hLog *log.Helper, conf *conf.Bootstrap, voucherService *service.VoucherService, ) *RdbConsumer { manager := rdsmq.NewConsumerManager() if cf := voucherService.GetWechatQueryConfig(); cf != nil { manager.Add(cf) } if cf1 := voucherService.GetWechatTimeSliceQueryConfig(); cf1 != nil { manager.Add(cf1) } //if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil { // manager.Add(cf2) //} if cf3 := voucherService.GetRetryNotifyConfig(); cf3 != nil { manager.Add(cf3) } if cf4 := voucherService.GetOrderNotifyRetryConfig(); cf4 != nil { manager.Add(cf4) } if cf5 := voucherService.GetUsedNotifyConfig(); cf5 != nil { manager.Add(cf5) } return &RdbConsumer{hLog: hLog, conf: conf, manager: manager} } func (c *RdbConsumer) Start(ctx context.Context) error { c.manager.Start(ctx) return nil } func (c *RdbConsumer) Stop(ctx context.Context) error { fmt.Println("关闭 RdbConsumer 中...") c.manager.Stop(ctx) fmt.Println("关闭 RdbConsumer 完成...") return nil }