package service import ( "context" "errors" "github.com/go-kratos/kratos/v2/log" "strconv" "strings" "voucher/internal/pkg/mq" ) func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["order"] if !ok { return nil } if !elm.IsOpenConsumer { log.Warnf("order MQ is not open") return nil } return &mq.ConsumerConfig{ TopicName: elm.Topic, GroupName: elm.Group, PerCoroutineCnt: int(elm.PerCoroutineCnt), } } func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { orderNo := msg.GetShardingKey() if orderNo == "" { log.Error("order 消费异常,获取 orderNo 失败") return errors.New("order 消费异常,获取 orderNo 失败") } if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil { log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) } return nil } func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["notify"] if !ok { return nil } if !elm.IsOpenConsumer { log.Warnf("notify MQ is not open") return nil } return &mq.ConsumerConfig{ TopicName: elm.Topic, GroupName: elm.Group, PerCoroutineCnt: int(elm.PerCoroutineCnt), } } func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { shardingKey := msg.GetShardingKey() if shardingKey == "" { log.Error("notify 消费异常,获取 shardingKey 失败") return errors.New("orderNotify 消费异常,获取 orderNo 失败") } rep := strings.Split(shardingKey, "_") if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil { log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error()) } return nil } func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"] if !ok { return nil } if !elm.IsOpenConsumer { log.Warnf("notify MQ is not open") return nil } return &mq.ConsumerConfig{ TopicName: elm.Topic, GroupName: elm.Group, PerCoroutineCnt: int(elm.PerCoroutineCnt), } } func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { shardingKey := msg.GetShardingKey() if shardingKey == "" { log.Error("notify retry 消费异常,获取 shardingKey 失败") return errors.New("orderNotify 消费异常,获取 orderNo 失败") } orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64) if err != nil { log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey) return err } if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil { log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error()) } return nil }