解决消息中间件并发环境下报错问题

This commit is contained in:
renzhiyuan 2024-12-02 17:59:33 +08:00
parent 0a3cde2f76
commit 7eab49cabe
16 changed files with 70 additions and 91 deletions

View File

@ -1,5 +1,4 @@
FROM registry.cn-chengdu.aliyuncs.com/go_ls/go-zero:v1 AS builder FROM registry.cn-chengdu.aliyuncs.com/lansexiongdi/build:1.22.2 AS builder
LABEL stage=gobuilder LABEL stage=gobuilder
ENV CGO_ENABLED 0 ENV CGO_ENABLED 0
@ -17,6 +16,7 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifi
COPY --from=builder /src /src COPY --from=builder /src /src
RUN mkdir "/var/log/supervisor" RUN mkdir "/var/log/supervisor"
RUN mkdir "/var/log/queue" RUN mkdir "/var/log/queue"
WORKDIR /src WORKDIR /src
ADD ./sh/startup.sh /opt/startup.sh ADD ./sh/startup.sh /opt/startup.sh

View File

@ -2,12 +2,10 @@ package logic
import ( import (
"context" "context"
"trasfer_middleware/until/common" "github.com/zeromicro/go-zero/core/logx"
"trasfer_middleware/cmd/rpc/internal/svc" "trasfer_middleware/cmd/rpc/internal/svc"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/until/common"
"github.com/zeromicro/go-zero/core/logx"
) )
type PhysicalOrderListLogic struct { type PhysicalOrderListLogic struct {

View File

@ -9,14 +9,15 @@ import (
"trasfer_middleware/cmd/rpc/internal/logic/po/market/types" "trasfer_middleware/cmd/rpc/internal/logic/po/market/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/until/common" "trasfer_middleware/until/common"
mqs "trasfer_middleware/until/mq"
"trasfer_middleware/until/request" "trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog" "trasfer_middleware/until/sysLog"
) )
type Market struct { type Market struct {
Conf *types.MarketConf Conf *types.MarketConf
rmq *mqs.Producer
} }
type MarketRequest struct { type MarketRequest struct {
@ -25,9 +26,10 @@ type MarketRequest struct {
*po.RequestStruct *po.RequestStruct
} }
func NewMarket(conf types.MarketConf) *Market { func NewMarket(conf types.MarketConf, rmq *mqs.Producer) *Market {
return &Market{ return &Market{
Conf: &conf, Conf: &conf,
rmq: rmq,
} }
} }
@ -63,13 +65,7 @@ func (r *MarketRequest) request(url string) (*request.Response, error) {
resp, _ := req.Send() resp, _ := req.Send()
//异步存入请求记录 //异步存入请求记录
sendMq := mq.AliyunRocketMq{ err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Market.Name, po.SetMqSendDataMarket(r.RequestStruct, &resp, reqUrl))
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Market.Name, po.SetMqSendDataMarket(r.RequestStruct, &resp, reqUrl))
if err != nil { if err != nil {
sysLog.LogSendMq(r.ctx, err) sysLog.LogSendMq(r.ctx, err)
} }

View File

@ -6,13 +6,12 @@ import (
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/api/v1/key" "github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/api/v1/key"
"github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/core" "github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/core"
"trasfer_middleware/cmd/rpc/etc" mqs "trasfer_middleware/until/mq"
"trasfer_middleware/cmd/rpc/internal/logic/po" "trasfer_middleware/cmd/rpc/internal/logic/po"
"trasfer_middleware/cmd/rpc/internal/logic/po/new_market/types" "trasfer_middleware/cmd/rpc/internal/logic/po/new_market/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/until/common" "trasfer_middleware/until/common"
"trasfer_middleware/until/sysLog" "trasfer_middleware/until/sysLog"
) )
@ -22,9 +21,10 @@ type NewMarketStruct struct {
AppId string AppId string
CoreConfig *core.Config CoreConfig *core.Config
*po.RequestStruct *po.RequestStruct
rmq *mqs.Producer
} }
func NewNewMarket(conf types.NewMarketConf, mq etc.RockerMqConfig) *NewMarketStruct { func NewNewMarket(conf types.NewMarketConf, rmq *mqs.Producer) *NewMarketStruct {
return &NewMarketStruct{ return &NewMarketStruct{
Conf: &conf, Conf: &conf,
CoreConfig: &core.Config{ CoreConfig: &core.Config{
@ -33,9 +33,7 @@ func NewNewMarket(conf types.NewMarketConf, mq etc.RockerMqConfig) *NewMarketStr
Key: conf.KEY, Key: conf.KEY,
BaseURL: conf.Host, BaseURL: conf.Host,
}, },
RequestStruct: &po.RequestStruct{ rmq: rmq,
Config: mq,
},
} }
} }
@ -114,13 +112,7 @@ func (r *NewMarketStruct) coreKey(appId string) (*key.Key, error) {
func (r *NewMarketStruct) sendMq(ctx context.Context, url string, resp *key.Response) error { func (r *NewMarketStruct) sendMq(ctx context.Context, url string, resp *key.Response) error {
//异步存入请求记录 //异步存入请求记录
sendMq := mq.AliyunRocketMq{ err := r.rmq.SendSync(ctx, r.Config.TopicPrefix+r.Config.Topic.NewMarket.Name, po.SetMqSendDataNewMarket(r.RequestStruct, resp, url))
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(ctx, r.Config.TopicPrefix+r.Config.Topic.NewMarket.Name, po.SetMqSendDataNewMarket(r.RequestStruct, resp, url))
if err != nil { if err != nil {
sysLog.LogSendMq(ctx, err) sysLog.LogSendMq(ctx, err)
} }

View File

@ -9,7 +9,7 @@ import (
"trasfer_middleware/cmd/rpc/internal/logic/po/physical/types" "trasfer_middleware/cmd/rpc/internal/logic/po/physical/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq" mqs "trasfer_middleware/until/mq"
"trasfer_middleware/until/request" "trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog" "trasfer_middleware/until/sysLog"
) )
@ -17,6 +17,7 @@ import (
type Physical struct { type Physical struct {
Conf *types.PhysicalConf Conf *types.PhysicalConf
AppId string AppId string
rmq *mqs.Producer
} }
type PhysicalRequest struct { type PhysicalRequest struct {
@ -25,10 +26,11 @@ type PhysicalRequest struct {
*po.RequestStructWithJson *po.RequestStructWithJson
} }
func NewPhysical(conf types.PhysicalConf) *Physical { func NewPhysical(conf types.PhysicalConf, rmq *mqs.Producer) *Physical {
return &Physical{ return &Physical{
Conf: &conf, Conf: &conf,
rmq: rmq,
} }
} }
@ -58,13 +60,7 @@ func (r *PhysicalRequest) request(url string) (*request.Response, error) {
} }
resp, _ := req.Send() resp, _ := req.Send()
//异步存入请求记录 //异步存入请求记录
sendMq := mq.AliyunRocketMq{ err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Physical.Name, po.SetMqSendDataPhysical(r.RequestStructWithJson, &resp, url))
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Physical.Name, po.SetMqSendDataPhysical(r.RequestStructWithJson, &resp, url))
handlerResCode(&resp.Text) handlerResCode(&resp.Text)
if err != nil { if err != nil {
sysLog.LogSendMq(r.ctx, err) sysLog.LogSendMq(r.ctx, err)

View File

@ -11,15 +11,16 @@ import (
"trasfer_middleware/cmd/rpc/internal/logic/po/rs/types" "trasfer_middleware/cmd/rpc/internal/logic/po/rs/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/genModel" "trasfer_middleware/genModel"
"trasfer_middleware/until/common" "trasfer_middleware/until/common"
mqs "trasfer_middleware/until/mq"
"trasfer_middleware/until/request" "trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog" "trasfer_middleware/until/sysLog"
) )
type RS struct { type RS struct {
Conf *types.RSConf Conf *types.RSConf
rmq *mqs.Producer
} }
type RsRequest struct { type RsRequest struct {
@ -35,9 +36,10 @@ type RequestData struct {
*po.RequestStruct *po.RequestStruct
} }
func NewRs(conf types.RSConf) *RS { func NewRs(conf types.RSConf, rmq *mqs.Producer) *RS {
return &RS{ return &RS{
Conf: &conf, Conf: &conf,
rmq: rmq,
} }
} }
@ -74,13 +76,7 @@ func (r *RsRequest) request(url string) (*request.Response, error) {
} }
resp, _ := req.Send() resp, _ := req.Send()
//异步存入请求记录 //异步存入请求记录
sendMq := mq.AliyunRocketMq{ err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.RS.Name, po.SetMqSendDataRs(r.RequestStruct, r.Order, &resp, url))
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.RS.Name, po.SetMqSendDataRs(r.RequestStruct, r.Order, &resp, url))
if err != nil { if err != nil {
sysLog.LogSendMq(r.ctx, err) sysLog.LogSendMq(r.ctx, err)
} }

View File

@ -9,13 +9,14 @@ import (
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types" "trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer" "trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq" mqs "trasfer_middleware/until/mq"
"trasfer_middleware/until/request" "trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog" "trasfer_middleware/until/sysLog"
) )
type ZltxOrder struct { type ZltxOrder struct {
Conf *types.ZLTXConf Conf *types.ZLTXConf
rmq *mqs.Producer
} }
type ZltxOrderRequest struct { type ZltxOrderRequest struct {
@ -24,10 +25,11 @@ type ZltxOrderRequest struct {
*po.RequestStruct *po.RequestStruct
} }
func NewZltxOrder(conf types.ZLTXConf) *ZltxOrder { func NewZltxOrder(conf types.ZLTXConf, rmq *mqs.Producer) *ZltxOrder {
return &ZltxOrder{ return &ZltxOrder{
Conf: &conf, Conf: &conf,
rmq: rmq,
} }
} }
@ -61,13 +63,8 @@ func (r *ZltxOrderRequest) request(url string) (*request.Response, error) {
} }
resp, _ := req.Send() resp, _ := req.Send()
//异步存入请求记录 //异步存入请求记录
sendMq := mq.AliyunRocketMq{
AccessKey: r.Config.AccessKey, err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.ZLTX.Name, po.SetMqSendDataZLTX(r.RequestStruct, &resp, url))
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.ZLTX.Name, po.SetMqSendDataZLTX(r.RequestStruct, &resp, url))
handlerResCode(&resp.Text) handlerResCode(&resp.Text)
if err != nil { if err != nil {
sysLog.LogSendMq(r.ctx, err) sysLog.LogSendMq(r.ctx, err)

View File

@ -8,6 +8,7 @@ import (
"trasfer_middleware/cmd/rpc/internal/logic/po/physical" "trasfer_middleware/cmd/rpc/internal/logic/po/physical"
"trasfer_middleware/cmd/rpc/internal/logic/po/rs" "trasfer_middleware/cmd/rpc/internal/logic/po/rs"
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx" "trasfer_middleware/cmd/rpc/internal/logic/po/zltx"
mqs "trasfer_middleware/until/mq"
) )
type ServiceContext struct { type ServiceContext struct {
@ -18,10 +19,18 @@ type ServiceContext struct {
RS *rs.RS RS *rs.RS
NewMarket *new_market.NewMarketStruct NewMarket *new_market.NewMarketStruct
Physical *physical.Physical Physical *physical.Physical
Rmq *mqs.Producer
} }
func NewServiceContext(c config.Config) *ServiceContext { func NewServiceContext(c config.Config) *ServiceContext {
sendMq := AliyunRocketMq{
AccessKey: c.ExtraConfig.Mq.AccessKey,
SecretKey: c.ExtraConfig.Mq.SecretKey,
SecurityToken: c.ExtraConfig.Mq.SecurityToken,
ServerAddress: c.ExtraConfig.Mq.Host,
}
rmq, _ := mqs.NewProducer(sendMq.ServerAddress[0], mqs.WithProducerCredentials(sendMq.AccessKey, sendMq.SecretKey, sendMq.SecurityToken))
rmq.Start()
return &ServiceContext{ return &ServiceContext{
Config: c, Config: c,
RedisClient: redis.MustNewRedis(redis.RedisConf{ RedisClient: redis.MustNewRedis(redis.RedisConf{
@ -29,10 +38,17 @@ func NewServiceContext(c config.Config) *ServiceContext {
Type: c.Redis.Type, Type: c.Redis.Type,
Pass: c.Redis.Pass, Pass: c.Redis.Pass,
}), }),
ZltxOrder: zltx.NewZltxOrder(c.ZLTX), ZltxOrder: zltx.NewZltxOrder(c.ZLTX, rmq),
Market: market.NewMarket(c.Market), Market: market.NewMarket(c.Market, rmq),
RS: rs.NewRs(c.RS), RS: rs.NewRs(c.RS, rmq),
NewMarket: new_market.NewNewMarket(c.NewMarket, c.Mq), NewMarket: new_market.NewNewMarket(c.NewMarket, rmq),
Physical: physical.NewPhysical(c.Physical), Physical: physical.NewPhysical(c.Physical, rmq),
} }
} }
type AliyunRocketMq struct {
AccessKey string
SecretKey string
SecurityToken string
ServerAddress []string
}

View File

@ -1,10 +1,5 @@
package mq package mq
import (
"context"
"trasfer_middleware/until/mq"
)
type RocketMq struct { type RocketMq struct {
} }
@ -15,21 +10,16 @@ type AliyunRocketMq struct {
ServerAddress []string ServerAddress []string
} }
func (n *AliyunRocketMq) Produce(c context.Context, topic string, body []byte) error { //func (n *AliyunRocketMq) Produce(c context.Context, p mq.Producer, topic string, body []byte) error {
//
p, err := mq.NewProducer(n.ServerAddress[0], mq.WithProducerCredentials(n.AccessKey, n.SecretKey, n.SecurityToken)) // err := p.Start()
if err != nil { // if err != nil {
return err // return err
} // }
err = p.Start() // err = p.SendSync(c, topic, body)
if err != nil { // if err != nil {
return err // return err
} // }
err = p.SendSync(c, topic, body) //
if err != nil { // return nil
return err //}
}
p.Shutdown()
return nil
}

View File

@ -23,7 +23,6 @@ var configFile = flag.String("f", fmt.Sprintf("%s%s", "../../config/", until.Get
func main() { func main() {
flag.Parse() flag.Parse()
var c config.Config var c config.Config
conf.MustLoad(*configFile, &c) conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c) ctx := svc.NewServiceContext(c)
@ -35,7 +34,7 @@ func main() {
} }
}) })
registerNacos(&c) //registerNacos(&c)
defer s.Stop() defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)

0
sh/build.sh Normal file → Executable file
View File

1
sh/build_produce.sh Normal file → Executable file
View File

@ -1,5 +1,4 @@
#!/bin/bash #!/bin/bash
. $(pwd)/sh/config.sh . $(pwd)/sh/config.sh
Environment="produce" Environment="produce"
ADDRESS="${PRODUCE_ADDRESS}" ADDRESS="${PRODUCE_ADDRESS}"

0
sh/build_test.sh Normal file → Executable file
View File

2
sh/config.sh Normal file → Executable file
View File

@ -4,4 +4,4 @@ PORT="10001"
TEST_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware" TEST_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware"
PRODUCE_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware_produce" PRODUCE_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware_produce"
COMMIT_CONTENT="UPDATE" COMMIT_CONTENT="UPDATE"
VERSION="v1" VERSION="v3.0.0"

View File

@ -12,6 +12,6 @@ else
ADDRESS="${PRODUCE_ADDRESS}" ADDRESS="${PRODUCE_ADDRESS}"
fi fi
docker pull ${ADDRESS}:${VERSION} echo docker pull ${ADDRESS}:${VERSION}
docker rm -f "${CONTAINER}" echo docker rm -f "${CONTAINER}"
docker run -it -p "${PORT}:${PORT}" --name "${CONTAINER}" "${IMAGE}:${VERSION}" echo docker run -it -p "${PORT}:${PORT}" --network transfer_middleware --name "${CONTAINER}" "${IMAGE}:${VERSION}"

0
sh/env Normal file → Executable file
View File