voucher/internal/biz/voucher.go

69 lines
1.7 KiB
Go

package biz
import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
"voucher/internal/biz/cmb"
"voucher/internal/biz/mixrepos"
"voucher/internal/biz/repo"
"voucher/internal/conf"
"voucher/internal/data"
"voucher/internal/pkg/mq"
)
type VoucherBiz struct {
bc *conf.Bootstrap
rdb *data.Rdb
Cmb *cmb.Cmb
OrderRepo repo.OrderRepo
MqSendMixRepo mixrepos.MQSendMixRepo
}
func NewVoucherBiz(
bc *conf.Bootstrap,
rdb *data.Rdb,
Cmb *cmb.Cmb,
OrderRepo repo.OrderRepo,
MqSendMixRepo mixrepos.MQSendMixRepo,
) *VoucherBiz {
return &VoucherBiz{
bc: bc,
rdb: rdb,
Cmb: Cmb,
OrderRepo: OrderRepo,
MqSendMixRepo: MqSendMixRepo,
}
}
func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error {
eventMap := v.bc.RocketMQ.EventMap["order"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("收单成功,消费队列投递失败[%v]", err)
}
return nil
}
func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error {
eventMap := v.bc.RocketMQ.EventMap["query"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(orderNo),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
mq.WithSendDelayLevelOption(300),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("查询入队投递失败[%v]", err)
}
return nil
}