138 lines
3.1 KiB
Go
138 lines
3.1 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/robfig/cron"
|
|
"strconv"
|
|
"time"
|
|
"voucher/internal/biz"
|
|
"voucher/internal/biz/bo"
|
|
"voucher/internal/biz/timeslicequery"
|
|
"voucher/internal/conf"
|
|
"voucher/internal/data"
|
|
"voucher/internal/pkg/mq"
|
|
)
|
|
|
|
type VoucherService struct {
|
|
bc *conf.Bootstrap
|
|
cron *cron.Cron
|
|
VoucherBiz *biz.VoucherBiz
|
|
rdb *data.Rdb
|
|
logHelper *log.Helper
|
|
|
|
timeSliceQuery *timeslicequery.Query
|
|
}
|
|
|
|
func NewVoucherService(
|
|
bc *conf.Bootstrap,
|
|
cron *cron.Cron,
|
|
VoucherBiz *biz.VoucherBiz,
|
|
rdb *data.Rdb,
|
|
logHelper *log.Helper,
|
|
timeSliceQuery *timeslicequery.Query,
|
|
) *VoucherService {
|
|
return &VoucherService{
|
|
bc: bc,
|
|
cron: cron,
|
|
VoucherBiz: VoucherBiz,
|
|
rdb: rdb,
|
|
logHelper: logHelper,
|
|
timeSliceQuery: timeSliceQuery,
|
|
}
|
|
}
|
|
|
|
func (s *VoucherService) CronOrderNotice(ctx context.Context) error {
|
|
|
|
c, ok := s.bc.Cron.CommandMap["orderNotice"]
|
|
|
|
if !ok {
|
|
log.Warn("orderNotice定时任务未找到")
|
|
return nil
|
|
}
|
|
|
|
if !c.IsOpen {
|
|
log.Warn("orderNotice定时任务未开启")
|
|
return nil
|
|
}
|
|
|
|
if len(c.Command) == 0 {
|
|
log.Error("orderNotice定时任务 command is empty")
|
|
return nil
|
|
}
|
|
|
|
return s.cron.AddFunc(c.Command, func() {
|
|
|
|
s.OrderNotice(ctx)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
func (s *VoucherService) OrderNotice(ctx context.Context) {
|
|
|
|
start := time.Now()
|
|
log.Errorf("订单定时通知,执行开始: %v", start.Format(time.DateTime))
|
|
|
|
if err := s.VoucherBiz.Notice(ctx); err != nil {
|
|
log.Errorf("订单定时通知,执行失败: %v", err)
|
|
}
|
|
|
|
end := time.Now()
|
|
elapsed := end.Sub(start)
|
|
log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed)
|
|
}
|
|
|
|
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
|
|
|
|
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if !elm.IsOpenConsumer {
|
|
log.Warnf("notify MQ is not open")
|
|
return nil
|
|
}
|
|
|
|
return &mq.ConsumerConfig{
|
|
TopicName: elm.Topic,
|
|
GroupName: elm.Group,
|
|
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
|
}
|
|
}
|
|
|
|
func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
|
|
|
shardingKey := msg.GetShardingKey()
|
|
if len(shardingKey) == 0 {
|
|
log.Error("notify retry 消费异常,获取 shardingKey 失败")
|
|
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
|
|
}
|
|
|
|
orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64)
|
|
if err != nil {
|
|
log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey)
|
|
return err
|
|
}
|
|
|
|
if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil {
|
|
log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (v *VoucherService) WechatUseNotifyConsumer(ctx context.Context, tag, msg string) error {
|
|
|
|
var req *bo.WechatVoucherNotifyBo
|
|
|
|
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
|
return err
|
|
}
|
|
|
|
return v.VoucherBiz.WechatNotifyConsumer(ctx, tag, req)
|
|
}
|