From 0fe1508d9ee16a4f2203718320ce8751f1886cd3 Mon Sep 17 00:00:00 2001 From: ziming Date: Fri, 1 Aug 2025 11:50:34 +0800 Subject: [PATCH] =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/server/rds_consume.go | 4 +++ internal/service/retry_notify.go | 45 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 internal/service/retry_notify.go diff --git a/internal/server/rds_consume.go b/internal/server/rds_consume.go index 0a34025..9a888f4 100644 --- a/internal/server/rds_consume.go +++ b/internal/server/rds_consume.go @@ -38,6 +38,10 @@ func NewRdbConsumer( manager.Add(cf2) } + if cf3 := voucherService.GetRetryNotifyConfig(); cf3 != nil { + manager.Add(cf3) + } + return &RdbConsumer{hLog: hLog, conf: conf, manager: manager} } diff --git a/internal/service/retry_notify.go b/internal/service/retry_notify.go new file mode 100644 index 0000000..f059ee2 --- /dev/null +++ b/internal/service/retry_notify.go @@ -0,0 +1,45 @@ +package service + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/pkg/rdsmq" +) + +func (s *VoucherService) GetRetryNotifyConfig() *rdsmq.ConsumeConfig { + + queue := s.bc.RdsMQ.GetWechatRetry() + if queue == nil { + return nil + } + + if !queue.GetIsOpen() { + log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name)) + return nil + } + + return &rdsmq.ConsumeConfig{ + Rdb: s.rdb.Rdb, + QueueName: queue.Name, + NumWorkers: queue.NumWorkers, + WaitTime: queue.GetWaitTime().AsDuration(), + RetryNum: queue.RetryNum, + Fn: s.HandleWechatRetry, + Logger: s.logHelper, + } +} + +func (s *VoucherService) HandleRetryNotify(ctx context.Context, msg string) error { + + if msg == "" { + s.logHelper.Errorf("RdsMQ keySend error: msg is empty") + return nil + } + + if err := s.VoucherBiz.RetryNotify(ctx, msg); err != nil { + s.logHelper.Error(err) + } + + return nil +}