diff --git a/internal/biz/cmb/cmb.go b/internal/biz/cmb/cmb.go index cf745d0..f366aaf 100644 --- a/internal/biz/cmb/cmb.go +++ b/internal/biz/cmb/cmb.go @@ -5,10 +5,12 @@ import ( "voucher/internal/biz/repo" "voucher/internal/biz/wechatrepo" "voucher/internal/conf" + "voucher/internal/data" ) type Cmb struct { bc *conf.Bootstrap + rdb *data.Rdb OrderRepo repo.OrderRepo OrderWechatRepo repo.OrderWechatRepo ProductRepo repo.ProductRepo @@ -21,6 +23,7 @@ type Cmb struct { func NewCmb( bc *conf.Bootstrap, + rdb *data.Rdb, orderRepo repo.OrderRepo, OrderWechatRepo repo.OrderWechatRepo, ProductRepo repo.ProductRepo, @@ -32,6 +35,7 @@ func NewCmb( ) *Cmb { return &Cmb{ bc: bc, + rdb: rdb, OrderRepo: orderRepo, OrderWechatRepo: OrderWechatRepo, ProductRepo: ProductRepo, diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index 83bc22c..26abd69 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -3,11 +3,15 @@ package cmb import ( "context" "encoding/json" + "errors" "fmt" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" "time" v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" "voucher/internal/pkg/uid" ) @@ -48,8 +52,68 @@ func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { - if err := v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { - return err + c := vo.WechatNotifyRegisterTagCacheKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err == nil { + return nil + } + + if err != redis.Nil { + errMsg := fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err) + return fmt.Errorf(errMsg) + } + + cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { + // 二次获取,判定处理,以免获取锁后又执行了一次 + + cacheValue, err := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err != nil && err != redis.Nil { + return fmt.Errorf(fmt.Sprintf("二次获取redis缓存%s异常:%v", c.Key, err)) + } + + if cacheValue != "" { + return nil // 有直接返回 + } + + wechatNotifyTag, err := v.WechatNotifyRegisterTagRepo.GetByStockIdAndMchId(ctx, stockCreatorMchID, stockID) + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + + if wechatNotifyTag != nil { + + return v.setCache(ctx, c, wechatNotifyTag) + } + + wechatNotifyTag, err = v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ + StockID: stockID, + StockCreatorMchID: stockCreatorMchID, + Tag: v.bc.WechatNotifyMQ.Tag, + }) + if err != nil { + return err + } + + if err = v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { + return v.WechatNotifyRegisterTagRepo.Fail(ctx, wechatNotifyTag.ID, err.Error()) + } + + if err = v.WechatNotifyRegisterTagRepo.Success(ctx, wechatNotifyTag.ID); err != nil { + return err + } + + return v.setCache(ctx, c, wechatNotifyTag) + }) +} + +func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { + + cacheValue := fmt.Sprintf("%s_%s_%s", wechatNotifyTag.Tag, wechatNotifyTag.StockCreatorMchID, wechatNotifyTag.StockID) + if err := v.rdb.Rdb.Set(ctx, c.Key, cacheValue, c.TTL).Err(); err != nil { + return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) } return nil diff --git a/internal/biz/vo/cache.go b/internal/biz/vo/cache.go new file mode 100644 index 0000000..33d8633 --- /dev/null +++ b/internal/biz/vo/cache.go @@ -0,0 +1,36 @@ +package vo + +import ( + "fmt" + "time" +) + +type CacheKey string + +const ( + WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag" + WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock" +) + +var CacheKeyMap = map[CacheKey]time.Duration{ + WechatNotifyRegisterTagCacheKey: 86400 * time.Second, + WechatNotifyRegisterTagCacheLockKey: 30 * time.Second, +} + +type Cache struct { + Key string + TTL time.Duration +} + +func (s CacheKey) BuildCache(tag, stockCreatorMchID, stockID string) *Cache { + k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID) + c := &Cache{ + Key: k, + } + ttl, ok := CacheKeyMap[s] + if !ok { + c.TTL = 30 // 默认30秒 + } + c.TTL = ttl + return c +}