代码调整
This commit is contained in:
parent
41c1be4763
commit
86b8fb30d8
|
|
@ -1,48 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"strconv"
|
||||
"voucher/internal/pkg/mq"
|
||||
)
|
||||
|
||||
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
|
||||
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !elm.IsOpenConsumer {
|
||||
log.Warnf("notify MQ is not open")
|
||||
return nil
|
||||
}
|
||||
|
||||
return &mq.ConsumerConfig{
|
||||
TopicName: elm.Topic,
|
||||
GroupName: elm.Group,
|
||||
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
||||
|
||||
shardingKey := msg.GetShardingKey()
|
||||
if len(shardingKey) == 0 {
|
||||
log.Error("notify retry 消费异常,获取 shardingKey 失败")
|
||||
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
|
||||
}
|
||||
|
||||
orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64)
|
||||
if err != nil {
|
||||
log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil {
|
||||
log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,48 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (s *VoucherService) CronOrderNotice(ctx context.Context) error {
|
||||
|
||||
c, ok := s.bc.Cron.CommandMap["orderNotice"]
|
||||
|
||||
if !ok {
|
||||
log.Warn("orderNotice定时任务未找到")
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.IsOpen {
|
||||
log.Warn("orderNotice定时任务未开启")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(c.Command) == 0 {
|
||||
log.Error("orderNotice定时任务 command is empty")
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.cron.AddFunc(c.Command, func() {
|
||||
|
||||
s.OrderNotice(ctx)
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (s *VoucherService) OrderNotice(ctx context.Context) {
|
||||
start := time.Now()
|
||||
|
||||
if err := s.VoucherBiz.Notice(ctx); err != nil {
|
||||
log.Error("订单定时通知,执行失败,err: %v", err)
|
||||
}
|
||||
|
||||
end := time.Now()
|
||||
elapsed := end.Sub(start)
|
||||
log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed)
|
||||
|
||||
return
|
||||
}
|
||||
|
|
@ -1,9 +1,17 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"github.com/robfig/cron"
|
||||
"strconv"
|
||||
"time"
|
||||
"voucher/internal/biz"
|
||||
"voucher/internal/biz/bo"
|
||||
"voucher/internal/conf"
|
||||
"voucher/internal/pkg/mq"
|
||||
)
|
||||
|
||||
type VoucherService struct {
|
||||
|
|
@ -23,3 +31,94 @@ func NewVoucherService(
|
|||
VoucherBiz: VoucherBiz,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *VoucherService) CronOrderNotice(ctx context.Context) error {
|
||||
|
||||
c, ok := s.bc.Cron.CommandMap["orderNotice"]
|
||||
|
||||
if !ok {
|
||||
log.Warn("orderNotice定时任务未找到")
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.IsOpen {
|
||||
log.Warn("orderNotice定时任务未开启")
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(c.Command) == 0 {
|
||||
log.Error("orderNotice定时任务 command is empty")
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.cron.AddFunc(c.Command, func() {
|
||||
|
||||
s.OrderNotice(ctx)
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (s *VoucherService) OrderNotice(ctx context.Context) {
|
||||
start := time.Now()
|
||||
|
||||
if err := s.VoucherBiz.Notice(ctx); err != nil {
|
||||
log.Error("订单定时通知,执行失败,err: %v", err)
|
||||
}
|
||||
|
||||
end := time.Now()
|
||||
elapsed := end.Sub(start)
|
||||
log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
|
||||
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !elm.IsOpenConsumer {
|
||||
log.Warnf("notify MQ is not open")
|
||||
return nil
|
||||
}
|
||||
|
||||
return &mq.ConsumerConfig{
|
||||
TopicName: elm.Topic,
|
||||
GroupName: elm.Group,
|
||||
PerCoroutineCnt: int(elm.PerCoroutineCnt),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
|
||||
|
||||
shardingKey := msg.GetShardingKey()
|
||||
if len(shardingKey) == 0 {
|
||||
log.Error("notify retry 消费异常,获取 shardingKey 失败")
|
||||
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
|
||||
}
|
||||
|
||||
orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64)
|
||||
if err != nil {
|
||||
log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil {
|
||||
log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VoucherService) WechatUseNotifyConsumer(ctx context.Context, tag, msg string) error {
|
||||
|
||||
var req *bo.WechatVoucherNotifyBo
|
||||
|
||||
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return v.VoucherBiz.WechatNotifyConsumer(ctx, tag, req)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,18 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"voucher/internal/biz/bo"
|
||||
)
|
||||
|
||||
func (v *VoucherService) WechatUseNotifyConsumer(ctx context.Context, tag, msg string) error {
|
||||
|
||||
var req *bo.WechatVoucherNotifyBo
|
||||
|
||||
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return v.VoucherBiz.WechatNotifyConsumer(ctx, tag, req)
|
||||
}
|
||||
Loading…
Reference in New Issue