56 lines
1.3 KiB
Go
56 lines
1.3 KiB
Go
package mqServer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"trasfer_middleware/cmd/rpc/etc"
|
|
"trasfer_middleware/cmd/rpc/internal/queue/mq"
|
|
mq1 "trasfer_middleware/until/mq"
|
|
"trasfer_middleware/until/sysLog"
|
|
)
|
|
|
|
type RocketMq struct {
|
|
mqConfig *etc.RockerMqConfig
|
|
}
|
|
|
|
func NewRocketmq(config *etc.RockerMqConfig) *RocketMq {
|
|
return &RocketMq{
|
|
mqConfig: config,
|
|
}
|
|
}
|
|
|
|
func (n RocketMq) Consume(c context.Context, handler map[string]mq.Message) error {
|
|
manager := mq1.NewConsumerManager(nil)
|
|
connConf := &mq1.ConsumerConnConfig{
|
|
NameServers: n.mqConfig.Host[0],
|
|
AccessKey: n.mqConfig.AccessKey,
|
|
SecretKey: n.mqConfig.SecretKey,
|
|
}
|
|
|
|
for topic, _ := range handler {
|
|
err := manager.Subscribe(c, connConf,
|
|
&mq1.ConsumerConfig{
|
|
TopicName: topic,
|
|
GroupName: topic + "_consumer",
|
|
PerCoroutineCnt: 2,
|
|
},
|
|
func(ctx context.Context, message *mq1.ConsumerMessage) error {
|
|
err := handler[message.Topic].MessageHandler(0, nil, message.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("链接失败:%v", err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
sysLog.ErrQueueLog(context.Background(), err)
|
|
}
|
|
|
|
}
|
|
err := manager.Start(c)
|
|
defer manager.Stop(c)
|
|
if err != nil {
|
|
sysLog.ErrQueueLog(context.Background(), fmt.Errorf("启动失败:%v", err))
|
|
}
|
|
select {}
|
|
}
|