diff --git a/api/v1/cmb_cpn.proto b/api/v1/cmb_cpn.proto index e3c10d9..a3cd78e 100644 --- a/api/v1/cmb_cpn.proto +++ b/api/v1/cmb_cpn.proto @@ -29,6 +29,12 @@ service Cmb { }; } + rpc OrderRetry (OrderRetryRequest) returns (Empty) { + option (google.api.http) = { + post: "/voucher/cmb/v1/orderRetry", + body: "*" + }; + } rpc OrderMock (CmbOrderRequest) returns (CmbRequest) { option (google.api.http) = { @@ -56,23 +62,18 @@ service Cmb { }; } - rpc BatchQuery (BatchQueryRequest) returns (Empty) { + rpc Test (Empty) returns (Empty) { option (google.api.http) = { - post: "/voucher/cmb/v1/batchQuery", + post: "/voucher/cmb/v1/test", body: "*" }; } - } - -message BatchQueryRequest { - repeated string order_ids = 5 [json_name = "order_ids"]; - string begin_time = 6 [json_name = "begin_time"]; - string end_time = 7 [json_name = "end_time"]; +message OrderRetryRequest { + repeated string transactionIds = 1 [json_name = "transactionIds"]; } - message CmbRequest { // 请求公共参数 // 合作方唯一ID,32位定长 diff --git a/internal/biz/cron_notice.go b/internal/biz/cron_notice.go index b7539bf..547e60f 100644 --- a/internal/biz/cron_notice.go +++ b/internal/biz/cron_notice.go @@ -86,10 +86,10 @@ func (v *VoucherBiz) isCanNotice(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 - cacheValue, err := v.rdb.Rdb.Get(ctx, cache.Key).Result() + cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result() - if err != nil && err != redis.Nil { - return fmt.Errorf(fmt.Sprintf("notice 二次获取redis缓存%s异常:%v", cache.Key, err)) + if err2 != nil && err2 != redis.Nil { + return fmt.Errorf(fmt.Sprintf("notice 二次获取redis缓存%s异常:%v", cache.Key, err2)) } if len(cacheValue) > 0 { @@ -119,7 +119,7 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error { } if order.Status == status { - log.Warnf("notice 券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) + //log.Warnf("notice 券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) return nil } @@ -146,7 +146,7 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error { func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) error { if !orderNotify.Event.CanNotify() { - log.Warnf("notice 券状态:%s,忽略不通知,orderNo:%s", orderNotify.Event.GetText(), order.OrderNo) + //log.Warnf("notice 券状态:%s,忽略不通知,orderNo:%s", orderNotify.Event.GetText(), order.OrderNo) return nil } diff --git a/internal/biz/order.go b/internal/biz/order.go index c69cc20..a7b5b40 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -2,139 +2,78 @@ package biz import ( "context" - "encoding/json" "fmt" - "github.com/go-kratos/kratos/v2/errors" - "github.com/go-kratos/kratos/v2/log" - "time" err2 "voucher/api/err" - v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/vo" ) -func (v *VoucherBiz) CmbOrder(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { +func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { - order, err := v.cmbOrder(ctx, request) - - if err != nil { - return v.OrderFail(ctx, err) - } - - reply, err := v.OrderSuccess(ctx, order.OrderNo) - if err != nil { - return nil, err - } - - _ = v.bbToWx(ctx, order, reply) - - return reply, nil -} - -func (v *VoucherBiz) cmbOrder(ctx context.Context, request *v1.CmbRequest) (*bo.OrderBo, error) { - - bizContent, err := v.CmbMixRepo.OrderVerify(ctx, request) - if err != nil { - return nil, err - } - - ctx2 := context.Background() - - product, err3 := v.ProductRepo.GetByProductNo(ctx2, bizContent.ActivityId) + order, err3 := v.GetByOutBizNo(ctx, req) if err3 != nil { - return nil, err - } - - order, err := v.Order(ctx2, product, bizContent) - if err != nil { - return nil, err - } - - order.MiniMum = product.MiniMum - order.CouponValue = product.Amount - - return order, nil -} - -func (v *VoucherBiz) Order(ctx context.Context, product *bo.ProductBo, bizContent *v1.CmbOrderRequest) (order *bo.OrderBo, err error) { - - order, err = v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, bizContent.TransactionId) - if err != nil && !err2.IsDbNotFound(err) { - return order, err + return "", err3 } if order != nil { - if order.Status.IsFail() { + if order.Status.IsFail() || order.Status.IsIng() { if err4 := v.orderRetry(ctx, order); err4 != nil { - return order, err4 + return "", err4 } } - return order, err + return order.OrderNo, err } - order, err = v.order(ctx, product, bizContent) - if err != nil { - return nil, err + product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) + if err3 != nil { + return "", err3 } - return order, nil + order, err3 = v.order(ctx, req, product) + if err3 != nil { + return "", err3 + } + + return order.OrderNo, nil } -func (v *VoucherBiz) order(ctx context.Context, product *bo.ProductBo, bizContent *v1.CmbOrderRequest) (*bo.OrderBo, error) { - - req := &bo.OrderCreateReqBo{ - OutBizNo: bizContent.TransactionId, - ProductNo: bizContent.ActivityId, - Account: bizContent.CmbUid, - AppID: bizContent.AppId, - Attach: bizContent.Attach, - AccountType: vo.OrderAccountTypeOpenId, - Type: vo.OrderTypeCmb, - NotifyUrl: v.bc.Cmb.NotifyUrl, - } +func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { order, err := v.create(ctx, req, product) if err != nil { return nil, err } - _ = v.cmbToBB(ctx, bizContent) - - //voucherNo, err := v.WechatCpnRepo.Order(ctx, order) - //if err != nil { - // if err3 := v.fail(ctx, order, err); err3 != nil { - // return nil, err3 - // } - // return nil, err - //} - // 休眠100微妙 - time.Sleep(time.Millisecond * 100) - - // mock发券成功,测试使用 - voucherNo := order.OrderNo + voucherNo, err := v.WechatCpnRepo.Order(ctx, order) + if err != nil { + if err3 := v.fail(ctx, order, err); err3 != nil { + return nil, err3 + } + return nil, err + } if err = v.success(ctx, order, voucherNo); err != nil { - return order, err + return nil, err } + //_ = v.bbToWx(ctx, order, reply) + return order, nil } func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error { - // mock发券成功,测试使用 - voucherNo := order.OrderNo - //voucherNo, err := v.WechatCpnRepo.Order(ctx, order) - // - //if err != nil { - // if err3 := v.fail(ctx, order, err); err3 != nil { - // return err3 - // } - // return err - //} + voucherNo, err := v.WechatCpnRepo.Order(ctx, order) + + if err != nil { + if err3 := v.fail(ctx, order, err); err3 != nil { + return err3 + } + return err + } return v.success(ctx, order, voucherNo) } @@ -161,6 +100,7 @@ func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, produ } func (v *VoucherBiz) ing(ctx context.Context, id uint64) error { + return v.OrderRepo.Ing(ctx, id) } @@ -182,37 +122,31 @@ func (v *VoucherBiz) fail(ctx context.Context, order *bo.OrderBo, errReq error) return v.alarm(ctx, order, errReq.Error()) } -func (c *VoucherBiz) OrderSuccess(ctx context.Context, orderNo string) (*v1.CmbReply, error) { +func (v *VoucherBiz) GetByOutBizNo(ctx context.Context, req *bo.OrderCreateReqBo) (*bo.OrderBo, error) { - bizReply := &v1.CmbOrderReply{ - RespCode: vo.CmbResponseStatusSuccess.GetValue(), - RespMsg: "成功", - CodeNo: orderNo, + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, req.OutBizNo) + + if err != nil && !err2.IsDbNotFound(err) { + return nil, err } - replyBizContent, _ := json.Marshal(bizReply) - - return c.GetResponse(ctx, replyBizContent) + return order, nil } -func (c *VoucherBiz) OrderFail(ctx context.Context, err error) (*v1.CmbReply, error) { +func (v *VoucherBiz) UpdateOrderStatus(ctx context.Context, orderId uint64, status vo.OrderStatus) error { - se := errors.FromError(err) + if status.IsSuccess() { - if len(se.Reason) == 0 { - se.Reason = err2.CmbErr_CMB_UNKNOWN.String() + return v.OrderRepo.Available(ctx, orderId) + + } else if status.IsUse() { + + return v.OrderRepo.Used(ctx, orderId) + + } else if status.IsExpired() { + + return v.OrderRepo.Expired(ctx, orderId) } - log.Errorf("order fail: %v", se) - - bizReply := &v1.CmbOrderReply{ - RespCode: vo.CmbResponseStatusFail.GetValue(), - RespMsg: se.Message, - CodeNo: "", - ThirdErrCode: se.Reason, - } - - replyBizContent, _ := json.Marshal(bizReply) - - return c.GetResponse(ctx, replyBizContent) + return fmt.Errorf("notice 未知券状态,orderId:%d,statuText:%s", orderId, status.GetText()) } diff --git a/internal/biz/query.go b/internal/biz/query.go index 7d47a98..5d8515c 100644 --- a/internal/biz/query.go +++ b/internal/biz/query.go @@ -8,7 +8,6 @@ import ( v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/vo" - "voucher/internal/data/wechatrepoimpl" "voucher/internal/pkg/lock" ) @@ -67,24 +66,6 @@ func (v *VoucherBiz) Query(ctx context.Context, order *bo.OrderBo) error { return nil } -func (v *VoucherBiz) UpdateOrderStatus(ctx context.Context, orderId uint64, status vo.OrderStatus) error { - - if status.IsSuccess() { - - return v.OrderRepo.Available(ctx, orderId) - - } else if status.IsUse() { - - return v.OrderRepo.Used(ctx, orderId, time.Now()) - - } else if status.IsExpired() { - - return v.OrderRepo.Expired(ctx, orderId) - } - - return fmt.Errorf("notice 未知券状态,orderId:%d,statuText:%s", orderId, status.GetText()) -} - func (v *VoucherBiz) QueryOrder(ctx context.Context, orderNo string) (string, error) { order, err3 := v.OrderRepo.GetByOrderNo(ctx, orderNo) @@ -92,15 +73,10 @@ func (v *VoucherBiz) QueryOrder(ctx context.Context, orderNo string) (string, er return "", err3 } - resp, err := v.WechatCpnRepo.QueryCoupon(ctx, order) + status, err := v.WechatCpnRepo.Query(ctx, order) if err != nil { return "", err } - t, err := wechatrepoimpl.CpnStatus(*resp.Status).GetStatus() - if err != nil { - return "", err - } - - return fmt.Sprintf("orderNo:%s,订单状态:%s,微信查询返回状态:%s,wxResp:%+v", orderNo, order.Status.GetText(), t, resp), nil + return fmt.Sprintf("orderNo:%s,订单状态:%s,微信查询返回状态:%s", orderNo, order.Status.GetText(), status.GetText()), nil } diff --git a/internal/biz/register_tag.go b/internal/biz/register_tag.go new file mode 100644 index 0000000..cb8314d --- /dev/null +++ b/internal/biz/register_tag.go @@ -0,0 +1,102 @@ +package biz + +import ( + "context" + "fmt" + "github.com/redis/go-redis/v9" + err2 "voucher/api/err" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" +) + +// RegisterTag 注册通知标签 stock.MchId 批次创建商户, stock.BatchNo 商品批次号 +func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error { + + stock, err := this.ProductRepo.GetByProductNo(ctx, productNo) + if err != nil { + return err + } + + return this.registerNotifyTag(ctx, stock.MchId, stock.BatchNo) +} + +func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { + + c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) + + _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err == nil { + // 缓存存在,直接返回 + return nil + } + + if err != redis.Nil { + return fmt.Errorf(fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err)) + } + + cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) + + return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { + // 二次获取,判定处理,以免获取锁后又执行了一次 + + cacheValue, err3 := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err3 != nil && err3 != redis.Nil { + return fmt.Errorf(fmt.Sprintf("二次获取redis缓存%s异常:%v", c.Key, err)) + } + + if cacheValue != "" { + return nil // 有直接返回 + } + + wechatNotifyTag, err3 := v.WechatNotifyRegisterTagRepo.GetByStockIdAndMchId(ctx, stockCreatorMchID, stockID) + if err3 != nil && !err2.IsDbNotFound(err3) { + return err3 + } + + if wechatNotifyTag != nil { + if wechatNotifyTag.Tag != v.bc.WechatNotifyMQ.Tag { + return fmt.Errorf("tag不一致,请检查tag配置:%s", wechatNotifyTag.Tag) + } + + if wechatNotifyTag.Status.IsSuccess() { + return v.setCache(ctx, c, wechatNotifyTag) + } + } else { + wechatNotifyTag, err3 = v.createWechatNotifyRegisterTag(ctx, stockCreatorMchID, stockID) + if err3 != nil { + return err3 + } + } + + if err = v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { + + return v.WechatNotifyRegisterTagRepo.Fail(ctx, wechatNotifyTag.ID, err.Error()) + } + + if err = v.WechatNotifyRegisterTagRepo.Success(ctx, wechatNotifyTag.ID); err != nil { + return err + } + + return v.setCache(ctx, c, wechatNotifyTag) + }) +} + +func (v *VoucherBiz) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { + return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ + StockID: stockID, + StockCreatorMchID: stockCreatorMchID, + Tag: v.bc.WechatNotifyMQ.Tag, + }) +} + +func (v *VoucherBiz) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { + + if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil { + return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) + } + + return nil +} diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 7e3b768..9c43d90 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -8,6 +8,7 @@ import ( ) type OrderRepo interface { + FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) diff --git a/internal/biz/retry.go b/internal/biz/retry.go new file mode 100644 index 0000000..2816fdf --- /dev/null +++ b/internal/biz/retry.go @@ -0,0 +1,45 @@ +package biz + +import ( + "context" + "fmt" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" +) + +func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { + + if len(outBizNos) > 0 { + + for _, outBizNo := range outBizNos { + + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo) + if err != nil { + return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err)) + } + + if !order.Status.IsIng() { + return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status)) + } + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + } + + return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error { + + for _, order := range rows { + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + }) + +} diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 0aa58a4..fd1d49c 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -30,6 +30,24 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.Order{}) } +func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + var results = make([]*model.Order, 0) + + result := p.DB(ctx). + Where("status = ?", vo.OrderStatusIng.GetValue()). + Limit(100). + FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error { + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { var results = make([]*model.Order, 0) @@ -74,8 +92,7 @@ func (p *OrderRepoImpl) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderB UpdateTime: &now, } - db := p.DB(ctx) - tx := db.Create(info) + tx := p.DB(ctx).Create(info) if tx.Error != nil { return nil, fmt.Errorf("create db fail %w", tx.Error) @@ -88,8 +105,7 @@ func (p *OrderRepoImpl) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBi info := &model.Order{} - db := p.DB(ctx) - tx := db.Where(model.Order{Type: t.GetValue(), OutBizNo: outBizNo}).First(&info) + tx := p.DB(ctx).Where(model.Order{Type: t.GetValue(), OutBizNo: outBizNo}).First(&info) if tx.Error != nil { @@ -191,8 +207,7 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string tx := p.DB(ctx). Where(model.Order{ - ID: id, - Status: vo.OrderStatusIng.GetValue(), + ID: id, }). Updates(model.Order{ Status: vo.OrderStatusSuccess.GetValue(), diff --git a/internal/data/wechatrepoimpl/cpn.go b/internal/data/wechatrepoimpl/cpn.go index f60738e..7754d9a 100644 --- a/internal/data/wechatrepoimpl/cpn.go +++ b/internal/data/wechatrepoimpl/cpn.go @@ -87,7 +87,11 @@ func (c *CpnRepoImpl) Order(ctx context.Context, order *bo.OrderBo) (string, err if err != nil { - return "", c.bodyErr(ctx, result) + if result.Response != nil && result.Response.Body != nil { + return "", c.bodyErr(ctx, result) + } + + return "", err } return *resp.CouponId, nil @@ -135,7 +139,11 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or resp, result, err := svc.QueryCoupon(ctx, req) if err != nil { - return 0, c.bodyErr(ctx, result) + if result.Response != nil && result.Response.Body != nil { + return 0, c.bodyErr(ctx, result) + } + + return 0, err } return CpnStatus(*resp.Status).GetStatus() @@ -162,7 +170,11 @@ func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stock if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response != nil && result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil @@ -184,7 +196,11 @@ func (c *CpnRepoImpl) QueryCallback(ctx context.Context) (*cashcoupons.Callback, if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response != nil && result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil @@ -207,7 +223,11 @@ func (c *CpnRepoImpl) SetCallback(ctx context.Context, url string) (*cashcoupons if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response != nil && result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil diff --git a/internal/pkg/helper/utils_test.go b/internal/pkg/helper/utils_test.go index 5ab23c1..64996c6 100644 --- a/internal/pkg/helper/utils_test.go +++ b/internal/pkg/helper/utils_test.go @@ -3,9 +3,25 @@ package helper import ( "fmt" "testing" + "time" ) func TestHashMod(t *testing.T) { serverId := HashMod("1dfsfdsfsddf12dddd5451212iodewnsanf2") fmt.Println(serverId) } + +func TestNoticeTime(t *testing.T) { + now := time.Now() + + // 获取七天前的日期 + noticeStartDay := now.AddDate(0, 0, -29) + // 获取七天前 00:00:00 的时间 + startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location()) + + noticeEndDay := now.AddDate(0, 0, -28) + // 获取昨天 23:59:59 的时间 + endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location()) + + t.Logf("startTime:%s,endTime:%s", startTime, endTime) +} diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 36134bb..2102a2b 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -19,9 +19,9 @@ func Test_WechatNotifyProducer(t *testing.T) { "original_type":"coupon", "associated_data":"coupon", "plain_text":{ -"stock_creator_mchid":"1652465541", -"stock_id":"10000000", -"coupon_id":"192720602126930329826", +"stock_creator_mchid":"1676203838", +"stock_id":"20215869", +"coupon_id":"754343650536853505", "coupon_name":"萧山农商新客激活礼", "description":"","status":"USED", "create_time":"2025-03-07T15:49:31+08:00", @@ -38,27 +38,26 @@ func Test_WechatNotifyProducer(t *testing.T) { func Test_WechatNotifyProducer2(t *testing.T) { - tag := "voucher_notify_pro" + tag := "voucher_notify_dev" bodyStr := `{"id":"5465699d-de6a-5414-a8df-283167b577ca", - "create_time":"2025-03-07T15:57:24+08:00", - "resource_type":"encrypt-resource", - "event_type":"COUPON.USE", - "summary":"代金券核销通知", - "original_type":"coupon", - "associated_data":"coupon", - "plain_text":{ - "stock_creator_mchid":"1652465541", - "stock_id":"20392273", - "coupon_id":"101286678322", - "coupon_name":"test", - "description":"", - "status":"USED", - "create_time":"2025-03-07T15:49:31+08:00", - "coupon_type":"NORMAL", - "no_cash":false, - "singleitem":false, - "consume_information":{"consume_time":"2025-03-07T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}` +"create_time":"2025-03-07T15:57:24+08:00", +"resource_type":"encrypt-resource", +"event_type":"COUPON.USE", +"summary":"代金券核销通知", +"original_type":"coupon", +"associated_data":"coupon", +"plain_text":{ +"stock_creator_mchid":"1652465541", +"stock_id":"20393435", +"coupon_id":"101423873113", +"coupon_name":"test", +"description":"","status":"USED", +"create_time":"2025-03-07T15:49:31+08:00", +"coupon_type":"NORMAL", +"no_cash":false, +"singleitem":false, +"consume_information":{"consume_time":"2025-05-13T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}` if err := wechatNotifyProducer(tag, bodyStr); err != nil { t.Errorf("入队失败 error = %v", err) diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_consumer.go similarity index 87% rename from internal/server/wechat_notify_consumer.go rename to internal/server/wechat_consumer.go index 1bafc1b..3017144 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_consumer.go @@ -62,12 +62,12 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag) - w.consumeMessages(ctx, mqConsumer) + w.consumeMessages(mqConsumer) return nil } -func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer) { +func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer) { for { endChan := make(chan int) respChan := make(chan mq_http_sdk.ConsumeMessageResponse) @@ -81,7 +81,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m for _, v := range resp.Messages { handles = append(handles, v.ReceiptHandle) - w.processMessage(ctx, v) + w.processMessage(v) } // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 @@ -131,11 +131,10 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m } // 业务逻辑处理 -func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) { - +func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) { // 收到消息 if w.shutdownFlag.Load() { - fmt.Println("正在退出中,延期处理") + fmt.Println("wechat consumer 正在退出中,延期处理") // 卡住,不再继续消费,等待退出 time.Sleep(24 * time.Hour) return @@ -146,15 +145,14 @@ func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_s defer func() { w.activeCnt.Add(-1) if v := recover(); v != nil { - log.Errorf("处理消息panic, ,%+v", v) + log.Errorf("wechat consumer 处理消息panic, ,%+v", v) return } }() - log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) - + ctx := context.Background() if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { - log.Errorf("微信回调消费处理失败:%+v", err) + log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message:%s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err) } } @@ -162,18 +160,18 @@ func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_s func (w *WechatNotifyConsumer) Stop(_ context.Context) error { if !w.conf.WechatNotifyMQ.IsOpenConsumer { - fmt.Println("关闭 wechat consumer 完成!") + fmt.Println("wechat consumer 关闭完成!") return nil } - fmt.Println("关闭 wechat consumer 中...") + fmt.Println("wechat consumer 关闭中...") w.shutdownFlag.Store(true) //shutdown之间,保证正在处理的消费先提交 _ = w.blockWaitFinish() - fmt.Println("关闭 wechat consumer 完成") + fmt.Println("wechat consumer 关闭完成") return nil } @@ -185,10 +183,10 @@ func (c *WechatNotifyConsumer) blockWaitFinish() error { for { cnt := c.activeCnt.Load() if cnt == 0 { - //无业务处理,正常退 + fmt.Println("wechat consumer 无业务处理,正常退") break } else { - fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt) + fmt.Printf("wechat consumer 等待消费者退出,%d 个正在运行\n", cnt) } time.Sleep(1 * time.Second) } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 84d9126..6ceabdf 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -1,14 +1,18 @@ package service import ( + "context" "fmt" + "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "github.com/robfig/cron" http2 "net/http" "strconv" v1 "voucher/api/v1" "voucher/internal/biz" + "voucher/internal/biz/bo" "voucher/internal/biz/mixrepos" + "voucher/internal/biz/vo" "voucher/internal/biz/wechatrepo" "voucher/internal/conf" ) @@ -39,6 +43,23 @@ func NewCmbService( } } +func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*v1.CmbReply, error) { + + req := &bo.CmbResponseBo{ + RespCode: vo.CmbResponseStatusSuccess.GetValue(), + RespMsg: "成功", + BizContent: string(replyBizContent), + } + + reply, err := c.CmbMixRepo.GetResponse(ctx, req) + if err != nil { + log.Errorf("build cmb response fail: %v", err) + return nil, err + } + + return reply, nil +} + func (this *CmbService) NotifyRetry(ctx http.Context) error { id := ctx.Vars().Get("id") if id == "" { @@ -70,14 +91,24 @@ func (this *CmbService) QueryOrder(ctx http.Context) error { }) } +func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { + + return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) +} + func (this *CmbService) RegisterTag(ctx http.Context) error { productNo := ctx.Vars().Get("product_no") if productNo == "" { - return fmt.Errorf("productNo is empty") + return fmt.Errorf("product_no is empty") + } + + err := this.VoucherBiz.RegisterTag(ctx, productNo) + if err != nil { + return err } return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": this.VoucherBiz.RegisterTag(ctx, productNo), + "data": productNo, }) } diff --git a/internal/service/order.go b/internal/service/order.go index 3ac794c..398076b 100644 --- a/internal/service/order.go +++ b/internal/service/order.go @@ -2,9 +2,80 @@ package service import ( "context" + "encoding/json" + "github.com/go-kratos/kratos/v2/errors" + err2 "voucher/api/err" v1 "voucher/api/v1" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" ) func (c *CmbService) Order(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { - return c.VoucherBiz.CmbOrder(ctx, request) + + orderNo, err := c.order(ctx, request) + + if err != nil { + return c.OrderFail(ctx, err) + } + + return c.OrderSuccess(ctx, orderNo) +} + +func (c *CmbService) order(ctx context.Context, request *v1.CmbRequest) (string, error) { + + bizContent, err := c.CmbMixRepo.OrderVerify(ctx, request) + if err != nil { + return "", err + } + + boReq := &bo.OrderCreateReqBo{ + OutBizNo: bizContent.TransactionId, + ProductNo: bizContent.ActivityId, + Account: bizContent.CmbUid, + AppID: bizContent.AppId, + Attach: bizContent.Attach, + AccountType: vo.OrderAccountTypeOpenId, + Type: vo.OrderTypeCmb, + NotifyUrl: c.bc.Cmb.NotifyUrl, + } + + orderNo, err := c.VoucherBiz.CmbOrder(ctx, boReq) + if err != nil { + return "", err + } + + return orderNo, nil +} + +func (c *CmbService) OrderSuccess(ctx context.Context, orderNo string) (*v1.CmbReply, error) { + + bizReply := &v1.CmbOrderReply{ + RespCode: vo.CmbResponseStatusSuccess.GetValue(), + RespMsg: "成功", + CodeNo: orderNo, + } + + replyBizContent, _ := json.Marshal(bizReply) + + return c.GetResponse(ctx, replyBizContent) +} + +func (c *CmbService) OrderFail(ctx context.Context, err error) (*v1.CmbReply, error) { + + se := errors.FromError(err) + + if len(se.Reason) == 0 { + se.Reason = err2.CmbErr_CMB_UNKNOWN.String() + } + + bizReply := &v1.CmbOrderReply{ + RespCode: vo.CmbResponseStatusFail.GetValue(), + RespMsg: se.Message, + CodeNo: "", + ThirdErrCode: se.Reason, + } + + replyBizContent, _ := json.Marshal(bizReply) + + return c.GetResponse(ctx, replyBizContent) } diff --git a/internal/service/product.go b/internal/service/product.go index 8b319bc..4516cbb 100644 --- a/internal/service/product.go +++ b/internal/service/product.go @@ -10,6 +10,26 @@ import ( "voucher/internal/biz/vo" ) +func (c *CmbService) QueryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { + + bizReply, err := c.queryProduct(ctx, request) + if err != nil { + return c.QueryProductFail(ctx, err) + } + + return c.QueryProductSuccess(ctx, bizReply) +} + +func (c *CmbService) queryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryProductReply, error) { + + bizContent, err := c.CmbMixRepo.ProductQueryVerify(ctx, request) + if err != nil { + return nil, err + } + + return c.VoucherBiz.CmbProductQuery(ctx, bizContent.ActivityId) +} + func (c *CmbService) QueryProductSuccess(ctx context.Context, bizReply *v1.CmbQueryProductReply) (*v1.CmbReply, error) { replyBizContent, _ := json.Marshal(bizReply) @@ -37,23 +57,3 @@ func (c *CmbService) QueryProductFail(ctx context.Context, err error) (*v1.CmbRe return c.VoucherBiz.GetResponse(ctx, replyBizContent) } - -func (c *CmbService) QueryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { - - bizReply, err := c.queryProduct(ctx, request) - if err != nil { - return c.QueryProductFail(ctx, err) - } - - return c.QueryProductSuccess(ctx, bizReply) -} - -func (c *CmbService) queryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryProductReply, error) { - - bizContent, err := c.CmbMixRepo.ProductQueryVerify(ctx, request) - if err != nil { - return nil, err - } - - return c.VoucherBiz.CmbProductQuery(ctx, bizContent.ActivityId) -} diff --git a/internal/service/query.go b/internal/service/query.go index 2412950..11a4ec8 100644 --- a/internal/service/query.go +++ b/internal/service/query.go @@ -8,6 +8,27 @@ import ( v1 "voucher/api/v1" ) +func (c *CmbService) Query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { + + orderNo, err := c.query(ctx, request) + + if err != nil { + return c.QueryFail(ctx, err) + } + + return c.QuerySuccess(ctx, orderNo) +} + +func (c *CmbService) query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryReply, error) { + + bizContent, err := c.CmbMixRepo.QueryVerify(ctx, request) + if err != nil { + return nil, err + } + + return c.VoucherBiz.CmbQuery(ctx, bizContent.CodeNo) +} + func (c *CmbService) QuerySuccess(ctx context.Context, bizReply *v1.CmbQueryReply) (*v1.CmbReply, error) { replyBizContent, _ := json.Marshal(bizReply) @@ -29,24 +50,3 @@ func (c *CmbService) QueryFail(ctx context.Context, err error) (*v1.CmbReply, er return c.VoucherBiz.GetResponse(ctx, replyBizContent) } - -func (c *CmbService) Query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { - - orderNo, err := c.query(ctx, request) - - if err != nil { - return c.QueryFail(ctx, err) - } - - return c.QuerySuccess(ctx, orderNo) -} - -func (c *CmbService) query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryReply, error) { - - bizContent, err := c.CmbMixRepo.QueryVerify(ctx, request) - if err != nil { - return nil, err - } - - return c.VoucherBiz.CmbQuery(ctx, bizContent.CodeNo) -} diff --git a/third_party/swagger_ui/openapi.yaml b/third_party/swagger_ui/openapi.yaml index 360dcdc..2136ec6 100644 --- a/third_party/swagger_ui/openapi.yaml +++ b/third_party/swagger_ui/openapi.yaml @@ -60,6 +60,24 @@ paths: application/json: schema: $ref: '#/components/schemas/api.v1.CmbRequest' + /voucher/cmb/v1/orderRetry: + post: + tags: + - Cmb + operationId: Cmb_OrderRetry + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.OrderRetryRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.Empty' /voucher/cmb/v1/product/query: post: tags: @@ -274,5 +292,12 @@ components: properties: encryptBody: type: string + api.v1.OrderRetryRequest: + type: object + properties: + transactionIds: + type: array + items: + type: string tags: - name: Cmb