timeSliceQuery 8
This commit is contained in:
parent
792ce23435
commit
87820f6836
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
"voucher/internal/biz/vo"
|
"voucher/internal/biz/vo"
|
||||||
|
|
@ -115,7 +116,12 @@ func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err)
|
|
||||||
|
// 获取当前堆栈信息
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
stackSize := runtime.Stack(buf, false)
|
||||||
|
|
||||||
|
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v\n堆栈信息:\n%s", req, err, string(buf[:stackSize]))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
@ -133,15 +139,22 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse
|
||||||
num := 0
|
num := 0
|
||||||
notifyNum := 0
|
notifyNum := 0
|
||||||
errNum := 0
|
errNum := 0
|
||||||
|
emptyNum := 0
|
||||||
|
|
||||||
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||||
|
|
||||||
for _, order := range rows {
|
for _, order := range rows {
|
||||||
|
|
||||||
|
if order == nil {
|
||||||
|
emptyNum += 1
|
||||||
|
log.Errorf("订单对象为 nil, req:%+v", req)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
num += 1
|
num += 1
|
||||||
if err := v.notice(ctx, order, ¬ifyNum); err != nil {
|
if err := v.notice(ctx, order, ¬ifyNum); err != nil {
|
||||||
errNum += 1
|
errNum += 1
|
||||||
log.Error(err)
|
log.Errorf("订单定时通知,err:%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -154,6 +167,7 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse
|
||||||
"num": num,
|
"num": num,
|
||||||
"notifyNum": notifyNum,
|
"notifyNum": notifyNum,
|
||||||
"errNum": errNum,
|
"errNum": errNum,
|
||||||
|
"emptyNum": emptyNum,
|
||||||
"elapsed": time.Now().Sub(start).String(),
|
"elapsed": time.Now().Sub(start).String(),
|
||||||
}
|
}
|
||||||
log.Warnf("订单定时通知,%+v", logFields)
|
log.Warnf("订单定时通知,%+v", logFields)
|
||||||
|
|
@ -186,14 +200,14 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *i
|
||||||
Type: order.Type,
|
Type: order.Type,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = v.cmbNotice(ctx, order, orderNotify, notifyNum); err != nil {
|
if err = v.request(ctx, order, orderNotify, notifyNum); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return v.UpdateOrderStatus(ctx, order.ID, status)
|
return v.UpdateOrderStatus(ctx, order.ID, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error {
|
func (v *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error {
|
||||||
|
|
||||||
if !orderNotify.Event.CanNotify() {
|
if !orderNotify.Event.CanNotify() {
|
||||||
return nil // 不可通知,忽略
|
return nil // 不可通知,忽略
|
||||||
|
|
@ -206,13 +220,13 @@ func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNoti
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
|
if request == nil {
|
||||||
if err != nil {
|
return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo)
|
||||||
return fmt.Errorf("订单定时通知,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
|
_, err = v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
|
||||||
return errors.New("订单定时通知,招行返回错误:" + reply.RespMsg)
|
if err != nil {
|
||||||
|
return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -24,10 +24,29 @@ import (
|
||||||
|
|
||||||
type CmbMixRepoImpl struct {
|
type CmbMixRepoImpl struct {
|
||||||
bc *conf.Bootstrap
|
bc *conf.Bootstrap
|
||||||
|
|
||||||
|
// 连接池复用(优化网络开销)
|
||||||
|
options *request.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCmbMixRepoImpl(bc *conf.Bootstrap) mixrepos.CmbMixRepo {
|
func NewCmbMixRepoImpl(bc *conf.Bootstrap) mixrepos.CmbMixRepo {
|
||||||
return &CmbMixRepoImpl{bc: bc}
|
|
||||||
|
h := http.Header{
|
||||||
|
"Content-Type": []string{"application/json"},
|
||||||
|
}
|
||||||
|
hc := &http.Client{
|
||||||
|
Timeout: 20 * time.Second,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: 100, // 最大空闲连接数
|
||||||
|
MaxIdleConnsPerHost: 20, // 每个主机的最大空闲连接数
|
||||||
|
IdleConnTimeout: 30 * time.Second, // 空闲连接超时时间
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return &CmbMixRepoImpl{
|
||||||
|
bc: bc,
|
||||||
|
options: request.NewOptions(request.WithHeaders(h), request.WithHttpClient(hc)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CmbMixRepoImpl) recordBody(ctx context.Context) {
|
func (c *CmbMixRepoImpl) recordBody(ctx context.Context) {
|
||||||
|
|
@ -262,26 +281,26 @@ func (s *CmbMixRepoImpl) Request(ctx context.Context, req *v1.CmbRequest, uri st
|
||||||
uv.Set(kv.Key, fmt.Sprintf("%v", kv.Value))
|
uv.Set(kv.Key, fmt.Sprintf("%v", kv.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
h := http.Header{
|
requestUrl := uri + "?" + uv.Encode()
|
||||||
"Content-Type": []string{"application/x-www-form-urlencoded"},
|
|
||||||
|
_, bodyBytes, err := request.POST(ctx, requestUrl, nil, s.options)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("请求掌上生活错误,url:%s,err:%v", requestUrl, err)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
r := uri + "?" + uv.Encode()
|
if len(bodyBytes) == 0 {
|
||||||
|
return nil, errors.New("请求掌上生活错误,请求失败,返回数据为空")
|
||||||
_, bodyBytes, err := request.Post(ctx, r, nil, request.WithHeaders(h), request.WithTimeout(time.Second*20))
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("请求掌上生活报错,url:%s,err:%v", r, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var response *v1.CmbReply
|
var response *v1.CmbReply
|
||||||
if err = json.Unmarshal(bodyBytes, &response); err != nil {
|
if err = json.Unmarshal(bodyBytes, &response); err != nil {
|
||||||
log.Errorf("请求掌上生活返回数据解析报错:%s,url:%s,bodyBytes:%s", err.Error(), r, string(bodyBytes))
|
log.Errorf("请求掌上生活错误,数据解析报错:%s,url:%s,bodyBytes:%s", err.Error(), requestUrl, string(bodyBytes))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if response.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
|
if response.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
|
||||||
log.Errorf("请求掌上生活返回报错:msg:%s,url:%s,bodyBytes:%s", response.RespMsg, r, string(bodyBytes))
|
log.Errorf("请求掌上生活错误:msg:%s,url:%s,bodyBytes:%s", response.RespMsg, requestUrl, string(bodyBytes))
|
||||||
return nil, fmt.Errorf(response.RespMsg)
|
return nil, fmt.Errorf(response.RespMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ import (
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Headers http.Header
|
Headers http.Header
|
||||||
|
|
||||||
|
HttpClient *http.Client
|
||||||
|
|
||||||
StatusCodeFunc func(int) bool
|
StatusCodeFunc func(int) bool
|
||||||
|
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
|
@ -51,12 +53,22 @@ func WithHeaders(headers http.Header) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithHttpClient(httpClient *http.Client) Option {
|
||||||
|
return func(options *Options) {
|
||||||
|
options.HttpClient = httpClient
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithStatusCodeFunc(statusCodeFunc func(int) bool) Option {
|
func WithStatusCodeFunc(statusCodeFunc func(int) bool) Option {
|
||||||
return func(options *Options) {
|
return func(options *Options) {
|
||||||
options.StatusCodeFunc = statusCodeFunc
|
options.StatusCodeFunc = statusCodeFunc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func POST(ctx context.Context, url string, body []byte, options *Options) (http.Header, []byte, error) {
|
||||||
|
return Request(ctx, http.MethodPost, url, body, options)
|
||||||
|
}
|
||||||
|
|
||||||
func Post(ctx context.Context, url string, body []byte, options ...Option) (http.Header, []byte, error) {
|
func Post(ctx context.Context, url string, body []byte, options ...Option) (http.Header, []byte, error) {
|
||||||
return Request(ctx, http.MethodPost, url, body, NewOptions(options...))
|
return Request(ctx, http.MethodPost, url, body, NewOptions(options...))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue