diff --git a/cmd/server/main.go b/cmd/server/main.go index ee8397b..ddfe0b1 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -57,6 +57,7 @@ func newApp( logger log.Logger, httpServer *http.Server, consumerServer *server.Consumer, + wechatNotifyConsumer *server.WechatNotifyConsumer, ) *kratos.App { return kratos.New( kratos.ID(id), @@ -66,6 +67,7 @@ func newApp( kratos.Server( httpServer, consumerServer, + wechatNotifyConsumer, ), ) } diff --git a/configs/config.yaml b/configs/config.yaml index 02dd14b..b76c112 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -59,18 +59,18 @@ cmb: notifyUrl: "https://sandbox.cdcc.cmbchina.com/AccessGateway/transIn/updateCodeStatus.json" # 招行测试回调地址 wechatNotifyMQ: + isOpenConsumer: false #是否启动消费 true/false accessKeyId: "LTAI5tPyV7FynQNTfEvbEBuX" accessKeySecret: "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T" endPoint: "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com" regionId: "hangzhou" instanceId: "MQ_INST_1389288909295870_BYSoMttI" topic: "notify" - tag: "coupon_usage_notify_market" # 待取 - groupId: "market_pro" # 待取 + groupId: "market_pro" + tag: "voucher_notify_dev" debug: false registerTagUrl: "https://wpcallbacks.api.1688sup.com/wechatPay/register_tag" - #配置日志 logs: business: business.log #业务日志路径:如果不写日志,则不配置或配置为空 diff --git a/go.mod b/go.mod index 73b1939..5f625ed 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.2 require ( github.com/ZZMarquis/gm v1.3.2 + github.com/aliyunmq/mq-http-go-sdk v1.0.3 github.com/apache/rocketmq-client-go/v2 v2.1.2 github.com/brianvoe/gofakeit/v6 v6.28.0 github.com/duke-git/lancet/v2 v2.2.8 @@ -11,6 +12,7 @@ require ( github.com/go-kratos/kratos/contrib/config/nacos/v2 v2.0.0-20241105072421-f8b97f675b32 github.com/go-kratos/kratos/v2 v2.8.2 github.com/go-playground/validator/v10 v10.25.0 + github.com/gogap/errors v0.0.0-20210818113853-edfbba0ddea9 github.com/gogf/gf v1.16.9 github.com/google/wire v0.6.0 github.com/gorilla/handlers v1.5.1 @@ -33,10 +35,12 @@ require ( require ( dario.cat/mergo v1.0.0 // indirect github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emirpasic/gods v1.12.0 // indirect + github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/go-errors/errors v1.0.1 // indirect @@ -47,6 +51,7 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect + github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 // indirect github.com/golang/mock v1.3.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect @@ -55,6 +60,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -67,17 +73,19 @@ require ( github.com/tidwall/gjson v1.13.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.59.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.uber.org/atomic v1.6.0 // indirect go.uber.org/multierr v1.5.0 // indirect go.uber.org/zap v1.15.0 // indirect - golang.org/x/crypto v0.32.0 // indirect + golang.org/x/crypto v0.33.0 // indirect golang.org/x/exp v0.0.0-20221208152030-732eee02a75a // indirect golang.org/x/lint v0.0.0-20241112194109-818c5a804067 // indirect - golang.org/x/net v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect golang.org/x/tools v0.28.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect diff --git a/go.sum b/go.sum index cf25808..a7ff27d 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,10 @@ github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3 github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= +github.com/aliyunmq/mq-http-go-sdk v1.0.3 h1:/uhH7DUoaw9XTtsPgDp7zdPUyG5FBKj2GmJJph9z+6o= +github.com/aliyunmq/mq-http-go-sdk v1.0.3/go.mod h1:JYfRMQoPexERvnNNBcal0ZQ2TVQ5ialDiW9ScjaadEM= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/apache/rocketmq-client-go/v2 v2.1.2 h1:yt73olKe5N6894Dbm+ojRf/JPiP0cxfDNNffKwhpJVg= github.com/apache/rocketmq-client-go/v2 v2.1.2/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE= github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4= @@ -46,6 +50,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= @@ -85,6 +91,10 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/gogap/errors v0.0.0-20210818113853-edfbba0ddea9 h1:qvGIRaCYFKkyFK9SgRXJCc/lmQCeeg2cl3mwBKQd5W0= +github.com/gogap/errors v0.0.0-20210818113853-edfbba0ddea9/go.mod h1:tbRYYYC7g/H7QlCeX0Z2zaThWKowF4QQCFIsGgAsqRo= +github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8 h1:AuxION6c7in+AsPmFjQTUKT6/o1suT8XEEpfU0pWsHA= +github.com/gogap/stack v0.0.0-20150131034635-fef68dddd4f8/go.mod h1:6q1WEv2BiAO4FSdwLQTJbWQYAn1/qDNJHUGJNXCj9kM= github.com/gogf/gf v1.16.9 h1:Q803UmmRo59+Ws08sMVFOcd8oNpkSWL9vS33hlo/Cyk= github.com/gogf/gf v1.16.9/go.mod h1:8Q/kw05nlVRp+4vv7XASBsMe9L1tsVKiGoeP2AHnlkk= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= @@ -151,6 +161,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -237,8 +249,14 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho= github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.59.0 h1:Qu0qYHfXvPk1mSLNqcFtEk6DpxgA26hy6bmydotDpRI= +github.com/valyala/fasthttp v1.59.0/go.mod h1:GTxNb9Bc6r2a9D0TWNSPwDz78UxnTGBViY3xZNEqyYU= github.com/wechatpay-apiv3/wechatpay-go v0.2.20 h1:gS8oFn1bHGnyapR2Zb4aqTV6l4kJWgbtqjCq6k1L9DQ= github.com/wechatpay-apiv3/wechatpay-go v0.2.20/go.mod h1:A254AUBVB6R+EqQFo3yTgeh7HtyqRRtN2w9hQSOrd4Q= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= @@ -273,8 +291,8 @@ golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20221208152030-732eee02a75a h1:4iLhBPcpqFmylhnkbY3W0ONLUYYkDAW9xMFLfxgsvCw= golang.org/x/exp v0.0.0-20221208152030-732eee02a75a/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= @@ -311,8 +329,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -322,8 +340,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -349,8 +367,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -365,8 +383,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index 0639167..a45356b 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -557,6 +557,7 @@ type WechatNotifyMQ struct { GroupId string `protobuf:"bytes,8,opt,name=groupId,proto3" json:"groupId,omitempty"` Debug bool `protobuf:"varint,9,opt,name=debug,proto3" json:"debug,omitempty"` RegisterTagUrl string `protobuf:"bytes,10,opt,name=registerTagUrl,proto3" json:"registerTagUrl,omitempty"` + IsOpenConsumer bool `protobuf:"varint,11,opt,name=isOpenConsumer,proto3" json:"isOpenConsumer,omitempty"` } func (x *WechatNotifyMQ) Reset() { @@ -661,6 +662,13 @@ func (x *WechatNotifyMQ) GetRegisterTagUrl() string { return "" } +func (x *WechatNotifyMQ) GetIsOpenConsumer() bool { + if x != nil { + return x.IsOpenConsumer + } + return false +} + type Logs struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1129,7 +1137,7 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x6f, 0x72, 0x67, 0x4e, 0x6f, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x72, 0x67, 0x4e, 0x6f, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x72, 0x6c, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x72, 0x6c, - 0x22, 0xb4, 0x02, 0x0a, 0x0e, 0x57, 0x65, 0x63, 0x68, 0x61, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x22, 0xdc, 0x02, 0x0a, 0x0e, 0x57, 0x65, 0x63, 0x68, 0x61, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4d, 0x51, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, @@ -1148,13 +1156,15 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x75, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x12, 0x26, 0x0a, 0x0e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, - 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, - 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, - 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, - 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, + 0x6e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x22, + 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, + 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, + 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, + 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index 4ad609d..e8c9c5e 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -95,6 +95,7 @@ message WechatNotifyMQ { string groupId = 8; bool debug = 9; string registerTagUrl = 10; + bool isOpenConsumer = 11; } message Logs { diff --git a/internal/data/mq.go b/internal/data/mq.go index 12a4337..0d027a2 100644 --- a/internal/data/mq.go +++ b/internal/data/mq.go @@ -2,9 +2,12 @@ package data import ( "fmt" + mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "github.com/go-kratos/kratos/v2/log" + "strconv" + "time" "voucher/internal/conf" "voucher/internal/pkg/mq" ) @@ -55,3 +58,50 @@ func buildMqProducer(c *conf.RocketMQ) (*mq.Producer, error) { //此时并没有发起连接,在使用时才会 return p, nil } + +func wechatNotifyProducer() { + // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 + endpoint := "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com" + // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 + // AccessKey ID,阿里云身份验证标识。 + accessKey := "LTAI5tPyV7FynQNTfEvbEBuX" + // AccessKey Secret,阿里云身份验证密钥。 + secretKey := "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T" + // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 + topic := "notify" + // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 + // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 + instanceId := "MQ_INST_1389288909295870_BYSoMttI" + + tag := "voucher_notify_dev" + //tag := "voucher_notify_pro" + + client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "") + + mqProducer := client.GetProducer(instanceId, topic) + + // 循环发送2条消息。 + for i := 0; i < 2; i++ { + var msg mq_http_sdk.PublishMessageRequest + + msg = mq_http_sdk.PublishMessageRequest{ + MessageBody: "hello mq!", //消息内容。 + MessageTag: tag, // 消息标签。 + Properties: map[string]string{}, // 消息属性。 + } + // 设置消息的Key。 + msg.MessageKey = "MessageKey" + // 设置消息自定义属性。 + msg.Properties["a"] = strconv.Itoa(i) + + ret, err := mqProducer.PublishMessage(msg) + + if err != nil { + fmt.Println(err) + return + } else { + fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5) + } + time.Sleep(time.Duration(100) * time.Millisecond) + } +} diff --git a/internal/data/mq_test.go b/internal/data/mq_test.go index ed69ef7..0105c2d 100644 --- a/internal/data/mq_test.go +++ b/internal/data/mq_test.go @@ -76,3 +76,7 @@ func Test_NotifyProducer(t *testing.T) { return } } + +func Test_WechatNotifyProducer(t *testing.T) { + wechatNotifyProducer() +} diff --git a/internal/server/provider_set.go b/internal/server/provider_set.go index 00d9338..0bf9483 100644 --- a/internal/server/provider_set.go +++ b/internal/server/provider_set.go @@ -8,4 +8,5 @@ import ( var ProviderSetServer = wire.NewSet( NewHTTPServer, NewConsumer, + NewWechatNotifyConsumer, ) diff --git a/internal/server/wechat_notify_consume.go b/internal/server/wechat_notify_consume.go new file mode 100644 index 0000000..9ce945d --- /dev/null +++ b/internal/server/wechat_notify_consume.go @@ -0,0 +1,134 @@ +package server + +import ( + "context" + "fmt" + mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport" + "github.com/gogap/errors" + "strings" + "time" + "voucher/internal/conf" +) + +var _ transport.Server = (*WechatNotifyConsumer)(nil) + +type WechatNotifyConsumer struct { + conf *conf.Bootstrap +} + +func NewWechatNotifyConsumer(conf *conf.Bootstrap) *WechatNotifyConsumer { + return &WechatNotifyConsumer{conf: conf} +} + +func (w *WechatNotifyConsumer) Start(ctx context.Context) error { + if !w.conf.WechatNotifyMQ.IsOpenConsumer { + log.Warnf("wechat notify MQ is not open") + return nil + } + + // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 + endpoint := w.conf.WechatNotifyMQ.EndPoint + // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 + // AccessKey ID,阿里云身份验证标识。 + accessKey := w.conf.WechatNotifyMQ.AccessKeyId + // AccessKey Secret,阿里云身份验证密钥。 + secretKey := w.conf.WechatNotifyMQ.AccessKeySecret + // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 + //不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。 + topic := w.conf.WechatNotifyMQ.Topic + // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 + // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 + instanceId := w.conf.WechatNotifyMQ.InstanceId + // 您在控制台创建的Group ID。 + groupId := w.conf.WechatNotifyMQ.GroupId + + tag := w.conf.WechatNotifyMQ.Tag + + client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "") + + mqConsumer := client.GetConsumer(instanceId, topic, groupId, tag) + + for { + endChan := make(chan int) + respChan := make(chan mq_http_sdk.ConsumeMessageResponse) + errChan := make(chan error) + go func() { + select { + case resp := <-respChan: + { + // 处理业务逻辑。 + var handles []string + fmt.Printf("Consume %d messages---->\n", len(resp.Messages)) + for _, v := range resp.Messages { + handles = append(handles, v.ReceiptHandle) + fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+ + "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+ + "\tBody: %s\n"+ + "\tProps: %s\n", + v.MessageId, + v.PublishTime, + v.MessageTag, + v.ConsumedTimes, + v.FirstConsumeTime, + v.NextConsumeTime, + v.MessageBody, + v.Properties, + ) + } + + // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 + // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 + ackerr := mqConsumer.AckMessage(handles) + if ackerr != nil { + // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 + fmt.Println(ackerr) + if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok { + for _, errAckItem := range errAckItems { + fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", + errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) + } + } else { + fmt.Println("ack err =", ackerr) + } + time.Sleep(time.Duration(3) * time.Second) + } else { + fmt.Printf("Ack ---->\n\t%s\n", handles) + } + + endChan <- 1 + } + case err := <-errChan: + { + // Topic中没有消息可消费。 + if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") { + fmt.Println("\nNo new message, continue!") + } else { + fmt.Println(err) + time.Sleep(time.Duration(3) * time.Second) + } + endChan <- 1 + } + case <-time.After(35 * time.Second): + { + fmt.Println("Timeout of consumer message ??") + endChan <- 1 + } + } + }() + + // 长轮询消费消息,网络超时时间默认为35s。 + // 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回响应。 + mqConsumer.ConsumeMessage(respChan, errChan, + 3, // 一次最多消费3条(最多可设置为16条)。 + 3, // 长轮询时间3s(最多可设置为30s)。 + ) + <-endChan + } +} + +func (w WechatNotifyConsumer) Stop(ctx context.Context) error { + fmt.Println("关闭 wechat consumer 中...") + return nil +}