45 lines
1.3 KiB
Go
45 lines
1.3 KiB
Go
package mixrepoimpl
|
||
|
||
import (
|
||
"context"
|
||
"github.com/go-kratos/kratos/v2/log"
|
||
"voucher/internal/biz/mixrepos"
|
||
"voucher/internal/data"
|
||
"voucher/internal/pkg/mq"
|
||
)
|
||
|
||
type MQSendMixRepoImpl struct {
|
||
mq *data.RocketMQ
|
||
}
|
||
|
||
func NewMQSendMixRepoImpl(mq *data.RocketMQ) mixrepos.MQSendMixRepo {
|
||
return &MQSendMixRepoImpl{
|
||
mq: mq,
|
||
}
|
||
}
|
||
|
||
func (s *MQSendMixRepoImpl) SendSync(ctx context.Context, topicName string, body []byte, sendOptions ...mq.SendOption) error {
|
||
return s.mq.MqProducer.SendSync(ctx, topicName, body, sendOptions...)
|
||
}
|
||
|
||
func (s *MQSendMixRepoImpl) SendASync(ctx context.Context, topicName string, body []byte, errFn func(error), sendOptions ...mq.SendOption) error {
|
||
err := s.mq.MqProducer.SendAsync(ctx, topicName, body, func(err error) {
|
||
if err == nil {
|
||
return
|
||
}
|
||
log.Errorf("异步发送消息出错:topName=%s,body=%s,err=%s", topicName, body, err.Error())
|
||
errFn(err)
|
||
}, sendOptions...)
|
||
return err
|
||
}
|
||
|
||
func (s *MQSendMixRepoImpl) SendAsyncNotFn(ctx context.Context, topic string, body []byte, sendOptions ...mq.SendOption) error {
|
||
err := s.mq.MqProducer.SendAsync(ctx, topic, body, func(err error) {
|
||
if err == nil {
|
||
return
|
||
}
|
||
log.Errorf("异步发送消息出错:topName=%s,body=%s,err=%s", topic, body, err.Error())
|
||
}, sendOptions...)
|
||
return err
|
||
}
|