This commit is contained in:
parent
40b5563852
commit
0fe1508d9e
|
|
@ -38,6 +38,10 @@ func NewRdbConsumer(
|
|||
manager.Add(cf2)
|
||||
}
|
||||
|
||||
if cf3 := voucherService.GetRetryNotifyConfig(); cf3 != nil {
|
||||
manager.Add(cf3)
|
||||
}
|
||||
|
||||
return &RdbConsumer{hLog: hLog, conf: conf, manager: manager}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,45 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"voucher/internal/pkg/rdsmq"
|
||||
)
|
||||
|
||||
func (s *VoucherService) GetRetryNotifyConfig() *rdsmq.ConsumeConfig {
|
||||
|
||||
queue := s.bc.RdsMQ.GetWechatRetry()
|
||||
if queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !queue.GetIsOpen() {
|
||||
log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
return &rdsmq.ConsumeConfig{
|
||||
Rdb: s.rdb.Rdb,
|
||||
QueueName: queue.Name,
|
||||
NumWorkers: queue.NumWorkers,
|
||||
WaitTime: queue.GetWaitTime().AsDuration(),
|
||||
RetryNum: queue.RetryNum,
|
||||
Fn: s.HandleWechatRetry,
|
||||
Logger: s.logHelper,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *VoucherService) HandleRetryNotify(ctx context.Context, msg string) error {
|
||||
|
||||
if msg == "" {
|
||||
s.logHelper.Errorf("RdsMQ keySend error: msg is empty")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.VoucherBiz.RetryNotify(ctx, msg); err != nil {
|
||||
s.logHelper.Error(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue