transfer_middleware/cmd/rpc/internal/queue/mq/mqServer/server.go

56 lines
1.3 KiB
Go

package mqServer
import (
"context"
"fmt"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/queue/mq"
mq1 "trasfer_middleware/until/mq"
"trasfer_middleware/until/sysLog"
)
type RocketMq struct {
mqConfig *etc.RockerMqConfig
}
func NewRocketmq(config *etc.RockerMqConfig) *RocketMq {
return &RocketMq{
mqConfig: config,
}
}
func (n RocketMq) Consume(c context.Context, handler map[string]mq.Message) error {
manager := mq1.NewConsumerManager(nil)
connConf := &mq1.ConsumerConnConfig{
NameServers: n.mqConfig.Host[0],
AccessKey: n.mqConfig.AccessKey,
SecretKey: n.mqConfig.SecretKey,
}
for topic, _ := range handler {
err := manager.Subscribe(c, connConf,
&mq1.ConsumerConfig{
TopicName: topic,
GroupName: topic + "_consumer",
PerCoroutineCnt: 2,
},
func(ctx context.Context, message *mq1.ConsumerMessage) error {
err := handler[message.Topic].MessageHandler(0, nil, message.Body)
if err != nil {
return fmt.Errorf("链接失败:%v", err)
}
return nil
})
if err != nil {
sysLog.ErrQueueLog(context.Background(), err)
}
}
err := manager.Start(c)
defer manager.Stop(c)
if err != nil {
sysLog.ErrQueueLog(context.Background(), fmt.Errorf("启动失败:%v", err))
}
select {}
}