59 lines
1.5 KiB
Go
59 lines
1.5 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/transport"
|
|
"voucher/internal/conf"
|
|
"voucher/internal/pkg/mq"
|
|
"voucher/internal/service"
|
|
)
|
|
|
|
var _ transport.Server = (*Consumer)(nil)
|
|
|
|
type Consumer struct {
|
|
hLog *log.Helper
|
|
conf *conf.Bootstrap
|
|
manager *mq.ConsumerManager
|
|
}
|
|
|
|
func NewConsumer(
|
|
hLog *log.Helper,
|
|
conf *conf.Bootstrap,
|
|
voucherService *service.VoucherService,
|
|
) *Consumer {
|
|
|
|
manager := mq.NewConsumerManager(hLog)
|
|
|
|
cf := &mq.ConsumerConnConfig{
|
|
NameServers: conf.RocketMQ.Addr,
|
|
AccessKey: conf.RocketMQ.AccessKey,
|
|
SecretKey: conf.RocketMQ.SecretKey,
|
|
}
|
|
|
|
if c := voucherService.GetOrderConfig(); c != nil {
|
|
if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderConsumer); err != nil {
|
|
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
|
}
|
|
}
|
|
|
|
if c := voucherService.GetNotifyConfig(); c != nil {
|
|
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil {
|
|
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
|
}
|
|
}
|
|
|
|
return &Consumer{manager: manager, hLog: hLog, conf: conf}
|
|
}
|
|
|
|
func (c *Consumer) Start(ctx context.Context) error {
|
|
// 当返回 err 时,框架会统一调用 stop,不用担心部分启动成功没有 stop 掉
|
|
return c.manager.Start(ctx)
|
|
}
|
|
|
|
func (c *Consumer) Stop(ctx context.Context) error {
|
|
fmt.Println("关闭 consumer 中...")
|
|
return c.manager.Stop(ctx)
|
|
}
|