voucher/internal/server/consume.go

53 lines
1.3 KiB
Go

package server
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"voucher/internal/conf"
"voucher/internal/pkg/mq"
"voucher/internal/service"
)
var _ transport.Server = (*Consumer)(nil)
type Consumer struct {
hLog *log.Helper
conf *conf.Bootstrap
manager *mq.ConsumerManager
}
func NewConsumer(
hLog *log.Helper,
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *Consumer {
manager := mq.NewConsumerManager(hLog)
//cf := &mq.ConsumerConnConfig{
// NameServers: conf.RocketMQ.Addr,
// AccessKey: conf.RocketMQ.AccessKey,
// SecretKey: conf.RocketMQ.SecretKey,
//}
//if c := keyService.GetConfig(); c != nil {
// if err := manager.Subscribe(context.Background(), cf, c, keyService.Handle); err != nil {
// panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
// }
//}
return &Consumer{manager: manager, hLog: hLog, conf: conf}
}
func (c *Consumer) Start(ctx context.Context) error {
// 当返回 err 时,框架会统一调用 stop,不用担心部分启动成功没有 stop 掉
return c.manager.Start(ctx)
}
func (c *Consumer) Stop(ctx context.Context) error {
fmt.Println("关闭 consumer 中...")
return c.manager.Stop(ctx)
}