diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index f5f6c4b..2b3a680 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -66,6 +66,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID } cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 @@ -89,11 +90,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID return v.setCache(ctx, c, wechatNotifyTag) } - wechatNotifyTag, err = v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ - StockID: stockID, - StockCreatorMchID: stockCreatorMchID, - Tag: v.bc.WechatNotifyMQ.Tag, - }) + wechatNotifyTag, err = v.createWechatNotifyRegisterTag(ctx, stockCreatorMchID, stockID) if err != nil { return err } @@ -110,6 +107,14 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID }) } +func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { + return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ + StockID: stockID, + StockCreatorMchID: stockCreatorMchID, + Tag: v.bc.WechatNotifyMQ.Tag, + }) +} + func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { cacheValue := fmt.Sprintf("%s_%s_%s", wechatNotifyTag.Tag, wechatNotifyTag.StockCreatorMchID, wechatNotifyTag.StockID)