This commit is contained in:
李子铭 2025-03-07 17:46:08 +08:00
parent da2fe44408
commit e8e894b8ec
13 changed files with 321 additions and 23 deletions

View File

@ -0,0 +1,36 @@
package bo
import "voucher/internal/biz/vo"
// ConsumeInformation 定义消费信息结构体
type ConsumeInformation struct {
ConsumeTime string `json:"consume_time"`
ConsumeMchid string `json:"consume_mchid"`
TransactionID string `json:"transaction_id"`
}
// PlainText 定义明文数据结构体
type PlainText struct {
StockCreatorMchid string `json:"stock_creator_mchid"`
StockID string `json:"stock_id"`
CouponID string `json:"coupon_id"`
CouponName string `json:"coupon_name"`
Description string `json:"description"`
Status vo.WechatVoucherStatus `json:"status"`
CreateTime string `json:"create_time"`
CouponType string `json:"coupon_type"`
NoCash bool `json:"no_cash"`
Singleitem bool `json:"singleitem"`
ConsumeInformation ConsumeInformation `json:"consume_information,omitempty"`
}
type WechatVoucherNotifyBo struct {
ID string `json:"id"`
CreateTime string `json:"create_time"`
ResourceType string `json:"resource_type"`
EventType string `json:"event_type"`
Summary string `json:"summary"`
OriginalType string `json:"original_type"`
AssociatedData string `json:"associated_data"`
PlainText PlainText `json:"plain_text"`
}

View File

@ -52,7 +52,7 @@ func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo
func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error {
c := vo.WechatNotifyRegisterTagCacheKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) c := vo.WechatNotifyRegisterTagCacheKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID)
_, err := v.rdb.Rdb.Get(ctx, c.Key).Result() _, err := v.rdb.Rdb.Get(ctx, c.Key).Result()
if err == nil { if err == nil {
@ -64,7 +64,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID)
return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error {
// 二次获取,判定处理,以免获取锁后又执行了一次 // 二次获取,判定处理,以免获取锁后又执行了一次

View File

@ -13,4 +13,6 @@ type OrderRepo interface {
Ing(ctx context.Context, id uint64) error Ing(ctx context.Context, id uint64) error
Success(ctx context.Context, id uint64) error Success(ctx context.Context, id uint64) error
Fail(ctx context.Context, id uint64) error Fail(ctx context.Context, id uint64) error
Used(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error
} }

View File

@ -10,4 +10,7 @@ type OrderWechatRepo interface {
Success(ctx context.Context, id uint64, couponId string) error Success(ctx context.Context, id uint64, couponId string) error
Fail(ctx context.Context, id uint64, remark string) error Fail(ctx context.Context, id uint64, remark string) error
GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error)
GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error)
Used(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error
} }

View File

@ -10,6 +10,8 @@ type CacheKey string
const ( const (
WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag" WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag"
WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock" WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock"
WechatNotifyConsumeKey CacheKey = "wechat_notify_consume"
) )
var CacheKeyMap = map[CacheKey]time.Duration{ var CacheKeyMap = map[CacheKey]time.Duration{
@ -22,15 +24,41 @@ type Cache struct {
TTL time.Duration TTL time.Duration
} }
func (s CacheKey) BuildCache(tag, stockCreatorMchID, stockID string) *Cache { func (s CacheKey) BuildCache(ids []string) *Cache {
k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID)
k := fmt.Sprintf("%s", s)
for _, id := range ids {
k = fmt.Sprintf("%s_%s", k, id)
}
c := &Cache{ c := &Cache{
Key: k, Key: k,
} }
ttl, ok := CacheKeyMap[s] ttl, ok := CacheKeyMap[s]
if !ok { if !ok {
c.TTL = 30 // 默认30秒 c.TTL = 30 // 默认30秒
} }
c.TTL = ttl c.TTL = ttl
return c
}
func (s CacheKey) BuildRegisterCache(tag, stockCreatorMchID, stockID string) *Cache {
k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID)
c := &Cache{
Key: k,
}
ttl, ok := CacheKeyMap[s]
if !ok {
c.TTL = 30 // 默认30秒
}
c.TTL = ttl
return c return c
} }

View File

@ -32,6 +32,10 @@ func (s OrderWechatStatus) IsUse() bool {
return s == OrderWechatStatusUse return s == OrderWechatStatusUse
} }
func (s OrderWechatStatus) IsExpired() bool {
return s == OrderWechatStatusExpired
}
func (s OrderWechatStatus) CanNotify() bool { func (s OrderWechatStatus) CanNotify() bool {
return s.IsSuccess() || s.IsUse() return s.IsSuccess() || s.IsUse()
} }

View File

@ -0,0 +1,38 @@
package vo
type WechatVoucherStatus string
const (
WechatVoucherStatusSended WechatVoucherStatus = "SENDED"
WechatVoucherStatusUsed WechatVoucherStatus = "USED"
WechatVoucherStatusExpired WechatVoucherStatus = "EXPIRED"
)
var VoucherStatusMap = map[WechatVoucherStatus]string{
WechatVoucherStatusSended: "可用",
WechatVoucherStatusUsed: "已实扣",
WechatVoucherStatusExpired: "已过期",
}
func (s WechatVoucherStatus) GetText() string {
if t, ok := VoucherStatusMap[s]; ok {
return t
}
return "未知类型"
}
func (s WechatVoucherStatus) GetValue() string {
return string(s)
}
func (s WechatVoucherStatus) IsSended() bool {
return s == WechatVoucherStatusSended
}
func (s WechatVoucherStatus) IsUsed() bool {
return s == WechatVoucherStatusUsed
}
func (s WechatVoucherStatus) IsExpired() bool {
return s == WechatVoucherStatusExpired
}

View File

@ -15,6 +15,7 @@ type VoucherBiz struct {
Cmb *cmb.Cmb Cmb *cmb.Cmb
ProductRepo repo.ProductRepo ProductRepo repo.ProductRepo
OrderRepo repo.OrderRepo OrderRepo repo.OrderRepo
OrderWechatRepo repo.OrderWechatRepo
MqSendMixRepo mixrepos.MQSendMixRepo MqSendMixRepo mixrepos.MQSendMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo WechatCpnRepo wechatrepo.WechatCpnRepo
} }
@ -25,6 +26,7 @@ func NewVoucherBiz(
Cmb *cmb.Cmb, Cmb *cmb.Cmb,
ProductRepo repo.ProductRepo, ProductRepo repo.ProductRepo,
OrderRepo repo.OrderRepo, OrderRepo repo.OrderRepo,
OrderWechatRepo repo.OrderWechatRepo,
MqSendMixRepo mixrepos.MQSendMixRepo, MqSendMixRepo mixrepos.MQSendMixRepo,
WechatCpnRepo wechatrepo.WechatCpnRepo, WechatCpnRepo wechatrepo.WechatCpnRepo,
) *VoucherBiz { ) *VoucherBiz {
@ -34,6 +36,7 @@ func NewVoucherBiz(
Cmb: Cmb, Cmb: Cmb,
ProductRepo: ProductRepo, ProductRepo: ProductRepo,
OrderRepo: OrderRepo, OrderRepo: OrderRepo,
OrderWechatRepo: OrderWechatRepo,
MqSendMixRepo: MqSendMixRepo, MqSendMixRepo: MqSendMixRepo,
WechatCpnRepo: WechatCpnRepo, WechatCpnRepo: WechatCpnRepo,
} }

View File

@ -1,8 +1,78 @@
package biz package biz
import "context" import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
)
func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error {
c := vo.WechatNotifyConsumeKey.BuildCache([]string{tag, req.PlainText.StockID, req.PlainText.CouponID})
return lock.NewMutex(j.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
if req.PlainText.Status.IsSended() {
log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s",
req.PlainText.CouponID, req.PlainText.StockID, req.PlainText.Status.GetText())
return nil
}
orderWechat, err := j.OrderWechatRepo.GetByMSCId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
if err != nil {
return err
}
if req.PlainText.Status.IsUsed() {
return j.wechatVoucherUsed(ctx, orderWechat)
} else if req.PlainText.Status.IsExpired() {
return j.wechatVoucherExpired(ctx, orderWechat)
} else {
return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText())
}
})
}
func (j *VoucherBiz) wechatVoucherUsed(ctx context.Context, orderWechat *bo.OrderWechatBo) error {
if orderWechat.Status.IsUse() {
return nil
}
order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo)
if err != nil {
return err
}
if err = j.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil {
return err
}
if err = j.OrderRepo.Used(ctx, order.ID); err != nil {
return err
}
return nil
}
func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, orderWechat *bo.OrderWechatBo) error {
if orderWechat.Status.IsExpired() {
return nil
}
order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo)
if err != nil {
return err
}
if err = j.OrderWechatRepo.Expired(ctx, orderWechat.ID); err != nil {
return err
}
if err = j.OrderRepo.Expired(ctx, order.ID); err != nil {
return err
}
func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag, msg string) error {
// todo
return nil return nil
} }

View File

@ -159,3 +159,43 @@ func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error {
return nil return nil
} }
func (p *OrderRepoImpl) Used(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.Order{
ID: id,
Status: vo.OrderStatusSuccess.GetValue(),
}).
Updates(model.Order{
Status: vo.OrderStatusUse.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}
func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.Order{
ID: id,
Status: vo.OrderStatusSuccess.GetValue(),
}).
Updates(model.Order{
Status: vo.OrderStatusExpired.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}

View File

@ -49,6 +49,26 @@ func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo)
return p.ToBo(info), nil return p.ToBo(info), nil
} }
func (p *OrderWechatRepoImpl) GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{}
tx := p.DB(ctx).Where(model.OrderWechat{
StockCreatorMchid: mchId,
StockID: stockId,
CouponID: couponId,
}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) { func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{} info := &model.OrderWechat{}
@ -113,3 +133,43 @@ func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string
return nil return nil
} }
func (p *OrderWechatRepoImpl) Used(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderWechatStatusSuccess.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusUse.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}
func (p *OrderWechatRepoImpl) Expired(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderWechatStatusSuccess.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusExpired.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}

View File

@ -68,7 +68,6 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
{ {
// 处理业务逻辑。 // 处理业务逻辑。
var handles []string var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages { for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle) handles = append(handles, v.ReceiptHandle)

View File

@ -2,8 +2,23 @@ package service
import ( import (
"context" "context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/biz/bo"
) )
func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error {
return j.VoucherBiz.WechatNotifyConsumer(ctx, tag, msg)
var x *bo.WechatVoucherNotifyBo
if err := json.Unmarshal([]byte(msg), x); err != nil {
return fmt.Errorf("consume msg json.Unmarshal error:%s", err.Error())
}
if err := j.VoucherBiz.WechatNotifyConsumer(ctx, tag, x); err != nil {
log.Errorf("WechatNotifyConsumer error:%s", err.Error())
}
return nil
} }