package mqServer import ( "context" "fmt" "trasfer_middleware/cmd/rpc/etc" "trasfer_middleware/cmd/rpc/internal/queue/mq" mq1 "trasfer_middleware/until/mq" "trasfer_middleware/until/sysLog" ) type RocketMq struct { mqConfig *etc.RockerMqConfig } func NewRocketmq(config *etc.RockerMqConfig) *RocketMq { return &RocketMq{ mqConfig: config, } } func (n RocketMq) Consume(c context.Context, handler map[string]mq.Message) error { manager := mq1.NewConsumerManager(nil) connConf := &mq1.ConsumerConnConfig{ NameServers: n.mqConfig.Host[0], AccessKey: n.mqConfig.AccessKey, SecretKey: n.mqConfig.SecretKey, } for topic, _ := range handler { err := manager.Subscribe(c, connConf, &mq1.ConsumerConfig{ TopicName: topic, GroupName: topic + "_consumer", PerCoroutineCnt: 2, }, func(ctx context.Context, message *mq1.ConsumerMessage) error { err := handler[message.Topic].MessageHandler(0, nil, message.Body) if err != nil { return fmt.Errorf("链接失败:%v", err) } return nil }) if err != nil { sysLog.ErrQueueLog(context.Background(), err) } } err := manager.Start(c) defer manager.Stop(c) if err != nil { sysLog.ErrQueueLog(context.Background(), fmt.Errorf("启动失败:%v", err)) } select {} }