消息中转中心

This commit is contained in:
Rzy 2024-06-18 16:34:14 +08:00
parent a7bd832a00
commit dcfdf2c741
71 changed files with 3072 additions and 567 deletions

1
.gitignore vendored
View File

@ -1,4 +1,3 @@
/.idea
/config
/test
/genModel.sh

32
Dockerfile Normal file
View File

@ -0,0 +1,32 @@
FROM golang:alpine AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED 0
ENV GOPROXY https://goproxy.cn,direct
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
WORKDIR /src
COPY ../.. .
RUN go mod download
RUN go build -ldflags="-s -w" -o /src/cmd/rpc/transfer cmd/rpc/transfer.go
RUN go build -ldflags="-s -w" -o /src/cmd/rpc/queue/queue cmd/rpc/queue/queue.go
FROM alpine:latest AS runtime
RUN apk update --no-cache && apk add --no-cache supervisor
RUN apk add --no-cache make
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
ENV TZ Asia/Shanghai
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /src /src
RUN mkdir "/var/log/supervisor"
WORKDIR /src
ADD ./sh/startup.sh /opt/startup.sh
RUN sed -i 's/\r//g' /opt/startup.sh
ADD ./sh/supervisord.conf /etc/supervisord.conf
WORKDIR /src
EXPOSE 10001
#CMD ["sh","/opt/startup.sh"]

0
Makefile Executable file → Normal file
View File

21
cmd/rpc/etc/config.go Executable file → Normal file
View File

@ -13,7 +13,22 @@ type NacosConf struct {
}
type RockerMqConfig struct {
Host []string
GroupName string
Topic []string
Host []string
GroupName string
TopicPrefix string
AccessKey string
SecretKey string
SecurityToken string
Topic TopicList
}
type TopicList struct {
Market TopicConfig
ZLTX TopicConfig
RS TopicConfig
}
type TopicConfig struct {
Name string
}

1
cmd/rpc/internal/config/config.go Executable file → Normal file
View File

@ -13,6 +13,7 @@ type Config struct {
Cache cache.CacheConf
ZLTX types.ZLTXConf
Market types.MarketConf
RS types.RSConf
DB struct {
Master struct {
DataSource string

View File

@ -25,7 +25,7 @@ func NewMarketKeyDiscardLogic(ctx context.Context, svcCtx *svc.ServiceContext) *
}
func (l *MarketKeyDiscardLogic) MarketKeyDiscard(in *transfer.MarketKeyDiscardReq) (*transfer.MarketKeyDiscardRes, error) {
res, err := l.svcCtx.Market.SetData(common.StructToMap(in), &l.svcCtx.Config.Mq).KeyDiscard()
res, err := l.svcCtx.Market.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).KeyDiscard()
return res, err
}

View File

@ -25,6 +25,6 @@ func NewMarketKeySendLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Mar
}
func (l *MarketKeySendLogic) MarketKeySend(in *transfer.MarketKeySendReq) (*transfer.MarketKeySendRes, error) {
res, err := l.svcCtx.Market.SetData(common.StructToMap(in), &l.svcCtx.Config.Mq).KeySend()
res, err := l.svcCtx.Market.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).KeySend()
return res, err
}

View File

@ -24,6 +24,6 @@ func NewMarketQueryLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Marke
}
func (l *MarketQueryLogic) MarketQuery(in *transfer.MarketQueryReq) (*transfer.MarketQueryRes, error) {
res, err := l.svcCtx.Market.SetData(common.StructToMap(in), &l.svcCtx.Config.Mq).Query()
res, err := l.svcCtx.Market.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).Query()
return res, err
}

View File

@ -4,81 +4,80 @@ import (
"context"
"encoding/json"
"fmt"
"time"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/logic/po"
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/until/common"
"trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog"
)
type Market struct {
Conf *types.MarketConf
RequestBody map[string]string
Conf *types.MarketConf
}
type MarketRequest struct {
ctx context.Context
*Market
RequestBody map[string]string
Config etc.RockerMqConfig
*po.RequestStruct
}
func NewMarket(conf types.MarketConf) *Market {
return &Market{
Conf: &conf,
}
}
func (r *Market) SetData(data map[string]interface{}, config *etc.RockerMqConfig) *MarketRequest {
func (r *Market) SetData(c context.Context, data map[string]interface{}, config *etc.RockerMqConfig) *MarketRequest {
data["timestamp"] = time.Now().Format("20060102150405")
private1 := "MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC5FGH7Tq5u7pA/eh6AjAS0InykvWDJt095go8yK3w7+TRIhSYDdbRHlTgOQm4nWuMPfz3U2Rs1vJQwyyEYdylcYJ2zFLr7Vb1BdvkJ3Kz/2yJ6sz3BNq6xAHaeCKzA/WZxnc/ypfkGlrmfr2tNqCM9CUHUWryihBjLxwRiWLmo0aKgYpKLKYNixLgyqUYAifD3APncAduv6sSjUPMTyXMOlP1DXgVwX6IaUG/yV8/56Ew72Vdi/y4qZmCKMmXq4PovWrs8ISOEuhxbfLrGWbGCAVYPq7d7XaH+AOY4dhJZm7OZ43UGWw80QKGEPkvU4Oquzu8BqBh12md7Zsd6r0XzAgMBAAECggEAcLgTPKUc437z51UOwqeELdlbJFIaYn/8LTrwz1NgpH4P86L0FeNX2sjsjPK0d8+IvmV2WO2o/r9NWbI9A9N/Iz3MjcawYmZDj11QK0t1KZZil2wWzlfpaO+pTnJmFFvASq4ceeHPms2tW63QokkmvQOoTha9EBV3rJQW/XagDEolty57kkfmB31cQHJuAt+BF5EzBqv3q3jnqhsj8J/ddT0hadyKq65u85VomLH92asu/KKMKYYXC8aHjgX48chAmQUAHGM/HCD2owLHwtei2kPWNDx85ecBsglIX3wy0yhH1dnL+o3eeskVLl89ye3QCJPHJBaNUUfbgucgWT0bsQKBgQD1pPMAe31ZXajl9WlHMtn8qhpAGzi/GiiH6YrrHMQECC2GGuAakBko1Vhc+2HU35gwlPOhwMIOCapB0cCqcZVo3+71AKo78YvZLQ7yMuSsp0/Wn2N79NZ6+++wtHGPP9eHrLuWm23l15W7W0RcQptTaQupbculMQZ8b6cAjh6d1QKBgQDA4c4Xl2ePbQdgMMOuKTPPKF3QI1VhCVtxSV+Gj9MZBZedstz9+ZO3oxHhy8D5S9it1hE6dn6/a+7OWibZ/gBr1S0+11LcwKDb7q30dimr9bQs/srIywpoIIN8wVEkX4P9JLOWgQeAtq53IMba+cElef916aqyJpXuIek9lvUQpwKBgQCD7alNMwWpf3H8v4dhY+BLoRgkIfqiOGxYQogHqhVkjPfWNIzz9zxr/9lLZv+uEsBsJzOKRjpyy6ITY5H0eLhj8REnqMnFE/+mDlsenVLPn7Rzcns90ct3leOvpdnvs7wP9CdzxdqKPPUAAQ5/9o3xiFNpFbzv5Zq0LkslMy8iWQKBgQCiRJWctUxzllcRLpVBTPqAOkaKV195zmR2rzLFQvRmZZUDH7nZlQEYCgF+Q2tqj8uPm7tMwumo4wW55pAu7witr19sMbxNaWUrAeao9kvilkfpXsV9HYv4w/m6l+xKvGyPKDRJ1u1X9Nhb8mA5UsqSW8t2CIoJbHrQJwlRPlGXmwKBgQDg4rcsM2PmShOg8lSrHXPATXiZyyqpPJLpXbV6DRKyt7U6KWjyrplQN7yOoIUgsuD2OC/q67y7w1P3OY7X0RDnMr6MtIV0JyBJHg24eyBTqeLai2DqoHlsBOSvpJDZf+g/DXCjvHMWp1h0wqdj3aLthmU0dHM/CEqr/o7d8GwrGQ=="
p := "-----BEGIN RSA PRIVATE KEY-----\n" + private1 + "\n-----END RSA PRIVATE KEY-----"
data["app_id"] = "2783278"
data["mem_id"] = "2783278"
data["pos_id"] = "2783278"
//data["timestamp"] = time.Now().Format("20060102150405")
//private1 := "MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC5FGH7Tq5u7pA/eh6AjAS0InykvWDJt095go8yK3w7+TRIhSYDdbRHlTgOQm4nWuMPfz3U2Rs1vJQwyyEYdylcYJ2zFLr7Vb1BdvkJ3Kz/2yJ6sz3BNq6xAHaeCKzA/WZxnc/ypfkGlrmfr2tNqCM9CUHUWryihBjLxwRiWLmo0aKgYpKLKYNixLgyqUYAifD3APncAduv6sSjUPMTyXMOlP1DXgVwX6IaUG/yV8/56Ew72Vdi/y4qZmCKMmXq4PovWrs8ISOEuhxbfLrGWbGCAVYPq7d7XaH+AOY4dhJZm7OZ43UGWw80QKGEPkvU4Oquzu8BqBh12md7Zsd6r0XzAgMBAAECggEAcLgTPKUc437z51UOwqeELdlbJFIaYn/8LTrwz1NgpH4P86L0FeNX2sjsjPK0d8+IvmV2WO2o/r9NWbI9A9N/Iz3MjcawYmZDj11QK0t1KZZil2wWzlfpaO+pTnJmFFvASq4ceeHPms2tW63QokkmvQOoTha9EBV3rJQW/XagDEolty57kkfmB31cQHJuAt+BF5EzBqv3q3jnqhsj8J/ddT0hadyKq65u85VomLH92asu/KKMKYYXC8aHjgX48chAmQUAHGM/HCD2owLHwtei2kPWNDx85ecBsglIX3wy0yhH1dnL+o3eeskVLl89ye3QCJPHJBaNUUfbgucgWT0bsQKBgQD1pPMAe31ZXajl9WlHMtn8qhpAGzi/GiiH6YrrHMQECC2GGuAakBko1Vhc+2HU35gwlPOhwMIOCapB0cCqcZVo3+71AKo78YvZLQ7yMuSsp0/Wn2N79NZ6+++wtHGPP9eHrLuWm23l15W7W0RcQptTaQupbculMQZ8b6cAjh6d1QKBgQDA4c4Xl2ePbQdgMMOuKTPPKF3QI1VhCVtxSV+Gj9MZBZedstz9+ZO3oxHhy8D5S9it1hE6dn6/a+7OWibZ/gBr1S0+11LcwKDb7q30dimr9bQs/srIywpoIIN8wVEkX4P9JLOWgQeAtq53IMba+cElef916aqyJpXuIek9lvUQpwKBgQCD7alNMwWpf3H8v4dhY+BLoRgkIfqiOGxYQogHqhVkjPfWNIzz9zxr/9lLZv+uEsBsJzOKRjpyy6ITY5H0eLhj8REnqMnFE/+mDlsenVLPn7Rzcns90ct3leOvpdnvs7wP9CdzxdqKPPUAAQ5/9o3xiFNpFbzv5Zq0LkslMy8iWQKBgQCiRJWctUxzllcRLpVBTPqAOkaKV195zmR2rzLFQvRmZZUDH7nZlQEYCgF+Q2tqj8uPm7tMwumo4wW55pAu7witr19sMbxNaWUrAeao9kvilkfpXsV9HYv4w/m6l+xKvGyPKDRJ1u1X9Nhb8mA5UsqSW8t2CIoJbHrQJwlRPlGXmwKBgQDg4rcsM2PmShOg8lSrHXPATXiZyyqpPJLpXbV6DRKyt7U6KWjyrplQN7yOoIUgsuD2OC/q67y7w1P3OY7X0RDnMr6MtIV0JyBJHg24eyBTqeLai2DqoHlsBOSvpJDZf+g/DXCjvHMWp1h0wqdj3aLthmU0dHM/CEqr/o7d8GwrGQ=="
//p := "-----BEGIN RSA PRIVATE KEY-----\n" + private1 + "\n-----END RSA PRIVATE KEY-----"
//data["app_id"] = "2783278"
//data["mem_id"] = "2783278"
//data["pos_id"] = "2783278"
sign, err := common.MarketMakeRsaSign(p, data)
if err != nil {
panic(err)
}
data["sign"] = sign
//sign, err := common.MarketMakeRsaSign(p, data)
//if err != nil {
// panic(err)
//}
//data["sign"] = sign
//data := common.MergeMaps(common.ToMap(p), common.ToMap(r), common.ToMap(e))*/
requestBody := make(map[string]string, len(data))
for key, value := range data {
requestBody[key] = fmt.Sprintf("%v", value)
}
return &MarketRequest{
Market: r,
RequestBody: requestBody,
Config: *config,
ctx: c,
Market: r,
RequestStruct: &po.RequestStruct{
Config: *config,
RequestBody: requestBody,
},
}
}
func (r *MarketRequest) request(url string) (*request.Response, error) {
reqInfo := make(map[string]interface{}, 4)
reqInfo["url"] = fmt.Sprintf("%s%s", r.Conf.Host, url)
reqUrl := fmt.Sprintf("%s%s", r.Conf.Host, url)
req := request.Request{
Method: "POST",
Url: reqInfo["url"].(string),
Url: reqUrl,
Data: r.RequestBody,
}
resp, _ := req.Send()
//异步存入请求记录
reqStr, _ := json.Marshal(r.RequestBody)
reqInfo["data"] = string(reqStr)
reqInfo["resp"] = resp.Text
reqInfo["code"] = resp.StatusCode
sendMq := mq.RocketMq{}
send, err := sendMq.Produce(r.Config.Host, r.Config.GroupName, "MARKET", reqInfo, 0)
sendMq := mq.AliyunRocketMq{
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.Market.Name, po.SetMqSendData(r.RequestStruct, &resp, reqUrl))
if err != nil {
sysLog.ErrLog(context.Background(), err)
} else {
sysLog.LogMq(context.Background(), send)
sysLog.LogSendMq(r.ctx, err)
}
/*r.Model.Insert(context.Background(), &genModel.ServerMiddleMarketLogs{
Url : url,
@ -87,7 +86,7 @@ func (r *MarketRequest) request(url string) (*request.Response, error) {
Resp :resp.Text,
CreateTime :time.Now(),
})*/
return &resp, nil
return &resp, err
}
func (r *MarketRequest) KeySend() (*transfer.MarketKeySendRes, error) {

View File

@ -0,0 +1,79 @@
package rs
import (
"context"
"encoding/json"
"fmt"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/logic/po"
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog"
)
type RS struct {
Conf *types.RSConf
}
type RsRequest struct {
ctx context.Context
*RS
*po.RequestStruct
}
func NewRs(conf types.RSConf) *RS {
return &RS{
Conf: &conf,
}
}
func (r *RS) SetData(c context.Context, data map[string]interface{}, config *etc.RockerMqConfig) *RsRequest {
requestBody := make(map[string]string, len(data))
for key, value := range data {
requestBody[key] = fmt.Sprintf("%v", value)
}
return &RsRequest{
ctx: c,
RS: r,
RequestStruct: &po.RequestStruct{
Config: *config,
RequestBody: requestBody,
},
}
}
func (r *RsRequest) request(url string) (*request.Response, error) {
reqUrl := fmt.Sprintf("%s%s", r.Conf.Host, url)
req := request.Request{
Method: "POST",
Url: reqUrl,
Data: r.RequestBody,
}
resp, _ := req.Send()
//异步存入请求记录
sendMq := mq.AliyunRocketMq{
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.RS.Name, po.SetMqSendData(r.RequestStruct, &resp, reqUrl))
if err != nil {
sysLog.LogSendMq(r.ctx, err)
}
return &resp, err
}
func (r *RsRequest) CouponGrant() (*transfer.RsCouponGrantRes, error) {
var res transfer.RsCouponGrantRes
req, err := r.request(vo.RS_COUPON_GRANT)
if err != nil {
return nil, err
}
_ = json.Unmarshal([]byte(req.Text), &res)
return &res, nil
}

View File

@ -0,0 +1,25 @@
package po
import (
"encoding/json"
"time"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/until/request"
)
type RequestStruct struct {
RequestBody map[string]string
Config etc.RockerMqConfig
}
func SetMqSendData(reqInfo *RequestStruct, respInfo *request.Response, url string) []byte {
log := make(map[string]interface{}, 5)
reqStr, _ := json.Marshal(reqInfo.RequestBody)
log["data"] = string(reqStr)
log["url"] = url
log["resp"] = respInfo.Text
log["code"] = respInfo.StatusCode
log["create_time"] = time.Now().Format("2006-01-02 15:04:05")
logByte, _ := json.Marshal(log)
return logByte
}

View File

@ -18,6 +18,10 @@ type MarketConf struct {
Host string
}
type RSConf struct {
Host string
}
type BaseRes struct {
Code string `json:"code"`
Message string `json:"message"`

View File

@ -1,22 +1,27 @@
package zltx
import (
"context"
"encoding/json"
"fmt"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/logic/po"
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx/types"
"trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/pb/transfer"
"trasfer_middleware/cmd/rpc/pkg/mq"
"trasfer_middleware/until/request"
"trasfer_middleware/until/sysLog"
)
type ZltxOrder struct {
Conf *types.ZLTXConf
RequestBody map[string]string
Conf *types.ZLTXConf
}
type ZltxOrderRequest struct {
ctx context.Context
*ZltxOrder
RequestBody map[string]string
*po.RequestStruct
}
func NewZltxOrder(conf types.ZLTXConf) *ZltxOrder {
@ -26,7 +31,7 @@ func NewZltxOrder(conf types.ZLTXConf) *ZltxOrder {
}
}
func (r *ZltxOrder) SetData(data map[string]interface{}) *ZltxOrderRequest {
func (r *ZltxOrder) SetData(c context.Context, data map[string]interface{}, config *etc.RockerMqConfig) *ZltxOrderRequest {
/*data["timeStamp"] = time.Now().Unix()
a := data
delete(a, "extendParameter")
@ -38,19 +43,35 @@ func (r *ZltxOrder) SetData(data map[string]interface{}) *ZltxOrderRequest {
requestBody[key] = fmt.Sprintf("%v", value)
}
return &ZltxOrderRequest{
ZltxOrder: r,
RequestBody: requestBody,
ctx: c,
ZltxOrder: r,
RequestStruct: &po.RequestStruct{
Config: *config,
RequestBody: requestBody,
},
}
}
func (r *ZltxOrderRequest) request(url string) (*request.Response, error) {
reqUrl := fmt.Sprintf("%s%s", r.Conf.Host, url)
req := request.Request{
Method: "POST",
Url: fmt.Sprintf("%s%s", r.Conf.Host, url),
Url: reqUrl,
Data: r.RequestBody,
}
resp, _ := req.Send()
return &resp, nil
//异步存入请求记录
sendMq := mq.AliyunRocketMq{
AccessKey: r.Config.AccessKey,
SecretKey: r.Config.SecretKey,
SecurityToken: r.Config.SecurityToken,
ServerAddress: r.Config.Host,
}
err := sendMq.Produce(r.ctx, r.Config.TopicPrefix+r.Config.Topic.ZLTX.Name, po.SetMqSendData(r.RequestStruct, &resp, reqUrl))
if err != nil {
sysLog.LogSendMq(r.ctx, err)
}
return &resp, err
}
func (r *ZltxOrderRequest) RechargeOrder() (*transfer.DefaultRes, error) {
@ -122,3 +143,13 @@ func (r *ZltxOrderRequest) RechargeInfo() (*transfer.ZltxRechargeInfoRes, error)
_ = json.Unmarshal([]byte(req.Text), &res)
return &res, nil
}
func (r *ZltxOrderRequest) RsMiXueCouponGrant() (*transfer.RsCouponGrantRes, error) {
var res transfer.RsCouponGrantRes
req, err := r.request(vo.ZLTX_RS_MIXUE_COUPON_GRANT)
if err != nil {
return nil, err
}
_ = json.Unmarshal([]byte(req.Text), &res)
return &res, nil
}

View File

@ -0,0 +1,31 @@
package logic
import (
"context"
"trasfer_middleware/until/common"
"trasfer_middleware/cmd/rpc/internal/svc"
"trasfer_middleware/cmd/rpc/pb/transfer"
"github.com/zeromicro/go-zero/core/logx"
)
type RsCouponGrantLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewRsCouponGrantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RsCouponGrantLogic {
return &RsCouponGrantLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *RsCouponGrantLogic) RsCouponGrant(in *transfer.RsCouponGrantReq) (*transfer.RsCouponGrantRes, error) {
res, err := l.svcCtx.RS.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).CouponGrant()
return res, err
}

View File

@ -9,4 +9,6 @@ const (
//券码详情
MARKET_KEY_QUERY = "openApi/v1/market/key/query"
MARKET_LOG_STATU_DEFAULT = 1
)

View File

@ -0,0 +1,8 @@
package vo
const (
// 荣数立减金
RS_COUPON_GRANT = "cross/v1/coupon/grant"
RS_LOG_STATU_DEFAULT = 1
)

View File

@ -21,4 +21,9 @@ const (
// 余额查询
ZLTX_RECHARGE_INFO = "recharge/info"
// 荣数立减金(蜜雪冰城)
ZLTX_RS_MIXUE_COUPON_GRANT = "supplier/order/RongShuMiXue"
ZLTX_LOG_STATU_DEFAULT = 1
)

View File

@ -25,7 +25,7 @@ func NewZltxOrderCardLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Zlt
}
func (l *ZltxOrderCardLogic) ZltxOrderCard(in *transfer.ZltxOrderCardReq) (*transfer.DefaultRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).CardOrder()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).CardOrder()
return &transfer.DefaultRes{
Code: res.Code,
Message: res.Message,

View File

@ -25,7 +25,7 @@ func NewZltxOrderCardQueryLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *ZltxOrderCardQueryLogic) ZltxOrderCardQuery(in *transfer.ZltxOrderCardQueryReq) (*transfer.ZltxOrderCardQueryRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).CardQuery()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).CardQuery()
return &transfer.ZltxOrderCardQueryRes{
Code: res.Code,
Message: res.Message,

View File

@ -25,7 +25,7 @@ func NewZltxOrderRechargeLogic(ctx context.Context, svcCtx *svc.ServiceContext)
}
func (l *ZltxOrderRechargeLogic) ZltxOrderRecharge(in *transfer.ZltxOrderRechargeReq) (*transfer.DefaultRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).RechargeOrder()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).RechargeOrder()
return &transfer.DefaultRes{
Code: res.Code,
Message: res.Message,

View File

@ -24,7 +24,7 @@ func NewZltxOrderRechargeQueryLogic(ctx context.Context, svcCtx *svc.ServiceCont
}
func (l *ZltxOrderRechargeQueryLogic) ZltxOrderRechargeQuery(in *transfer.ZltxOrderRechargeQueryReq) (*transfer.ZltxOrderRechargeQueryRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).RechargeQuery()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).RechargeQuery()
return &transfer.ZltxOrderRechargeQueryRes{
Code: res.Code,
Message: res.Message,

View File

@ -27,7 +27,7 @@ func NewZltxOrderSmsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Zltx
func (l *ZltxOrderSmsLogic) ZltxOrderSms(in *transfer.ZltxOrderSmsReq) (*transfer.ZltxOrderSmsRes, error) {
var result = &transfer.ZltxOrderSmsRes{}
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).OrderSendSms()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).OrderSendSms()
_ = copier.Copy(result, &res)
return result, err
}

View File

@ -26,7 +26,7 @@ func NewZltxRechargeInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *
func (l *ZltxRechargeInfoLogic) ZltxRechargeInfo(in *transfer.DefaultReq) (*transfer.ZltxRechargeInfoRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).RechargeInfo()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).RechargeInfo()
//_ = copier.Copy(result, &res)
return res, err
}

View File

@ -27,7 +27,7 @@ func NewZltxRechargeProductLogic(ctx context.Context, svcCtx *svc.ServiceContext
func (l *ZltxRechargeProductLogic) ZltxRechargeProduct(in *transfer.DefaultReq) (*transfer.ZltxRechargeProductRes, error) {
var result = &transfer.ZltxRechargeProductRes{}
res, err := l.svcCtx.ZltxOrder.SetData(common.StructToMap(in)).RechargeProduct()
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).RechargeProduct()
_ = copier.Copy(result, &res)
return result, err
}

View File

@ -0,0 +1,30 @@
package logic
import (
"context"
"trasfer_middleware/until/common"
"trasfer_middleware/cmd/rpc/internal/svc"
"trasfer_middleware/cmd/rpc/pb/transfer"
"github.com/zeromicro/go-zero/core/logx"
)
type ZltxRsMiXueLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewZltxRsMiXueLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ZltxRsMiXueLogic {
return &ZltxRsMiXueLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *ZltxRsMiXueLogic) ZltxRsMiXue(in *transfer.RsCouponGrantReq) (*transfer.RsCouponGrantRes, error) {
res, err := l.svcCtx.ZltxOrder.SetData(l.ctx, common.StructToMap(in), &l.svcCtx.Config.Mq).RsMiXueCouponGrant()
return res, err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/streadway/amqp"
"time"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/logic/vo"
"trasfer_middleware/cmd/rpc/internal/queue/mq/mqSvc"
"trasfer_middleware/genModel"
)
@ -23,6 +24,11 @@ type ZLTX struct {
ctx context.Context
}
type RS struct {
svc *mqSvc.ServiceContext
ctx context.Context
}
func NewMarket(svc *mqSvc.ServiceContext, ctx context.Context) *Market {
return &Market{
svc: svc,
@ -37,10 +43,18 @@ func NewZLTX(svc *mqSvc.ServiceContext, ctx context.Context) *ZLTX {
}
}
func NewRS(svc *mqSvc.ServiceContext, ctx context.Context) *RS {
return &RS{
svc: svc,
ctx: ctx,
}
}
func (m *Market) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var market = &genModel.ServerMiddleMarketLogs{}
json.Unmarshal(msg, market)
market.CreateTime = time.Now()
market.Status = vo.MARKET_LOG_STATU_DEFAULT
_, err := m.svc.DbWrite.MarketLogs.Insert(m.ctx, market)
if err != nil {
return fmt.Errorf("market数据保存失败%s,原因:%s", msg, err)
@ -52,16 +66,30 @@ func (m *ZLTX) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var zltx = &genModel.ServerMiddleZltxLogs{}
json.Unmarshal(msg, zltx)
zltx.CreateTime = time.Now()
zltx.Status = vo.ZLTX_LOG_STATU_DEFAULT
_, err := m.svc.DbWrite.ZLTXLogs.Insert(m.ctx, zltx)
if err != nil {
return fmt.Errorf("zltx数据保存失败%s", msg)
return fmt.Errorf("zltx数据保存失败%s,原因:%s", msg, err)
}
return nil
}
func (m *RS) MessageHandler(tag uint64, ch *amqp.Channel, msg []byte) error {
var rs = &genModel.ServerMiddleRsLogs{}
json.Unmarshal(msg, rs)
rs.CreateTime = time.Now()
rs.Status = vo.RS_LOG_STATU_DEFAULT
_, err := m.svc.DbWrite.RSLogs.Insert(m.ctx, rs)
if err != nil {
return fmt.Errorf("rs数据保存失败%s,原因:%s", msg, err)
}
return nil
}
func AllHandle(c *etc.RockerMqConfig, svc *mqSvc.ServiceContext, ctx context.Context) map[string]Message {
result := make(map[string]Message, len(c.Topic))
result["MARKET"] = NewMarket(svc, ctx)
result["ZLTX"] = NewZLTX(svc, ctx)
result := make(map[string]Message)
result[c.TopicPrefix+c.Topic.Market.Name] = NewMarket(svc, ctx)
result[c.TopicPrefix+c.Topic.ZLTX.Name] = NewZLTX(svc, ctx)
result[c.TopicPrefix+c.Topic.RS.Name] = NewRS(svc, ctx)
return result
}

View File

@ -3,11 +3,9 @@ package mqServer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"sync"
"trasfer_middleware/cmd/rpc/etc"
"trasfer_middleware/cmd/rpc/internal/queue/mq"
mq1 "trasfer_middleware/until/mq"
"trasfer_middleware/until/sysLog"
)
@ -21,42 +19,37 @@ func NewRocketmq(config *etc.RockerMqConfig) *RocketMq {
}
}
func (n RocketMq) Consume(handler map[string]mq.Message) error {
c, err := consumer.NewPushConsumer(
consumer.WithGroupName(n.mqConfig.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(n.mqConfig.Host)),
)
defer c.Shutdown()
// 设置回调函数
if err != nil {
return fmt.Errorf("rockmq error:%v", err)
func (n RocketMq) Consume(c context.Context, handler map[string]mq.Message) error {
manager := mq1.NewConsumerManager(nil)
connConf := &mq1.ConsumerConnConfig{
NameServers: n.mqConfig.Host[0],
AccessKey: n.mqConfig.AccessKey,
SecretKey: n.mqConfig.SecretKey,
}
var wg sync.WaitGroup
for _, topic := range n.mqConfig.Topic {
wg.Add(1)
go func(topic string) {
defer wg.Done()
err := c.Subscribe(topic,
consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
err = handler[topic].MessageHandler(0, nil, msgs[i].Body)
if err != nil {
return consumer.ConsumeRetryLater, err
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
sysLog.ErrLog(context.Background(), err)
}
}(topic)
for topic, _ := range handler {
err := manager.Subscribe(c, connConf,
&mq1.ConsumerConfig{
TopicName: topic,
GroupName: topic + "_consumer",
PerCoroutineCnt: 2,
},
func(ctx context.Context, message *mq1.ConsumerMessage) error {
err := handler[message.Topic].MessageHandler(0, nil, message.Body)
if err != nil {
return fmt.Errorf("链接失败:%v", err)
}
return nil
})
if err != nil {
sysLog.ErrQueueLog(context.Background(), err)
}
}
err = c.Start()
err := manager.Start(c)
defer manager.Stop(c)
if err != nil {
sysLog.ErrLog(context.Background(), err)
sysLog.ErrQueueLog(context.Background(), fmt.Errorf("启动失败:%v", err))
}
select {}
}

View File

@ -24,10 +24,12 @@ func DbModel(datasource string, c config.Config) *Model {
return &Model{
MarketLogs: genModel.NewServerMiddleMarketLogsModel(sqlConn),
ZLTXLogs: genModel.NewServerMiddleZltxLogsModel(sqlConn),
RSLogs: genModel.NewServerMiddleRsLogsModel(sqlConn),
}
}
type Model struct {
MarketLogs genModel.ServerMiddleMarketLogsModel
ZLTXLogs genModel.ServerMiddleZltxLogsModel
RSLogs genModel.ServerMiddleRsLogsModel
}

10
cmd/rpc/internal/server/transferServer.go Executable file → Normal file
View File

@ -57,6 +57,11 @@ func (s *TransferServer) ZltxRechargeProduct(ctx context.Context, in *transfer.D
return l.ZltxRechargeProduct(in)
}
func (s *TransferServer) ZltxRsMiXue(ctx context.Context, in *transfer.RsCouponGrantReq) (*transfer.RsCouponGrantRes, error) {
l := logic.NewZltxRsMiXueLogic(ctx, s.svcCtx)
return l.ZltxRsMiXue(in)
}
func (s *TransferServer) MarketKeySend(ctx context.Context, in *transfer.MarketKeySendReq) (*transfer.MarketKeySendRes, error) {
l := logic.NewMarketKeySendLogic(ctx, s.svcCtx)
return l.MarketKeySend(in)
@ -71,3 +76,8 @@ func (s *TransferServer) MarketQuery(ctx context.Context, in *transfer.MarketQue
l := logic.NewMarketQueryLogic(ctx, s.svcCtx)
return l.MarketQuery(in)
}
func (s *TransferServer) RsCouponGrant(ctx context.Context, in *transfer.RsCouponGrantReq) (*transfer.RsCouponGrantRes, error) {
l := logic.NewRsCouponGrantLogic(ctx, s.svcCtx)
return l.RsCouponGrant(in)
}

3
cmd/rpc/internal/svc/serviceContext.go Executable file → Normal file
View File

@ -4,6 +4,7 @@ import (
"github.com/zeromicro/go-zero/core/stores/redis"
"trasfer_middleware/cmd/rpc/internal/config"
"trasfer_middleware/cmd/rpc/internal/logic/po/market"
"trasfer_middleware/cmd/rpc/internal/logic/po/rs"
"trasfer_middleware/cmd/rpc/internal/logic/po/zltx"
)
@ -12,6 +13,7 @@ type ServiceContext struct {
RedisClient *redis.Redis
ZltxOrder *zltx.ZltxOrder
Market *market.Market
RS *rs.RS
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -25,6 +27,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
}),
ZltxOrder: zltx.NewZltxOrder(c.ZLTX),
Market: market.NewMarket(c.Market),
RS: rs.NewRs(c.RS),
}
}

38
cmd/rpc/pb/transfer.proto Executable file → Normal file
View File

@ -25,10 +25,48 @@ service Transfer {
rpc zltxOrderSms(ZltxOrderSmsReq) returns(ZltxOrderSmsRes);
rpc zltxRechargeInfo(DefaultReq) returns(ZltxRechargeInfoRes);
rpc zltxRechargeProduct(DefaultReq) returns(ZltxRechargeProductRes);
rpc zltxRsMiXue(RsCouponGrantReq) returns(RsCouponGrantRes);
rpc marketKeySend(MarketKeySendReq) returns(MarketKeySendRes);
rpc marketKeyDiscard(MarketKeyDiscardReq) returns(MarketKeyDiscardRes);
rpc marketQuery(MarketQueryReq) returns(MarketQueryRes);
rpc rsCouponGrant(RsCouponGrantReq) returns(RsCouponGrantRes);
}
message RsCouponGrantReq {
string vendorNo = 1;
Data data = 2;
message Data{
string sipOrderNo = 1;
string voucherTag = 2;
int32 accountType = 3;
string accountNo = 4;
string accountInfo = 5;
int32 num = 6;
}
string sign = 3;
}
message RsCouponGrantRes {
string code = 1;
string message = 2;
string status = 3;
Data data = 4;
message Data {
string sipOrderNo = 1;
string vendorOrderNo = 2;
repeated VoucherInfo voucherInfo = 3;
message VoucherInfo {
string voucherCode = 1;
string voucherPassword = 2;
string voucherDesc = 3;
string qrCodeUrl = 4;
string startTime = 5;
string endTime = 6;
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -26,9 +26,11 @@ const (
Transfer_ZltxOrderSms_FullMethodName = "/transfer.Transfer/zltxOrderSms"
Transfer_ZltxRechargeInfo_FullMethodName = "/transfer.Transfer/zltxRechargeInfo"
Transfer_ZltxRechargeProduct_FullMethodName = "/transfer.Transfer/zltxRechargeProduct"
Transfer_ZltxRsMiXue_FullMethodName = "/transfer.Transfer/zltxRsMiXue"
Transfer_MarketKeySend_FullMethodName = "/transfer.Transfer/marketKeySend"
Transfer_MarketKeyDiscard_FullMethodName = "/transfer.Transfer/marketKeyDiscard"
Transfer_MarketQuery_FullMethodName = "/transfer.Transfer/marketQuery"
Transfer_RsCouponGrant_FullMethodName = "/transfer.Transfer/rsCouponGrant"
)
// TransferClient is the client API for Transfer service.
@ -42,9 +44,11 @@ type TransferClient interface {
ZltxOrderSms(ctx context.Context, in *ZltxOrderSmsReq, opts ...grpc.CallOption) (*ZltxOrderSmsRes, error)
ZltxRechargeInfo(ctx context.Context, in *DefaultReq, opts ...grpc.CallOption) (*ZltxRechargeInfoRes, error)
ZltxRechargeProduct(ctx context.Context, in *DefaultReq, opts ...grpc.CallOption) (*ZltxRechargeProductRes, error)
ZltxRsMiXue(ctx context.Context, in *RsCouponGrantReq, opts ...grpc.CallOption) (*RsCouponGrantRes, error)
MarketKeySend(ctx context.Context, in *MarketKeySendReq, opts ...grpc.CallOption) (*MarketKeySendRes, error)
MarketKeyDiscard(ctx context.Context, in *MarketKeyDiscardReq, opts ...grpc.CallOption) (*MarketKeyDiscardRes, error)
MarketQuery(ctx context.Context, in *MarketQueryReq, opts ...grpc.CallOption) (*MarketQueryRes, error)
RsCouponGrant(ctx context.Context, in *RsCouponGrantReq, opts ...grpc.CallOption) (*RsCouponGrantRes, error)
}
type transferClient struct {
@ -118,6 +122,15 @@ func (c *transferClient) ZltxRechargeProduct(ctx context.Context, in *DefaultReq
return out, nil
}
func (c *transferClient) ZltxRsMiXue(ctx context.Context, in *RsCouponGrantReq, opts ...grpc.CallOption) (*RsCouponGrantRes, error) {
out := new(RsCouponGrantRes)
err := c.cc.Invoke(ctx, Transfer_ZltxRsMiXue_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *transferClient) MarketKeySend(ctx context.Context, in *MarketKeySendReq, opts ...grpc.CallOption) (*MarketKeySendRes, error) {
out := new(MarketKeySendRes)
err := c.cc.Invoke(ctx, Transfer_MarketKeySend_FullMethodName, in, out, opts...)
@ -145,6 +158,15 @@ func (c *transferClient) MarketQuery(ctx context.Context, in *MarketQueryReq, op
return out, nil
}
func (c *transferClient) RsCouponGrant(ctx context.Context, in *RsCouponGrantReq, opts ...grpc.CallOption) (*RsCouponGrantRes, error) {
out := new(RsCouponGrantRes)
err := c.cc.Invoke(ctx, Transfer_RsCouponGrant_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// TransferServer is the server API for Transfer service.
// All implementations must embed UnimplementedTransferServer
// for forward compatibility
@ -156,9 +178,11 @@ type TransferServer interface {
ZltxOrderSms(context.Context, *ZltxOrderSmsReq) (*ZltxOrderSmsRes, error)
ZltxRechargeInfo(context.Context, *DefaultReq) (*ZltxRechargeInfoRes, error)
ZltxRechargeProduct(context.Context, *DefaultReq) (*ZltxRechargeProductRes, error)
ZltxRsMiXue(context.Context, *RsCouponGrantReq) (*RsCouponGrantRes, error)
MarketKeySend(context.Context, *MarketKeySendReq) (*MarketKeySendRes, error)
MarketKeyDiscard(context.Context, *MarketKeyDiscardReq) (*MarketKeyDiscardRes, error)
MarketQuery(context.Context, *MarketQueryReq) (*MarketQueryRes, error)
RsCouponGrant(context.Context, *RsCouponGrantReq) (*RsCouponGrantRes, error)
mustEmbedUnimplementedTransferServer()
}
@ -187,6 +211,9 @@ func (UnimplementedTransferServer) ZltxRechargeInfo(context.Context, *DefaultReq
func (UnimplementedTransferServer) ZltxRechargeProduct(context.Context, *DefaultReq) (*ZltxRechargeProductRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method ZltxRechargeProduct not implemented")
}
func (UnimplementedTransferServer) ZltxRsMiXue(context.Context, *RsCouponGrantReq) (*RsCouponGrantRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method ZltxRsMiXue not implemented")
}
func (UnimplementedTransferServer) MarketKeySend(context.Context, *MarketKeySendReq) (*MarketKeySendRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method MarketKeySend not implemented")
}
@ -196,6 +223,9 @@ func (UnimplementedTransferServer) MarketKeyDiscard(context.Context, *MarketKeyD
func (UnimplementedTransferServer) MarketQuery(context.Context, *MarketQueryReq) (*MarketQueryRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method MarketQuery not implemented")
}
func (UnimplementedTransferServer) RsCouponGrant(context.Context, *RsCouponGrantReq) (*RsCouponGrantRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method RsCouponGrant not implemented")
}
func (UnimplementedTransferServer) mustEmbedUnimplementedTransferServer() {}
// UnsafeTransferServer may be embedded to opt out of forward compatibility for this service.
@ -335,6 +365,24 @@ func _Transfer_ZltxRechargeProduct_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _Transfer_ZltxRsMiXue_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RsCouponGrantReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TransferServer).ZltxRsMiXue(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Transfer_ZltxRsMiXue_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TransferServer).ZltxRsMiXue(ctx, req.(*RsCouponGrantReq))
}
return interceptor(ctx, in, info, handler)
}
func _Transfer_MarketKeySend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(MarketKeySendReq)
if err := dec(in); err != nil {
@ -389,6 +437,24 @@ func _Transfer_MarketQuery_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _Transfer_RsCouponGrant_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RsCouponGrantReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TransferServer).RsCouponGrant(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Transfer_RsCouponGrant_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TransferServer).RsCouponGrant(ctx, req.(*RsCouponGrantReq))
}
return interceptor(ctx, in, info, handler)
}
// Transfer_ServiceDesc is the grpc.ServiceDesc for Transfer service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -424,6 +490,10 @@ var Transfer_ServiceDesc = grpc.ServiceDesc{
MethodName: "zltxRechargeProduct",
Handler: _Transfer_ZltxRechargeProduct_Handler,
},
{
MethodName: "zltxRsMiXue",
Handler: _Transfer_ZltxRsMiXue_Handler,
},
{
MethodName: "marketKeySend",
Handler: _Transfer_MarketKeySend_Handler,
@ -436,6 +506,10 @@ var Transfer_ServiceDesc = grpc.ServiceDesc{
MethodName: "marketQuery",
Handler: _Transfer_MarketQuery_Handler,
},
{
MethodName: "rsCouponGrant",
Handler: _Transfer_RsCouponGrant_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "transfer.proto",

View File

@ -2,38 +2,34 @@ package mq
import (
"context"
"encoding/json"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"trasfer_middleware/until/mq"
)
type RocketMq struct {
}
func (n RocketMq) Produce(host []string, groupName string, topic string, log interface{}, delayTime int) (string, error) {
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver(host)),
producer.WithRetry(2), //指定重试次数
producer.WithGroupName(groupName),
)
defer p.Shutdown()
if err != nil {
return "", fmt.Errorf("rockmq error:%v", err)
}
if err = p.Start(); err != nil {
return "", fmt.Errorf("rockmq error:%v", err)
}
var body, _ = json.Marshal(log)
// 构建一个消息
message := primitive.NewMessage(topic, body)
// 给message设置延迟级别
message.WithDelayTimeLevel(delayTime)
res, err := p.SendSync(context.Background(), message)
if err != nil {
fmt.Errorf("rockmq send message fail:%v", err)
}
return res.String(), nil
type AliyunRocketMq struct {
AccessKey string
SecretKey string
SecurityToken string
ServerAddress []string
}
func (n *AliyunRocketMq) Produce(c context.Context, topic string, body []byte) error {
p, err := mq.NewProducer(n.ServerAddress[0], mq.WithProducerCredentials(n.AccessKey, n.SecretKey, n.SecurityToken))
if err != nil {
return err
}
err = p.Start()
if err != nil {
return err
}
err = p.SendSync(c, topic, body)
if err != nil {
return err
}
p.Shutdown()
return nil
}

View File

@ -28,9 +28,9 @@ func main() {
mqSv := mqServer.NewRocketmq(&c.Mq)
err := mqSv.Consume(res)
err := mqSv.Consume(ctx, res)
if err != nil {
sysLog.ErrLog(ctx, err)
sysLog.ErrQueueLog(ctx, err)
panic(err)
}
}

11
cmd/rpc/transfer.go Executable file → Normal file
View File

@ -4,19 +4,18 @@ import (
"flag"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"github.com/zeromicro/zero-contrib/zrpc/registry/nacos"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"strconv"
"strings"
"trasfer_middleware/cmd/rpc/internal/config"
"trasfer_middleware/cmd/rpc/internal/server"
"trasfer_middleware/cmd/rpc/internal/svc"
"trasfer_middleware/cmd/rpc/pb/transfer"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var configFile = flag.String("f", "../../config/transfer.yaml", "the config file")

View File

@ -1,2 +0,0 @@
{"@timestamp":"2024-06-12T10:43:56.711+08:00","caller":"queue/queue.go:31","content":"errlog:%!(EXTRA []interface {}=[the topic=ZLTX route info not found, it may not exist])","level":"error"}
{"@timestamp":"2024-06-12T10:44:29.461+08:00","caller":"queue/queue.go:31","content":"errlog:%!(EXTRA []interface {}=[the topic=ZLTX route info not found, it may not exist])","level":"error"}

0
genModel/serverMiddleMarketLogsModel.go Executable file → Normal file
View File

0
genModel/serverMiddleMarketLogsModel_gen.go Executable file → Normal file
View File

View File

@ -0,0 +1,29 @@
package genModel
import "github.com/zeromicro/go-zero/core/stores/sqlx"
var _ ServerMiddleRsLogsModel = (*customServerMiddleRsLogsModel)(nil)
type (
// ServerMiddleRsLogsModel is an interface to be customized, add more methods here,
// and implement the added methods in customServerMiddleRsLogsModel.
ServerMiddleRsLogsModel interface {
serverMiddleRsLogsModel
withSession(session sqlx.Session) ServerMiddleRsLogsModel
}
customServerMiddleRsLogsModel struct {
*defaultServerMiddleRsLogsModel
}
)
// NewServerMiddleRsLogsModel returns a model for the database table.
func NewServerMiddleRsLogsModel(conn sqlx.SqlConn) ServerMiddleRsLogsModel {
return &customServerMiddleRsLogsModel{
defaultServerMiddleRsLogsModel: newServerMiddleRsLogsModel(conn),
}
}
func (m *customServerMiddleRsLogsModel) withSession(session sqlx.Session) ServerMiddleRsLogsModel {
return NewServerMiddleRsLogsModel(sqlx.NewSqlConnFromSession(session))
}

View File

@ -0,0 +1,90 @@
// Code generated by goctl. DO NOT EDIT.
package genModel
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/stringx"
)
var (
serverMiddleRsLogsFieldNames = builder.RawFieldNames(&ServerMiddleRsLogs{})
serverMiddleRsLogsRows = strings.Join(serverMiddleRsLogsFieldNames, ",")
serverMiddleRsLogsRowsExpectAutoSet = strings.Join(stringx.Remove(serverMiddleRsLogsFieldNames, "`log_id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
serverMiddleRsLogsRowsWithPlaceHolder = strings.Join(stringx.Remove(serverMiddleRsLogsFieldNames, "`log_id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
)
type (
serverMiddleRsLogsModel interface {
Insert(ctx context.Context, data *ServerMiddleRsLogs) (sql.Result, error)
FindOne(ctx context.Context, logId uint64) (*ServerMiddleRsLogs, error)
Update(ctx context.Context, data *ServerMiddleRsLogs) error
Delete(ctx context.Context, logId uint64) error
}
defaultServerMiddleRsLogsModel struct {
conn sqlx.SqlConn
table string
}
ServerMiddleRsLogs struct {
LogId uint64 `db:"log_id"`
Url string `db:"url"`
Code int64 `db:"code"`
Data string `db:"data"`
Resp string `db:"resp"`
CreateTime time.Time `db:"create_time"`
UpdateTime time.Time `db:"update_time"`
Status int64 `db:"status"`
}
)
func newServerMiddleRsLogsModel(conn sqlx.SqlConn) *defaultServerMiddleRsLogsModel {
return &defaultServerMiddleRsLogsModel{
conn: conn,
table: "`server_middle_rs_logs`",
}
}
func (m *defaultServerMiddleRsLogsModel) Delete(ctx context.Context, logId uint64) error {
query := fmt.Sprintf("delete from %s where `log_id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, query, logId)
return err
}
func (m *defaultServerMiddleRsLogsModel) FindOne(ctx context.Context, logId uint64) (*ServerMiddleRsLogs, error) {
query := fmt.Sprintf("select %s from %s where `log_id` = ? limit 1", serverMiddleRsLogsRows, m.table)
var resp ServerMiddleRsLogs
err := m.conn.QueryRowCtx(ctx, &resp, query, logId)
switch err {
case nil:
return &resp, nil
case sqlx.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
func (m *defaultServerMiddleRsLogsModel) Insert(ctx context.Context, data *ServerMiddleRsLogs) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?)", m.table, serverMiddleRsLogsRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.Url, data.Code, data.Data, data.Resp, data.Status)
return ret, err
}
func (m *defaultServerMiddleRsLogsModel) Update(ctx context.Context, data *ServerMiddleRsLogs) error {
query := fmt.Sprintf("update %s set %s where `log_id` = ?", m.table, serverMiddleRsLogsRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.Url, data.Code, data.Data, data.Resp, data.Status, data.LogId)
return err
}
func (m *defaultServerMiddleRsLogsModel) tableName() string {
return m.table
}

0
genModel/serverMiddleZltxLogsModel.go Executable file → Normal file
View File

0
genModel/serverMiddleZltxLogsModel_gen.go Executable file → Normal file
View File

9
go.mod Executable file → Normal file
View File

@ -3,7 +3,6 @@ module trasfer_middleware
go 1.21
require (
gitee.com/chengdu_blue_brothers/openapi-go-sdk v0.0.2
github.com/apache/rocketmq-client-go/v2 v2.1.2
github.com/jinzhu/copier v0.4.0
github.com/nacos-group/nacos-sdk-go/v2 v2.2.3
@ -12,6 +11,10 @@ require (
github.com/streadway/amqp v1.1.0
github.com/zeromicro/go-zero v1.6.5
github.com/zeromicro/zero-contrib/zrpc/registry/nacos v0.0.0-20231030135404-af9ae855016f
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/exporters/jaeger v1.17.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.34.1
)
@ -87,16 +90,12 @@ require (
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect

2
go.sum Executable file → Normal file
View File

@ -723,8 +723,6 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU=
gitee.com/chengdu_blue_brothers/openapi-go-sdk v0.0.2 h1:f4Rj4jVshXYX7wl7aIrd7W9DbGfqxYrZ/c7ppoVL4a4=
gitee.com/chengdu_blue_brothers/openapi-go-sdk v0.0.2/go.mod h1:OEBHFTBQOvsJGzLyMZS8K98F8aZHWg+O8Stycuh94Dk=
gitee.com/travelliu/dm v1.8.11192/go.mod h1:DHTzyhCrM843x9VdKVbZ+GKXGRbKM2sJ4LxihRxShkE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=

View File

@ -1,17 +0,0 @@
{"@timestamp":"2024-06-12T10:39:16.749+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003aef38200001, offsetMsgId=C0A86E5D00002A9F000000000000EB1E, queueOffset=8, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:25.182+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af130080002, offsetMsgId=C0A86E5D00002A9F000000000000F380, queueOffset=9, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:34.998+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af153300003, offsetMsgId=C0A86E5D00002A9F000000000000F766, queueOffset=10, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:35.768+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af157180004, offsetMsgId=C0A86E5D00002A9F000000000000FB4C, queueOffset=11, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:36.456+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af15b000005, offsetMsgId=C0A86E5D00002A9F000000000000FF32, queueOffset=12, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:37.195+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af15ee80006, offsetMsgId=C0A86E5D00002A9F0000000000010318, queueOffset=13, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:37.735+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af15ee80007, offsetMsgId=C0A86E5D00002A9F00000000000106FE, queueOffset=14, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:38.383+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af162d00008, offsetMsgId=C0A86E5D00002A9F0000000000010AE4, queueOffset=15, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:38.992+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af162d00009, offsetMsgId=C0A86E5D00002A9F0000000000010ECA, queueOffset=16, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:39.515+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af166b8000a, offsetMsgId=C0A86E5D00002A9F00000000000112B0, queueOffset=17, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:40.147+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af16aa0000b, offsetMsgId=C0A86E5D00002A9F0000000000011B13, queueOffset=18, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:40.687+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af16aa0000c, offsetMsgId=C0A86E5D00002A9F0000000000011EF9, queueOffset=19, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:41.310+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af16e88000d, offsetMsgId=C0A86E5D00002A9F00000000000122DF, queueOffset=20, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:41:41.851+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af16e88000e, offsetMsgId=C0A86E5D00002A9F00000000000126C5, queueOffset=21, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:44:00.912+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af38d80000f, offsetMsgId=C0A86E5D00002A9F0000000000012AAB, queueOffset=22, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T10:49:45.709+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003af8d1280010, offsetMsgId=C0A86E5D00002A9F0000000000012E91, queueOffset=23, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}
{"@timestamp":"2024-06-12T13:33:05.542+08:00","caller":"market/market.go:95","content":"mq:[SendResult [sendStatus=0, msgIds=AC1A15CC3879000000003b8e5a680011, offsetMsgId=C0A86E5D00002A9F0000000000013277, queueOffset=24, messageQueue=MessageQueue [topic=MARKET, brokerName=broker-a, queueId=1]]]","level":"info"}

14
sh/create.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
IMAGE="transfer_middleware:v1"
RPC_CONTAINER="transfer_middleware"
RPC_PORT="10001"
V_REFLECT=""
docker build -t "${IMAGE}" . --no-cache
docker stop "${RPC_CONTAINER}"
docker rm "${RPC_CONTAINER}"
docker run -it -p "${RPC_PORT}:${RPC_PORT}" --name "$RPC_CONTAINER" "${IMAGE}"

9
sh/startup.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/bash
# supervisord
supervisord -c /etc/supervisord.conf

29
sh/supervisord.conf Executable file
View File

@ -0,0 +1,29 @@
; supervisor config file
[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.
[include]
files = /src/sh/supervisord_include/*.conf

View File

@ -0,0 +1,39 @@
[program:rpc]
directory=/src/cmd/rpc
# 执行的命令
command=/src/cmd/rpc/transfer
#在 supervisord 启动的时候也自动启动
autorstart=false
#程序异常退出后自动重启
autorestart=true
#启动 5 秒后没有异常退出,就当作已经正常启动了
startsecs=5
#启动失败自动重试次数,默认是 3
startretries=3
#把 stderr 重定向到 stdout默认 false
redirect_stderr=false
#stdout 日志文件大小,默认 50MB
stdout_logfile_maxbytes = 20MB
#stdout 日志文件备份数
stdout_logfile_backups = 20
[program:queue]
directory=/src/cmd/rpc/queue
# 执行的命令
command=/src/cmd/rpc/queue/queue
#在 supervisord 启动的时候也自动启动
autorstart=false
#程序异常退出后自动重启
autorestart=true
#启动 5 秒后没有异常退出,就当作已经正常启动了
startsecs=5
#启动失败自动重试次数,默认是 3
startretries=3
#把 stderr 重定向到 stdout默认 false
redirect_stderr=false
#stdout 日志文件大小,默认 50MB
stdout_logfile_maxbytes = 20MB
#stdout 日志文件备份数
stdout_logfile_backups = 20

156
test/encrypt.go Normal file
View File

@ -0,0 +1,156 @@
package test
import (
"bytes"
"crypto/aes"
"crypto/cipher"
crand "crypto/rand"
"encoding/base64"
"io"
"math/rand"
"unsafe"
)
const lettersString = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
// 字符串长度
const number = 16
/*
16位码前15位随机字符串最后一位通过前15位字符串计算校验生成
*/
func LotteryEncryptEncode() string {
b := make([]byte, number)
var sum byte
for i := 0; i < number-1; i++ {
b[i] = lettersString[rand.Int63()%int64(len(lettersString))]
sum += b[i]
}
b[number-1] = lettersString[sum%byte(len(lettersString))]
return *(*string)(unsafe.Pointer(&b))
}
func LotteryEncryptDecode(str string) bool {
if len(str) != number {
return false
}
var sum byte
for i := 0; i < len(str)-1; i++ {
sum += str[i]
}
if lettersString[sum%byte(len(lettersString))] != str[len(str)-1] {
return false
}
return true
}
// =================== CBC ======================
func AesEncryptCBC(origData []byte, key []byte) (str string) {
// 分组秘钥
// NewCipher该函数限制了输入k的长度必须为16, 24或者32
block, _ := aes.NewCipher(key)
blockSize := block.BlockSize() // 获取秘钥块的长度
origData = pkcs5Padding(origData, blockSize) // 补全码
blockMode := cipher.NewCBCEncrypter(block, key[:blockSize]) // 加密模式
encrypted := make([]byte, len(origData)) // 创建数组
blockMode.CryptBlocks(encrypted, origData) // 加密
return base64.StdEncoding.EncodeToString(encrypted)
}
func AesDecryptCBC(data string, key []byte) (decrypted []byte) {
encrypted, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return
}
block, _ := aes.NewCipher(key) // 分组秘钥
blockSize := block.BlockSize() // 获取秘钥块的长度
blockMode := cipher.NewCBCDecrypter(block, key[:blockSize]) // 加密模式
decrypted = make([]byte, len(encrypted)) // 创建数组
blockMode.CryptBlocks(decrypted, encrypted) // 解密
decrypted = pkcs5UnPadding(decrypted) // 去除补全码
return decrypted
}
func pkcs5Padding(ciphertext []byte, blockSize int) []byte {
padding := blockSize - len(ciphertext)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(ciphertext, padtext...)
}
func pkcs5UnPadding(origData []byte) []byte {
length := len(origData)
unpadding := int(origData[length-1])
return origData[:(length - unpadding)]
}
// =================== ECB ======================
func AesEncryptECB(origData []byte, key []byte) (encrypted []byte) {
cipher, _ := aes.NewCipher(generateKey(key))
length := (len(origData) + aes.BlockSize) / aes.BlockSize
plain := make([]byte, length*aes.BlockSize)
copy(plain, origData)
pad := byte(len(plain) - len(origData))
for i := len(origData); i < len(plain); i++ {
plain[i] = pad
}
encrypted = make([]byte, len(plain))
// 分组分块加密
for bs, be := 0, cipher.BlockSize(); bs <= len(origData); bs, be = bs+cipher.BlockSize(), be+cipher.BlockSize() {
cipher.Encrypt(encrypted[bs:be], plain[bs:be])
}
return encrypted
}
func AesDecryptECB(encrypted []byte, key []byte) (decrypted []byte) {
cipher, _ := aes.NewCipher(generateKey(key))
decrypted = make([]byte, len(encrypted))
//
for bs, be := 0, cipher.BlockSize(); bs < len(encrypted); bs, be = bs+cipher.BlockSize(), be+cipher.BlockSize() {
cipher.Decrypt(decrypted[bs:be], encrypted[bs:be])
}
trim := 0
if len(decrypted) > 0 {
trim = len(decrypted) - int(decrypted[len(decrypted)-1])
}
return decrypted[:trim]
}
func generateKey(key []byte) (genKey []byte) {
genKey = make([]byte, 16)
copy(genKey, key)
for i := 16; i < len(key); {
for j := 0; j < 16 && i < len(key); j, i = j+1, i+1 {
genKey[j] ^= key[i]
}
}
return genKey
}
// =================== CFB ======================
func AesEncryptCFB(origData []byte, key []byte) (encrypted []byte) {
block, err := aes.NewCipher(key)
if err != nil {
panic(err)
}
encrypted = make([]byte, aes.BlockSize+len(origData))
iv := encrypted[:aes.BlockSize]
if _, err := io.ReadFull(crand.Reader, iv); err != nil {
panic(err)
}
stream := cipher.NewCFBEncrypter(block, iv)
stream.XORKeyStream(encrypted[aes.BlockSize:], origData)
return encrypted
}
func AesDecryptCFB(encrypted []byte, key []byte) (decrypted []byte) {
block, _ := aes.NewCipher(key)
if len(encrypted) < aes.BlockSize {
panic("ciphertext too short")
}
iv := encrypted[:aes.BlockSize]
encrypted = encrypted[aes.BlockSize:]
stream := cipher.NewCFBDecrypter(block, iv)
stream.XORKeyStream(encrypted, encrypted)
return encrypted
}

86
test/market/config.go Normal file
View File

@ -0,0 +1,86 @@
package market
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"qteam/config"
)
type MarketClient struct {
cfg config.MarketConfig
}
type MarketSendRequest struct {
AppId string `json:"app_id"` //APP ID
Sign string `json:"sign"` //签名
ReqCode string `json:"req_code"` //固定值voucher.create
MemId string `json:"mem_id"` //商户号
ReqSerialNo string `json:"req_serial_no"` //请求唯一流水号 最大32位
TimeTamp string `json:"timestamp"` //时间戳 yyyyMMddHHmmss
PosId string `json:"pos_id"` //商户方平台号
VoucherId string `json:"voucher_id"` //制码批次号
VoucherNum int `json:"voucher_num"` //请券数量,默认是 1
MobileNo string `json:"mobile_no"` //11 手机号,可传空字符串
SendMsg string `json:"send_msg"` //是否发送短信2- 发送 1-不发送
}
type MarketSenResponse struct {
VoucherId string `json:"voucher_id"` //制码批次号
VoucherCode string `json:"voucher_code"` //券码
ShortUrl string `json:"short_url"` //含二维码、条码的短链接
VoucherSdate string `json:"voucher_sdate"` //有效期起
VoucherEdate string `json:"voucher_edate"` //有效期止
CodeType string `json:"code_type"` //码类型: 00- 代金券 01- 满减券
}
type MarketResponse struct {
ErrCode string `json:"errCode"` //00-成功 其他:失败
Msg string `json:"msg"` //描 述 (失败时必填)
Data MarketSenResponse `json:"data"`
}
func (this *MarketSendRequest) toMap() (resultMap map[string]interface{}) {
// Marshal the struct to JSON, ignoring omitempty fields.
jsonBytes, err := json.Marshal(this)
if err != nil {
return
}
// Unmarshal the JSON into a map to get the final result.
err = json.Unmarshal(jsonBytes, &resultMap)
if err != nil {
return
}
return resultMap
}
func (this *MarketClient) doPost(url string, jsonBytes []byte) (body []byte, err error) {
// 创建POST请求
url = this.cfg.Host + url
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
return
}
// 设置Content-Type头
req.Header.Set("Content-Type", "application/json")
// 创建HTTP客户端
client := &http.Client{}
// 发送请求并处理响应
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
// 读取响应体
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return
}
return
}

60
test/market/market_api.go Normal file
View File

@ -0,0 +1,60 @@
package market
import (
"encoding/json"
"qteam/app/utils/encrypt"
"qteam/config"
"time"
)
func NewMarketClient(cfg config.MarketConfig) *MarketClient {
cfg.Sign = "-----BEGIN RSA PRIVATE KEY-----\n" + cfg.Sign + "\n-----END RSA PRIVATE KEY-----"
return &MarketClient{
cfg: cfg,
}
}
/*
MarketSend
券码生成接口
- 请求地址/openApi/v1/market/key/send
- 说明发券接口应支持使用同一流水号进行重复请求当调用该接口失败时 以使用同一流水号进行再次请求接口需要根据请求的流水号进行判断若无该流水 号的券码信息则新生成后返回若有该流水号的券码信息则直接返回该券码的信息
orderNo: 订单号
VoucherId: 制码批次号
MobileNo: 11 手机号可传空字符串
SendMsg: 是否发送短信2- 发送 1-不发送
*/
func (this *MarketClient) MarketSend(orderNo, VoucherId, MobileNo, SendMsg string) (res MarketResponse, err error) {
url := "/openApi/v1/market/key/send"
request := MarketSendRequest{
AppId: this.cfg.AppId,
ReqCode: this.cfg.ReqCode,
MemId: this.cfg.MemId,
PosId: this.cfg.PosId,
TimeTamp: time.Now().Format("20060102150405"),
VoucherId: VoucherId,
ReqSerialNo: orderNo,
VoucherNum: 1,
MobileNo: MobileNo,
SendMsg: SendMsg,
}
request.Sign, err = MakeRsaSign(this.cfg.Sign, request.toMap())
if err != nil {
return res, err
}
bytes, err := json.Marshal(request)
if err != nil {
return res, err
}
data, err := this.doPost(url, bytes)
if err != nil {
return res, err
}
err = json.Unmarshal(data, &res)
// 加密
res.Data.ShortUrl = encrypt.AesEncryptCBC([]byte(res.Data.ShortUrl), []byte(this.cfg.SecretKey))
return res, err
}

View File

@ -0,0 +1,33 @@
package market
import (
"fmt"
"github.com/qit-team/snow-core/kernel/server"
"os"
"qteam/app/utils"
"qteam/config"
"testing"
)
func TestMarketSendRequest_Market(t *testing.T) {
opts := config.GetOptions()
if opts.ShowVersion {
fmt.Printf("%s\ncommit %s\nbuilt on %s\n", server.Version, server.BuildCommit, server.BuildDate)
os.Exit(0)
}
//加载配置
conf, err := config.Load(opts.ConfFile)
if err != nil {
utils.Log(nil, "err", err.Error())
return
}
client := NewMarketClient(conf.OpenApiMarketConfig)
data, err := client.MarketSend("123456789111", "1717567048171", "", "2")
if err != nil {
t.Error(err)
}
t.Log(data)
}

137
test/market/rsa.go Normal file
View File

@ -0,0 +1,137 @@
package market
import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"sort"
)
// getSignString 使用 xx=aa&yy=bb 的字符串拼接
func getSignString(data map[string]interface{}) string {
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
signString := ""
separator := ""
for _, key := range keys {
value := data[key]
if key == "sign" || value == nil {
continue
}
signString += fmt.Sprintf("%s%s=%v", separator, key, value)
separator = "&"
}
return signString
}
// VerifyRsaSign 签名验证
func VerifyRsaSign(publicKey string, data map[string]interface{}) (map[string]interface{}, error) {
// 对 sign nonce timestamp appId 升序排序
// 使用 xx=aa&yy=bb 的字符串拼接
// 商户的公钥验签 RSA2验签
signString := getSignString(data)
rsaPubKey, err := parseRSAPublicKeyFromPEM([]byte(publicKey))
if err != nil {
return nil, err
}
signature, err := base64.StdEncoding.DecodeString(data["sign"].(string))
if err != nil {
return nil, err
}
hashed := sha256.Sum256([]byte(signString))
err = rsa.VerifyPKCS1v15(rsaPubKey, crypto.SHA256, hashed[:], signature)
if err != nil {
return nil, errors.New("签名验证失败")
}
return data, nil
}
// MakeRsaSign 生成签名
func MakeRsaSign(privateKey string, data map[string]interface{}) (string, error) {
// 对 sign nonce timestamp appId 升序排序
// 使用 xx=aa&yy=bb 的字符串拼接
// 营销系统生成的私钥生成签名 RSA2加签
signString := getSignString(data)
privKey, err := parseRSAPrivateKeyFromPEM([]byte(privateKey))
if err != nil {
return "", errors.New("私钥解析失败")
}
hashed := sha256.Sum256([]byte(signString))
signature, err := rsa.SignPKCS1v15(rand.Reader, privKey, crypto.SHA256, hashed[:])
if err != nil {
return "", errors.New("签名失败")
}
return base64.StdEncoding.EncodeToString(signature), nil
}
// ParseRSAPrivateKeyFromPEM 解析私钥
func parseRSAPrivateKeyFromPEM(key []byte) (*rsa.PrivateKey, error) {
var err error
// Parse PEM block
var block *pem.Block
if block, _ = pem.Decode(key); block == nil {
return nil, errors.New("私钥解析失败: 无效的PEM格式")
}
var parsedKey interface{}
if parsedKey, err = x509.ParsePKCS1PrivateKey(block.Bytes); err != nil {
if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil {
return nil, err
}
}
var pkey *rsa.PrivateKey
var ok bool
if pkey, ok = parsedKey.(*rsa.PrivateKey); !ok {
return nil, errors.New("密钥不是有效的RSA私钥")
}
return pkey, nil
}
// parseRSAPublicKeyFromPEM parses a PEM encoded PKCS1 or PKCS8 public key
func parseRSAPublicKeyFromPEM(key []byte) (*rsa.PublicKey, error) {
var err error
// Parse PEM block
var block *pem.Block
if block, _ = pem.Decode(key); block == nil {
return nil, errors.New("公钥解析失败: 无效的PEM格式")
}
// Parse the key
var parsedKey interface{}
if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil {
if cert, err := x509.ParseCertificate(block.Bytes); err == nil {
parsedKey = cert.PublicKey
} else {
return nil, err
}
}
var pkey *rsa.PublicKey
var ok bool
if pkey, ok = parsedKey.(*rsa.PublicKey); !ok {
return nil, errors.New("密钥不是有效的RSA公钥")
}
return pkey, nil
}

5
until/Dockerfile Normal file
View File

@ -0,0 +1,5 @@
FROM alpine:latest AS runtime
RUN apk update && apk add supervisor
RUN apk add make
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
ENV TZ Asia/Shanghai

76
until/mq/README.md Normal file
View File

@ -0,0 +1,76 @@
# 事件总线
统一的事件总线低层使用RocketMQ进一步封装约束命名规范、权限验证、链路追踪等
## 发送
[参考:](/event/producer_test.go)
1. 生成发送实例
```go
producer, err := event.NewProducer("192.168.6.107:9876")
// 带有验证凭证的示例
producer, err := NewProducer("192.168.6.107:9876", producer.WithProducerCredentials("accessKey", "secretKey", "securityToken"))
```
生成时可选的配置项有:
* event.WithProducerCredentials 设置生产者的凭证
2. 在Data层中注入生成的实例
3. 在biz中声明event的interface
4. 在data层中实现biz声明的interface同repository类似
5. 使用的方法:
* SendSync 同步发送
* SendAsync 异步发送
* BatchSendSync 批量同步发送
* BatchSendAsync 批量异步发送
6. 默认支持链路追踪如要关闭请调用DisableTraceTelemetry方法
## 消费
[参考:](/event/producer_test.go)
### 一、随着Kratos服务一起启动
1. 在server中新增consumer.go
```go
// ConsumerServer 消费者Server
type ConsumerServer struct {
manager *event.ConsumerManager
}
// NewConsumerServer 工厂方法
func NewConsumerServer(
//注入一些依赖的对象包括biz同http、grpc的Sever类似
) *ConsumerServer {
manager := event.NewConsumerManager()
// 添加一些订阅方法,示例:
_ = manager.Subscribe(ctx, connConf, consumerConf, func(message *ConsumerMessage) error {
// mock 业务耗时
time.Sleep(10 * time.Second)
// 返回nil才会commit否则会重试
return nil
})
return &ConsumerServer{manager:manager}
}
func (c *ConsumerServer) Start(ctx context.Context) error {
return c.manager.Start()
}
func (c *ConsumerServer) Stop(ctx context.Context) error {
return c.manager.Stop()
}
```
2. 添加进provider_set.go
3. 打开main.go
```go
// 在newApp方法中注入ConsumerServer,并添加到
serverOption := kratos.Server(
//在这里面添加ConsumerServer的实例
)
```
### 二、使用cli模式启动
参考上面的启动方式顺序为subscribe -> start -> stop

View File

@ -0,0 +1,32 @@
package mq
import "github.com/apache/rocketmq-client-go/v2"
// ConsumerConfig 消费者配置
type ConsumerConfig struct {
TopicName string // 必填,主题名称
// GroupName 必填消费者分组名称填写消费的业务名称例如CreatePay-创建支付单
// 实际注册时会自动在前面加上TopicName
GroupName string
// ConsumerCnt消费者个数默认为1不能超过MQ的消费Queue个数
// 需要保证顺序消费的场景才推荐使用否则推荐配置PerCoroutineCnt参数来开启并发消费即可
ConsumerCnt int
// PerCoroutineCnt 每个consumer的协程个数默认为20它与ConsumerCnt都能实现并发消费区别在于此参数是一个消费者多个协程并发消费性能更高
// 由于是单消费者的并发处理,可能存在后拉取的比先拉取的先处理完,即无法保证严格的顺序消费要求(但大部分场景都没有顺序要求)
// SDK在收到消息时会启动一个新协程运行回调函数最大运行协程数不超过此配置
// 示例2则实际的并发数是4正常消费2个重试2个协程
PerCoroutineCnt int
// RetryCnt 消费最大重试次数间隔时间查看https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy
// 我们默认为38次目的是给异常业务方保留最多48小时去修复超过之后进入死信队列3天后会被自动删除
RetryCnt *int
// 指定消费的tag为空则消费所有tag
Tags []string
//消费者实例
pushConsumers []rocketmq.PushConsumer
}

View File

@ -0,0 +1,9 @@
package mq
// ConsumerConnConfig 消费者配置
type ConsumerConnConfig struct {
NameServers string // 必填,多个用,号隔开
AccessKey string // 连接rocketMQ的accessKey
SecretKey string // 连接rocketMQ的secretKey
SecurityToken string // 连接rocketMQ的securityToken
}

View File

@ -0,0 +1,289 @@
package mq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"os"
"strings"
"sync/atomic"
"time"
)
// ConsumerManager 消费者管理器
type ConsumerManager struct {
consumerConfigs []*ConsumerConfig
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
shutdownFlag atomic.Bool // 关闭标记
logger Logger
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
}
// NewConsumerManager 创建一个消费者管理器
func NewConsumerManager(logger Logger) *ConsumerManager {
return &ConsumerManager{logger: logger, IsOpenTelemetry: true}
}
func (c *ConsumerManager) DisableOpenTelemetry() {
c.IsOpenTelemetry = false
}
// Subscribe 订阅一个主题
func (c *ConsumerManager) Subscribe(ctx context.Context, connConf *ConsumerConnConfig, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, opts ...consumer.Option) error {
// 检查参数规范
if !c.checkGroupName(consumerConf.TopicName, consumerConf.GroupName) {
return fmt.Errorf("groupName不符合规范前缀必须是\"${topicName}_\"开头如trade_recharge_dispatcher_pay")
}
if consumerConf.RetryCnt == nil || *consumerConf.RetryCnt < 0 {
retryCnt := 38
consumerConf.RetryCnt = &retryCnt
}
if consumerConf.ConsumerCnt <= 0 {
consumerConf.ConsumerCnt = 1
}
if consumerConf.PerCoroutineCnt <= 0 {
consumerConf.PerCoroutineCnt = 20
}
credentials := primitive.Credentials{
AccessKey: connConf.AccessKey,
SecretKey: connConf.SecretKey,
SecurityToken: connConf.SecurityToken,
}
//限制groupName名称必须与topic捆绑
nameServers := strings.Split(connConf.NameServers, ",")
opts = append(opts,
consumer.WithNameServer(nameServers),
consumer.WithCredentials(credentials),
consumer.WithConsumerModel(consumer.Clustering),
// 不要开启此参数开启顺序消费通过返回consumer.ConsumeRetryLater将会失效需要自己实现重试机制
//consumer.WithConsumerOrder(true),
consumer.WithGroupName(consumerConf.GroupName),
consumer.WithConsumeGoroutineNums(consumerConf.PerCoroutineCnt),
consumer.WithRetry(*consumerConf.RetryCnt),
// 不启用批消费,多个一起消费的特点是则其中一个失败则整体都会失败
consumer.WithConsumeMessageBatchMaxSize(1),
//consumer.WithPullThresholdForQueue(50),
)
// 启动指定个数的consumer
hostName, _ := os.Hostname()
now := time.Now().Unix()
for i := 0; i < consumerConf.ConsumerCnt; i++ {
currOpts := make([]consumer.Option, len(opts))
copy(currOpts, opts)
currOpts = append(currOpts, consumer.WithInstance(fmt.Sprintf("%s:%s:%d:%d", hostName, consumerConf.GroupName, now, i+1)))
pushConsumer, err := rocketmq.NewPushConsumer(currOpts...)
if err != nil {
return err
}
selector := consumer.MessageSelector{}
// 过滤tag
if len(consumerConf.Tags) > 0 {
selector.Type = consumer.TAG
selector.Expression = strings.Join(consumerConf.Tags, " || ")
}
err = pushConsumer.Subscribe(consumerConf.TopicName, selector, func(subCtx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
return c.callbackForReceive(subCtx, consumerConf, fn, ext...)
})
if err != nil {
return err
}
consumerConf.pushConsumers = append(consumerConf.pushConsumers, pushConsumer)
}
c.consumerConfigs = append(c.consumerConfigs, consumerConf)
return nil
}
var consumerPropagator = propagation.TraceContext{}
// callbackForReceive 收到消息的回调
func (c *ConsumerManager) callbackForReceive(ctx context.Context, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, ext ...*primitive.MessageExt) (cr consumer.ConsumeResult, fnErr error) {
// 收到消息
if c.shutdownFlag.Load() {
cr = consumer.ConsumeRetryLater
fnErr = fmt.Errorf("正在退出中,延期处理:%s,%s", consumerConf.TopicName, consumerConf.GroupName)
// 卡住,不再继续消费,等待退出
// 测试发现在重试主题消费时返回retryLater有时会被commit掉导致消息丢失
time.Sleep(24 * time.Hour)
return
}
// 标记活跃状态
c.activeCnt.Add(1)
defer func() {
c.activeCnt.Add(-1)
if v := recover(); v != nil {
cr = consumer.ConsumeRetryLater
fnErr = errors.Errorf("处理消息panic, groupName=%s%+v", consumerConf.GroupName, v)
c.LogErrorf("%+v", fnErr)
return
}
}()
var tracer trace.Tracer
if c.IsOpenTelemetry {
tracer = otel.GetTracerProvider().Tracer("LSXD_Util")
}
// WithConsumeMessageBatchMaxSize 配置大于1时ext才会存在多个它的特点是要么全成功或全失败
cr = consumer.ConsumeSuccess
spanName := fmt.Sprintf("%s %s %s", consumerConf.TopicName, semconv.MessagingOperationProcess.Value.AsString(), consumerConf.GroupName)
for _, v := range ext {
func() {
message := &ConsumerMessage{
MsgId: v.MsgId,
Topic: v.Topic,
Body: v.Body,
ReconsumeTimes: v.ReconsumeTimes,
CompressedBody: v.CompressedBody,
Flag: v.Flag,
TransactionId: v.TransactionId,
Batch: v.Batch,
Compress: v.Compress,
Properties: v.GetProperties(),
}
// 链路追踪
var span trace.Span
if tracer != nil {
if traceParent, ok := message.Properties[openTelemetryPropertyName]; ok {
var mapCarrier propagation.MapCarrier = map[string]string{
openTelemetryPropertyName: traceParent,
}
ctx = consumerPropagator.Extract(ctx, mapCarrier)
}
ctx, span = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindConsumer))
span.SetAttributes(
semconv.MessagingSystem("RocketMQ"),
semconv.MessagingSourceName(consumerConf.TopicName),
semconv.MessagingRocketmqClientGroup(consumerConf.GroupName),
semconv.MessagingRocketmqMessageKeys(message.GetKeys()...),
semconv.MessagingRocketmqMessageTag(message.GetTags()),
semconv.MessagingRocketmqMessageDelayTimeLevel(message.GetDelayTimeLevel()),
semconv.MessageIDKey.String(message.MsgId),
)
defer func() {
// 记录追踪信息
spanErr := fnErr
var panicVal any
if panicVal = recover(); panicVal != nil {
spanErr = fmt.Errorf("%s消费者处理方法panic:%s", consumerConf.GroupName, panicVal)
}
if spanErr != nil {
span.RecordError(spanErr)
span.SetStatus(codes.Error, spanErr.Error())
}
span.End()
if panicVal != nil {
panic(panicVal)
}
}()
}
// 回调业务函数
currErr := fn(ctx, message)
if currErr != nil {
cr = consumer.ConsumeRetryLater
fnErr = currErr
c.LogErrorf("%s消费者处理方法返回失败,body=%s%+v", consumerConf.GroupName, string(message.Body), currErr)
}
}()
}
return
}
// Start 启动所有消费者
func (c *ConsumerManager) Start(_ context.Context) error {
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
err := pushConsumer.Start()
if err != nil {
return err
}
}
}
return nil
}
// Stop 停止所有消费者
func (c *ConsumerManager) Stop(_ context.Context) error {
fmt.Println("开始停止消费者")
c.shutdownFlag.Store(true)
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
pushConsumer.Suspend() // 似乎没起使用
}
}
fmt.Println("已suspend所有消费者")
//shutdown之间保证正在处理的消费先提交
_ = c.blockWaitFinish()
var err error = nil
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
if closeErr := pushConsumer.Shutdown(); closeErr != nil {
err = closeErr
fmt.Println("消费者shutdown失败", closeErr)
}
}
}
fmt.Println("已shutdown所有消费者")
return err
}
// blockWaitFinish 阻塞等待业务完成
func (c *ConsumerManager) blockWaitFinish() error {
// 每1s检查下业务是否都处理完成
for {
cnt := c.activeCnt.Load()
if cnt == 0 {
//无业务处理,正常退
break
} else {
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
}
time.Sleep(1 * time.Second)
}
//防止极端情况下commit未完成
// nolint
time.Sleep(1 * time.Second)
return nil
}
// LogErrorf 记录错误日志
func (c *ConsumerManager) LogErrorf(format string, args ...any) {
if c.logger != nil {
c.logger.Errorf(format, args...)
} else {
fmt.Printf(format+"\n", args...)
}
}
// checkGroupName 检查groupName是否符合规范
// 必须是以${topicName}_开头目的是
// 1. 防止相同相同groupName与多个topic的情况避免出现消费不符异常的情况
// 2. 在管理group时能更清晰地体现出对应的topic
func (c *ConsumerManager) checkGroupName(topicName string, groupName string) bool {
if groupName == "" {
return false
}
return strings.HasPrefix(groupName, topicName+"_")
}

View File

@ -0,0 +1,64 @@
package mq
import (
"github.com/apache/rocketmq-client-go/v2/primitive"
"strconv"
"strings"
)
// ConsumerMessage 消费者收到的消息
type ConsumerMessage struct {
MsgId string
Topic string
Body []byte
ReconsumeTimes int32
CompressedBody []byte
Flag int32
TransactionId string
Batch bool
Compress bool
Properties map[string]string
}
// GetKeys 获取消息的key
func (c *ConsumerMessage) GetKeys() []string {
if len(c.Properties) == 0 {
return nil
}
val, isOk := c.Properties[primitive.PropertyKeys]
if !isOk {
return nil
}
return strings.Split(val, primitive.PropertyKeySeparator)
}
// GetTags 获取消息的tag
func (c *ConsumerMessage) GetTags() string {
if len(c.Properties) == 0 {
return ""
}
// nolint
val, _ := c.Properties[primitive.PropertyTags]
return val
}
// GetShardingKey 获取消息的分区key
func (c *ConsumerMessage) GetShardingKey() string {
if len(c.Properties) == 0 {
return ""
}
// nolint
val, _ := c.Properties[primitive.PropertyShardingKey]
return val
}
// GetDelayTimeLevel 获取消息的延迟级别
func (c *ConsumerMessage) GetDelayTimeLevel() int {
if len(c.Properties) == 0 {
return 0
}
// nolint
val, _ := c.Properties[primitive.PropertyDelayTimeLevel]
level, _ := strconv.Atoi(val)
return level
}

64
until/mq/consumer_test.go Normal file
View File

@ -0,0 +1,64 @@
package mq
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
)
func TestConsumer_Start(t *testing.T) {
//initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_consumer")
manager := NewConsumerManager(nil)
ctx := context.Background()
consumerConf := &ConsumerConfig{
TopicName: "test_transfer_rs",
GroupName: "test_transfer_rs_consumer",
PerCoroutineCnt: 2,
}
connConf := &ConsumerConnConfig{
NameServers: "http://rmq-cn-j4g3sem5i05.cn-chengdu.rmq.aliyuncs.com:8080",
AccessKey: "1i3vEsceLzcYD26p",
SecretKey: "192602NYDuAHQ76a",
}
i := atomic.Int32{}
err := manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
cnt := i.Add(1)
fmt.Printf("%d开始消费%s body=%s \n", cnt, message.MsgId, message.Body)
fmt.Println(message.Properties)
// mock 业务耗时
time.Sleep(1 * time.Second)
fmt.Printf("%d已完成%s body=%s \n", cnt, message.MsgId, message.Body)
return nil
})
fmt.Println(err)
consumerConf = &ConsumerConfig{
TopicName: "test_transfer_rs",
GroupName: "test_transfer_rs_consumer",
PerCoroutineCnt: 2,
}
err = manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
cnt := i.Add(1)
fmt.Printf("%d开始消费%s body=%s \n", cnt, message.MsgId, message.Body)
fmt.Println(message.Properties)
// mock 业务耗时
time.Sleep(1 * time.Second)
fmt.Printf("%d已完成%s body=%s \n", cnt, message.MsgId, message.Body)
return nil
})
fmt.Println(err)
_ = manager.Start(context.Background())
time.Sleep(90 * time.Second)
// mock 退出
_ = manager.Stop(context.Background())
}

249
until/mq/producer.go Normal file
View File

@ -0,0 +1,249 @@
package mq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"strings"
)
type Producer struct {
ProducerClient rocketmq.Producer
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
}
// WithProducerCredentials 设置生产者的凭证
func WithProducerCredentials(accessKey, secretKey, securityToken string) producer.Option {
return producer.WithCredentials(primitive.Credentials{
AccessKey: accessKey,
SecretKey: secretKey,
SecurityToken: securityToken,
})
}
// NewProducer 创建一个生产者
// nameServer: 连接地址,多个中间用,号隔开
// opts: 配置项
func NewProducer(nameServer string, opts ...producer.Option) (*Producer, error) {
//检查参数
if nameServer == "" {
return nil, fmt.Errorf("rocketMQ NameServer 不能为空")
}
//创建生产者
nameServers := strings.Split(nameServer, ",")
opts = append(opts, producer.WithNameServer(nameServers))
p, err := rocketmq.NewProducer(opts...)
if err != nil {
fmt.Println("创建 rocketMQ producer 失败: ", err)
return nil, err
}
//此时并没有发起连接,在使用时才会连接
return &Producer{ProducerClient: p, IsOpenTelemetry: true}, nil
}
// DisableTelemetry 关闭链路追踪
func (p *Producer) DisableTelemetry() {
p.IsOpenTelemetry = false
}
// Start 启动生产者
func (p *Producer) Start() error {
return p.ProducerClient.Start()
}
// Shutdown 关闭生产者
func (p *Producer) Shutdown() error {
return p.ProducerClient.Shutdown()
}
// SendOption 发送消息选项
type SendOption func(*primitive.Message)
// WithSendKeysOption 设置消息的key
func WithSendKeysOption(keys []string) SendOption {
return func(msg *primitive.Message) {
msg.WithKeys(keys)
}
}
// WithSendShardingKeysOption 设置消息的key
func WithSendShardingKeysOption(key string) SendOption {
return func(msg *primitive.Message) {
msg.WithShardingKey(key)
}
}
// WithSendTagOption 设置消息的Tag
func WithSendTagOption(tag string) SendOption {
return func(msg *primitive.Message) {
msg.WithTag(tag)
}
}
// WithSendDelayLevelOption 设置消息的延迟级别
// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
func WithSendDelayLevelOption(level int) SendOption {
return func(msg *primitive.Message) {
msg.WithDelayTimeLevel(level)
}
}
const openTelemetryPropertyName = "traceparent"
// WithOpenTelemetryOption 设置消息的链接追踪信息
func WithOpenTelemetryOption(value string) SendOption {
return func(msg *primitive.Message) {
msg.WithProperty(openTelemetryPropertyName, value)
}
}
// WithSendWithPropertyOption 设置消息的属性
func WithSendWithPropertyOption(key, value string) SendOption {
return func(msg *primitive.Message) {
msg.WithProperty(key, value)
}
}
// SendSync 同步发送消息
// topic: 主题
// sendOptions: 发送选项如WithSendTagOption
// bodyList: 支持发送多个
func (p *Producer) SendSync(ctx context.Context, topic string, body []byte, sendOptions ...SendOption) error {
return p.BatchSendSync(ctx, topic, sendOptions, body)
}
// SendAsync 异步发送消息
// topic: 主题
// sendOptions: 发送选项如WithSendTagOption
// callbackFn: 回调函数无论成功与否都会回调失败时err!=nil
// bodyList: 支持发送多个
func (p *Producer) SendAsync(ctx context.Context, topic string, body []byte, callbackFn func(error), sendOptions ...SendOption) error {
return p.BatchSendAsync(ctx, topic, callbackFn, sendOptions, body)
}
// BatchSendSync 同步发送消息
// topic: 主题
// sendOptions: 发送选项如WithSendTagOption
// bodyList: 支持发送多个
func (p *Producer) BatchSendSync(ctx context.Context, topic string, sendOptions []SendOption, bodyList ...[]byte) error {
if err := p.checkSend(topic); err != nil {
return err
}
msgList := make([]*primitive.Message, len(bodyList))
for i, body := range bodyList {
msgList[i] = &primitive.Message{
Topic: topic,
Body: body,
}
for _, option := range sendOptions {
option(msgList[i])
}
}
// 链路追踪
var err error
if p.IsOpenTelemetry {
_, span := p.generateTraceSpan(ctx, topic, msgList)
defer func() {
// 记录错误
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
}
_, err = p.ProducerClient.SendSync(context.Background(), msgList...)
if err != nil {
return err
}
return nil
}
// generateTraceSpan 生成链路追踪的span
var produceTraceContext = propagation.TraceContext{}
func (p *Producer) generateTraceSpan(ctx context.Context, topic string, msgList []*primitive.Message) (context.Context, trace.Span) {
tracer := otel.GetTracerProvider().Tracer("LSXD_Util")
spanName := fmt.Sprintf("%s %s", topic, semconv.MessagingOperationPublish.Value.AsString())
spanCtx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(
semconv.MessagingSystem("RocketMQ"),
semconv.MessagingDestinationName(topic),
semconv.MessagingBatchMessageCount(len(msgList)),
semconv.MessagingRocketmqMessageKeys(msgList[0].GetKeys()),
semconv.MessagingRocketmqMessageTag(msgList[0].GetTags()),
)
//将span的trace数据写入
carrier := propagation.MapCarrier{}
produceTraceContext.Inject(spanCtx, carrier)
traceParent := carrier[openTelemetryPropertyName]
for _, message := range msgList {
message.WithProperty(openTelemetryPropertyName, traceParent)
}
return spanCtx, span
}
// BatchSendAsync 异步发送消息
// topic: 主题
// callbackFn: 回调方法无论成功与否都会回调失败时err!=nil
// sendOptions: 发送选项如WithSendTagOption
// bodyList: 支持发送多个
func (p *Producer) BatchSendAsync(ctx context.Context, topic string, callbackFn func(error), sendOptions []SendOption, bodyList ...[]byte) error {
if err := p.checkSend(topic); err != nil {
return err
}
msgList := make([]*primitive.Message, len(bodyList))
for i, body := range bodyList {
msgList[i] = &primitive.Message{
Topic: topic,
Body: body,
}
for _, option := range sendOptions {
option(msgList[i])
}
}
var err error
var span trace.Span
if p.IsOpenTelemetry {
_, span = p.generateTraceSpan(ctx, topic, msgList)
}
err = p.ProducerClient.SendAsync(context.Background(), func(ctxErr context.Context, result *primitive.SendResult, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
callbackFn(err)
}, msgList...)
return err
}
// checkTopicName 检查topicName是否符合规范
// 名字分三部分,由字母和.号组成:${模块名}_${业务名}_${事件}trade-order-created
// 权限:
func (p *Producer) checkSend(topicName string) error {
arr := strings.Split(topicName, "_")
isOk := len(arr) == 3 && arr[0] != "" && arr[1] != "" && arr[2] != ""
if !isOk {
return fmt.Errorf("topic名称不符合规范")
}
// 检查权限:待完善
return nil
}

89
until/mq/producer_test.go Normal file
View File

@ -0,0 +1,89 @@
package mq
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
// nolint
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"os"
"testing"
"time"
)
func TestProducer_SendSync(t *testing.T) {
//initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_producer")
//http://192.168.6.107:9876/
//p, err := NewProducer("192.168.6.107:9876")
p, err := NewProducer("http://rmq-cn-j4g3sem5i05.cn-chengdu.rmq.aliyuncs.com:8080", WithProducerCredentials("1i3vEsceLzcYD26p", "192602NYDuAHQ76a", ""))
if err != nil {
t.Fatal(err)
}
err = p.Start()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
err = p.SendSync(
context.Background(),
"test_transfer_rs",
[]byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
WithSendKeysOption([]string{"ttt"}),
WithSendTagOption("TagA"),
WithSendWithPropertyOption("pk", "pv"),
)
if err != nil {
fmt.Println("同步发送失败:", err)
} else {
fmt.Println("同步发送成功")
}
//err = p.SendAsync(
// context.Background(),
// "test_transfer_rs",
// []byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
// func(err error) {
// if err != nil {
// //出错了
// fmt.Println("异步回调错误", err)
// } else {
// fmt.Println("异步回调成功")
// }
// },
// WithSendKeysOption([]string{"ttt"}),
// WithSendTagOption("TagA"),
// WithSendWithPropertyOption("pk", "pv"),
//)
//if err != nil {
// fmt.Println("异步发起失败", err)
//}
}
_ = p.Shutdown()
time.Sleep(1 * time.Second) // 防止测试时span未上报完成
}
func initTracer(url string, sampler float64, name string) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
panic(fmt.Errorf("failed to initialize jaeger exporter:%s", url))
}
hostName, _ := os.Hostname()
tp := trace.NewTracerProvider(
// 将基于父span的采样率设置为100%
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(sampler))),
// 单元测试使用同步发送
//trace.WithBatcher(exporter),
trace.WithSyncer(exporter),
// 在资源中记录有关此应用程序的信息
trace.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String(name),
semconv.ServiceVersionKey.String("v1.0.0"),
semconv.ServiceInstanceIDKey.String(hostName),
semconv.GCPGceInstanceHostnameKey.String(hostName),
)),
)
otel.SetTracerProvider(tp)
}

40
until/mq/recover.go Normal file
View File

@ -0,0 +1,40 @@
package mq
import (
"fmt"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
type Logger interface {
// Debugf logs a formatted debugging message.
Debugf(format string, args ...interface{})
// Infof logs a formatted informational message.
Infof(format string, args ...interface{})
// Warnf logs a formatted warning message.
Warnf(format string, args ...interface{})
// Errorf logs a formatted error message.
Errorf(format string, args ...interface{})
}
func init() {
SetRecoverLogger(nil)
}
// SetRecoverLogger 设置RocketMQ的recover的处理函数日志不设置会导致协程panic后程序退出
func SetRecoverLogger(logger Logger) {
SetRecoverHandler(func(err any) {
if logger == nil {
fmt.Printf("rocketmq 发生 panic%+v\n", err)
return
}
logger.Errorf("rocketmq 发生 panic%+v", err)
})
}
// SetRecoverHandler 设置RocketMQ的recover的处理函数不设置会导致协程panic后程序退出
func SetRecoverHandler(fn func(any)) {
primitive.PanicHandler = fn
}

0
until/request/request.go Executable file → Normal file
View File

0
until/request/test/wxbiz_test.go Executable file → Normal file
View File

39
until/sysLog/log.go Executable file → Normal file
View File

@ -10,7 +10,7 @@ import (
)
func ErrLog(ctx context.Context, errContent ...any) {
path, _ := errLogFile()
path, _ := errLogFile("err")
// 创建一个文件对象
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
@ -25,8 +25,8 @@ func ErrLog(ctx context.Context, errContent ...any) {
logx.WithContext(ctx).WithCallerSkip(2).Errorf("errlog:", errContent)
}
func LogMq(ctx context.Context, content ...any) {
path, _ := logMqPath()
func LogSendMq(ctx context.Context, content ...any) {
path, _ := errLogFile("produce")
// 创建一个文件对象
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
@ -38,15 +38,31 @@ func LogMq(ctx context.Context, content ...any) {
writer := logx.NewWriter(file)
// 使用该写入器记录日志
logx.SetWriter(writer)
logx.WithContext(ctx).WithCallerSkip(2).Info("mq:", content)
logx.WithContext(ctx).WithCallerSkip(2).Info("sendMq:", content)
}
func logMqPath() (string, error) {
logPath, err := runtimePath()
func ErrQueueLog(ctx context.Context, content ...any) {
path, _ := logQueuePath()
// 创建一个文件对象
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logx.Error(err)
return
}
defer file.Close()
// 创建一个日志写入器,将日志写入文件
writer := logx.NewWriter(file)
// 使用该写入器记录日志
logx.SetWriter(writer)
logx.WithContext(ctx).WithCallerSkip(2).Errorf("queue:", content)
}
func logQueuePath() (string, error) {
logPath, err := runtimePathForQueue()
if err != nil {
return "", err
}
path := fmt.Sprintf("%s/%s", logPath, "mq")
path := fmt.Sprintf("%s/%s", logPath, "queue")
err = CheckDir(path)
if err != nil {
return "", err
@ -54,12 +70,12 @@ func logMqPath() (string, error) {
return fmt.Sprintf("%s/%s", path, time.Now().Format(time.DateOnly)), nil
}
func errLogFile() (string, error) {
func errLogFile(pathName string) (string, error) {
logPath, err := runtimePath()
if err != nil {
return "", err
}
path := fmt.Sprintf("%s/%s", logPath, "err")
path := fmt.Sprintf("%s/%s", logPath, pathName)
err = CheckDir(path)
if err != nil {
return "", err
@ -72,6 +88,11 @@ func runtimePath() (string, error) {
return fmt.Sprintf("%s/%s/", filepath.Dir(filepath.Dir(path)), "runtime"), err
}
func runtimePathForQueue() (string, error) {
path, err := os.Getwd()
return fmt.Sprintf("%s/%s/", filepath.Dir(filepath.Dir(filepath.Dir(path))), "runtime"), err
}
func CheckDir(path string) error {
// 判断目录是否存在
if _, err := os.Stat(path); os.IsNotExist(err) {

0
until/xerr/errCode.go Executable file → Normal file
View File

0
until/xerr/errMsg.go Executable file → Normal file
View File

0
until/xerr/errors.go Executable file → Normal file
View File