diff --git a/Dockerfile b/Dockerfile index 85d0751..2c7ae64 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,4 @@ -FROM registry.cn-chengdu.aliyuncs.com/go_ls/go-zero:v1 AS builder - +FROM registry.cn-chengdu.aliyuncs.com/lansexiongdi/build:1.22.2 AS builder LABEL stage=gobuilder ENV CGO_ENABLED 0 @@ -17,6 +16,7 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifi COPY --from=builder /src /src RUN mkdir "/var/log/supervisor" + RUN mkdir "/var/log/queue" WORKDIR /src ADD ./sh/startup.sh /opt/startup.sh diff --git a/cmd/rpc/internal/logic/physicalOrderListLogic.go b/cmd/rpc/internal/logic/physicalOrderListLogic.go index 9b76039..2e1ee7a 100644 --- a/cmd/rpc/internal/logic/physicalOrderListLogic.go +++ b/cmd/rpc/internal/logic/physicalOrderListLogic.go @@ -2,12 +2,10 @@ package logic import ( "context" - "trasfer_middleware/until/common" - + "github.com/zeromicro/go-zero/core/logx" "trasfer_middleware/cmd/rpc/internal/svc" "trasfer_middleware/cmd/rpc/pb/transfer" - - "github.com/zeromicro/go-zero/core/logx" + "trasfer_middleware/until/common" ) type PhysicalOrderListLogic struct { diff --git a/cmd/rpc/internal/logic/po/market/market.go b/cmd/rpc/internal/logic/po/market/market.go index e1ce8ea..dd6eef7 100644 --- a/cmd/rpc/internal/logic/po/market/market.go +++ b/cmd/rpc/internal/logic/po/market/market.go @@ -9,14 +9,15 @@ import ( "trasfer_middleware/cmd/rpc/internal/logic/po/market/types" "trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/pb/transfer" - "trasfer_middleware/cmd/rpc/pkg/mq" "trasfer_middleware/until/common" + mqs "trasfer_middleware/until/mq" "trasfer_middleware/until/request" "trasfer_middleware/until/sysLog" ) type Market struct { Conf *types.MarketConf + rmq *mqs.Producer } type MarketRequest struct { @@ -25,9 +26,10 @@ type MarketRequest struct { *po.RequestStruct } -func NewMarket(conf types.MarketConf) *Market { +func NewMarket(conf types.MarketConf, rmq *mqs.Producer) *Market { return &Market{ Conf: &conf, + rmq: rmq, } } @@ -63,13 +65,7 @@ func (r *MarketRequest) request(url string) (*request.Response, error) { resp, _ := req.Send() //异步存入请求记录 - sendMq := mq.AliyunRocketMq{ - AccessKey: r.Config.AccessKey, - SecretKey: r.Config.SecretKey, - SecurityToken: r.Config.SecurityToken, - ServerAddress: r.Config.Host, - } - err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Market.Name, po.SetMqSendDataMarket(r.RequestStruct, &resp, reqUrl)) + err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Market.Name, po.SetMqSendDataMarket(r.RequestStruct, &resp, reqUrl)) if err != nil { sysLog.LogSendMq(r.ctx, err) } diff --git a/cmd/rpc/internal/logic/po/new_market/new_market.go b/cmd/rpc/internal/logic/po/new_market/new_market.go index c563a46..ff50ca6 100644 --- a/cmd/rpc/internal/logic/po/new_market/new_market.go +++ b/cmd/rpc/internal/logic/po/new_market/new_market.go @@ -6,13 +6,12 @@ import ( "github.com/mitchellh/mapstructure" "github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/api/v1/key" "github.com/sleepinggodoflove/lansexiongdi-marketing-sdk/core" - "trasfer_middleware/cmd/rpc/etc" + mqs "trasfer_middleware/until/mq" "trasfer_middleware/cmd/rpc/internal/logic/po" "trasfer_middleware/cmd/rpc/internal/logic/po/new_market/types" "trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/pb/transfer" - "trasfer_middleware/cmd/rpc/pkg/mq" "trasfer_middleware/until/common" "trasfer_middleware/until/sysLog" ) @@ -22,9 +21,10 @@ type NewMarketStruct struct { AppId string CoreConfig *core.Config *po.RequestStruct + rmq *mqs.Producer } -func NewNewMarket(conf types.NewMarketConf, mq etc.RockerMqConfig) *NewMarketStruct { +func NewNewMarket(conf types.NewMarketConf, rmq *mqs.Producer) *NewMarketStruct { return &NewMarketStruct{ Conf: &conf, CoreConfig: &core.Config{ @@ -33,9 +33,7 @@ func NewNewMarket(conf types.NewMarketConf, mq etc.RockerMqConfig) *NewMarketStr Key: conf.KEY, BaseURL: conf.Host, }, - RequestStruct: &po.RequestStruct{ - Config: mq, - }, + rmq: rmq, } } @@ -114,13 +112,7 @@ func (r *NewMarketStruct) coreKey(appId string) (*key.Key, error) { func (r *NewMarketStruct) sendMq(ctx context.Context, url string, resp *key.Response) error { //异步存入请求记录 - sendMq := mq.AliyunRocketMq{ - AccessKey: r.Config.AccessKey, - SecretKey: r.Config.SecretKey, - SecurityToken: r.Config.SecurityToken, - ServerAddress: r.Config.Host, - } - err := sendMq.Produce(ctx, r.Config.TopicPrefix+r.Config.Topic.NewMarket.Name, po.SetMqSendDataNewMarket(r.RequestStruct, resp, url)) + err := r.rmq.SendSync(ctx, r.Config.TopicPrefix+r.Config.Topic.NewMarket.Name, po.SetMqSendDataNewMarket(r.RequestStruct, resp, url)) if err != nil { sysLog.LogSendMq(ctx, err) } diff --git a/cmd/rpc/internal/logic/po/physical/physical.go b/cmd/rpc/internal/logic/po/physical/physical.go index bf88b3a..63162c1 100644 --- a/cmd/rpc/internal/logic/po/physical/physical.go +++ b/cmd/rpc/internal/logic/po/physical/physical.go @@ -9,7 +9,7 @@ import ( "trasfer_middleware/cmd/rpc/internal/logic/po/physical/types" "trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/pb/transfer" - "trasfer_middleware/cmd/rpc/pkg/mq" + mqs "trasfer_middleware/until/mq" "trasfer_middleware/until/request" "trasfer_middleware/until/sysLog" ) @@ -17,6 +17,7 @@ import ( type Physical struct { Conf *types.PhysicalConf AppId string + rmq *mqs.Producer } type PhysicalRequest struct { @@ -25,10 +26,11 @@ type PhysicalRequest struct { *po.RequestStructWithJson } -func NewPhysical(conf types.PhysicalConf) *Physical { +func NewPhysical(conf types.PhysicalConf, rmq *mqs.Producer) *Physical { return &Physical{ Conf: &conf, + rmq: rmq, } } @@ -58,13 +60,7 @@ func (r *PhysicalRequest) request(url string) (*request.Response, error) { } resp, _ := req.Send() //异步存入请求记录 - sendMq := mq.AliyunRocketMq{ - AccessKey: r.Config.AccessKey, - SecretKey: r.Config.SecretKey, - SecurityToken: r.Config.SecurityToken, - ServerAddress: r.Config.Host, - } - err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Physical.Name, po.SetMqSendDataPhysical(r.RequestStructWithJson, &resp, url)) + err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Physical.Name, po.SetMqSendDataPhysical(r.RequestStructWithJson, &resp, url)) handlerResCode(&resp.Text) if err != nil { sysLog.LogSendMq(r.ctx, err) diff --git a/cmd/rpc/internal/logic/po/rs/coupon.go b/cmd/rpc/internal/logic/po/rs/coupon.go index fa070e8..2d05969 100644 --- a/cmd/rpc/internal/logic/po/rs/coupon.go +++ b/cmd/rpc/internal/logic/po/rs/coupon.go @@ -11,15 +11,16 @@ import ( "trasfer_middleware/cmd/rpc/internal/logic/po/rs/types" "trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/pb/transfer" - "trasfer_middleware/cmd/rpc/pkg/mq" "trasfer_middleware/genModel" "trasfer_middleware/until/common" + mqs "trasfer_middleware/until/mq" "trasfer_middleware/until/request" "trasfer_middleware/until/sysLog" ) type RS struct { Conf *types.RSConf + rmq *mqs.Producer } type RsRequest struct { @@ -35,9 +36,10 @@ type RequestData struct { *po.RequestStruct } -func NewRs(conf types.RSConf) *RS { +func NewRs(conf types.RSConf, rmq *mqs.Producer) *RS { return &RS{ Conf: &conf, + rmq: rmq, } } @@ -74,13 +76,7 @@ func (r *RsRequest) request(url string) (*request.Response, error) { } resp, _ := req.Send() //异步存入请求记录 - sendMq := mq.AliyunRocketMq{ - AccessKey: r.Config.AccessKey, - SecretKey: r.Config.SecretKey, - SecurityToken: r.Config.SecurityToken, - ServerAddress: r.Config.Host, - } - err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.RS.Name, po.SetMqSendDataRs(r.RequestStruct, r.Order, &resp, url)) + err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.RS.Name, po.SetMqSendDataRs(r.RequestStruct, r.Order, &resp, url)) if err != nil { sysLog.LogSendMq(r.ctx, err) } diff --git a/cmd/rpc/internal/logic/po/zltx/zltxOrder.go b/cmd/rpc/internal/logic/po/zltx/zltxOrder.go index d6fc280..d5e7a03 100644 --- a/cmd/rpc/internal/logic/po/zltx/zltxOrder.go +++ b/cmd/rpc/internal/logic/po/zltx/zltxOrder.go @@ -9,13 +9,14 @@ import ( "trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types" "trasfer_middleware/cmd/rpc/internal/logic/vo" "trasfer_middleware/cmd/rpc/pb/transfer" - "trasfer_middleware/cmd/rpc/pkg/mq" + mqs "trasfer_middleware/until/mq" "trasfer_middleware/until/request" "trasfer_middleware/until/sysLog" ) type ZltxOrder struct { Conf *types.ZLTXConf + rmq *mqs.Producer } type ZltxOrderRequest struct { @@ -24,10 +25,11 @@ type ZltxOrderRequest struct { *po.RequestStruct } -func NewZltxOrder(conf types.ZLTXConf) *ZltxOrder { +func NewZltxOrder(conf types.ZLTXConf, rmq *mqs.Producer) *ZltxOrder { return &ZltxOrder{ Conf: &conf, + rmq: rmq, } } @@ -61,13 +63,8 @@ func (r *ZltxOrderRequest) request(url string) (*request.Response, error) { } resp, _ := req.Send() //异步存入请求记录 - sendMq := mq.AliyunRocketMq{ - AccessKey: r.Config.AccessKey, - SecretKey: r.Config.SecretKey, - SecurityToken: r.Config.SecurityToken, - ServerAddress: r.Config.Host, - } - err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.ZLTX.Name, po.SetMqSendDataZLTX(r.RequestStruct, &resp, url)) + + err := r.rmq.SendSync(r.ctx, r.Config.TopicPrefix+r.Config.Topic.ZLTX.Name, po.SetMqSendDataZLTX(r.RequestStruct, &resp, url)) handlerResCode(&resp.Text) if err != nil { sysLog.LogSendMq(r.ctx, err) diff --git a/cmd/rpc/internal/svc/serviceContext.go b/cmd/rpc/internal/svc/serviceContext.go index c204fe0..bc9d929 100644 --- a/cmd/rpc/internal/svc/serviceContext.go +++ b/cmd/rpc/internal/svc/serviceContext.go @@ -8,6 +8,7 @@ import ( "trasfer_middleware/cmd/rpc/internal/logic/po/physical" "trasfer_middleware/cmd/rpc/internal/logic/po/rs" "trasfer_middleware/cmd/rpc/internal/logic/po/zltx" + mqs "trasfer_middleware/until/mq" ) type ServiceContext struct { @@ -18,10 +19,18 @@ type ServiceContext struct { RS *rs.RS NewMarket *new_market.NewMarketStruct Physical *physical.Physical + Rmq *mqs.Producer } func NewServiceContext(c config.Config) *ServiceContext { - + sendMq := AliyunRocketMq{ + AccessKey: c.ExtraConfig.Mq.AccessKey, + SecretKey: c.ExtraConfig.Mq.SecretKey, + SecurityToken: c.ExtraConfig.Mq.SecurityToken, + ServerAddress: c.ExtraConfig.Mq.Host, + } + rmq, _ := mqs.NewProducer(sendMq.ServerAddress[0], mqs.WithProducerCredentials(sendMq.AccessKey, sendMq.SecretKey, sendMq.SecurityToken)) + rmq.Start() return &ServiceContext{ Config: c, RedisClient: redis.MustNewRedis(redis.RedisConf{ @@ -29,10 +38,17 @@ func NewServiceContext(c config.Config) *ServiceContext { Type: c.Redis.Type, Pass: c.Redis.Pass, }), - ZltxOrder: zltx.NewZltxOrder(c.ZLTX), - Market: market.NewMarket(c.Market), - RS: rs.NewRs(c.RS), - NewMarket: new_market.NewNewMarket(c.NewMarket, c.Mq), - Physical: physical.NewPhysical(c.Physical), + ZltxOrder: zltx.NewZltxOrder(c.ZLTX, rmq), + Market: market.NewMarket(c.Market, rmq), + RS: rs.NewRs(c.RS, rmq), + NewMarket: new_market.NewNewMarket(c.NewMarket, rmq), + Physical: physical.NewPhysical(c.Physical, rmq), } } + +type AliyunRocketMq struct { + AccessKey string + SecretKey string + SecurityToken string + ServerAddress []string +} diff --git a/cmd/rpc/pkg/mq/rocketmq.go b/cmd/rpc/pkg/mq/rocketmq.go index 98d9424..f20dad5 100644 --- a/cmd/rpc/pkg/mq/rocketmq.go +++ b/cmd/rpc/pkg/mq/rocketmq.go @@ -1,10 +1,5 @@ package mq -import ( - "context" - "trasfer_middleware/until/mq" -) - type RocketMq struct { } @@ -15,21 +10,16 @@ type AliyunRocketMq struct { ServerAddress []string } -func (n *AliyunRocketMq) Produce(c context.Context, topic string, body []byte) error { - - p, err := mq.NewProducer(n.ServerAddress[0], mq.WithProducerCredentials(n.AccessKey, n.SecretKey, n.SecurityToken)) - if err != nil { - return err - } - err = p.Start() - if err != nil { - return err - } - err = p.SendSync(c, topic, body) - if err != nil { - return err - } - p.Shutdown() - - return nil -} +//func (n *AliyunRocketMq) Produce(c context.Context, p mq.Producer, topic string, body []byte) error { +// +// err := p.Start() +// if err != nil { +// return err +// } +// err = p.SendSync(c, topic, body) +// if err != nil { +// return err +// } +// +// return nil +//} diff --git a/cmd/rpc/transfer.go b/cmd/rpc/transfer.go index 2470f7a..b937940 100644 --- a/cmd/rpc/transfer.go +++ b/cmd/rpc/transfer.go @@ -23,7 +23,6 @@ var configFile = flag.String("f", fmt.Sprintf("%s%s", "../../config/", until.Get func main() { flag.Parse() - var c config.Config conf.MustLoad(*configFile, &c) ctx := svc.NewServiceContext(c) @@ -35,7 +34,7 @@ func main() { } }) - registerNacos(&c) + //registerNacos(&c) defer s.Stop() fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) diff --git a/sh/build.sh b/sh/build.sh old mode 100644 new mode 100755 diff --git a/sh/build_produce.sh b/sh/build_produce.sh old mode 100644 new mode 100755 index 1697f8c..28ccefd --- a/sh/build_produce.sh +++ b/sh/build_produce.sh @@ -1,5 +1,4 @@ #!/bin/bash - . $(pwd)/sh/config.sh Environment="produce" ADDRESS="${PRODUCE_ADDRESS}" diff --git a/sh/build_test.sh b/sh/build_test.sh old mode 100644 new mode 100755 diff --git a/sh/config.sh b/sh/config.sh old mode 100644 new mode 100755 index 6acd568..741dd85 --- a/sh/config.sh +++ b/sh/config.sh @@ -4,4 +4,4 @@ PORT="10001" TEST_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware" PRODUCE_ADDRESS="registry.cn-chengdu.aliyuncs.com/go_ls/transfer_middleware_produce" COMMIT_CONTENT="UPDATE" -VERSION="v1" +VERSION="v3.0.0" diff --git a/sh/depoly.sh b/sh/depoly.sh index a939783..b54bf22 100755 --- a/sh/depoly.sh +++ b/sh/depoly.sh @@ -12,6 +12,6 @@ else ADDRESS="${PRODUCE_ADDRESS}" fi -docker pull ${ADDRESS}:${VERSION} -docker rm -f "${CONTAINER}" -docker run -it -p "${PORT}:${PORT}" --name "${CONTAINER}" "${IMAGE}:${VERSION}" \ No newline at end of file +echo docker pull ${ADDRESS}:${VERSION} +echo docker rm -f "${CONTAINER}" +echo docker run -it -p "${PORT}:${PORT}" --network transfer_middleware --name "${CONTAINER}" "${IMAGE}:${VERSION}" \ No newline at end of file diff --git a/sh/env b/sh/env old mode 100644 new mode 100755