diff --git a/app/http/controllers/orders/orders.go b/app/http/controllers/orders/orders.go index 62bfa53..1843b01 100644 --- a/app/http/controllers/orders/orders.go +++ b/app/http/controllers/orders/orders.go @@ -44,6 +44,7 @@ func GetById(c *gin.Context) { ExtendParameter: item.ExtendParameter, Status: item.Status, TransferStatus: item.TransferStatus, + QueueNo: item.QueueNo, FailReason: item.FailReason, CreatedAt: item.CreatedAt.Format(time.RFC3339), } @@ -75,6 +76,7 @@ func Search(c *gin.Context) { request.AccountType, request.Status, request.TransferStatus, + request.QueueNo, start, end, request.PageSize, @@ -103,6 +105,7 @@ func Search(c *gin.Context) { ExtendParameter: item.ExtendParameter, Status: item.Status, TransferStatus: item.TransferStatus, + QueueNo: item.QueueNo, FailReason: item.FailReason, CreatedAt: item.CreatedAt.Format(time.RFC3339), }) @@ -120,6 +123,7 @@ func Search(c *gin.Context) { request.AccountType, request.Status, request.TransferStatus, + request.QueueNo, start, end, ) diff --git a/app/http/controllers/product/product.go b/app/http/controllers/product/product.go index 72acb95..bb8fa84 100644 --- a/app/http/controllers/product/product.go +++ b/app/http/controllers/product/product.go @@ -28,7 +28,6 @@ func GetById(c *gin.Context) { var response *proEnt.GetListByIdResp = nil if res != nil { - response = &proEnt.GetListByIdResp{ Id: res.Id, Name: res.Name, @@ -38,6 +37,7 @@ func GetById(c *gin.Context) { CreatedAt: res.CreatedAt.Format(time.RFC3339), Type: res.Type, ExtendParameter: res.ExtendParameter, + QueueNo: res.QueueNo, } } @@ -82,6 +82,7 @@ func Search(c *gin.Context) { CreatedAt: item.CreatedAt.Format(time.RFC3339), Type: item.Type, ExtendParameter: item.ExtendParameter, + QueueNo: item.QueueNo, }) } } @@ -115,6 +116,7 @@ func Create(c *gin.Context) { Status: 1, //1.上架 2.下架 Type: request.Type, ExtendParameter: request.ExtendParameter, + QueueNo: request.QueueNo, } affected, err := proServ.Create(product) @@ -145,6 +147,7 @@ func Update(c *gin.Context) { Price: request.Price, Type: request.Type, ExtendParameter: request.ExtendParameter, + QueueNo: request.QueueNo, } affected, err := proServ.Update(product) diff --git a/app/http/controllers/transfersys/transfersys.go b/app/http/controllers/transfersys/transfersys.go index 44c73ce..40f06b7 100644 --- a/app/http/controllers/transfersys/transfersys.go +++ b/app/http/controllers/transfersys/transfersys.go @@ -13,25 +13,25 @@ import ( ) func GetOrder(c *gin.Context) { - item, err := transServ.ReadGroup() + //队列里取出一个订单 + item, err := transServ.ReadGroup(0) if err != nil { common.Error(c, 400, err.Error()) return } + //标记订单已经去除 err = orderMod.GetInstance().OrderOutQueue(item.Id) if err != nil { common.Error(c, 400, err.Error()) return } - var response *transEnt.GetOrderResp = nil - + //获取产品信息 product, err := proServ.GetById(item.ProductId) if err != nil { common.Error(c, 400, "产品不存在") return } - if item != nil { response = &transEnt.GetOrderResp{ Id: item.Id, @@ -39,6 +39,68 @@ func GetOrder(c *gin.Context) { MerchantId: item.MerchantId, ProductId: item.ProductId, ProductUrl: product.ProductUrl, + QueueNo: item.QueueNo, + OutTradeNo: item.OutTradeNo, + RechargeAccount: item.RechargeAccount, + AccountType: item.AccountType, + Number: item.Number, + NotifyUrl: item.NotifyUrl, + ExtendParameter: item.ExtendParameter, + Status: item.Status, + TransferStatus: item.TransferStatus, + FailReason: item.FailReason, + CreatedAt: item.CreatedAt.Format(time.RFC3339), + } + } else { + c.JSON(500, gin.H{ + "code": 500, + "message": "no data", + }) + return + } + common.Success(c, response) +} + +func GetOrderFromQueue(c *gin.Context) { + request := new(transEnt.GetOrderReq) + err := common.GenRequest(c, request) + if err != nil { + common.Error(c, 400, err.Error()) + return + } + + var queue_no int64 = 0 + if request.QueueNo != 0 { + queue_no = request.QueueNo + } + + //队列里取出一个订单 + item, err := transServ.ReadGroup(queue_no) + if err != nil { + common.Error(c, 400, err.Error()) + return + } + //标记订单已经去除 + err = orderMod.GetInstance().OrderOutQueue(item.Id) + if err != nil { + common.Error(c, 400, err.Error()) + return + } + var response *transEnt.GetOrderResp = nil + //获取产品信息 + product, err := proServ.GetById(item.ProductId) + if err != nil { + common.Error(c, 400, "产品不存在") + return + } + if item != nil { + response = &transEnt.GetOrderResp{ + Id: item.Id, + OrderNo: item.OrderNo, + MerchantId: item.MerchantId, + ProductId: item.ProductId, + ProductUrl: product.ProductUrl, + QueueNo: item.QueueNo, OutTradeNo: item.OutTradeNo, RechargeAccount: item.RechargeAccount, AccountType: item.AccountType, diff --git a/app/http/entities/orders/orders.go b/app/http/entities/orders/orders.go index 8620a32..902679a 100644 --- a/app/http/entities/orders/orders.go +++ b/app/http/entities/orders/orders.go @@ -18,6 +18,7 @@ type GetListByIdResp struct { ExtendParameter string `json:"extend_parameter"` Status int64 `json:"status"` TransferStatus int64 `json:"transfer_status"` + QueueNo int64 `json:"queue_no"` FailReason string `json:"fail_reason"` CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"` } @@ -33,6 +34,7 @@ type SearchReq struct { AccountType int64 `json:"account_type"` Status int64 `json:"status"` TransferStatus int64 `json:"transfer_status"` + QueueNo int64 `json:"queue_no"` CreatedAt []string `json:"created_at" form:"created_at"` PageNum int `json:"page_num" form:"page" validate:"required"` PageSize int `json:"page_size" form:"page_size" validate:"required"` @@ -52,6 +54,7 @@ type SearchResp struct { ExtendParameter string `json:"extend_parameter"` Status int64 `json:"status"` TransferStatus int64 `json:"transfer_status"` + QueueNo int64 `json:"queue_no"` FailReason string `json:"fail_reason"` CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"` } diff --git a/app/http/entities/product/product.go b/app/http/entities/product/product.go index 39b62eb..bf5abba 100644 --- a/app/http/entities/product/product.go +++ b/app/http/entities/product/product.go @@ -13,6 +13,7 @@ type GetListByIdResp struct { CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"` Type int64 `json:"type" example:"1"` ExtendParameter string `json:"extend_parameter" example:"{}"` + QueueNo int64 `json:"queue_no" example:"1"` } type SearcgReq struct { @@ -32,6 +33,7 @@ type SearchResp struct { CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"` Type int64 `json:"type" example:"1"` ExtendParameter string `json:"extend_parameter" example:"{}"` + QueueNo int64 `json:"queue_no" example:"1"` } type CreateReq struct { @@ -40,6 +42,7 @@ type CreateReq struct { ProductUrl string `json:"product_url"` Type int64 `json:"type" validate:"required"` ExtendParameter string `json:"extend_parameter"` + QueueNo int64 `json:"queue_no" example:"1"` } type CreateResp struct { @@ -53,6 +56,7 @@ type UpdateReq struct { ProductUrl string `json:"product_url" ` Type int64 `json:"type" ` ExtendParameter string `json:"extend_parameter"` + QueueNo int64 `json:"queue_no" example:"1"` } type UpdateResp struct { diff --git a/app/http/entities/transfersys/transfersys.go b/app/http/entities/transfersys/transfersys.go index 77610eb..c04e76e 100644 --- a/app/http/entities/transfersys/transfersys.go +++ b/app/http/entities/transfersys/transfersys.go @@ -1,6 +1,7 @@ package merchant type GetOrderReq struct { + QueueNo int64 `json:"queue_no"` } type GetOrderResp struct { @@ -9,6 +10,7 @@ type GetOrderResp struct { MerchantId int64 `json:"merchant_id"` ProductId int64 `json:"product_id"` ProductUrl string `json:"product_url"` + QueueNo int64 `json:"queue_no"` OutTradeNo string `json:"out_trade_no"` RechargeAccount string `json:"recharge_account"` AccountType int64 `json:"account_type"` @@ -21,6 +23,10 @@ type GetOrderResp struct { CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"` } +type GetOrderFromQueueReq struct { + QueueNo int64 `json:"queue_no"` +} + type FinishOrderReq struct { Id int64 `json:"id" validate:"required"` OrderNo string `json:"order_no" validate:"required"` @@ -46,4 +52,3 @@ type SetOrderMobileReq struct { type SetOrderMobileResp struct { Id int64 `json:"id"` } - diff --git a/app/http/routes/route.go b/app/http/routes/route.go index 8af2ddf..eda2628 100644 --- a/app/http/routes/route.go +++ b/app/http/routes/route.go @@ -125,6 +125,7 @@ func RegisterRoute(router *gin.Engine) { transfersys.Use(middlewares.RequestLog()) { transfersys.GET("/recharge/get_order", transCon.GetOrder) + transfersys.POST("/recharge/get_queue_order", transCon.GetOrderFromQueue) transfersys.POST("/recharge/finish", transCon.FinishOrder) transfersys.POST("/recharge/set_order_mobile", transCon.SetOrderMobile) } diff --git a/app/models/orders/orders.go b/app/models/orders/orders.go index 75073b1..5f44cd9 100644 --- a/app/models/orders/orders.go +++ b/app/models/orders/orders.go @@ -35,6 +35,8 @@ type Orders struct { FailReason string DeviceNo string CreatedAt time.Time `xorm:"created"` + + QueueNo int64 } func (o *Orders) MarshalBinary() ([]byte, error) { @@ -102,6 +104,7 @@ func (m *ordersModel) Search( status int64, transfer_status int64, + queue_no int64, startTime string, endTime string, limit int, @@ -149,6 +152,10 @@ func (m *ordersModel) Search( sql += " and transfer_status = ?" args = append(args, transfer_status) } + if queue_no != 0 { + sql += " and queue_no = ?" + args = append(args, queue_no) + } if startTime != "" && endTime != "" { sql += " and created_at >= ? and created_at <= ?" args = append(args, startTime, endTime) @@ -169,6 +176,7 @@ func (m *ordersModel) CountAll( status int64, transfer_status int64, + queue_no int64, startTime string, endTime string, ) (res int64, err error) { @@ -214,6 +222,10 @@ func (m *ordersModel) CountAll( sql += " and transfer_status = ?" args = append(args, transfer_status) } + if queue_no != 0 { + sql += " and queue_no = ?" + args = append(args, queue_no) + } if startTime != "" && endTime != "" { sql += " and created_at >= ? and created_at <= ?" args = append(args, startTime, endTime) diff --git a/app/models/product/product.go b/app/models/product/product.go index ea8aa79..d391ad5 100644 --- a/app/models/product/product.go +++ b/app/models/product/product.go @@ -25,6 +25,7 @@ type Product struct { Type int64 ExtendParameter string + QueueNo int64 } /** diff --git a/app/services/orders/orders.go b/app/services/orders/orders.go index 791261f..1ccb98f 100644 --- a/app/services/orders/orders.go +++ b/app/services/orders/orders.go @@ -1,7 +1,10 @@ package orders import ( + "errors" + models "com.snow.auto_monitor/app/models/orders" + promod "com.snow.auto_monitor/app/models/product" ) func GetLimitStart(limit int, page int) (int, int) { @@ -32,13 +35,14 @@ func Search(id int64, status int64, transfer_status int64, + queue_no int64, startTime string, endTime string, limit int, page int) (res []*models.Orders, err error) { limit, page = GetLimitStart(limit, page) res, err = models.GetInstance().Search(id, order_no, merchant_id, product_id, device_no, out_trade_no, - recharge_account, account_type, status, transfer_status, + recharge_account, account_type, status, transfer_status, queue_no, startTime, endTime, limit, page) return } @@ -54,15 +58,25 @@ func CountAll(id int64, status int64, transfer_status int64, + queue_no int64, startTime string, endTime string) (res int64, err error) { res, err = models.GetInstance().CountAll(id, order_no, merchant_id, product_id, device_no, out_trade_no, - recharge_account, account_type, status, transfer_status, + recharge_account, account_type, status, transfer_status, queue_no, startTime, endTime) return } func Create(orders *models.Orders) (affected int64, err error) { + //验证产品是否存在 + product, has, err := promod.GetInstance().GetById(orders.ProductId) + if err != nil { + return + } + if !has { + return 0, errors.New("产品不存在") + } + orders.QueueNo = product.QueueNo affected, err = models.GetInstance().Create(orders) return } diff --git a/app/services/transfersys/transfersys_rdb.go b/app/services/transfersys/transfersys_rdb.go index f9a4ac0..1568b76 100644 --- a/app/services/transfersys/transfersys_rdb.go +++ b/app/services/transfersys/transfersys_rdb.go @@ -20,8 +20,16 @@ import ( // return // } -func ReadGroup() (order *models.Orders, err error) { - res, err := rdbmq.ReadOne("orders", "orders_one", "cosumer_one") +func ReadGroup(queue_no int64) (order *models.Orders, err error) { + streamName := "orders" + groupName := "orders_one" + consumerName := "cosumer_one" + if queue_no > 0 { + streamName = fmt.Sprintf("orders_%d", queue_no) + groupName = fmt.Sprintf("orders_%d", queue_no) + consumerName = fmt.Sprintf("cosumer_%d", queue_no) + } + id, res, err := rdbmq.ReadOne(streamName, groupName, consumerName) if err != nil { if err.Error() == "context deadline exceeded" { return nil, errors.New("no data") @@ -36,18 +44,34 @@ func ReadGroup() (order *models.Orders, err error) { jsonstr := res[key].(string) json.Unmarshal([]byte(jsonstr), &order) + //确认信息 + err = rdbmq.AckOne(streamName, groupName, id) + if err != nil { + return nil, err + } return order, nil } func WriteOrder() { fmt.Println("开始执行周期任务:WriteOrder") - // 创建分组 + // 创建默认分组 err := rdbmq.CreateStreamAndGroup("orders", "orders_one") if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { fmt.Println("分组失败") fmt.Println(err) return } + // 创建价格分组 + var queue_nos = [...]int{10, 20, 50, 100, 1000} + for _, value := range queue_nos { + err = rdbmq.CreateStreamAndGroup(fmt.Sprintf("orders_%d", value), fmt.Sprintf("orders_%d", value)) + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + fmt.Println("分组失败") + fmt.Println(err) + return + } + } + // 创建一个新的Ticker,每3秒钟触发一次 ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() // 在函数结束时停止Ticker @@ -59,7 +83,12 @@ func WriteOrder() { continue } if res != nil { - err = rdbmq.WriteOne("orders", res.OrderNo, res) + //获取产品信息 + streamName := "orders" + if res.QueueNo != 0 { + streamName = fmt.Sprintf("orders_%d", res.QueueNo) + } + err = rdbmq.WriteOne(streamName, res.OrderNo, res) if err != nil { fmt.Println(err.Error()) continue diff --git a/app/services/transfersys/transfersys_recharge_timeout.go b/app/services/transfersys/transfersys_recharge_timeout.go index ed9b0fe..ea840ce 100644 --- a/app/services/transfersys/transfersys_recharge_timeout.go +++ b/app/services/transfersys/transfersys_recharge_timeout.go @@ -48,5 +48,5 @@ func DoRechargeTimeout() { } func init() { - go DoRechargeTimeout() + // go DoRechargeTimeout() } diff --git a/app/services/transfersys/transfersys_timeout.go b/app/services/transfersys/transfersys_timeout.go index 142dfd5..9419ede 100644 --- a/app/services/transfersys/transfersys_timeout.go +++ b/app/services/transfersys/transfersys_timeout.go @@ -35,5 +35,5 @@ func DoTimeout() { } func init() { - go DoTimeout() + // go DoTimeout() } diff --git a/app/services/zhiliansys/zhiliansys.go b/app/services/zhiliansys/zhiliansys.go index 5f84957..6f0c31e 100644 --- a/app/services/zhiliansys/zhiliansys.go +++ b/app/services/zhiliansys/zhiliansys.go @@ -25,7 +25,7 @@ func CreateOrder(orders *models.Orders) (affected int64, err error) { // } //验证产品是否存在 - _, has, err := promod.GetInstance().GetById(orders.ProductId) + product, has, err := promod.GetInstance().GetById(orders.ProductId) if err != nil { return } @@ -37,6 +37,7 @@ func CreateOrder(orders *models.Orders) (affected int64, err error) { orders.OrderNo = uuid.New().String() orders.Status = 2 // 1.成功 2.充值中 3.充值失败 4.异常需要人工处理 orders.TransferStatus = 3 // 1.成功 2.充值中 3. 等待充值 4.充值失败 5.异常需要人工处理 + orders.QueueNo = product.QueueNo affected, err = models.GetInstance().Create(orders) return } diff --git a/app/utils/rdbmq/rdbmq.go b/app/utils/rdbmq/rdbmq.go index d672487..e44d307 100644 --- a/app/utils/rdbmq/rdbmq.go +++ b/app/utils/rdbmq/rdbmq.go @@ -58,7 +58,25 @@ func WriteOne(stream string, key string, value interface{}) (err error) { return } -func ReadOne(stream string, group string, consumer string) (res map[string]interface{}, err error) { +func AckOne(stream string, group string, id string) (err error) { + rdb := redis.GetRedis(redis.SingletonMain) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // XACK,确认消息 + err = rdb.XAck(ctx, stream, group, id).Err() + if err != nil { + fmt.Println(err) + return + } + _, err = rdb.XDel(ctx, stream, id).Result() + if err != nil { + fmt.Println("删除消息失败:", err) + } + return +} + +func ReadOne(stream string, group string, consumer string) (id string, res map[string]interface{}, err error) { rdb := redis.GetRedis(redis.SingletonMain) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() @@ -76,9 +94,10 @@ func ReadOne(stream string, group string, consumer string) (res map[string]inter Block: 0, NoAck: true, // true-表示读取消息时确认消息 }).Result() + if err != nil { - return nil, err + return "", nil, err } - return readgroupval[0].Messages[0].Values, err + return readgroupval[0].Messages[0].ID, readgroupval[0].Messages[0].Values, err }