orderNotice定时任务

This commit is contained in:
李子铭 2025-03-19 11:19:56 +08:00
parent 4f9caac1b0
commit 457381a85c
4 changed files with 51 additions and 20 deletions

View File

@ -14,7 +14,7 @@ import (
func (v *Cmb) Notify(ctx context.Context, order *bo.OrderBo) (*bo.OrderNotifyBo, error) {
if order.Status.IsCanNotify() {
if !order.Status.IsCanNotify() {
return nil, fmt.Errorf("订单状态不允许通知,orderNo:%s,orderStatusText:%s", order.OrderNo, order.Status.GetText())
}

View File

@ -39,6 +39,35 @@ func Test_WechatNotifyProducer2(t *testing.T) {
tag := "voucher_notify_dev"
bodyStr := `{"id":"5465699d-de6a-5414-a8df-283167b577ca",
"create_time":"2025-03-07T15:57:24+08:00",
"resource_type":"encrypt-resource",
"event_type":"COUPON.USE",
"summary":"代金券核销通知",
"original_type":"coupon",
"associated_data":"coupon",
"plain_text":{
"stock_creator_mchid":"1652465541",
"stock_id":"20259610",
"coupon_id":"97225743207",
"coupon_name":"test",
"description":"","status":"USED",
"create_time":"2025-03-07T15:49:31+08:00",
"coupon_type":"NORMAL",
"no_cash":false,
"singleitem":false,
"consume_information":{"consume_time":"2025-03-07T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}`
if err := wechatNotifyProducer(tag, bodyStr); err != nil {
t.Errorf("入队失败 error = %v", err)
return
}
}
func Test_WechatNotifyProducer3(t *testing.T) {
tag := "voucher_notify_dev"
bodyStr := `{"id":"8804b1d0-74e8-5975-815d-e567a761a213","create_time":"2025-03-18T09:20:50+08:00","resource_type":"encrypt-resource","event_type":"COUPON.USE","summary":"代金券核销通知","original_type":"coupon","associated_data":"coupon","plain_text":{"stock_creator_mchid":"1652465541","stock_id":"20255099","coupon_id":"97046583699","coupon_name":"招行测试滚动有效期","description":"","status":"USED","create_time":"2025-03-17T19:38:52+08:00","coupon_type":"NORMAL","no_cash":false,"singleitem":false,"consume_information":{"consume_time":"2025-03-18T09:20:50+08:00","consume_mchid":"1532524971","transaction_id":"4200002702202503180978933760"}}}`
if err := wechatNotifyProducer(tag, bodyStr); err != nil {

View File

@ -90,19 +90,22 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
if len(handles) > 0 {
go func(hs []string) {
ackerr := mqConsumer.AckMessage(hs)
if ackerr != nil {
log.Errorf("消息确认失败: %+v", ackerr)
// 记录失败句柄,后续处理
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("失败句柄: %s, 错误码: %s, 错误信息: %s", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
}
} else {
log.Warnf("成功确认消息: %d条", len(hs))
}
} else {
log.Errorf("ack err:%+v\n", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}(handles)
}
endChan <- 1
@ -121,6 +124,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
log.Errorf("消息处理超时,需要续期可见性")
endChan <- 1
}
}
@ -129,7 +133,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
// 长轮询消费消息网络超时时间默认为35s。
// 长轮询表示如果Topic没有消息则客户端请求会在服务端挂起3s3s内如果有消息可以消费则立即返回响应。
mqConsumer.ConsumeMessage(respChan, errChan,
1, // 一次最多消费3条最多可设置为16条
3, // 一次最多消费3条最多可设置为16条
10, // 长轮询时间3s最多可设置为30s
)
<-endChan
@ -137,7 +141,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
}
// Stop 停止消息消费
func (w *WechatNotifyConsumer) Stop(ctx context.Context) error {
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
fmt.Println("关闭 wechat consumer 中...")
return nil
}

View File

@ -27,15 +27,13 @@ func (s *VoucherService) CronNotice(ctx context.Context) error {
return s.cron.AddFunc(c.Command, func() {
if err := s.Notice(ctx); err != nil {
log.Errorf("orderNotice定时任务执行发生错误: %v", err)
}
s.Notice(ctx)
})
}
func (s *VoucherService) Notice(ctx context.Context) error {
func (s *VoucherService) Notice(ctx context.Context) {
start := time.Now()
if err := s.VoucherBiz.Notice(ctx); err != nil {
@ -46,5 +44,5 @@ func (s *VoucherService) Notice(ctx context.Context) error {
elapsed := end.Sub(start)
log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed)
return nil
return
}