111 lines
2.5 KiB
Go
111 lines
2.5 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"strings"
|
|
"voucher/internal/pkg/mq"
|
|
)
|
|
|
|
func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig {
|
|
elm, ok := j.bc.RocketMQ.EventMap["order"]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if !elm.IsOpenConsumer {
|
|
log.Warnf("order MQ is not open")
|
|
return nil
|
|
}
|
|
|
|
return &mq.ConsumerConfig{
|
|
TopicName: elm.Topic,
|
|
GroupName: elm.Group,
|
|
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
|
}
|
|
}
|
|
|
|
func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
|
|
|
orderNo := msg.GetShardingKey()
|
|
if orderNo == "" {
|
|
log.Error("order 消费异常,获取 orderNo 失败")
|
|
return errors.New("order 消费异常,获取 orderNo 失败")
|
|
}
|
|
|
|
if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil {
|
|
log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (j *VoucherService) GetQueryConfig() *mq.ConsumerConfig {
|
|
elm, ok := j.bc.RocketMQ.EventMap["query"]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if !elm.IsOpenConsumer {
|
|
log.Warnf("query MQ is not open")
|
|
return nil
|
|
}
|
|
|
|
return &mq.ConsumerConfig{
|
|
TopicName: elm.Topic,
|
|
GroupName: elm.Group,
|
|
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
|
}
|
|
}
|
|
|
|
func (j *VoucherService) QueryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
|
|
|
orderNo := msg.GetShardingKey()
|
|
if orderNo == "" {
|
|
log.Error("orderQuery 消费异常,获取 orderNo 失败")
|
|
return errors.New("orderQuery 消费异常,获取 orderNo 失败")
|
|
}
|
|
|
|
if err := j.VoucherBiz.QueryConsume(ctx, orderNo); err != nil {
|
|
log.Errorf("query 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig {
|
|
elm, ok := j.bc.RocketMQ.EventMap["notify"]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if !elm.IsOpenConsumer {
|
|
log.Warnf("notify MQ is not open")
|
|
return nil
|
|
}
|
|
|
|
return &mq.ConsumerConfig{
|
|
TopicName: elm.Topic,
|
|
GroupName: elm.Group,
|
|
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
|
}
|
|
}
|
|
|
|
func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
|
|
|
shardingKey := msg.GetShardingKey()
|
|
if shardingKey == "" {
|
|
log.Error("orderNotify 消费异常,获取 shardingKey 失败")
|
|
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
|
|
}
|
|
|
|
rep := strings.Split(shardingKey, "_")
|
|
|
|
if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil {
|
|
log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|