package mixrepoimpl import ( "context" "github.com/go-kratos/kratos/v2/log" "voucher/internal/biz/mixrepos" "voucher/internal/data" "voucher/internal/pkg/mq" ) type MQSendMixRepoImpl struct { mq *data.RocketMQ } func NewMQSendMixRepoImpl(mq *data.RocketMQ) mixrepos.MQSendMixRepo { return &MQSendMixRepoImpl{ mq: mq, } } func (s *MQSendMixRepoImpl) SendSync(ctx context.Context, topicName string, body []byte, sendOptions ...mq.SendOption) error { return s.mq.MqProducer.SendSync(ctx, topicName, body, sendOptions...) } func (s *MQSendMixRepoImpl) SendASync(ctx context.Context, topicName string, body []byte, errFn func(error), sendOptions ...mq.SendOption) error { err := s.mq.MqProducer.SendAsync(ctx, topicName, body, func(err error) { if err == nil { return } log.Errorf("异步发送消息出错:topName=%s,body=%s,err=%s", topicName, body, err.Error()) errFn(err) }, sendOptions...) return err } func (s *MQSendMixRepoImpl) SendAsyncNotFn(ctx context.Context, topic string, body []byte, sendOptions ...mq.SendOption) error { err := s.mq.MqProducer.SendAsync(ctx, topic, body, func(err error) { if err == nil { return } log.Errorf("异步发送消息出错:topName=%s,body=%s,err=%s", topic, body, err.Error()) }, sendOptions...) return err }