voucher/internal/service/consume.go

118 lines
2.8 KiB
Go

package service
import (
"context"
"errors"
"github.com/go-kratos/kratos/v2/log"
"strconv"
"strings"
"voucher/internal/pkg/mq"
)
func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["order"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("order MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey()
if orderNo == "" {
log.Error("order 消费异常,获取 orderNo 失败")
return errors.New("order 消费异常,获取 orderNo 失败")
}
if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil {
log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
}
return nil
}
func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notify"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("notify MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
shardingKey := msg.GetShardingKey()
if shardingKey == "" {
log.Error("notify 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
rep := strings.Split(shardingKey, "_")
if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil {
log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
}
return nil
}
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("notify MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
shardingKey := msg.GetShardingKey()
if shardingKey == "" {
log.Error("notify retry 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64)
if err != nil {
log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey)
return err
}
if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil {
log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
}
return nil
}