Merge branch 'pro' into kx
# Conflicts: # internal/pkg/request/request.go
This commit is contained in:
commit
fa36755179
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
"voucher/internal/biz/vo"
|
"voucher/internal/biz/vo"
|
||||||
|
|
@ -89,7 +90,7 @@ func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time
|
||||||
duration := 2 * time.Hour
|
duration := 2 * time.Hour
|
||||||
|
|
||||||
eg := new(errgroup.Group)
|
eg := new(errgroup.Group)
|
||||||
eg.SetLimit(10)
|
eg.SetLimit(8)
|
||||||
|
|
||||||
for start := startTime; start.Before(endTime); start = start.Add(duration) {
|
for start := startTime; start.Before(endTime); start = start.Add(duration) {
|
||||||
|
|
||||||
|
|
@ -116,7 +117,12 @@ func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err)
|
|
||||||
|
// 获取当前堆栈信息
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
stackSize := runtime.Stack(buf, false)
|
||||||
|
|
||||||
|
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v\n堆栈信息:\n%s", req, err, string(buf[:stackSize]))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
@ -134,15 +140,22 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse
|
||||||
num := 0
|
num := 0
|
||||||
notifyNum := 0
|
notifyNum := 0
|
||||||
errNum := 0
|
errNum := 0
|
||||||
|
emptyNum := 0
|
||||||
|
|
||||||
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||||
|
|
||||||
for _, order := range rows {
|
for _, order := range rows {
|
||||||
|
|
||||||
|
if order == nil {
|
||||||
|
emptyNum += 1
|
||||||
|
log.Errorf("订单对象为 nil, req:%+v", req)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
num += 1
|
num += 1
|
||||||
if err := v.notice(ctx, order, ¬ifyNum); err != nil {
|
if err := v.notice(ctx, order, ¬ifyNum); err != nil {
|
||||||
errNum += 1
|
errNum += 1
|
||||||
log.Error(err)
|
log.Errorf("订单定时通知,err:%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -154,6 +167,8 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse
|
||||||
"searchTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
|
"searchTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
|
||||||
"num": num,
|
"num": num,
|
||||||
"notifyNum": notifyNum,
|
"notifyNum": notifyNum,
|
||||||
|
"errNum": errNum,
|
||||||
|
"emptyNum": emptyNum,
|
||||||
"elapsed": time.Now().Sub(start).String(),
|
"elapsed": time.Now().Sub(start).String(),
|
||||||
}
|
}
|
||||||
log.Warnf("订单定时通知,%+v", logFields)
|
log.Warnf("订单定时通知,%+v", logFields)
|
||||||
|
|
@ -191,14 +206,14 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *i
|
||||||
Type: order.Type,
|
Type: order.Type,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = v.cmbNotice(ctx, order, orderNotify, notifyNum); err != nil {
|
if err = v.request(ctx, order, orderNotify, notifyNum); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return v.UpdateOrderStatus(ctx, order.ID, status)
|
return v.UpdateOrderStatus(ctx, order.ID, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error {
|
func (v *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error {
|
||||||
|
|
||||||
if !orderNotify.Event.CanNotify() {
|
if !orderNotify.Event.CanNotify() {
|
||||||
return nil // 不可通知,忽略
|
return nil // 不可通知,忽略
|
||||||
|
|
@ -211,13 +226,13 @@ func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNoti
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
|
if request == nil {
|
||||||
if err != nil {
|
return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo)
|
||||||
return fmt.Errorf("订单定时通知,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
|
_, err = v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
|
||||||
return errors.New("订单定时通知,招行返回错误:" + reply.RespMsg)
|
if err != nil {
|
||||||
|
return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ type OrderRepo interface {
|
||||||
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||||
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
|
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
|
||||||
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
|
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
|
||||||
GetByVoucherNo(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error)
|
GetByCouponId(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error)
|
||||||
GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error)
|
GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error)
|
||||||
Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error)
|
Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error)
|
||||||
GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error)
|
GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error)
|
||||||
|
|
|
||||||
|
|
@ -12,16 +12,23 @@ import (
|
||||||
|
|
||||||
func (v *Query) execute(ctx context.Context, req *timeslice.Manager) error {
|
func (v *Query) execute(ctx context.Context, req *timeslice.Manager) error {
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
managerStartStr := req.StartTime.Format(time.DateTime)
|
managerStartStr := req.StartTime.Format(time.DateTime)
|
||||||
managerEndStr := req.EndTime.Format(time.DateTime)
|
managerEndStr := req.EndTime.Format(time.DateTime)
|
||||||
|
|
||||||
|
log.Warnf("微信券查询,%s到%s,开始", managerStartStr, managerEndStr)
|
||||||
|
fmt.Printf("微信券查询,%s到%s,开始\n", managerStartStr, managerEndStr)
|
||||||
|
|
||||||
taskCount, err := timeslice.NewManager(v.callbackFunc).Run(ctx, req)
|
taskCount, err := timeslice.NewManager(v.callbackFunc).Run(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("微信券查询处理,%s到%s,失败:%v", managerStartStr, managerEndStr, err)
|
log.Errorf("微信券查询,%s到%s,失败:%v", managerStartStr, managerEndStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("微信券查询处理,%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
|
elapsed := time.Now().Sub(start).String()
|
||||||
log.Warnf("微信券查询处理,%s到%s,总任务数:%d", managerStartStr, managerEndStr, taskCount)
|
|
||||||
|
log.Warnf("微信券查询,%s到%s,总任务数:%d,总耗时:%s", managerStartStr, managerEndStr, taskCount, elapsed)
|
||||||
|
fmt.Printf("微信券查询,%s到%s,总任务数:%d,总耗时:%s\n", managerStartStr, managerEndStr, taskCount, elapsed)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -62,10 +69,10 @@ func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error {
|
||||||
"stock_id": order.BatchNo,
|
"stock_id": order.BatchNo,
|
||||||
"err": err.Error(),
|
"err": err.Error(),
|
||||||
}
|
}
|
||||||
log.Errorf("微信券查询处理,%s到%s,taskId:%d,错误:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
log.Errorf("微信券查询,%s到%s,taskId:%d,错误:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
||||||
|
|
||||||
if errNum > 20 {
|
if errNum > 20 {
|
||||||
return fmt.Errorf("微信券查询处理,%s到%s,第%d个任务,已经连续发生20次错误%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
return fmt.Errorf("微信券查询,%s到%s,第%d个任务,已经连续发生20次错误%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,13 +83,14 @@ func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error {
|
||||||
|
|
||||||
end := time.Now()
|
end := time.Now()
|
||||||
|
|
||||||
logFields := map[string]interface{}{
|
logFields := map[string]any{
|
||||||
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
|
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
|
||||||
"num": num,
|
"num": num,
|
||||||
"notifyNum": notifyNum,
|
"notifyNum": notifyNum,
|
||||||
|
"errNum": errNum,
|
||||||
"elapsed": end.Sub(start).String(),
|
"elapsed": end.Sub(start).String(),
|
||||||
}
|
}
|
||||||
log.Warnf("微信券查询处理,%s到%s,taskId:%d,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
log.Warnf("微信券查询,%s到%s,taskId:%d,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,25 +106,13 @@ func (v *Query) Consumer(ctx context.Context, msg string) error {
|
||||||
|
|
||||||
req, err := v.getManager(msg)
|
req, err := v.getManager(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
log.Errorf("微信券查询,前置参数处理失败,msg:%s,err:%v", msg, err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
reqStr := req.String()
|
|
||||||
|
|
||||||
executeStart := time.Now()
|
|
||||||
executeStartStr := executeStart.Format(time.DateTime)
|
|
||||||
|
|
||||||
log.Warnf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
|
|
||||||
fmt.Printf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
|
|
||||||
|
|
||||||
if err = v.execute(ctx, req); err != nil {
|
if err = v.execute(ctx, req); err != nil {
|
||||||
log.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
log.Errorf("微信券查询,失败,msg:%s,err:%v", msg, err)
|
||||||
return fmt.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
executeEnd := time.Now()
|
|
||||||
log.Warnf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
|
|
||||||
fmt.Printf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@ package timeslicequery
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -40,7 +39,6 @@ func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int
|
||||||
func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
|
func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
|
||||||
|
|
||||||
if order.Status.IsExpired() {
|
if order.Status.IsExpired() {
|
||||||
log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ func (v *VoucherBiz) createUseLog(ctx context.Context, order *bo.OrderBo, req *b
|
||||||
|
|
||||||
func (this *VoucherBiz) getOrder(ctx context.Context, req *bo.WechatVoucherNotifyBo) (*bo.OrderBo, error) {
|
func (this *VoucherBiz) getOrder(ctx context.Context, req *bo.WechatVoucherNotifyBo) (*bo.OrderBo, error) {
|
||||||
|
|
||||||
order, err := this.OrderRepo.GetByVoucherNo(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
|
order, err := this.OrderRepo.GetByCouponId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,10 +24,29 @@ import (
|
||||||
|
|
||||||
type CmbMixRepoImpl struct {
|
type CmbMixRepoImpl struct {
|
||||||
bc *conf.Bootstrap
|
bc *conf.Bootstrap
|
||||||
|
|
||||||
|
// 连接池复用(优化网络开销)
|
||||||
|
options *request.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCmbMixRepoImpl(bc *conf.Bootstrap) mixrepos.CmbMixRepo {
|
func NewCmbMixRepoImpl(bc *conf.Bootstrap) mixrepos.CmbMixRepo {
|
||||||
return &CmbMixRepoImpl{bc: bc}
|
|
||||||
|
h := http.Header{
|
||||||
|
"Content-Type": []string{"application/json"},
|
||||||
|
}
|
||||||
|
hc := &http.Client{
|
||||||
|
Timeout: 20 * time.Second,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: 100, // 最大空闲连接数
|
||||||
|
MaxIdleConnsPerHost: 20, // 每个主机的最大空闲连接数
|
||||||
|
IdleConnTimeout: 30 * time.Second, // 空闲连接超时时间
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return &CmbMixRepoImpl{
|
||||||
|
bc: bc,
|
||||||
|
options: request.NewOptions(request.WithHeaders(h), request.WithHttpClient(hc)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CmbMixRepoImpl) recordBody(ctx context.Context) {
|
func (c *CmbMixRepoImpl) recordBody(ctx context.Context) {
|
||||||
|
|
|
||||||
|
|
@ -212,20 +212,17 @@ func (p *OrderRepoImpl) GetByOrderNo(ctx context.Context, orderNo string) (*bo.O
|
||||||
return p.ToBo(info), nil
|
return p.ToBo(info), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *OrderRepoImpl) GetByVoucherNo(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) {
|
func (p *OrderRepoImpl) GetByCouponId(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) {
|
||||||
info := &model.Order{}
|
info := &model.Order{}
|
||||||
|
|
||||||
tx := p.DB(ctx).Where(model.Order{MerchantNo: merchantNo, BatchNo: batchNo, VoucherNo: voucherNo}).First(&info)
|
tx := p.DB(ctx).Where(model.Order{MerchantNo: merchantNo, BatchNo: batchNo, VoucherNo: voucherNo}).First(&info)
|
||||||
|
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
|
return nil, tx.Error
|
||||||
return nil, err2.ErrorDbNotFound("订单数据不存在")
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("db fail %w", tx.Error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if tx.RowsAffected == 0 {
|
if tx.RowsAffected == 0 {
|
||||||
return nil, err2.ErrorDbNotFound("订单数据不存在")
|
return nil, gorm.ErrRecordNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.ToBo(info), nil
|
return p.ToBo(info), nil
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -39,18 +40,18 @@ func NewOptions(options ...Option) *Options {
|
||||||
|
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
func WithHttpClient(httpClient *http.Client) Option {
|
|
||||||
return func(options *Options) {
|
|
||||||
options.HttpClient = httpClient
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithHeaders(headers http.Header) Option {
|
func WithHeaders(headers http.Header) Option {
|
||||||
return func(options *Options) {
|
return func(options *Options) {
|
||||||
options.Headers = headers
|
options.Headers = headers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithHttpClient(httpClient *http.Client) Option {
|
||||||
|
return func(options *Options) {
|
||||||
|
options.HttpClient = httpClient
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithStatusCodeFunc(statusCodeFunc func(int) bool) Option {
|
func WithStatusCodeFunc(statusCodeFunc func(int) bool) Option {
|
||||||
return func(options *Options) {
|
return func(options *Options) {
|
||||||
options.StatusCodeFunc = statusCodeFunc
|
options.StatusCodeFunc = statusCodeFunc
|
||||||
|
|
|
||||||
|
|
@ -152,7 +152,8 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
|
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
|
||||||
log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message:%s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err)
|
//log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message:%s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err)
|
||||||
|
log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, err:%+v", msg.MessageId, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue