transfer_middleware/cmd/rpc/internal/queue/mq/mq.go

333 lines
8.4 KiB
Go
Raw Normal View History

2024-06-12 13:46:14 +08:00
package mq
import (
"context"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
2024-07-09 17:07:22 +08:00
"github.com/zeromicro/go-zero/core/stores/sqlx"
2024-06-12 13:46:14 +08:00
"time"
"trasfer_middleware/cmd/rpc/etc"
2024-07-09 17:07:22 +08:00
"trasfer_middleware/cmd/rpc/internal/logic/do"
2024-06-18 16:34:14 +08:00
"trasfer_middleware/cmd/rpc/internal/logic/vo"
2024-06-12 13:46:14 +08:00
"trasfer_middleware/cmd/rpc/internal/queue/mq/mqSvc"
"trasfer_middleware/genModel"
)
type Message interface {
MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error
}
type Market struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
type ZLTX struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
2024-06-18 16:34:14 +08:00
type RS struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
2024-10-15 18:35:45 +08:00
type NewMarkets struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
2024-11-06 17:35:59 +08:00
type Physical struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
2024-06-12 13:46:14 +08:00
func NewMarket(svc *mqSvc.ServiceContext, ctx context.Context) *Market {
return &Market{
svc: svc,
ctx: ctx,
}
}
func NewZLTX(svc *mqSvc.ServiceContext, ctx context.Context) *ZLTX {
return &ZLTX{
svc: svc,
ctx: ctx,
}
}
2024-06-18 16:34:14 +08:00
func NewRS(svc *mqSvc.ServiceContext, ctx context.Context) *RS {
return &RS{
svc: svc,
ctx: ctx,
}
}
2024-10-15 18:35:45 +08:00
func NewNewMarkets(svc *mqSvc.ServiceContext, ctx context.Context) *NewMarkets {
return &NewMarkets{
svc: svc,
ctx: ctx,
}
}
2024-11-06 17:35:59 +08:00
func NewPhysical(svc *mqSvc.ServiceContext, ctx context.Context) *Physical {
return &Physical{
svc: svc,
ctx: ctx,
}
}
2024-06-12 13:46:14 +08:00
func (m *Market) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var market = &genModel.ServerMiddleMarketLogs{}
2024-10-15 18:35:45 +08:00
err := json.Unmarshal(msg, market)
2024-06-12 13:46:14 +08:00
market.CreateTime = time.Now()
2024-06-18 16:34:14 +08:00
market.Status = vo.MARKET_LOG_STATU_DEFAULT
2024-07-09 17:07:22 +08:00
logInfo, err := m.svc.DbWrite.MarketLogs.Insert(m.ctx, market)
2024-06-12 13:46:14 +08:00
if err != nil {
return fmt.Errorf("market数据保存失败%s,原因:%s", msg, err)
}
2024-07-09 17:07:22 +08:00
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("market日志数据保存失败%s,原因:%s", msg, err)
}
if market.Url == vo.MARKET_KEY_SEND {
err = m.saveMarketOrder(logId, market.Data, market.Resp)
if err != nil {
return fmt.Errorf("market订单数据保存失败%s,原因:%s", msg, err)
}
}
2024-06-12 13:46:14 +08:00
return nil
}
func (m *ZLTX) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
2024-07-09 17:07:22 +08:00
var (
zltx = &genModel.ServerMiddleZltxLogs{}
)
2024-06-12 13:46:14 +08:00
json.Unmarshal(msg, zltx)
2024-07-09 17:07:22 +08:00
2024-06-12 13:46:14 +08:00
zltx.CreateTime = time.Now()
2024-06-18 16:34:14 +08:00
zltx.Status = vo.ZLTX_LOG_STATU_DEFAULT
2024-07-09 17:07:22 +08:00
logInfo, err := m.svc.DbWrite.ZLTXLogs.Insert(m.ctx, zltx)
2024-06-12 13:46:14 +08:00
if err != nil {
2024-06-18 16:34:14 +08:00
return fmt.Errorf("zltx数据保存失败%s,原因:%s", msg, err)
}
2024-07-09 17:07:22 +08:00
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("zltx日志数据保存失败%s,原因:%s", msg, err)
}
if zltx.Url == vo.ZLTX_RECHARGE || zltx.Url == vo.ZLTX_RS_MIXUE_COUPON_GRANT {
err = m.saveZLTXOrder(logId, zltx.Data, zltx.Resp, zltx.Url)
if err != nil {
return fmt.Errorf("zltx订单数据保存失败%s,原因:%s", msg, err)
}
}
return nil
}
func (m *ZLTX) saveZLTXOrder(logId int64, resq string, resp string, url string) error {
var order = &genModel.ServerOrderZltx{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.ORDER_STATUS_DEFAULT
switch url {
case vo.ZLTX_RS_MIXUE_COUPON_GRANT:
zltxRs := do.ZltxRsData{Order: order}
err := zltxRs.SetData(resq, resp)
if err != nil {
return err
}
default:
err := do.ZLTXRechargeData(order, resq, resp)
if err != nil {
return err
}
}
//检查是否存在,存在则不插入
_, err := m.svc.DbWrite.OrderZLTX.FindOneByBizNo(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderZLTX.Insert(m.ctx, order)
if err != nil {
return err
}
}
2024-06-18 16:34:14 +08:00
return nil
}
func (m *RS) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
2024-07-09 17:07:22 +08:00
var (
rs = &genModel.ServerMiddleRsLogs{}
)
2024-06-18 16:34:14 +08:00
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
2024-07-09 17:07:22 +08:00
logInfo, err := m.svc.DbWrite.RSLogs.Insert(m.ctx, rs)
2024-06-18 16:34:14 +08:00
if err != nil {
return fmt.Errorf("rs数据保存失败%s,原因:%s", msg, err)
2024-06-12 13:46:14 +08:00
}
2024-07-09 17:07:22 +08:00
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("rs日志数据保存失败%s,原因:%s", msg, err)
}
err = m.saveRsOrder(logId, rs.Data, rs.Resp)
if err != nil {
return fmt.Errorf("rs订单数据保存失败%s,原因:%s", msg, err)
}
return nil
}
func (m *RS) saveRsOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderRs{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
data := do.RsData{Order: order}
err := data.SetData(resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderRs.FindByOutBizId(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderRs.Insert(m.ctx, order)
}
if err != nil {
return err
}
return nil
}
2024-10-15 18:35:45 +08:00
func (m *NewMarkets) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
2024-11-06 17:35:59 +08:00
rs = &genModel.ServerMiddleNewMarketLogs{}
2024-10-15 18:35:45 +08:00
)
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
2024-11-06 17:35:59 +08:00
logInfo, err := m.svc.DbWrite.NewMarketLogs.Insert(m.ctx, rs)
2024-10-15 18:35:45 +08:00
if err != nil {
2024-11-06 17:35:59 +08:00
return fmt.Errorf("new_market数据保存失败%s,原因:%s", msg, err)
2024-10-15 18:35:45 +08:00
}
logId, err := logInfo.LastInsertId()
if err != nil {
2024-11-06 17:35:59 +08:00
return fmt.Errorf("new_market日志数据保存失败%s,原因:%s", msg, err)
2024-10-15 18:35:45 +08:00
}
2024-11-06 17:35:59 +08:00
err = m.saveNewMarketOrder(logId, rs.Data, rs.Resp)
2024-10-15 18:35:45 +08:00
if err != nil {
2024-11-06 17:35:59 +08:00
return fmt.Errorf("new_market订单数据保存失败%s,原因:%s", msg, err)
2024-10-15 18:35:45 +08:00
}
return nil
}
2024-11-06 17:35:59 +08:00
func (m *Physical) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
rs = &genModel.ServerMiddlePhysicalLogs{}
)
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.PhysicalLogs.Insert(m.ctx, rs)
if err != nil {
return fmt.Errorf("new_market数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("new_market日志数据保存失败%s,原因:%s", msg, err)
}
if rs.Url == vo.PHYSICAL_ORDER_SUBMIT {
err = m.savePhysicalOrder(logId, rs.Data, rs.Resp)
if err != nil {
return fmt.Errorf("new_market订单数据保存失败%s,原因:%s", msg, err)
}
}
return nil
}
func (m *NewMarkets) saveNewMarketOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderNewMarket{}
2024-10-15 18:35:45 +08:00
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
2024-11-06 17:35:59 +08:00
order.Status = vo.MARKET_LOG_STATU_DEFAULT
err := do.NewMarketKeyDataSet(order, resq, resp)
2024-10-15 18:35:45 +08:00
if err != nil {
return err
}
2024-11-06 17:35:59 +08:00
_, err = m.svc.DbWrite.OrderNewMarket.FindByOutBizId(m.ctx, order.OutBizNo)
2024-10-15 18:35:45 +08:00
if err == sqlx.ErrNotFound {
2024-11-06 17:35:59 +08:00
_, err = m.svc.DbWrite.OrderNewMarket.Insert(m.ctx, order)
if err != nil {
return err
}
2024-10-15 18:35:45 +08:00
}
2024-11-06 17:35:59 +08:00
2024-10-15 18:35:45 +08:00
if err != nil {
return err
}
return nil
}
2024-07-09 17:07:22 +08:00
func (m *Market) saveMarketOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderMarket{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
2024-07-16 16:57:26 +08:00
order.Status = vo.MARKET_LOG_STATU_DEFAULT
2024-07-09 17:07:22 +08:00
err := do.MarketKeyDataSet(order, resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderMarket.FindByOutBizId(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderMarket.Insert(m.ctx, order)
if err != nil {
return err
}
}
if err != nil {
return err
}
2024-06-12 13:46:14 +08:00
return nil
}
2024-11-06 17:35:59 +08:00
func (m *Physical) savePhysicalOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderPhysical{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.PHYSICAL_LOG_STATU_DEFAULT
err := do.PhysicalDataSet(order, resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderPhysical.FindByOutBizId(m.ctx, order.CustomerOrderNum)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderPhysical.Insert(m.ctx, order)
if err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
2024-06-12 13:46:14 +08:00
func AllHandle(c *etc.RockerMqConfig, svc *mqSvc.ServiceContext, ctx context.Context) map[string]Message {
2024-06-18 16:34:14 +08:00
result := make(map[string]Message)
result[c.TopicPrefix+c.Topic.Market.Name] = NewMarket(svc, ctx)
result[c.TopicPrefix+c.Topic.ZLTX.Name] = NewZLTX(svc, ctx)
result[c.TopicPrefix+c.Topic.RS.Name] = NewRS(svc, ctx)
2024-10-15 18:35:45 +08:00
result[c.TopicPrefix+c.Topic.NewMarket.Name] = NewNewMarkets(svc, ctx)
2024-11-06 17:35:59 +08:00
result[c.TopicPrefix+c.Topic.Physical.Name] = NewPhysical(svc, ctx)
2024-06-12 13:46:14 +08:00
return result
}