Compare commits

..

No commits in common. "f9f652eb3282f082b620cbbf2eedcb5ac5c5d81f" and "814e9572a879ea6aa07ce332a9a98a4d1df46737" have entirely different histories.

7 changed files with 167 additions and 197 deletions

View File

@ -50,13 +50,7 @@ func CmdStart(c *gin.Context) {
} }
func CmdTestRead(c *gin.Context) { func CmdTestRead(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdTestReadRequest) request := controllers.GetRequest(c).(*backend.CmdTestRequest)
result, time_duration, err := cmd_services.TestRead(request) result, time_duration, err := cmd_services.TestRead(request)
controllers.HandRes(c, gin.H{"result": result, "time_duration": time_duration}, err) controllers.HandRes(c, gin.H{"result": result, "time_duration": time_duration}, err)
} }
func CmdTestWrite(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdTestWriteRequest)
time_duration, err := cmd_services.TestWrite(request)
controllers.HandRes(c, gin.H{"time_duration": time_duration}, err)
}

View File

@ -9,14 +9,14 @@ import (
) )
type CmdListRequest struct { type CmdListRequest struct {
Page int `json:"page" validate:"required" form:"page" example:"1"` Page int `json:"page" validate:"required" form:"page" example:"1"`
Limit int `json:"limit" validate:"required" form:"limit" example:"10"` Limit int `json:"limit" validate:"required" form:"limit" example:"10"`
CmdName string `json:"cmd_name" form:"cmd_name" example:"155555555"` CmdName string `json:"cmd_name" form:"cmd_name" example:"155555555"`
Status int `json:"status" form:"status" example:"1"` Status int `json:"status" form:"status" example:"1"`
ExecuteType int `json:"execute_type" form:"execute_type" example:"46516"` ExecuteType int `json:"execute_type" form:"execute_type" example:"46516"`
ReadDbId int `json:"read_db_id"` ExecuteRead string `json:"execute_read"`
WriteDbId int `json:"write_db_id"` ExecuteWrite string `json:"execute_write"`
CmdIds []string `json:"cmd_id"` CmdIds []string `json:"cmd_id"`
} }
type CmdInfoRequest struct { type CmdInfoRequest struct {
@ -87,17 +87,8 @@ type CmdStartRequest struct {
CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""` CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""`
} }
type CmdTestWriteRequest struct { type CmdTestRequest struct {
ReadDbId int `json:"read_db_id" validate:"required"` CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""`
ExecuteRead string `json:"execute_read" validate:"required"`
WriteDbId int `json:"write_db_id" validate:"required"`
ExecuteWrite string `json:"execute_write" validate:"required"`
MatchJson string `json:"match_json" validate:"required"`
}
type CmdTestReadRequest struct {
ReadDbId int `json:"read_db_id" validate:"required" form:"read_db_id" example:""`
ExecuteRead string `json:"execute_read" validate:"required" form:"execute_read" example:""`
} }
type CmdStopRequest struct { type CmdStopRequest struct {

View File

@ -38,12 +38,11 @@ var BackendRequestMap = map[string]func() interface{}{
common.ADMIN_OAUTH_V1 + "/log/mes/list": func() interface{} { return new(backend.CronReportLogsListRequest) }, common.ADMIN_OAUTH_V1 + "/log/mes/list": func() interface{} { return new(backend.CronReportLogsListRequest) },
//任务 //任务
common.ADMIN_OAUTH_V1 + "/cmd/list": func() interface{} { return new(backend.CmdListRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/list": func() interface{} { return new(backend.CmdListRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/add": func() interface{} { return new(backend.CmdAddRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/add": func() interface{} { return new(backend.CmdAddRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/edit": func() interface{} { return new(backend.CmdEditRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/edit": func() interface{} { return new(backend.CmdEditRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/del": func() interface{} { return new(backend.CmdDeleteRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/del": func() interface{} { return new(backend.CmdDeleteRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/start": func() interface{} { return new(backend.CmdStartRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/start": func() interface{} { return new(backend.CmdStartRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/stop": func() interface{} { return new(backend.CmdStopRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/stop": func() interface{} { return new(backend.CmdStopRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/test/read": func() interface{} { return new(backend.CmdTestReadRequest) }, common.ADMIN_OAUTH_V1 + "/cmd/test/read": func() interface{} { return new(backend.CmdTestRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/test/write": func() interface{} { return new(backend.CmdTestWriteRequest) },
} }

View File

@ -59,7 +59,6 @@ func RegisterAdminRoute(router *gin.Engine) {
cmd.POST("/start", backend.CmdStart) cmd.POST("/start", backend.CmdStart)
cmd.POST("/stop", backend.CmdStop) cmd.POST("/stop", backend.CmdStop)
cmd.POST("/test/read", backend.CmdTestRead) cmd.POST("/test/read", backend.CmdTestRead)
cmd.POST("/test/write", backend.CmdTestWrite)
} }
//消息管理 //消息管理
mes := v1.Group("/channel") mes := v1.Group("/channel")

View File

@ -35,16 +35,6 @@ func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count
cond = cond.And(builder.In("cmd_id", request.CmdIds)) cond = cond.And(builder.In("cmd_id", request.CmdIds))
} }
if request.ReadDbId != 0 {
// 使用IN查询
cond = cond.And(builder.In("read_db_id", request.ReadDbId))
}
if request.WriteDbId != 0 {
// 使用IN查询
cond = cond.And(builder.In("write_db_id", request.WriteDbId))
}
session := croncmdmodel.GetInstance().GetDb().Where(cond).OrderBy("update_time desc") session := croncmdmodel.GetInstance().GetDb().Where(cond).OrderBy("update_time desc")
if page != 0 && limit != 0 { if page != 0 && limit != 0 {
@ -118,11 +108,19 @@ func Stop(request *backend.CmdStopRequest) (err error) {
return return
} }
func TestRead(request *backend.CmdTestReadRequest) (result []map[string]interface{}, time_duration float64, err error) { func TestRead(request *backend.CmdTestRequest) (result interface{}, time_duration float64, err error) {
var ( var (
data croncmdmodel.CronCmd
database crondbmodel.CronDb database crondbmodel.CronDb
) )
exists, err := crondbmodel.GetInstance().GetDb().ID(request.ReadDbId).Get(&database) exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data)
if !exists {
return nil, 0, fmt.Errorf("数据不存在")
}
if err != nil {
return
}
exists, err = crondbmodel.GetInstance().GetDb().ID(data.ReadDbId).Get(&database)
if !exists { if !exists {
return nil, 0, fmt.Errorf("数据库不存在") return nil, 0, fmt.Errorf("数据库不存在")
} }
@ -132,67 +130,22 @@ func TestRead(request *backend.CmdTestReadRequest) (result []map[string]interfac
//执行 //执行
start := time.Now() start := time.Now()
db, err := excute.NewExecuteDb(database.Source, database.DbName) db, err := excute.NewExecuteDb(database.Source, database.DbName)
dbConn, _ := db.Db.DB()
defer dbConn.Close()
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
result, err = db.ExecuteRead(request.ExecuteRead) result, err = db.ExecuteRead(data.ExecuteRead)
if err != nil { if err != nil {
return result, 0, err return result, 0, err
} }
// 记录结束时间 // 记录结束时间
end := time.Now() end := time.Now()
// 计算执行时间 // 计算执行时间
duration := end.Sub(start) duration := end.Sub(start)
time_duration = duration.Seconds() time_duration = duration.Seconds()
return return
} }
func TestWrite(request *backend.CmdTestWriteRequest) (time_duration float64, err error) {
var (
finalData []map[string]interface{}
database crondbmodel.CronDb
)
readRes, readDuration, err := TestRead(&backend.CmdTestReadRequest{
ReadDbId: request.ReadDbId,
ExecuteRead: request.ExecuteRead,
})
if err != nil {
return 0, err
}
if len(readRes) == 0 {
return 0, err
}
//过滤数据
finalData, err = excute.FilterReadData(&readRes, request.MatchJson)
if err != nil {
return 0, err
}
//执行
start := time.Now()
exists, err := crondbmodel.GetInstance().GetDb().ID(request.WriteDbId).Get(&database)
if !exists {
return 0, fmt.Errorf("数据库不存在")
}
if err != nil {
return
}
db, err := excute.NewExecuteDb(database.Source, database.DbName)
dbConn, _ := db.Db.DB()
defer dbConn.Close()
_, err = db.ExecuteWriteV2(finalData, request.ExecuteWrite)
if err != nil {
return 0, err
}
// 记录结束时间
end := time.Now()
// 计算执行时间
duration := end.Sub(start)
time_duration = duration.Seconds() + readDuration
return
}
func ListToRep(list []croncmdmodel.CronCmd) (cmdList []backend.CmdListResponse, err error) { func ListToRep(list []croncmdmodel.CronCmd) (cmdList []backend.CmdListResponse, err error) {
var ( var (
userIdList []int userIdList []int

View File

@ -4,9 +4,10 @@ import (
"cron_admin/app/utils" "cron_admin/app/utils"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/jinzhu/copier"
"gorm.io/gorm" "gorm.io/gorm"
"reflect"
"strings" "strings"
"time"
) )
type ExecuteDb struct { type ExecuteDb struct {
@ -64,112 +65,150 @@ func (db *ExecuteDb) replaceExecuteString(execute string, rowData map[string]int
return executeString return executeString
} }
func FilterReadData(data *[]map[string]interface{}, matchJson string) (filteredData []map[string]interface{}, err error) { func (db *ExecuteDb) ExecuteWriteV1(readData []map[string]interface{}, execute string) (err error) {
if matchJson == "" { var (
return *data, nil match MatchWrite
} )
for _, v := range *data { err = json.Unmarshal([]byte(execute), &match)
match, err := MatchJSON(v, &matchJson)
if err != nil {
return nil, err
}
if match {
filteredData = append(filteredData, v)
}
}
return filteredData, nil
}
func MatchJSON(mapData map[string]interface{}, jsonString *string) (bool, error) {
// 解析JSON字符串
var jsonData []Match
err := json.Unmarshal([]byte(*jsonString), &jsonData)
if err != nil { if err != nil {
return false, fmt.Errorf("解析Match_Json失败:%v", err) return fmt.Errorf("写执行命令数据格式验证失败:%v", err)
} }
for _, v := range jsonData { table, err := db.GetTableColumnsInfo(match.Table)
// 判断key值 if err != nil {
mapData, err := judgeData(&v, mapData) return err
}
switch match.Op {
case "insert":
insertData := db.ForInsert(&table, readData, match.Data)
db.Db.Table(match.Table).Create(insertData)
case "update":
for _, v := range readData {
updateData, updateKey := db.ForUpdate(&table, v, match.Data, match.Keys)
db.Db.Table(match.Table).Where(updateKey).Updates(updateData)
}
default:
return fmt.Errorf("未知的写操作执行类型:%v", match.Op)
}
return nil
}
func (db *ExecuteDb) GetTableColumnsInfo(tableName string) (tableInfo []TableColumn, err error) {
column, _ := db.Db.Raw(fmt.Sprintf("SHOW FULL COLUMNS FROM `%s`", tableName)).Rows()
defer column.Close()
for column.Next() {
err := db.Db.ScanRows(column, &tableInfo)
if err != nil { if err != nil {
return false, err return tableInfo, fmt.Errorf("获取表结构失败:%v", err)
} }
switch v.Op {
case "=":
if mapData[v.Key] != v.Val {
return false, nil
}
case "!=":
if mapData[v.Key] == v.Val {
return false, nil
}
case "<":
if mapData[v.Key].(int) >= v.Val.(int) {
return false, nil
}
case "<=":
if mapData[v.Key].(int) > v.Val.(int) {
return false, nil
}
case ">":
if mapData[v.Key].(int) <= v.Val.(int) {
return false, nil
}
case ">=":
if mapData[v.Key].(int) < v.Val.(int) {
return false, nil
}
case "regex":
if !utils.Regexp(mapData[v.Key].(string), v.Val.(string)) {
return false, nil
}
default:
return false, fmt.Errorf("未知的比较类型:%v", v.Op)
}
} }
return tableInfo, nil
return true, nil
} }
func judgeData(v *Match, mapData map[string]interface{}) (map[string]interface{}, error) { func (db *ExecuteDb) ForInsert(table *[]TableColumn, resultData []map[string]interface{}, matchData string) (insertData []map[string]interface{}) {
if v.Key == "" { var (
return nil, fmt.Errorf("Match_Json的key值未映射") columns = map[string]interface{}{}
mData []string
)
if matchData != "" {
mData = strings.Split(matchData, ",")
} }
if v.Op == "" { for _, v := range *table {
return nil, fmt.Errorf("Match_Json的判断方式没找到") in := false
if v.Key == "PRI" && v.Extra == "auto_increment" {
continue
}
col := NotNullColumnDefault{}
if v.Null == "NO" && v.Default.(*interface{}) == nil { //非null的情况
//处理非null但是default为空的情况
if strings.Contains(v.Type, "time") {
col.Default = time.Now().Format(time.DateTime)
} else if strings.Contains(v.Type, "int") {
col.Default = 0
} else if strings.Contains(v.Type, "json") {
col.Default = "{}"
} else {
col.Default = ""
}
in = true
} else {
//null的情况
if len(mData) != 0 {
if utils.ContainsStr(mData, v.Field) {
in = true
}
} else {
in = true
}
}
if in {
col.Field = v.Field
columns[col.Field] = col.Default
}
} }
if _, ok := mapData[v.Key]; !ok { for _, v := range resultData {
return nil, fmt.Errorf("Match_Json的key值没找到%s", v.Key) var row map[string]interface{}
copier.Copy(&row, columns)
for key, value := range v {
if _, ok := columns[key]; ok {
row[key] = value
}
}
insertData = append(insertData, row)
} }
return insertData
// 判断类型 }
valueKind := reflect.TypeOf(mapData[v.Key]).Kind()
valKind := reflect.TypeOf(v.Val).Kind() func (db *ExecuteDb) ForUpdate(table *[]TableColumn, resultData map[string]interface{}, matchData string, matchKeys string) (updateData map[string]interface{}, updateKeys map[string]interface{}) {
if valKind == reflect.Float64 { var (
v.Val = int(v.Val.(float64)) columns []string
valKind = reflect.TypeOf(v.Val).Kind() mData []string
} primary PrimaryKey
if valueKind == reflect.Int64 { keys []string
mapData[v.Key] = int(mapData[v.Key].(int64)) )
valueKind = reflect.TypeOf(mapData[v.Key]).Kind() if matchData != "" {
} mData = strings.Split(matchData, ",")
}
if valueKind != reflect.String && valueKind != reflect.Int { if matchKeys != "" {
return nil, fmt.Errorf("未知的val值类型%s", v.Key) keys = strings.Split(matchKeys, ",")
} }
for _, v := range *table {
if v.Op != "regex" && (valueKind != valKind) { in := false
return nil, fmt.Errorf("非正则模式Match_Json的val值类型不匹配%s", v.Key) if v.Key == "PRI" && len(keys) == 0 {
} primary.Filed = v.Field
continue
if v.Op == "regex" && (valKind != reflect.String || valueKind != reflect.String) { }
return nil, fmt.Errorf("正则模式Match_Json的val值类型不匹配%s", v.Key) if len(mData) != 0 {
} if utils.ContainsStr(mData, v.Field) {
in = true
if utils.ContainsStr([]string{">", "<", ">=", "<="}, v.Op) && (valueKind == reflect.String || valKind == reflect.String) { }
return nil, fmt.Errorf("字符串类型val无法进行比较%s", v.Key) } else {
} in = true
}
return mapData, nil if in {
columns = append(columns, v.Field)
}
}
updateData = map[string]interface{}{}
updateKeys = map[string]interface{}{}
for key, value := range resultData {
if key == primary.Filed {
primary.FiledValue = value
continue
}
if utils.ContainsStr(keys, key) {
updateKeys[key] = value
continue
}
if utils.ContainsStr(columns, key) {
updateData[key] = value
}
}
if len(updateKeys) == 0 && primary.FiledValue != nil {
updateKeys[primary.Filed] = primary.FiledValue
}
return updateData, updateKeys
} }

View File

@ -476,8 +476,3 @@ func ContainsStr(slice []string, element string) bool {
} }
return false return false
} }
func Regexp(s string, rule string) bool {
re := regexp.MustCompile(rule)
return re.MatchString(s)
}