From 6dd34cf4d85b2248777027cec5bc7b9f54ec0c5a Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Tue, 3 Dec 2024 17:00:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=99=E6=95=B0=E6=8D=AE=E4=B8=80=E9=94=AE?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controllers/backend/cmd_controller.go | 8 +- app/http/entities/backend/cmd.go | 29 ++- app/http/requestmapping/backend.go | 15 +- app/http/routes/admin.go | 1 + app/services/cmd_service/cmd_service.go | 71 +++++- app/utils/excute/dbExecute.go | 217 +++++++----------- app/utils/util.go | 5 + 7 files changed, 188 insertions(+), 158 deletions(-) diff --git a/app/http/controllers/backend/cmd_controller.go b/app/http/controllers/backend/cmd_controller.go index 7ed88d0..ee5e7de 100644 --- a/app/http/controllers/backend/cmd_controller.go +++ b/app/http/controllers/backend/cmd_controller.go @@ -50,7 +50,13 @@ func CmdStart(c *gin.Context) { } func CmdTestRead(c *gin.Context) { - request := controllers.GetRequest(c).(*backend.CmdTestRequest) + request := controllers.GetRequest(c).(*backend.CmdTestReadRequest) result, time_duration, err := cmd_services.TestRead(request) controllers.HandRes(c, gin.H{"result": result, "time_duration": time_duration}, err) } + +func CmdTestWrite(c *gin.Context) { + request := controllers.GetRequest(c).(*backend.CmdTestWriteRequest) + time_duration, err := cmd_services.TestWrite(request) + controllers.HandRes(c, gin.H{"time_duration": time_duration}, err) +} diff --git a/app/http/entities/backend/cmd.go b/app/http/entities/backend/cmd.go index b61dda4..08097b6 100644 --- a/app/http/entities/backend/cmd.go +++ b/app/http/entities/backend/cmd.go @@ -9,14 +9,14 @@ import ( ) type CmdListRequest struct { - Page int `json:"page" validate:"required" form:"page" example:"1"` - Limit int `json:"limit" validate:"required" form:"limit" example:"10"` - CmdName string `json:"cmd_name" form:"cmd_name" example:"155555555"` - Status int `json:"status" form:"status" example:"1"` - ExecuteType int `json:"execute_type" form:"execute_type" example:"46516"` - ExecuteRead string `json:"execute_read"` - ExecuteWrite string `json:"execute_write"` - CmdIds []string `json:"cmd_id"` + Page int `json:"page" validate:"required" form:"page" example:"1"` + Limit int `json:"limit" validate:"required" form:"limit" example:"10"` + CmdName string `json:"cmd_name" form:"cmd_name" example:"155555555"` + Status int `json:"status" form:"status" example:"1"` + ExecuteType int `json:"execute_type" form:"execute_type" example:"46516"` + ReadDbId int `json:"read_db_id"` + WriteDbId int `json:"write_db_id"` + CmdIds []string `json:"cmd_id"` } type CmdInfoRequest struct { @@ -87,8 +87,17 @@ type CmdStartRequest struct { CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""` } -type CmdTestRequest struct { - CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""` +type CmdTestWriteRequest struct { + ReadDbId int `json:"read_db_id" validate:"required"` + ExecuteRead string `json:"execute_read" validate:"required"` + WriteDbId int `json:"write_db_id" validate:"required"` + ExecuteWrite string `json:"execute_write" validate:"required"` + MatchJson string `json:"match_json" validate:"required"` +} + +type CmdTestReadRequest struct { + ReadDbId int `json:"read_db_id" validate:"required" form:"read_db_id" example:""` + ExecuteRead string `json:"execute_read" validate:"required" form:"execute_read" example:""` } type CmdStopRequest struct { diff --git a/app/http/requestmapping/backend.go b/app/http/requestmapping/backend.go index c512bc9..88d378d 100644 --- a/app/http/requestmapping/backend.go +++ b/app/http/requestmapping/backend.go @@ -38,11 +38,12 @@ var BackendRequestMap = map[string]func() interface{}{ common.ADMIN_OAUTH_V1 + "/log/mes/list": func() interface{} { return new(backend.CronReportLogsListRequest) }, //任务 - common.ADMIN_OAUTH_V1 + "/cmd/list": func() interface{} { return new(backend.CmdListRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/add": func() interface{} { return new(backend.CmdAddRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/edit": func() interface{} { return new(backend.CmdEditRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/del": func() interface{} { return new(backend.CmdDeleteRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/start": func() interface{} { return new(backend.CmdStartRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/stop": func() interface{} { return new(backend.CmdStopRequest) }, - common.ADMIN_OAUTH_V1 + "/cmd/test/read": func() interface{} { return new(backend.CmdTestRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/list": func() interface{} { return new(backend.CmdListRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/add": func() interface{} { return new(backend.CmdAddRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/edit": func() interface{} { return new(backend.CmdEditRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/del": func() interface{} { return new(backend.CmdDeleteRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/start": func() interface{} { return new(backend.CmdStartRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/stop": func() interface{} { return new(backend.CmdStopRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/test/read": func() interface{} { return new(backend.CmdTestReadRequest) }, + common.ADMIN_OAUTH_V1 + "/cmd/test/write": func() interface{} { return new(backend.CmdTestWriteRequest) }, } diff --git a/app/http/routes/admin.go b/app/http/routes/admin.go index 34d1a38..7e4db81 100644 --- a/app/http/routes/admin.go +++ b/app/http/routes/admin.go @@ -56,6 +56,7 @@ func RegisterAdminRoute(router *gin.Engine) { cmd.POST("/start", backend.CmdStart) cmd.POST("/stop", backend.CmdStop) cmd.POST("/test/read", backend.CmdTestRead) + cmd.POST("/test/write", backend.CmdTestWrite) } //消息管理 mes := v1.Group("/channel") diff --git a/app/services/cmd_service/cmd_service.go b/app/services/cmd_service/cmd_service.go index 36067c1..b34e74a 100644 --- a/app/services/cmd_service/cmd_service.go +++ b/app/services/cmd_service/cmd_service.go @@ -35,6 +35,16 @@ func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count cond = cond.And(builder.In("cmd_id", request.CmdIds)) } + if request.ReadDbId != 0 { + // 使用IN查询 + cond = cond.And(builder.In("read_db_id", request.ReadDbId)) + } + + if request.WriteDbId != 0 { + // 使用IN查询 + cond = cond.And(builder.In("write_db_id", request.WriteDbId)) + } + session := croncmdmodel.GetInstance().GetDb().Where(cond).OrderBy("update_time desc") if page != 0 && limit != 0 { @@ -108,19 +118,11 @@ func Stop(request *backend.CmdStopRequest) (err error) { return } -func TestRead(request *backend.CmdTestRequest) (result interface{}, time_duration float64, err error) { +func TestRead(request *backend.CmdTestReadRequest) (result []map[string]interface{}, time_duration float64, err error) { var ( - data croncmdmodel.CronCmd database crondbmodel.CronDb ) - exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data) - if !exists { - return nil, 0, fmt.Errorf("数据不存在") - } - if err != nil { - return - } - exists, err = crondbmodel.GetInstance().GetDb().ID(data.ReadDbId).Get(&database) + exists, err := crondbmodel.GetInstance().GetDb().ID(request.ReadDbId).Get(&database) if !exists { return nil, 0, fmt.Errorf("数据库不存在") } @@ -130,22 +132,67 @@ func TestRead(request *backend.CmdTestRequest) (result interface{}, time_duratio //执行 start := time.Now() db, err := excute.NewExecuteDb(database.Source, database.DbName) + dbConn, _ := db.Db.DB() + defer dbConn.Close() if err != nil { return nil, 0, err } - result, err = db.ExecuteRead(data.ExecuteRead) + result, err = db.ExecuteRead(request.ExecuteRead) if err != nil { return result, 0, err } // 记录结束时间 end := time.Now() - // 计算执行时间 duration := end.Sub(start) time_duration = duration.Seconds() return } +func TestWrite(request *backend.CmdTestWriteRequest) (time_duration float64, err error) { + var ( + finalData []map[string]interface{} + database crondbmodel.CronDb + ) + readRes, readDuration, err := TestRead(&backend.CmdTestReadRequest{ + ReadDbId: request.ReadDbId, + ExecuteRead: request.ExecuteRead, + }) + if err != nil { + return 0, err + } + if len(readRes) == 0 { + return 0, err + } + //过滤数据 + finalData, err = excute.FilterReadData(&readRes, request.MatchJson) + if err != nil { + return 0, err + } + //执行 + start := time.Now() + exists, err := crondbmodel.GetInstance().GetDb().ID(request.WriteDbId).Get(&database) + if !exists { + return 0, fmt.Errorf("数据库不存在") + } + if err != nil { + return + } + db, err := excute.NewExecuteDb(database.Source, database.DbName) + dbConn, _ := db.Db.DB() + defer dbConn.Close() + _, err = db.ExecuteWriteV2(finalData, request.ExecuteWrite) + if err != nil { + return 0, err + } + // 记录结束时间 + end := time.Now() + // 计算执行时间 + duration := end.Sub(start) + time_duration = duration.Seconds() + readDuration + return +} + func ListToRep(list []croncmdmodel.CronCmd) (cmdList []backend.CmdListResponse, err error) { var ( userIdList []int diff --git a/app/utils/excute/dbExecute.go b/app/utils/excute/dbExecute.go index 2985775..6a89845 100644 --- a/app/utils/excute/dbExecute.go +++ b/app/utils/excute/dbExecute.go @@ -4,10 +4,9 @@ import ( "cron_admin/app/utils" "encoding/json" "fmt" - "github.com/jinzhu/copier" "gorm.io/gorm" + "reflect" "strings" - "time" ) type ExecuteDb struct { @@ -65,150 +64,112 @@ func (db *ExecuteDb) replaceExecuteString(execute string, rowData map[string]int return executeString } -func (db *ExecuteDb) ExecuteWriteV1(readData []map[string]interface{}, execute string) (err error) { - var ( - match MatchWrite - ) - err = json.Unmarshal([]byte(execute), &match) - if err != nil { - return fmt.Errorf("写执行命令数据格式验证失败:%v", err) +func FilterReadData(data *[]map[string]interface{}, matchJson string) (filteredData []map[string]interface{}, err error) { + if matchJson == "" { + return *data, nil } - table, err := db.GetTableColumnsInfo(match.Table) - if err != nil { - return err - } - switch match.Op { - case "insert": - insertData := db.ForInsert(&table, readData, match.Data) - db.Db.Table(match.Table).Create(insertData) - case "update": - for _, v := range readData { - updateData, updateKey := db.ForUpdate(&table, v, match.Data, match.Keys) - db.Db.Table(match.Table).Where(updateKey).Updates(updateData) - } - default: - return fmt.Errorf("未知的写操作执行类型:%v", match.Op) - } - - return nil -} - -func (db *ExecuteDb) GetTableColumnsInfo(tableName string) (tableInfo []TableColumn, err error) { - column, _ := db.Db.Raw(fmt.Sprintf("SHOW FULL COLUMNS FROM `%s`", tableName)).Rows() - defer column.Close() - for column.Next() { - err := db.Db.ScanRows(column, &tableInfo) + for _, v := range *data { + match, err := MatchJSON(v, &matchJson) if err != nil { - return tableInfo, fmt.Errorf("获取表结构失败:%v", err) + return nil, err + } + if match { + filteredData = append(filteredData, v) } } - return tableInfo, nil + return filteredData, nil } -func (db *ExecuteDb) ForInsert(table *[]TableColumn, resultData []map[string]interface{}, matchData string) (insertData []map[string]interface{}) { - var ( - columns = map[string]interface{}{} - mData []string - ) - if matchData != "" { - mData = strings.Split(matchData, ",") +func MatchJSON(mapData map[string]interface{}, jsonString *string) (bool, error) { + // 解析JSON字符串 + var jsonData []Match + err := json.Unmarshal([]byte(*jsonString), &jsonData) + if err != nil { + return false, fmt.Errorf("解析Match_Json失败:%v", err) } - for _, v := range *table { - in := false - if v.Key == "PRI" && v.Extra == "auto_increment" { - continue + for _, v := range jsonData { + // 判断key值 + mapData, err := judgeData(&v, mapData) + if err != nil { + return false, err } - col := NotNullColumnDefault{} - if v.Null == "NO" && v.Default.(*interface{}) == nil { //非null的情况 - //处理非null但是default为空的情况 - if strings.Contains(v.Type, "time") { - col.Default = time.Now().Format(time.DateTime) - } else if strings.Contains(v.Type, "int") { - col.Default = 0 - } else if strings.Contains(v.Type, "json") { - col.Default = "{}" - } else { - col.Default = "" + switch v.Op { + case "=": + if mapData[v.Key] != v.Val { + return false, nil } - in = true - } else { - //null的情况 - if len(mData) != 0 { - if utils.ContainsStr(mData, v.Field) { - in = true - } - } else { - in = true + case "!=": + if mapData[v.Key] == v.Val { + return false, nil } + case "<": + if mapData[v.Key].(int) >= v.Val.(int) { + return false, nil + } + case "<=": + if mapData[v.Key].(int) > v.Val.(int) { + return false, nil + } + case ">": + if mapData[v.Key].(int) <= v.Val.(int) { + return false, nil + } + case ">=": + if mapData[v.Key].(int) < v.Val.(int) { + return false, nil + } + case "regex": + if !utils.Regexp(mapData[v.Key].(string), v.Val.(string)) { + return false, nil + } + default: + return false, fmt.Errorf("未知的比较类型:%v", v.Op) } - if in { - col.Field = v.Field - columns[col.Field] = col.Default - } + } - for _, v := range resultData { - var row map[string]interface{} - copier.Copy(&row, columns) - for key, value := range v { - if _, ok := columns[key]; ok { - row[key] = value - } - } - insertData = append(insertData, row) - } - return insertData + return true, nil } -func (db *ExecuteDb) ForUpdate(table *[]TableColumn, resultData map[string]interface{}, matchData string, matchKeys string) (updateData map[string]interface{}, updateKeys map[string]interface{}) { - var ( - columns []string - mData []string - primary PrimaryKey - keys []string - ) - if matchData != "" { - mData = strings.Split(matchData, ",") +func judgeData(v *Match, mapData map[string]interface{}) (map[string]interface{}, error) { + if v.Key == "" { + return nil, fmt.Errorf("Match_Json的key值未映射") } - if matchKeys != "" { - keys = strings.Split(matchKeys, ",") - } - for _, v := range *table { - in := false - if v.Key == "PRI" && len(keys) == 0 { - primary.Filed = v.Field - continue - } - if len(mData) != 0 { - if utils.ContainsStr(mData, v.Field) { - in = true - } - } else { - in = true - } - if in { - columns = append(columns, v.Field) - } - } - updateData = map[string]interface{}{} - updateKeys = map[string]interface{}{} - for key, value := range resultData { - if key == primary.Filed { - primary.FiledValue = value - continue - } - if utils.ContainsStr(keys, key) { - updateKeys[key] = value - continue - } - if utils.ContainsStr(columns, key) { - updateData[key] = value - } + if v.Op == "" { + return nil, fmt.Errorf("Match_Json的判断方式没找到") } - if len(updateKeys) == 0 && primary.FiledValue != nil { - updateKeys[primary.Filed] = primary.FiledValue + if _, ok := mapData[v.Key]; !ok { + return nil, fmt.Errorf("Match_Json的key值没找到:%s", v.Key) } - return updateData, updateKeys + // 判断类型 + valueKind := reflect.TypeOf(mapData[v.Key]).Kind() + valKind := reflect.TypeOf(v.Val).Kind() + if valKind == reflect.Float64 { + v.Val = int(v.Val.(float64)) + valKind = reflect.TypeOf(v.Val).Kind() + } + if valueKind == reflect.Int64 { + mapData[v.Key] = int(mapData[v.Key].(int64)) + valueKind = reflect.TypeOf(mapData[v.Key]).Kind() + } + + if valueKind != reflect.String && valueKind != reflect.Int { + return nil, fmt.Errorf("未知的val值类型:%s", v.Key) + } + + if v.Op != "regex" && (valueKind != valKind) { + return nil, fmt.Errorf("非正则模式Match_Json的val值类型不匹配:%s", v.Key) + } + + if v.Op == "regex" && (valKind != reflect.String || valueKind != reflect.String) { + return nil, fmt.Errorf("正则模式Match_Json的val值类型不匹配:%s", v.Key) + } + + if utils.ContainsStr([]string{">", "<", ">=", "<="}, v.Op) && (valueKind == reflect.String || valKind == reflect.String) { + return nil, fmt.Errorf("字符串类型val无法进行比较:%s", v.Key) + } + + return mapData, nil } diff --git a/app/utils/util.go b/app/utils/util.go index c96d6c2..25eb341 100644 --- a/app/utils/util.go +++ b/app/utils/util.go @@ -476,3 +476,8 @@ func ContainsStr(slice []string, element string) bool { } return false } + +func Regexp(s string, rule string) bool { + re := regexp.MustCompile(rule) + return re.MatchString(s) +}