transfer_middleware/cmd/rpc/internal/queue/mq/mq.go

333 lines
8.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mq
import (
"context"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"time"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/logic/do"
"trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/internal/queue/mq/mqSvc"
"trasfer_middleware/genModel"
)
type Message interface {
MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error
}
type Market struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
type ZLTX struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
type RS struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
type NewMarkets struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
type Physical struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
func NewMarket(svc *mqSvc.ServiceContext, ctx context.Context) *Market {
return &Market{
svc: svc,
ctx: ctx,
}
}
func NewZLTX(svc *mqSvc.ServiceContext, ctx context.Context) *ZLTX {
return &ZLTX{
svc: svc,
ctx: ctx,
}
}
func NewRS(svc *mqSvc.ServiceContext, ctx context.Context) *RS {
return &RS{
svc: svc,
ctx: ctx,
}
}
func NewNewMarkets(svc *mqSvc.ServiceContext, ctx context.Context) *NewMarkets {
return &NewMarkets{
svc: svc,
ctx: ctx,
}
}
func NewPhysical(svc *mqSvc.ServiceContext, ctx context.Context) *Physical {
return &Physical{
svc: svc,
ctx: ctx,
}
}
func (m *Market) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var market = &genModel.ServerMiddleMarketLogs{}
err := json.Unmarshal(msg, market)
market.CreateTime = time.Now()
market.Status = vo.MARKET_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.MarketLogs.Insert(m.ctx, market)
if err != nil {
return fmt.Errorf("market数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("market日志数据保存失败%s,原因:%s", msg, err)
}
if market.Url == vo.MARKET_KEY_SEND {
err = m.saveMarketOrder(logId, market.Data, market.Resp)
if err != nil {
return fmt.Errorf("market订单数据保存失败%s,原因:%s", msg, err)
}
}
return nil
}
func (m *ZLTX) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
zltx = &genModel.ServerMiddleZltxLogs{}
)
json.Unmarshal(msg, zltx)
zltx.CreateTime = time.Now()
zltx.Status = vo.ZLTX_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.ZLTXLogs.Insert(m.ctx, zltx)
if err != nil {
return fmt.Errorf("zltx数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("zltx日志数据保存失败%s,原因:%s", msg, err)
}
if zltx.Url == vo.ZLTX_RECHARGE || zltx.Url == vo.ZLTX_RS_MIXUE_COUPON_GRANT {
err = m.saveZLTXOrder(logId, zltx.Data, zltx.Resp, zltx.Url)
if err != nil {
return fmt.Errorf("zltx订单数据保存失败%s,原因:%s", msg, err)
}
}
return nil
}
func (m *ZLTX) saveZLTXOrder(logId int64, resq string, resp string, url string) error {
var order = &genModel.ServerOrderZltx{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.ORDER_STATUS_DEFAULT
switch url {
case vo.ZLTX_RS_MIXUE_COUPON_GRANT:
zltxRs := do.ZltxRsData{Order: order}
err := zltxRs.SetData(resq, resp)
if err != nil {
return err
}
default:
err := do.ZLTXRechargeData(order, resq, resp)
if err != nil {
return err
}
}
//检查是否存在,存在则不插入
_, err := m.svc.DbWrite.OrderZLTX.FindOneByBizNo(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderZLTX.Insert(m.ctx, order)
if err != nil {
return err
}
}
return nil
}
func (m *RS) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
rs = &genModel.ServerMiddleRsLogs{}
)
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.RSLogs.Insert(m.ctx, rs)
if err != nil {
return fmt.Errorf("rs数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("rs日志数据保存失败%s,原因:%s", msg, err)
}
err = m.saveRsOrder(logId, rs.Data, rs.Resp)
if err != nil {
return fmt.Errorf("rs订单数据保存失败%s,原因:%s", msg, err)
}
return nil
}
func (m *RS) saveRsOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderRs{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
data := do.RsData{Order: order}
err := data.SetData(resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderRs.FindByOutBizId(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderRs.Insert(m.ctx, order)
}
if err != nil {
return err
}
return nil
}
func (m *NewMarkets) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
rs = &genModel.ServerMiddleNewMarketLogs{}
)
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.NewMarketLogs.Insert(m.ctx, rs)
if err != nil {
return fmt.Errorf("new_market数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("new_market日志数据保存失败%s,原因:%s", msg, err)
}
err = m.saveNewMarketOrder(logId, rs.Data, rs.Resp)
if err != nil {
return fmt.Errorf("new_market订单数据保存失败%s,原因:%s", msg, err)
}
return nil
}
func (m *Physical) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var (
rs = &genModel.ServerMiddlePhysicalLogs{}
)
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
logInfo, err := m.svc.DbWrite.PhysicalLogs.Insert(m.ctx, rs)
if err != nil {
return fmt.Errorf("new_market数据保存失败%s,原因:%s", msg, err)
}
logId, err := logInfo.LastInsertId()
if err != nil {
return fmt.Errorf("new_market日志数据保存失败%s,原因:%s", msg, err)
}
if rs.Url == vo.PHYSICAL_ORDER_SUBMIT {
err = m.savePhysicalOrder(logId, rs.Data, rs.Resp)
if err != nil {
return fmt.Errorf("new_market订单数据保存失败%s,原因:%s", msg, err)
}
}
return nil
}
func (m *NewMarkets) saveNewMarketOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderNewMarket{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.MARKET_LOG_STATU_DEFAULT
err := do.NewMarketKeyDataSet(order, resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderNewMarket.FindByOutBizId(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderNewMarket.Insert(m.ctx, order)
if err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
func (m *Market) saveMarketOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderMarket{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.MARKET_LOG_STATU_DEFAULT
err := do.MarketKeyDataSet(order, resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderMarket.FindByOutBizId(m.ctx, order.OutBizNo)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderMarket.Insert(m.ctx, order)
if err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
func (m *Physical) savePhysicalOrder(logId int64, resq string, resp string) error {
var order = &genModel.ServerOrderPhysical{}
order.LogId = logId
order.ReqTime = time.Now()
order.CreateTime = time.Now()
order.Status = vo.PHYSICAL_LOG_STATU_DEFAULT
err := do.PhysicalDataSet(order, resq, resp)
if err != nil {
return err
}
_, err = m.svc.DbWrite.OrderPhysical.FindByOutBizId(m.ctx, order.CustomerOrderNum)
if err == sqlx.ErrNotFound {
_, err = m.svc.DbWrite.OrderPhysical.Insert(m.ctx, order)
if err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
func AllHandle(c *etc.RockerMqConfig, svc *mqSvc.ServiceContext, ctx context.Context) map[string]Message {
result := make(map[string]Message)
result[c.TopicPrefix+c.Topic.Market.Name] = NewMarket(svc, ctx)
result[c.TopicPrefix+c.Topic.ZLTX.Name] = NewZLTX(svc, ctx)
result[c.TopicPrefix+c.Topic.RS.Name] = NewRS(svc, ctx)
result[c.TopicPrefix+c.Topic.NewMarket.Name] = NewNewMarkets(svc, ctx)
result[c.TopicPrefix+c.Topic.Physical.Name] = NewPhysical(svc, ctx)
return result
}