diff --git a/configs/config.yaml b/configs/config.yaml index c64706d..8af4ea6 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -89,13 +89,13 @@ rdsMQ: wechatQuery: #发放结算 name: "wechatQuery" retryNum: 1 #重试次数 - numWorkers: 1 #协程数量,不配置默认为10 + numWorkers: 2 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: true #是否启动消费 true/false orderRetry: #发放结算 name: "orderRetry" retryNum: 1 #重试次数 - numWorkers: 1 #协程数量,不配置默认为10 + numWorkers: 2 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: true #是否启动消费 true/false diff --git a/internal/biz/bo/order_bo.go b/internal/biz/bo/order_bo.go index a295dc2..74e8e17 100644 --- a/internal/biz/bo/order_bo.go +++ b/internal/biz/bo/order_bo.go @@ -23,6 +23,7 @@ type OrderBo struct { Channel vo.Channel Attach string Remark string + TransactionId string ReceiveSuccessTime *time.Time LastUseTime *time.Time CreateTime *time.Time diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 294ce0b..ace3a2f 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -15,12 +15,14 @@ type OrderRepo interface { GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) GetByVoucherNo(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) + GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error) GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error) Ing(ctx context.Context, id uint64) error Success(ctx context.Context, id uint64, voucherNo string) error Fail(ctx context.Context, id uint64, remark string) error Used(ctx context.Context, id uint64) error + NotifyUsed(ctx context.Context, id uint64, transactionId string) error Available(ctx context.Context, id uint64) error Expired(ctx context.Context, id uint64) error } diff --git a/internal/biz/wechat_notify.go b/internal/biz/wechat_notify.go index d771dae..c7d4c01 100644 --- a/internal/biz/wechat_notify.go +++ b/internal/biz/wechat_notify.go @@ -19,7 +19,7 @@ func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req * return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - order, err := v.OrderRepo.GetByVoucherNo(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) + order, err := v.getOrder(ctx, req) if err != nil { return err } @@ -30,11 +30,12 @@ func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req * } else if req.PlainText.Status.IsUsed() { - return v.used(ctx, order, req) + return v.notifyUsed(ctx, order, req) } else if req.PlainText.Status.IsExpired() { return v.expired(ctx, order) + } return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) @@ -84,26 +85,46 @@ func (v *VoucherBiz) createUseLog(ctx context.Context, order *bo.OrderBo, req *b return nil } -func (v *VoucherBiz) used(ctx context.Context, order *bo.OrderBo, req *bo.WechatVoucherNotifyBo) error { +func (this *VoucherBiz) getOrder(ctx context.Context, req *bo.WechatVoucherNotifyBo) (*bo.OrderBo, error) { - //var usedTime time.Time - //if req.PlainText.ConsumeInformation.ConsumeTime != "" { - // usedTime, _ = time.Parse(time.RFC3339, req.PlainText.ConsumeInformation.ConsumeTime) - //} else { - // usedTime = time.Now() - //} + order, err := this.OrderRepo.GetByVoucherNo(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) - //if order.Status.IsUse() && usedTime == *order.LastUseTime { - // log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo) - // return nil - //} + if err != nil { + + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err + } + + order, err = this.OrderRepo.GetByTransactionId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.ConsumeInformation.TransactionID) + + if err != nil { + + if errors.Is(err, gorm.ErrRecordNotFound) { + + return nil, fmt.Errorf("微信回调消费,订单不存在,StockCreatorMchid:%s,StockID:%s,CouponID:%s", + req.PlainText.StockCreatorMchid, + req.PlainText.StockID, + req.PlainText.CouponID, + ) + } + + return nil, err + } + + return order, nil + } + + return order, nil +} + +func (v *VoucherBiz) notifyUsed(ctx context.Context, order *bo.OrderBo, req *bo.WechatVoucherNotifyBo) error { if order.Status.IsUse() { log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo) return nil } - if err := v.OrderRepo.Used(ctx, order.ID); err != nil { + if err := v.OrderRepo.NotifyUsed(ctx, order.ID, req.PlainText.ConsumeInformation.TransactionID); err != nil { return err } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index c3b027d..2098bc9 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -79,10 +79,24 @@ func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo) error { } if status.IsUse() { - return v.used(ctx, order) + return v.queryUsed(ctx, order) } else if status.IsExpired() { return v.expired(ctx, order) } return nil } + +func (v *VoucherBiz) queryUsed(ctx context.Context, order *bo.OrderBo) error { + + if order.Status.IsUse() { + log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", order.OrderNo) + return nil + } + + if err := v.OrderRepo.Used(ctx, order.ID); err != nil { + return err + } + + return v.notify(ctx, order) +} diff --git a/internal/data/model/order.gen.go b/internal/data/model/order.gen.go index e0b6ef2..f078044 100644 --- a/internal/data/model/order.gen.go +++ b/internal/data/model/order.gen.go @@ -30,6 +30,7 @@ type Order struct { Attach string `gorm:"column:attach;not null;comment:attach" json:"attach"` ReceiveSuccessTime *time.Time `gorm:"column:receive_success_time" json:"receive_success_time"` LastUseTime *time.Time `gorm:"column:last_use_time" json:"last_use_time"` + TransactionId string `gorm:"column:transaction_id;not null" json:"transaction_id"` CreateTime *time.Time `gorm:"column:create_time" json:"create_time"` UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 5348a85..36cccf3 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -46,7 +46,7 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We var results = make([]*model.Order, 0) - result := tx.FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { + result := tx.FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) @@ -66,7 +66,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s Where("batch_no = ?", batchNo). Where("status = ?", vo.OrderStatusFail.GetValue()). //Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). - FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { + FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) @@ -83,8 +83,8 @@ func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx conte result := p.DB(ctx). Where("status = ?", vo.OrderStatusIng.GetValue()). - Limit(100). - FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error { + Limit(20). + FindInBatches(&results, 10, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) @@ -229,6 +229,22 @@ func (p *OrderRepoImpl) GetByVoucherNo(ctx context.Context, merchantNo, batchNo, return p.ToBo(info), nil } +func (this *OrderRepoImpl) GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error) { + row := &model.Order{} + + tx := this.DB(ctx).Where(model.Order{MerchantNo: stockCreatorMchId, BatchNo: stockID, TransactionId: transactionId}).First(&row) + + if tx.Error != nil { + return nil, tx.Error + } + + if tx.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return this.ToBo(row), nil +} + func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error { now := time.Now() @@ -338,6 +354,27 @@ func (p *OrderRepoImpl) Used(ctx context.Context, id uint64) error { return nil } +func (p *OrderRepoImpl) NotifyUsed(ctx context.Context, id uint64, transactionId string) error { + now := time.Now() + + tx := p.DB(ctx). + Where(model.Order{ + ID: id, + }). + Updates(model.Order{ + Status: vo.OrderStatusUse.GetValue(), + TransactionId: transactionId, + LastUseTime: &now, + UpdateTime: &now, + }) + + if tx.Error != nil { + return fmt.Errorf("update db fail %w", tx.Error) + } + + return nil +} + func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error { now := time.Now()