voucher/internal/biz/wechat_query.go

123 lines
3.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package biz
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
if v.Get("CMB_WECHAT_QUERY") {
return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For"))
}
if req.ProductNo != "" {
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
if err != nil {
return err
}
}
queue := v.bc.RdsMQ.GetWechatQuery()
if queue == nil {
return fmt.Errorf("队列不存在")
}
msg, err := json.Marshal(req)
if err != nil {
return err
}
v.Add("CMB_WECHAT_QUERY")
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
if err != nil {
v.Remove("CMB_WECHAT_QUERY")
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
defer v.Remove("CMB_WECHAT_QUERY")
var req *do.WechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
start := time.Now()
startStr := time.Now().String()
log.Warnf("微信券查询处理开始:%s,msg:%s", startStr, msg)
fmt.Printf("微信券查询处理开始:%s,msg:%s", startStr, msg)
n := 0
num := 0
notifyNum := 0
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
num += 1
if err := v.wechatQuery(ctx, order, &notifyNum); err != nil {
log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v",
msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err)
}
}
groupTime := time.Now()
log.Warnf("微信券查询处理第:%d组,已执行条数:%d,核销通知条数:%d,执行开始时间:%s当前时间:%s,已耗时:%s", n, num, notifyNum, startStr, groupTime.String(), groupTime.Sub(start).String())
return nil
})
endTime := time.Now()
log.Warnf("微信券查询处理耗时:%s,结束时间:%s,处理%d组,处理%d单,核销通知条数:%d,msg:%s", endTime.Sub(start).String(), endTime.String(), n, num, notifyNum, msg)
fmt.Printf("微信券查询处理耗时:%s,结束时间%s,处理%d组,处理%d单,核销通知条数:%d,msg:%s", endTime.Sub(start).String(), endTime.String(), n, num, notifyNum, msg)
return err
}
func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
status, err := v.WechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsed(ctx, order, notifyNum)
} else if status.IsExpired() {
return v.expired(ctx, order)
}
return nil
}
func (v *VoucherBiz) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
*notifyNum += 1
if order.Status.IsUse() {
return v.notify(ctx, order)
}
if err := v.OrderRepo.Used(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}