transfer_middleware/cmd/rpc/internal/queue/mq/mqServer/server.go

63 lines
1.4 KiB
Go
Raw Normal View History

2024-06-12 13:46:14 +08:00
package mqServer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"sync"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/queue/mq"
"trasfer_middleware/until/sysLog"
)
type RocketMq struct {
mqConfig *etc.RockerMqConfig
}
func NewRocketmq(config *etc.RockerMqConfig) *RocketMq {
return &RocketMq{
mqConfig: config,
}
}
func (n RocketMq) Consume(handler map[string]mq.Message) error {
c, err := consumer.NewPushConsumer(
consumer.WithGroupName(n.mqConfig.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(n.mqConfig.Host)),
)
defer c.Shutdown()
// 设置回调函数
if err != nil {
return fmt.Errorf("rockmq error:%v", err)
}
var wg sync.WaitGroup
for _, topic := range n.mqConfig.Topic {
wg.Add(1)
go func(topic string) {
defer wg.Done()
err := c.Subscribe(topic,
consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
err = handler[topic].MessageHandler(0, nil, msgs[i].Body)
if err != nil {
return consumer.ConsumeRetryLater, err
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
sysLog.ErrLog(context.Background(), err)
}
}(topic)
}
err = c.Start()
if err != nil {
sysLog.ErrLog(context.Background(), err)
}
select {}
}