From e8e894b8ec5bf11ddce4d2bb43816f5f7614b265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Fri, 7 Mar 2025 17:46:08 +0800 Subject: [PATCH] cmb --- internal/biz/bo/wechat_voucher_bo.go | 36 +++++++++++ internal/biz/cmb/consume.go | 4 +- internal/biz/repo/order.go | 2 + internal/biz/repo/order_wechat.go | 3 + internal/biz/vo/cache.go | 32 +++++++++- internal/biz/vo/order_wechant_status.go | 4 ++ internal/biz/vo/wechat_voucher_status.go | 38 ++++++++++++ internal/biz/voucher.go | 31 ++++----- internal/biz/wechat_notify_consume.go | 76 ++++++++++++++++++++++- internal/data/repoimpl/order.go | 40 ++++++++++++ internal/data/repoimpl/order_wechat.go | 60 ++++++++++++++++++ internal/server/wechat_notify_consume.go | 1 - internal/service/wechat_notify_consume.go | 17 ++++- 13 files changed, 321 insertions(+), 23 deletions(-) create mode 100644 internal/biz/bo/wechat_voucher_bo.go create mode 100644 internal/biz/vo/wechat_voucher_status.go diff --git a/internal/biz/bo/wechat_voucher_bo.go b/internal/biz/bo/wechat_voucher_bo.go new file mode 100644 index 0000000..6c99585 --- /dev/null +++ b/internal/biz/bo/wechat_voucher_bo.go @@ -0,0 +1,36 @@ +package bo + +import "voucher/internal/biz/vo" + +// ConsumeInformation 定义消费信息结构体 +type ConsumeInformation struct { + ConsumeTime string `json:"consume_time"` + ConsumeMchid string `json:"consume_mchid"` + TransactionID string `json:"transaction_id"` +} + +// PlainText 定义明文数据结构体 +type PlainText struct { + StockCreatorMchid string `json:"stock_creator_mchid"` + StockID string `json:"stock_id"` + CouponID string `json:"coupon_id"` + CouponName string `json:"coupon_name"` + Description string `json:"description"` + Status vo.WechatVoucherStatus `json:"status"` + CreateTime string `json:"create_time"` + CouponType string `json:"coupon_type"` + NoCash bool `json:"no_cash"` + Singleitem bool `json:"singleitem"` + ConsumeInformation ConsumeInformation `json:"consume_information,omitempty"` +} + +type WechatVoucherNotifyBo struct { + ID string `json:"id"` + CreateTime string `json:"create_time"` + ResourceType string `json:"resource_type"` + EventType string `json:"event_type"` + Summary string `json:"summary"` + OriginalType string `json:"original_type"` + AssociatedData string `json:"associated_data"` + PlainText PlainText `json:"plain_text"` +} diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index 26abd69..71ba886 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -52,7 +52,7 @@ func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { - c := vo.WechatNotifyRegisterTagCacheKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + c := vo.WechatNotifyRegisterTagCacheKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() if err == nil { @@ -64,7 +64,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID return fmt.Errorf(errMsg) } - cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 6b8dac2..f6547b8 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -13,4 +13,6 @@ type OrderRepo interface { Ing(ctx context.Context, id uint64) error Success(ctx context.Context, id uint64) error Fail(ctx context.Context, id uint64) error + Used(ctx context.Context, id uint64) error + Expired(ctx context.Context, id uint64) error } diff --git a/internal/biz/repo/order_wechat.go b/internal/biz/repo/order_wechat.go index 69be1f7..1b73201 100644 --- a/internal/biz/repo/order_wechat.go +++ b/internal/biz/repo/order_wechat.go @@ -10,4 +10,7 @@ type OrderWechatRepo interface { Success(ctx context.Context, id uint64, couponId string) error Fail(ctx context.Context, id uint64, remark string) error GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) + GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) + Used(ctx context.Context, id uint64) error + Expired(ctx context.Context, id uint64) error } diff --git a/internal/biz/vo/cache.go b/internal/biz/vo/cache.go index 33d8633..e015435 100644 --- a/internal/biz/vo/cache.go +++ b/internal/biz/vo/cache.go @@ -10,6 +10,8 @@ type CacheKey string const ( WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag" WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock" + + WechatNotifyConsumeKey CacheKey = "wechat_notify_consume" ) var CacheKeyMap = map[CacheKey]time.Duration{ @@ -22,15 +24,41 @@ type Cache struct { TTL time.Duration } -func (s CacheKey) BuildCache(tag, stockCreatorMchID, stockID string) *Cache { - k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID) +func (s CacheKey) BuildCache(ids []string) *Cache { + + k := fmt.Sprintf("%s", s) + + for _, id := range ids { + k = fmt.Sprintf("%s_%s", k, id) + } + c := &Cache{ Key: k, } + ttl, ok := CacheKeyMap[s] if !ok { c.TTL = 30 // 默认30秒 } + c.TTL = ttl + + return c +} + +func (s CacheKey) BuildRegisterCache(tag, stockCreatorMchID, stockID string) *Cache { + k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID) + + c := &Cache{ + Key: k, + } + + ttl, ok := CacheKeyMap[s] + if !ok { + c.TTL = 30 // 默认30秒 + } + + c.TTL = ttl + return c } diff --git a/internal/biz/vo/order_wechant_status.go b/internal/biz/vo/order_wechant_status.go index b9519af..68654be 100644 --- a/internal/biz/vo/order_wechant_status.go +++ b/internal/biz/vo/order_wechant_status.go @@ -32,6 +32,10 @@ func (s OrderWechatStatus) IsUse() bool { return s == OrderWechatStatusUse } +func (s OrderWechatStatus) IsExpired() bool { + return s == OrderWechatStatusExpired +} + func (s OrderWechatStatus) CanNotify() bool { return s.IsSuccess() || s.IsUse() } diff --git a/internal/biz/vo/wechat_voucher_status.go b/internal/biz/vo/wechat_voucher_status.go new file mode 100644 index 0000000..55d6cbb --- /dev/null +++ b/internal/biz/vo/wechat_voucher_status.go @@ -0,0 +1,38 @@ +package vo + +type WechatVoucherStatus string + +const ( + WechatVoucherStatusSended WechatVoucherStatus = "SENDED" + WechatVoucherStatusUsed WechatVoucherStatus = "USED" + WechatVoucherStatusExpired WechatVoucherStatus = "EXPIRED" +) + +var VoucherStatusMap = map[WechatVoucherStatus]string{ + WechatVoucherStatusSended: "可用", + WechatVoucherStatusUsed: "已实扣", + WechatVoucherStatusExpired: "已过期", +} + +func (s WechatVoucherStatus) GetText() string { + if t, ok := VoucherStatusMap[s]; ok { + return t + } + return "未知类型" +} + +func (s WechatVoucherStatus) GetValue() string { + return string(s) +} + +func (s WechatVoucherStatus) IsSended() bool { + return s == WechatVoucherStatusSended +} + +func (s WechatVoucherStatus) IsUsed() bool { + return s == WechatVoucherStatusUsed +} + +func (s WechatVoucherStatus) IsExpired() bool { + return s == WechatVoucherStatusExpired +} diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index b1d46c8..20f826e 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -10,13 +10,14 @@ import ( ) type VoucherBiz struct { - bc *conf.Bootstrap - rdb *data.Rdb - Cmb *cmb.Cmb - ProductRepo repo.ProductRepo - OrderRepo repo.OrderRepo - MqSendMixRepo mixrepos.MQSendMixRepo - WechatCpnRepo wechatrepo.WechatCpnRepo + bc *conf.Bootstrap + rdb *data.Rdb + Cmb *cmb.Cmb + ProductRepo repo.ProductRepo + OrderRepo repo.OrderRepo + OrderWechatRepo repo.OrderWechatRepo + MqSendMixRepo mixrepos.MQSendMixRepo + WechatCpnRepo wechatrepo.WechatCpnRepo } func NewVoucherBiz( @@ -25,16 +26,18 @@ func NewVoucherBiz( Cmb *cmb.Cmb, ProductRepo repo.ProductRepo, OrderRepo repo.OrderRepo, + OrderWechatRepo repo.OrderWechatRepo, MqSendMixRepo mixrepos.MQSendMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, ) *VoucherBiz { return &VoucherBiz{ - bc: bc, - rdb: rdb, - Cmb: Cmb, - ProductRepo: ProductRepo, - OrderRepo: OrderRepo, - MqSendMixRepo: MqSendMixRepo, - WechatCpnRepo: WechatCpnRepo, + bc: bc, + rdb: rdb, + Cmb: Cmb, + ProductRepo: ProductRepo, + OrderRepo: OrderRepo, + OrderWechatRepo: OrderWechatRepo, + MqSendMixRepo: MqSendMixRepo, + WechatCpnRepo: WechatCpnRepo, } } diff --git a/internal/biz/wechat_notify_consume.go b/internal/biz/wechat_notify_consume.go index ca06111..39886ef 100644 --- a/internal/biz/wechat_notify_consume.go +++ b/internal/biz/wechat_notify_consume.go @@ -1,8 +1,78 @@ package biz -import "context" +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" +) + +func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error { + c := vo.WechatNotifyConsumeKey.BuildCache([]string{tag, req.PlainText.StockID, req.PlainText.CouponID}) + + return lock.NewMutex(j.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + + if req.PlainText.Status.IsSended() { + log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s", + req.PlainText.CouponID, req.PlainText.StockID, req.PlainText.Status.GetText()) + return nil + } + + orderWechat, err := j.OrderWechatRepo.GetByMSCId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) + if err != nil { + return err + } + + if req.PlainText.Status.IsUsed() { + return j.wechatVoucherUsed(ctx, orderWechat) + } else if req.PlainText.Status.IsExpired() { + return j.wechatVoucherExpired(ctx, orderWechat) + } else { + return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) + } + }) +} + +func (j *VoucherBiz) wechatVoucherUsed(ctx context.Context, orderWechat *bo.OrderWechatBo) error { + if orderWechat.Status.IsUse() { + return nil + } + + order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo) + if err != nil { + return err + } + + if err = j.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil { + return err + } + + if err = j.OrderRepo.Used(ctx, order.ID); err != nil { + return err + } + + return nil +} + +func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, orderWechat *bo.OrderWechatBo) error { + if orderWechat.Status.IsExpired() { + return nil + } + + order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo) + if err != nil { + return err + } + + if err = j.OrderWechatRepo.Expired(ctx, orderWechat.ID); err != nil { + return err + } + + if err = j.OrderRepo.Expired(ctx, order.ID); err != nil { + return err + } -func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { - // todo return nil } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 58c9947..3aac4cf 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -159,3 +159,43 @@ func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error { return nil } + +func (p *OrderRepoImpl) Used(ctx context.Context, id uint64) error { + now := time.Now() + + res := p.db.DB(ctx). + Where(model.Order{ + ID: id, + Status: vo.OrderStatusSuccess.GetValue(), + }). + Updates(model.Order{ + Status: vo.OrderStatusUse.GetValue(), + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil +} + +func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error { + now := time.Now() + + res := p.db.DB(ctx). + Where(model.Order{ + ID: id, + Status: vo.OrderStatusSuccess.GetValue(), + }). + Updates(model.Order{ + Status: vo.OrderStatusExpired.GetValue(), + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil +} diff --git a/internal/data/repoimpl/order_wechat.go b/internal/data/repoimpl/order_wechat.go index 4e6e7ba..1b90f92 100644 --- a/internal/data/repoimpl/order_wechat.go +++ b/internal/data/repoimpl/order_wechat.go @@ -49,6 +49,26 @@ func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) return p.ToBo(info), nil } +func (p *OrderWechatRepoImpl) GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) { + info := &model.OrderWechat{} + + tx := p.DB(ctx).Where(model.OrderWechat{ + StockCreatorMchid: mchId, + StockID: stockId, + CouponID: couponId, + }).Find(&info) + + if tx.Error != nil { + return nil, tx.Error + } + + if tx.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return p.ToBo(info), nil +} + func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) { info := &model.OrderWechat{} @@ -113,3 +133,43 @@ func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string return nil } + +func (p *OrderWechatRepoImpl) Used(ctx context.Context, id uint64) error { + now := time.Now() + + res := p.db.DB(ctx). + Where(model.OrderWechat{ + ID: id, + Status: vo.OrderWechatStatusSuccess.GetValue(), + }). + Updates(model.OrderWechat{ + Status: vo.OrderWechatStatusUse.GetValue(), + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil +} + +func (p *OrderWechatRepoImpl) Expired(ctx context.Context, id uint64) error { + now := time.Now() + + res := p.db.DB(ctx). + Where(model.OrderWechat{ + ID: id, + Status: vo.OrderWechatStatusSuccess.GetValue(), + }). + Updates(model.OrderWechat{ + Status: vo.OrderWechatStatusExpired.GetValue(), + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil +} diff --git a/internal/server/wechat_notify_consume.go b/internal/server/wechat_notify_consume.go index a9d6dcb..ee5e75d 100644 --- a/internal/server/wechat_notify_consume.go +++ b/internal/server/wechat_notify_consume.go @@ -68,7 +68,6 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { { // 处理业务逻辑。 var handles []string - fmt.Printf("Consume %d messages---->\n", len(resp.Messages)) for _, v := range resp.Messages { handles = append(handles, v.ReceiptHandle) diff --git a/internal/service/wechat_notify_consume.go b/internal/service/wechat_notify_consume.go index b9812ce..7e2ba9e 100644 --- a/internal/service/wechat_notify_consume.go +++ b/internal/service/wechat_notify_consume.go @@ -2,8 +2,23 @@ package service import ( "context" + "encoding/json" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/biz/bo" ) func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { - return j.VoucherBiz.WechatNotifyConsumer(ctx, tag, msg) + + var x *bo.WechatVoucherNotifyBo + + if err := json.Unmarshal([]byte(msg), x); err != nil { + return fmt.Errorf("consume msg json.Unmarshal error:%s", err.Error()) + } + + if err := j.VoucherBiz.WechatNotifyConsumer(ctx, tag, x); err != nil { + log.Errorf("WechatNotifyConsumer error:%s", err.Error()) + } + + return nil }