diff --git a/internal/biz/cron_notice.go b/internal/biz/cron_notice.go index e2fe857..5de2e3f 100644 --- a/internal/biz/cron_notice.go +++ b/internal/biz/cron_notice.go @@ -7,6 +7,7 @@ import ( "github.com/go-kratos/kratos/v2/log" "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" + "runtime" "time" "voucher/internal/biz/bo" "voucher/internal/biz/vo" @@ -115,7 +116,12 @@ func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time defer func() { if err := recover(); err != nil { - log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err) + + // 获取当前堆栈信息 + buf := make([]byte, 1024) + stackSize := runtime.Stack(buf, false) + + log.Errorf("订单定时通知,发生错误:req:%+v,err:%v\n堆栈信息:\n%s", req, err, string(buf[:stackSize])) } }() @@ -133,15 +139,22 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse num := 0 notifyNum := 0 errNum := 0 + emptyNum := 0 err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { + if order == nil { + emptyNum += 1 + log.Errorf("订单对象为 nil, req:%+v", req) + continue + } + num += 1 if err := v.notice(ctx, order, ¬ifyNum); err != nil { errNum += 1 - log.Error(err) + log.Errorf("订单定时通知,err:%v", err) } } @@ -154,6 +167,7 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse "num": num, "notifyNum": notifyNum, "errNum": errNum, + "emptyNum": emptyNum, "elapsed": time.Now().Sub(start).String(), } log.Warnf("订单定时通知,%+v", logFields) @@ -186,14 +200,14 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *i Type: order.Type, } - if err = v.cmbNotice(ctx, order, orderNotify, notifyNum); err != nil { + if err = v.request(ctx, order, orderNotify, notifyNum); err != nil { return err } return v.UpdateOrderStatus(ctx, order.ID, status) } -func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error { +func (v *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error { if !orderNotify.Event.CanNotify() { return nil // 不可通知,忽略 @@ -206,13 +220,13 @@ func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNoti return err } - reply, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) - if err != nil { - return fmt.Errorf("订单定时通知,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error()) + if request == nil { + return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo) } - if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { - return errors.New("订单定时通知,招行返回错误:" + reply.RespMsg) + _, err = v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) + if err != nil { + return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error()) } return nil diff --git a/internal/data/mixrepoimpl/cmb.go b/internal/data/mixrepoimpl/cmb.go index 570e024..d786431 100644 --- a/internal/data/mixrepoimpl/cmb.go +++ b/internal/data/mixrepoimpl/cmb.go @@ -24,10 +24,29 @@ import ( type CmbMixRepoImpl struct { bc *conf.Bootstrap + + // 连接池复用(优化网络开销) + options *request.Options } func NewCmbMixRepoImpl(bc *conf.Bootstrap) mixrepos.CmbMixRepo { - return &CmbMixRepoImpl{bc: bc} + + h := http.Header{ + "Content-Type": []string{"application/json"}, + } + hc := &http.Client{ + Timeout: 20 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, // 最大空闲连接数 + MaxIdleConnsPerHost: 20, // 每个主机的最大空闲连接数 + IdleConnTimeout: 30 * time.Second, // 空闲连接超时时间 + }, + } + + return &CmbMixRepoImpl{ + bc: bc, + options: request.NewOptions(request.WithHeaders(h), request.WithHttpClient(hc)), + } } func (c *CmbMixRepoImpl) recordBody(ctx context.Context) { @@ -262,26 +281,26 @@ func (s *CmbMixRepoImpl) Request(ctx context.Context, req *v1.CmbRequest, uri st uv.Set(kv.Key, fmt.Sprintf("%v", kv.Value)) } - h := http.Header{ - "Content-Type": []string{"application/x-www-form-urlencoded"}, + requestUrl := uri + "?" + uv.Encode() + + _, bodyBytes, err := request.POST(ctx, requestUrl, nil, s.options) + if err != nil { + log.Errorf("请求掌上生活错误,url:%s,err:%v", requestUrl, err) + return nil, err } - r := uri + "?" + uv.Encode() - - _, bodyBytes, err := request.Post(ctx, r, nil, request.WithHeaders(h), request.WithTimeout(time.Second*20)) - if err != nil { - log.Errorf("请求掌上生活报错,url:%s,err:%v", r, err) - return nil, err + if len(bodyBytes) == 0 { + return nil, errors.New("请求掌上生活错误,请求失败,返回数据为空") } var response *v1.CmbReply if err = json.Unmarshal(bodyBytes, &response); err != nil { - log.Errorf("请求掌上生活返回数据解析报错:%s,url:%s,bodyBytes:%s", err.Error(), r, string(bodyBytes)) + log.Errorf("请求掌上生活错误,数据解析报错:%s,url:%s,bodyBytes:%s", err.Error(), requestUrl, string(bodyBytes)) return nil, err } if response.RespCode != vo.CmbResponseStatusSuccess.GetValue() { - log.Errorf("请求掌上生活返回报错:msg:%s,url:%s,bodyBytes:%s", response.RespMsg, r, string(bodyBytes)) + log.Errorf("请求掌上生活错误:msg:%s,url:%s,bodyBytes:%s", response.RespMsg, requestUrl, string(bodyBytes)) return nil, fmt.Errorf(response.RespMsg) } diff --git a/internal/pkg/request/request.go b/internal/pkg/request/request.go index 28da386..f1b1d59 100644 --- a/internal/pkg/request/request.go +++ b/internal/pkg/request/request.go @@ -12,6 +12,8 @@ import ( type Options struct { Headers http.Header + HttpClient *http.Client + StatusCodeFunc func(int) bool Timeout time.Duration @@ -51,12 +53,22 @@ func WithHeaders(headers http.Header) Option { } } +func WithHttpClient(httpClient *http.Client) Option { + return func(options *Options) { + options.HttpClient = httpClient + } +} + func WithStatusCodeFunc(statusCodeFunc func(int) bool) Option { return func(options *Options) { options.StatusCodeFunc = statusCodeFunc } } +func POST(ctx context.Context, url string, body []byte, options *Options) (http.Header, []byte, error) { + return Request(ctx, http.MethodPost, url, body, options) +} + func Post(ctx context.Context, url string, body []byte, options ...Option) (http.Header, []byte, error) { return Request(ctx, http.MethodPost, url, body, NewOptions(options...)) }