package cmb import ( "context" "encoding/json" "errors" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/redis/go-redis/v9" "gorm.io/gorm" "time" v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/vo" "voucher/internal/pkg/lock" "voucher/internal/pkg/uid" ) func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo string, err error) { if !order.Status.IsWait() { return outRequestNo, fmt.Errorf("订单状态错误,%s", order.Status.GetText()) } if !order.Channel.IsWeChat() { return outRequestNo, fmt.Errorf("订单渠道错误,%s", order.Channel.GetText()) } if err = v.ing(ctx, order.ID); err != nil { return } orderWechat, err := v.create(ctx, order) if err != nil { return } if err = v.registerNotifyTag(ctx, orderWechat.StockCreatorMchid, orderWechat.StockID); err != nil { return outRequestNo, err } couponId, err := v.WechatCpnRepo.Order(ctx, orderWechat) if err != nil { return outRequestNo, v.fail(ctx, order, orderWechat, err.Error()) } if err = v.success(ctx, order, orderWechat, couponId); err != nil { return } return orderWechat.OutRequestNo, err } func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { c := vo.WechatNotifyRegisterTagCacheKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() if err == nil { return nil } if err != redis.Nil { errMsg := fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err) return fmt.Errorf(errMsg) } cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 cacheValue, err := v.rdb.Rdb.Get(ctx, c.Key).Result() if err != nil && err != redis.Nil { return fmt.Errorf(fmt.Sprintf("二次获取redis缓存%s异常:%v", c.Key, err)) } if cacheValue != "" { return nil // 有直接返回 } wechatNotifyTag, err := v.WechatNotifyRegisterTagRepo.GetByStockIdAndMchId(ctx, stockCreatorMchID, stockID) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return err } if wechatNotifyTag != nil { return v.setCache(ctx, c, wechatNotifyTag) } wechatNotifyTag, err = v.createWechatNotifyRegisterTag(ctx, stockCreatorMchID, stockID) if err != nil { return err } if err = v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { return v.WechatNotifyRegisterTagRepo.Fail(ctx, wechatNotifyTag.ID, err.Error()) } if err = v.WechatNotifyRegisterTagRepo.Success(ctx, wechatNotifyTag.ID); err != nil { return err } return v.setCache(ctx, c, wechatNotifyTag) }) } func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ StockID: stockID, StockCreatorMchID: stockCreatorMchID, Tag: v.bc.WechatNotifyMQ.Tag, }) } func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { cacheValue := fmt.Sprintf("%s_%s_%s", wechatNotifyTag.Tag, wechatNotifyTag.StockCreatorMchID, wechatNotifyTag.StockID) if err := v.rdb.Rdb.Set(ctx, c.Key, cacheValue, c.TTL).Err(); err != nil { return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) } return nil } func (v *Cmb) create(ctx context.Context, order *bo.OrderBo) (*bo.OrderWechatBo, error) { outRequestNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.OrderWechat) if err != nil { return nil, err } req := &bo.OrderWechatBo{ OrderNo: order.OrderNo, OutRequestNo: outRequestNo, AppID: order.AppID, StockCreatorMchid: order.MerchantNo, OpenID: order.Account, StockID: order.BatchNo, Status: vo.OrderWechatStatusWait, } orderWechat, err := v.OrderWechatRepo.Create(ctx, req) if err != nil { return nil, err } return orderWechat, nil } func (v *Cmb) ing(ctx context.Context, id uint64) error { return v.OrderRepo.Ing(ctx, id) } func (v *Cmb) success(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, couponId string) error { if err := v.OrderWechatRepo.Success(ctx, orderWechat.ID, couponId); err != nil { return err } return v.OrderRepo.Success(ctx, order.ID) } func (v *Cmb) fail(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, remarks string) error { if err := v.OrderWechatRepo.Fail(ctx, orderWechat.ID, remarks); err != nil { return err } return v.OrderRepo.Fail(ctx, order.ID) } func (v *Cmb) orderWechat(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderWechatBo, error) { orderWechat, err := v.OrderWechatRepo.GetByOutRequestNo(ctx, orderOutRequestNo) if err != nil { return nil, err } if !orderWechat.Status.CanNotify() { return nil, fmt.Errorf("微信订单状态错误,不能通知:%s", order.Status.GetText()) } return orderWechat, err } func (v *Cmb) bizContent(_ context.Context, orderWechat *bo.OrderWechatBo) (string, error) { status, err := orderWechat.Status.GetCmbStatusText() if err != nil { return "", err } req := &v1.CmbNotifyRequest{ Ticket: orderWechat.OrderNo, Status: status.GetValue(), TransDate: time.Now().Format("20060102150405"), OrgNo: v.bc.Cmb.OrgNo, Ext: "", } bizJsonBytes, err := json.Marshal(req) if err != nil { return "", err } return string(bizJsonBytes), nil } func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error { if !order.Channel.IsWeChat() { return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText()) } orderWechat, err := v.orderWechat(ctx, order, orderOutRequestNo) if err != nil { return err } bizContent, err := v.bizContent(ctx, orderWechat) if err != nil { return err } request, err := v.CmbMixRepo.GetRequest(ctx, &bo.CmbRequestBo{ FuncName: vo.CmbNotifyFuncName, BizContent: bizContent, }) if err != nil { return err } requestBytes, err := json.Marshal(request) if err != nil { return err } orderNotify, err := v.OrderNotifyRepo.Create(ctx, &bo.OrderNotifyBo{ OrderNo: orderWechat.OrderNo, OutRequestNo: orderWechat.OutRequestNo, Request: string(requestBytes), NotifyUrl: order.NotifyUrl, }) if err != nil { return err } x, err := v.CmbMixRepo.Request(ctx, request, v.bc.Cmb.NotifyUrl) if err != nil { return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) } bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x) if err != nil { log.Errorf("NotifyConsume CmbMixRepo.VerifyResponse error:%s", err.Error()) return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) } var s *v1.CmbNotifyReply if err = json.Unmarshal([]byte(bizStr), &s); err != nil { return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) } if s.RespCode != vo.CmbResponseStatusSuccess.GetValue() { return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, s.RespMsg) } return v.OrderNotifyRepo.Success(ctx, orderNotify.ID, bizStr) }