53 lines
1.3 KiB
Go
53 lines
1.3 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/transport"
|
|
"voucher/internal/conf"
|
|
"voucher/internal/pkg/mq"
|
|
"voucher/internal/service"
|
|
)
|
|
|
|
var _ transport.Server = (*Consumer)(nil)
|
|
|
|
type Consumer struct {
|
|
hLog *log.Helper
|
|
conf *conf.Bootstrap
|
|
manager *mq.ConsumerManager
|
|
}
|
|
|
|
func NewConsumer(
|
|
hLog *log.Helper,
|
|
conf *conf.Bootstrap,
|
|
voucherService *service.VoucherService,
|
|
) *Consumer {
|
|
|
|
manager := mq.NewConsumerManager(hLog)
|
|
|
|
//cf := &mq.ConsumerConnConfig{
|
|
// NameServers: conf.RocketMQ.Addr,
|
|
// AccessKey: conf.RocketMQ.AccessKey,
|
|
// SecretKey: conf.RocketMQ.SecretKey,
|
|
//}
|
|
|
|
//if c := keyService.GetConfig(); c != nil {
|
|
// if err := manager.Subscribe(context.Background(), cf, c, keyService.Handle); err != nil {
|
|
// panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
|
// }
|
|
//}
|
|
|
|
return &Consumer{manager: manager, hLog: hLog, conf: conf}
|
|
}
|
|
|
|
func (c *Consumer) Start(ctx context.Context) error {
|
|
// 当返回 err 时,框架会统一调用 stop,不用担心部分启动成功没有 stop 掉
|
|
return c.manager.Start(ctx)
|
|
}
|
|
|
|
func (c *Consumer) Stop(ctx context.Context) error {
|
|
fmt.Println("关闭 consumer 中...")
|
|
return c.manager.Stop(ctx)
|
|
}
|