This commit is contained in:
duyu 2024-09-10 14:26:07 +08:00
parent b32c731969
commit 977a64bd55
15 changed files with 176 additions and 18 deletions

View File

@ -44,6 +44,7 @@ func GetById(c *gin.Context) {
ExtendParameter: item.ExtendParameter,
Status: item.Status,
TransferStatus: item.TransferStatus,
QueueNo: item.QueueNo,
FailReason: item.FailReason,
CreatedAt: item.CreatedAt.Format(time.RFC3339),
}
@ -75,6 +76,7 @@ func Search(c *gin.Context) {
request.AccountType,
request.Status,
request.TransferStatus,
request.QueueNo,
start,
end,
request.PageSize,
@ -103,6 +105,7 @@ func Search(c *gin.Context) {
ExtendParameter: item.ExtendParameter,
Status: item.Status,
TransferStatus: item.TransferStatus,
QueueNo: item.QueueNo,
FailReason: item.FailReason,
CreatedAt: item.CreatedAt.Format(time.RFC3339),
})
@ -120,6 +123,7 @@ func Search(c *gin.Context) {
request.AccountType,
request.Status,
request.TransferStatus,
request.QueueNo,
start,
end,
)

View File

@ -28,7 +28,6 @@ func GetById(c *gin.Context) {
var response *proEnt.GetListByIdResp = nil
if res != nil {
response = &proEnt.GetListByIdResp{
Id: res.Id,
Name: res.Name,
@ -38,6 +37,7 @@ func GetById(c *gin.Context) {
CreatedAt: res.CreatedAt.Format(time.RFC3339),
Type: res.Type,
ExtendParameter: res.ExtendParameter,
QueueNo: res.QueueNo,
}
}
@ -82,6 +82,7 @@ func Search(c *gin.Context) {
CreatedAt: item.CreatedAt.Format(time.RFC3339),
Type: item.Type,
ExtendParameter: item.ExtendParameter,
QueueNo: item.QueueNo,
})
}
}
@ -115,6 +116,7 @@ func Create(c *gin.Context) {
Status: 1, //1.上架 2.下架
Type: request.Type,
ExtendParameter: request.ExtendParameter,
QueueNo: request.QueueNo,
}
affected, err := proServ.Create(product)
@ -145,6 +147,7 @@ func Update(c *gin.Context) {
Price: request.Price,
Type: request.Type,
ExtendParameter: request.ExtendParameter,
QueueNo: request.QueueNo,
}
affected, err := proServ.Update(product)

View File

@ -13,25 +13,25 @@ import (
)
func GetOrder(c *gin.Context) {
item, err := transServ.ReadGroup()
//队列里取出一个订单
item, err := transServ.ReadGroup(0)
if err != nil {
common.Error(c, 400, err.Error())
return
}
//标记订单已经去除
err = orderMod.GetInstance().OrderOutQueue(item.Id)
if err != nil {
common.Error(c, 400, err.Error())
return
}
var response *transEnt.GetOrderResp = nil
//获取产品信息
product, err := proServ.GetById(item.ProductId)
if err != nil {
common.Error(c, 400, "产品不存在")
return
}
if item != nil {
response = &transEnt.GetOrderResp{
Id: item.Id,
@ -39,6 +39,68 @@ func GetOrder(c *gin.Context) {
MerchantId: item.MerchantId,
ProductId: item.ProductId,
ProductUrl: product.ProductUrl,
QueueNo: item.QueueNo,
OutTradeNo: item.OutTradeNo,
RechargeAccount: item.RechargeAccount,
AccountType: item.AccountType,
Number: item.Number,
NotifyUrl: item.NotifyUrl,
ExtendParameter: item.ExtendParameter,
Status: item.Status,
TransferStatus: item.TransferStatus,
FailReason: item.FailReason,
CreatedAt: item.CreatedAt.Format(time.RFC3339),
}
} else {
c.JSON(500, gin.H{
"code": 500,
"message": "no data",
})
return
}
common.Success(c, response)
}
func GetOrderFromQueue(c *gin.Context) {
request := new(transEnt.GetOrderReq)
err := common.GenRequest(c, request)
if err != nil {
common.Error(c, 400, err.Error())
return
}
var queue_no int64 = 0
if request.QueueNo != 0 {
queue_no = request.QueueNo
}
//队列里取出一个订单
item, err := transServ.ReadGroup(queue_no)
if err != nil {
common.Error(c, 400, err.Error())
return
}
//标记订单已经去除
err = orderMod.GetInstance().OrderOutQueue(item.Id)
if err != nil {
common.Error(c, 400, err.Error())
return
}
var response *transEnt.GetOrderResp = nil
//获取产品信息
product, err := proServ.GetById(item.ProductId)
if err != nil {
common.Error(c, 400, "产品不存在")
return
}
if item != nil {
response = &transEnt.GetOrderResp{
Id: item.Id,
OrderNo: item.OrderNo,
MerchantId: item.MerchantId,
ProductId: item.ProductId,
ProductUrl: product.ProductUrl,
QueueNo: item.QueueNo,
OutTradeNo: item.OutTradeNo,
RechargeAccount: item.RechargeAccount,
AccountType: item.AccountType,

View File

@ -18,6 +18,7 @@ type GetListByIdResp struct {
ExtendParameter string `json:"extend_parameter"`
Status int64 `json:"status"`
TransferStatus int64 `json:"transfer_status"`
QueueNo int64 `json:"queue_no"`
FailReason string `json:"fail_reason"`
CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"`
}
@ -33,6 +34,7 @@ type SearchReq struct {
AccountType int64 `json:"account_type"`
Status int64 `json:"status"`
TransferStatus int64 `json:"transfer_status"`
QueueNo int64 `json:"queue_no"`
CreatedAt []string `json:"created_at" form:"created_at"`
PageNum int `json:"page_num" form:"page" validate:"required"`
PageSize int `json:"page_size" form:"page_size" validate:"required"`
@ -52,6 +54,7 @@ type SearchResp struct {
ExtendParameter string `json:"extend_parameter"`
Status int64 `json:"status"`
TransferStatus int64 `json:"transfer_status"`
QueueNo int64 `json:"queue_no"`
FailReason string `json:"fail_reason"`
CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"`
}

View File

@ -13,6 +13,7 @@ type GetListByIdResp struct {
CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"`
Type int64 `json:"type" example:"1"`
ExtendParameter string `json:"extend_parameter" example:"{}"`
QueueNo int64 `json:"queue_no" example:"1"`
}
type SearcgReq struct {
@ -32,6 +33,7 @@ type SearchResp struct {
CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"`
Type int64 `json:"type" example:"1"`
ExtendParameter string `json:"extend_parameter" example:"{}"`
QueueNo int64 `json:"queue_no" example:"1"`
}
type CreateReq struct {
@ -40,6 +42,7 @@ type CreateReq struct {
ProductUrl string `json:"product_url"`
Type int64 `json:"type" validate:"required"`
ExtendParameter string `json:"extend_parameter"`
QueueNo int64 `json:"queue_no" example:"1"`
}
type CreateResp struct {
@ -53,6 +56,7 @@ type UpdateReq struct {
ProductUrl string `json:"product_url" `
Type int64 `json:"type" `
ExtendParameter string `json:"extend_parameter"`
QueueNo int64 `json:"queue_no" example:"1"`
}
type UpdateResp struct {

View File

@ -1,6 +1,7 @@
package merchant
type GetOrderReq struct {
QueueNo int64 `json:"queue_no"`
}
type GetOrderResp struct {
@ -9,6 +10,7 @@ type GetOrderResp struct {
MerchantId int64 `json:"merchant_id"`
ProductId int64 `json:"product_id"`
ProductUrl string `json:"product_url"`
QueueNo int64 `json:"queue_no"`
OutTradeNo string `json:"out_trade_no"`
RechargeAccount string `json:"recharge_account"`
AccountType int64 `json:"account_type"`
@ -21,6 +23,10 @@ type GetOrderResp struct {
CreatedAt string `json:"created_at" example:"2020-01-01 00:00:00"`
}
type GetOrderFromQueueReq struct {
QueueNo int64 `json:"queue_no"`
}
type FinishOrderReq struct {
Id int64 `json:"id" validate:"required"`
OrderNo string `json:"order_no" validate:"required"`
@ -46,4 +52,3 @@ type SetOrderMobileReq struct {
type SetOrderMobileResp struct {
Id int64 `json:"id"`
}

View File

@ -125,6 +125,7 @@ func RegisterRoute(router *gin.Engine) {
transfersys.Use(middlewares.RequestLog())
{
transfersys.GET("/recharge/get_order", transCon.GetOrder)
transfersys.POST("/recharge/get_queue_order", transCon.GetOrderFromQueue)
transfersys.POST("/recharge/finish", transCon.FinishOrder)
transfersys.POST("/recharge/set_order_mobile", transCon.SetOrderMobile)
}

View File

@ -35,6 +35,8 @@ type Orders struct {
FailReason string
DeviceNo string
CreatedAt time.Time `xorm:"created"`
QueueNo int64
}
func (o *Orders) MarshalBinary() ([]byte, error) {
@ -102,6 +104,7 @@ func (m *ordersModel) Search(
status int64,
transfer_status int64,
queue_no int64,
startTime string,
endTime string,
limit int,
@ -149,6 +152,10 @@ func (m *ordersModel) Search(
sql += " and transfer_status = ?"
args = append(args, transfer_status)
}
if queue_no != 0 {
sql += " and queue_no = ?"
args = append(args, queue_no)
}
if startTime != "" && endTime != "" {
sql += " and created_at >= ? and created_at <= ?"
args = append(args, startTime, endTime)
@ -169,6 +176,7 @@ func (m *ordersModel) CountAll(
status int64,
transfer_status int64,
queue_no int64,
startTime string,
endTime string,
) (res int64, err error) {
@ -214,6 +222,10 @@ func (m *ordersModel) CountAll(
sql += " and transfer_status = ?"
args = append(args, transfer_status)
}
if queue_no != 0 {
sql += " and queue_no = ?"
args = append(args, queue_no)
}
if startTime != "" && endTime != "" {
sql += " and created_at >= ? and created_at <= ?"
args = append(args, startTime, endTime)

View File

@ -25,6 +25,7 @@ type Product struct {
Type int64
ExtendParameter string
QueueNo int64
}
/**

View File

@ -1,7 +1,10 @@
package orders
import (
"errors"
models "com.snow.auto_monitor/app/models/orders"
promod "com.snow.auto_monitor/app/models/product"
)
func GetLimitStart(limit int, page int) (int, int) {
@ -32,13 +35,14 @@ func Search(id int64,
status int64,
transfer_status int64,
queue_no int64,
startTime string,
endTime string,
limit int,
page int) (res []*models.Orders, err error) {
limit, page = GetLimitStart(limit, page)
res, err = models.GetInstance().Search(id, order_no, merchant_id, product_id, device_no, out_trade_no,
recharge_account, account_type, status, transfer_status,
recharge_account, account_type, status, transfer_status, queue_no,
startTime, endTime, limit, page)
return
}
@ -54,15 +58,25 @@ func CountAll(id int64,
status int64,
transfer_status int64,
queue_no int64,
startTime string,
endTime string) (res int64, err error) {
res, err = models.GetInstance().CountAll(id, order_no, merchant_id, product_id, device_no, out_trade_no,
recharge_account, account_type, status, transfer_status,
recharge_account, account_type, status, transfer_status, queue_no,
startTime, endTime)
return
}
func Create(orders *models.Orders) (affected int64, err error) {
//验证产品是否存在
product, has, err := promod.GetInstance().GetById(orders.ProductId)
if err != nil {
return
}
if !has {
return 0, errors.New("产品不存在")
}
orders.QueueNo = product.QueueNo
affected, err = models.GetInstance().Create(orders)
return
}

View File

@ -20,8 +20,16 @@ import (
// return
// }
func ReadGroup() (order *models.Orders, err error) {
res, err := rdbmq.ReadOne("orders", "orders_one", "cosumer_one")
func ReadGroup(queue_no int64) (order *models.Orders, err error) {
streamName := "orders"
groupName := "orders_one"
consumerName := "cosumer_one"
if queue_no > 0 {
streamName = fmt.Sprintf("orders_%d", queue_no)
groupName = fmt.Sprintf("orders_%d", queue_no)
consumerName = fmt.Sprintf("cosumer_%d", queue_no)
}
id, res, err := rdbmq.ReadOne(streamName, groupName, consumerName)
if err != nil {
if err.Error() == "context deadline exceeded" {
return nil, errors.New("no data")
@ -36,18 +44,34 @@ func ReadGroup() (order *models.Orders, err error) {
jsonstr := res[key].(string)
json.Unmarshal([]byte(jsonstr), &order)
//确认信息
err = rdbmq.AckOne(streamName, groupName, id)
if err != nil {
return nil, err
}
return order, nil
}
func WriteOrder() {
fmt.Println("开始执行周期任务WriteOrder")
// 创建分组
// 创建默认分组
err := rdbmq.CreateStreamAndGroup("orders", "orders_one")
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
fmt.Println("分组失败")
fmt.Println(err)
return
}
// 创建价格分组
var queue_nos = [...]int{10, 20, 50, 100, 1000}
for _, value := range queue_nos {
err = rdbmq.CreateStreamAndGroup(fmt.Sprintf("orders_%d", value), fmt.Sprintf("orders_%d", value))
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
fmt.Println("分组失败")
fmt.Println(err)
return
}
}
// 创建一个新的Ticker每3秒钟触发一次
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop() // 在函数结束时停止Ticker
@ -59,7 +83,12 @@ func WriteOrder() {
continue
}
if res != nil {
err = rdbmq.WriteOne("orders", res.OrderNo, res)
//获取产品信息
streamName := "orders"
if res.QueueNo != 0 {
streamName = fmt.Sprintf("orders_%d", res.QueueNo)
}
err = rdbmq.WriteOne(streamName, res.OrderNo, res)
if err != nil {
fmt.Println(err.Error())
continue

View File

@ -48,5 +48,5 @@ func DoRechargeTimeout() {
}
func init() {
go DoRechargeTimeout()
// go DoRechargeTimeout()
}

View File

@ -35,5 +35,5 @@ func DoTimeout() {
}
func init() {
go DoTimeout()
// go DoTimeout()
}

View File

@ -25,7 +25,7 @@ func CreateOrder(orders *models.Orders) (affected int64, err error) {
// }
//验证产品是否存在
_, has, err := promod.GetInstance().GetById(orders.ProductId)
product, has, err := promod.GetInstance().GetById(orders.ProductId)
if err != nil {
return
}
@ -37,6 +37,7 @@ func CreateOrder(orders *models.Orders) (affected int64, err error) {
orders.OrderNo = uuid.New().String()
orders.Status = 2 // 1.成功 2.充值中 3.充值失败 4.异常需要人工处理
orders.TransferStatus = 3 // 1.成功 2.充值中 3. 等待充值 4.充值失败 5.异常需要人工处理
orders.QueueNo = product.QueueNo
affected, err = models.GetInstance().Create(orders)
return
}

View File

@ -58,7 +58,25 @@ func WriteOne(stream string, key string, value interface{}) (err error) {
return
}
func ReadOne(stream string, group string, consumer string) (res map[string]interface{}, err error) {
func AckOne(stream string, group string, id string) (err error) {
rdb := redis.GetRedis(redis.SingletonMain)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// XACK确认消息
err = rdb.XAck(ctx, stream, group, id).Err()
if err != nil {
fmt.Println(err)
return
}
_, err = rdb.XDel(ctx, stream, id).Result()
if err != nil {
fmt.Println("删除消息失败:", err)
}
return
}
func ReadOne(stream string, group string, consumer string) (id string, res map[string]interface{}, err error) {
rdb := redis.GetRedis(redis.SingletonMain)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
@ -76,9 +94,10 @@ func ReadOne(stream string, group string, consumer string) (res map[string]inter
Block: 0,
NoAck: true, // true-表示读取消息时确认消息
}).Result()
if err != nil {
return nil, err
return "", nil, err
}
return readgroupval[0].Messages[0].Values, err
return readgroupval[0].Messages[0].ID, readgroupval[0].Messages[0].Values, err
}