diff --git a/internal/biz/cmb/notify.go b/internal/biz/cmb/notify.go index 4bc0ebc..0c3c7a6 100644 --- a/internal/biz/cmb/notify.go +++ b/internal/biz/cmb/notify.go @@ -14,7 +14,7 @@ import ( func (v *Cmb) Notify(ctx context.Context, order *bo.OrderBo) (*bo.OrderNotifyBo, error) { - if order.Status.IsCanNotify() { + if !order.Status.IsCanNotify() { return nil, fmt.Errorf("订单状态不允许通知,orderNo:%s,orderStatusText:%s", order.OrderNo, order.Status.GetText()) } diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 9392a70..81a903c 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -39,6 +39,35 @@ func Test_WechatNotifyProducer2(t *testing.T) { tag := "voucher_notify_dev" + bodyStr := `{"id":"5465699d-de6a-5414-a8df-283167b577ca", +"create_time":"2025-03-07T15:57:24+08:00", +"resource_type":"encrypt-resource", +"event_type":"COUPON.USE", +"summary":"代金券核销通知", +"original_type":"coupon", +"associated_data":"coupon", +"plain_text":{ +"stock_creator_mchid":"1652465541", +"stock_id":"20259610", +"coupon_id":"97225743207", +"coupon_name":"test", +"description":"","status":"USED", +"create_time":"2025-03-07T15:49:31+08:00", +"coupon_type":"NORMAL", +"no_cash":false, +"singleitem":false, +"consume_information":{"consume_time":"2025-03-07T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}` + + if err := wechatNotifyProducer(tag, bodyStr); err != nil { + t.Errorf("入队失败 error = %v", err) + return + } +} + +func Test_WechatNotifyProducer3(t *testing.T) { + + tag := "voucher_notify_dev" + bodyStr := `{"id":"8804b1d0-74e8-5975-815d-e567a761a213","create_time":"2025-03-18T09:20:50+08:00","resource_type":"encrypt-resource","event_type":"COUPON.USE","summary":"代金券核销通知","original_type":"coupon","associated_data":"coupon","plain_text":{"stock_creator_mchid":"1652465541","stock_id":"20255099","coupon_id":"97046583699","coupon_name":"招行测试滚动有效期","description":"","status":"USED","create_time":"2025-03-17T19:38:52+08:00","coupon_type":"NORMAL","no_cash":false,"singleitem":false,"consume_information":{"consume_time":"2025-03-18T09:20:50+08:00","consume_mchid":"1532524971","transaction_id":"4200002702202503180978933760"}}}` if err := wechatNotifyProducer(tag, bodyStr); err != nil { diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index b9d0bb0..26d63c7 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -90,19 +90,22 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 - ackerr := mqConsumer.AckMessage(handles) - if ackerr != nil { - // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 - if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok { - for _, errAckItem := range errAckItems { - log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) + + if len(handles) > 0 { + go func(hs []string) { + ackerr := mqConsumer.AckMessage(hs) + if ackerr != nil { + log.Errorf("消息确认失败: %+v", ackerr) + // 记录失败句柄,后续处理 + if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok { + for _, errAckItem := range errAckItems { + log.Errorf("失败句柄: %s, 错误码: %s, 错误信息: %s", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) + } + } + } else { + log.Warnf("成功确认消息: %d条", len(hs)) } - } else { - log.Errorf("ack err:%+v\n", ackerr) - } - time.Sleep(time.Duration(3) * time.Second) - } else { - fmt.Printf("Ack ---->\n\t%s\n", handles) + }(handles) } endChan <- 1 @@ -121,6 +124,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m case <-time.After(35 * time.Second): { fmt.Println("Timeout of consumer message ??") + log.Errorf("消息处理超时,需要续期可见性") endChan <- 1 } } @@ -129,7 +133,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m // 长轮询消费消息,网络超时时间默认为35s。 // 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回响应。 mqConsumer.ConsumeMessage(respChan, errChan, - 1, // 一次最多消费3条(最多可设置为16条)。 + 3, // 一次最多消费3条(最多可设置为16条)。 10, // 长轮询时间3s(最多可设置为30s)。 ) <-endChan @@ -137,7 +141,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m } // Stop 停止消息消费 -func (w *WechatNotifyConsumer) Stop(ctx context.Context) error { +func (w *WechatNotifyConsumer) Stop(_ context.Context) error { fmt.Println("关闭 wechat consumer 中...") return nil } diff --git a/internal/service/cron.go b/internal/service/cron.go index b139ddc..ba5d128 100644 --- a/internal/service/cron.go +++ b/internal/service/cron.go @@ -27,15 +27,13 @@ func (s *VoucherService) CronNotice(ctx context.Context) error { return s.cron.AddFunc(c.Command, func() { - if err := s.Notice(ctx); err != nil { - log.Errorf("orderNotice定时任务执行发生错误: %v", err) - } + s.Notice(ctx) }) } -func (s *VoucherService) Notice(ctx context.Context) error { +func (s *VoucherService) Notice(ctx context.Context) { start := time.Now() if err := s.VoucherBiz.Notice(ctx); err != nil { @@ -46,5 +44,5 @@ func (s *VoucherService) Notice(ctx context.Context) error { elapsed := end.Sub(start) log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed) - return nil + return }