voucher/internal/server/rds_consume.go

51 lines
1.1 KiB
Go

package server
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"voucher/internal/conf"
"voucher/internal/pkg/rdsmq"
"voucher/internal/service"
)
var _ transport.Server = (*RdbConsumer)(nil)
type RdbConsumer struct {
hLog *log.Helper
conf *conf.Bootstrap
manager *rdsmq.ConsumerManager
voucherService *service.VoucherService
}
func NewRdbConsumer(
hLog *log.Helper,
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *RdbConsumer {
manager := rdsmq.NewConsumerManager()
if cf := voucherService.GetWechatQueryConfig(); cf != nil {
manager.Add(cf)
}
if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil {
manager.Add(cf2)
}
return &RdbConsumer{hLog: hLog, conf: conf, manager: manager}
}
func (c *RdbConsumer) Start(ctx context.Context) error {
c.manager.Start(ctx)
return nil
}
func (c *RdbConsumer) Stop(ctx context.Context) error {
fmt.Println("关闭 RdbConsumer 中...")
c.manager.Stop(ctx)
fmt.Println("关闭 RdbConsumer 完成...")
return nil
}