package mqServer

import (
	"context"
	"fmt"
	"trasfer_middleware/cmd/rpc/etc"
	"trasfer_middleware/cmd/rpc/internal/queue/mq"
	mq1 "trasfer_middleware/until/mq"
	"trasfer_middleware/until/sysLog"
)

type RocketMq struct {
	mqConfig *etc.RockerMqConfig
}

func NewRocketmq(config *etc.RockerMqConfig) *RocketMq {
	return &RocketMq{
		mqConfig: config,
	}
}

func (n RocketMq) Consume(c context.Context, handler map[string]mq.Message) error {
	manager := mq1.NewConsumerManager(nil)
	connConf := &mq1.ConsumerConnConfig{
		NameServers: n.mqConfig.Host[0],
		AccessKey:   n.mqConfig.AccessKey,
		SecretKey:   n.mqConfig.SecretKey,
	}

	for topic, _ := range handler {
		err := manager.Subscribe(c, connConf,
			&mq1.ConsumerConfig{
				TopicName:       topic,
				GroupName:       topic + "_consumer",
				PerCoroutineCnt: 2,
			},
			func(ctx context.Context, message *mq1.ConsumerMessage) error {
				err := handler[message.Topic].MessageHandler(0, nil, message.Body)
				if err != nil {
					return fmt.Errorf("链接失败:%v", err)
				}
				return nil
			})
		if err != nil {
			sysLog.ErrQueueLog(context.Background(), err)
		}

	}
	err := manager.Start(c)
	defer manager.Stop(c)
	if err != nil {
		sysLog.ErrQueueLog(context.Background(), fmt.Errorf("启动失败:%v", err))
	}
	select {}
}