215 lines
5.1 KiB
Go
215 lines
5.1 KiB
Go
package excute
|
||
|
||
import (
|
||
"cron_admin/app/utils"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/jinzhu/copier"
|
||
"gorm.io/gorm"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type ExecuteDb struct {
|
||
Db *gorm.DB
|
||
DbName string
|
||
}
|
||
|
||
func NewExecuteDb(source string, dbName string) (*ExecuteDb, error) {
|
||
db, err := ExecuteDbConn(source)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("%s链接失败:%v", dbName, err)
|
||
}
|
||
return &ExecuteDb{Db: db, DbName: dbName}, nil
|
||
}
|
||
|
||
func (db *ExecuteDb) ExecuteRead(execute string) (result []map[string]interface{}, err error) {
|
||
rows, err := db.Db.Raw(execute).Rows()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("数据执行失败:%v", err.Error())
|
||
}
|
||
defer rows.Close()
|
||
for rows.Next() {
|
||
err = db.Db.ScanRows(rows, &result)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("数据映射失败:%v", err)
|
||
}
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
func (db *ExecuteDb) ExecuteWriteV2(readData []map[string]interface{}, execute string) (dbString string, err error) {
|
||
var (
|
||
dbExecuteSql []string
|
||
)
|
||
for _, v := range readData {
|
||
dbExecuteSql = append(dbExecuteSql, db.replaceExecuteString(execute, v))
|
||
}
|
||
dbString = strings.Join(dbExecuteSql, ";")
|
||
for _, v := range dbExecuteSql {
|
||
d := db.Db.Exec(v)
|
||
if d.Error != nil {
|
||
return dbString, d.Error
|
||
}
|
||
}
|
||
return dbString, nil
|
||
}
|
||
|
||
func (db *ExecuteDb) replaceExecuteString(execute string, rowData map[string]interface{}) string {
|
||
executeString := execute
|
||
for key, value := range rowData {
|
||
key := fmt.Sprintf("${%s}", key)
|
||
executeString = strings.Replace(executeString, key, fmt.Sprintf("%v", value), -1)
|
||
}
|
||
return executeString
|
||
}
|
||
|
||
func (db *ExecuteDb) ExecuteWriteV1(readData []map[string]interface{}, execute string) (err error) {
|
||
var (
|
||
match MatchWrite
|
||
)
|
||
err = json.Unmarshal([]byte(execute), &match)
|
||
if err != nil {
|
||
return fmt.Errorf("写执行命令数据格式验证失败:%v", err)
|
||
}
|
||
table, err := db.GetTableColumnsInfo(match.Table)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
switch match.Op {
|
||
case "insert":
|
||
insertData := db.ForInsert(&table, readData, match.Data)
|
||
db.Db.Table(match.Table).Create(insertData)
|
||
case "update":
|
||
for _, v := range readData {
|
||
updateData, updateKey := db.ForUpdate(&table, v, match.Data, match.Keys)
|
||
db.Db.Table(match.Table).Where(updateKey).Updates(updateData)
|
||
}
|
||
default:
|
||
return fmt.Errorf("未知的写操作执行类型:%v", match.Op)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (db *ExecuteDb) GetTableColumnsInfo(tableName string) (tableInfo []TableColumn, err error) {
|
||
column, _ := db.Db.Raw(fmt.Sprintf("SHOW FULL COLUMNS FROM `%s`", tableName)).Rows()
|
||
defer column.Close()
|
||
for column.Next() {
|
||
err := db.Db.ScanRows(column, &tableInfo)
|
||
if err != nil {
|
||
return tableInfo, fmt.Errorf("获取表结构失败:%v", err)
|
||
}
|
||
}
|
||
return tableInfo, nil
|
||
}
|
||
|
||
func (db *ExecuteDb) ForInsert(table *[]TableColumn, resultData []map[string]interface{}, matchData string) (insertData []map[string]interface{}) {
|
||
var (
|
||
columns = map[string]interface{}{}
|
||
mData []string
|
||
)
|
||
if matchData != "" {
|
||
mData = strings.Split(matchData, ",")
|
||
}
|
||
for _, v := range *table {
|
||
in := false
|
||
if v.Key == "PRI" && v.Extra == "auto_increment" {
|
||
continue
|
||
}
|
||
col := NotNullColumnDefault{}
|
||
if v.Null == "NO" && v.Default.(*interface{}) == nil { //非null的情况
|
||
//处理非null但是default为空的情况
|
||
if strings.Contains(v.Type, "time") {
|
||
col.Default = time.Now().Format(time.DateTime)
|
||
} else if strings.Contains(v.Type, "int") {
|
||
col.Default = 0
|
||
} else if strings.Contains(v.Type, "json") {
|
||
col.Default = "{}"
|
||
} else {
|
||
col.Default = ""
|
||
}
|
||
in = true
|
||
} else {
|
||
//null的情况
|
||
if len(mData) != 0 {
|
||
if utils.ContainsStr(mData, v.Field) {
|
||
in = true
|
||
}
|
||
} else {
|
||
in = true
|
||
}
|
||
}
|
||
if in {
|
||
col.Field = v.Field
|
||
columns[col.Field] = col.Default
|
||
}
|
||
}
|
||
|
||
for _, v := range resultData {
|
||
var row map[string]interface{}
|
||
copier.Copy(&row, columns)
|
||
for key, value := range v {
|
||
if _, ok := columns[key]; ok {
|
||
row[key] = value
|
||
}
|
||
}
|
||
insertData = append(insertData, row)
|
||
}
|
||
return insertData
|
||
}
|
||
|
||
func (db *ExecuteDb) ForUpdate(table *[]TableColumn, resultData map[string]interface{}, matchData string, matchKeys string) (updateData map[string]interface{}, updateKeys map[string]interface{}) {
|
||
var (
|
||
columns []string
|
||
mData []string
|
||
primary PrimaryKey
|
||
keys []string
|
||
)
|
||
if matchData != "" {
|
||
mData = strings.Split(matchData, ",")
|
||
}
|
||
if matchKeys != "" {
|
||
keys = strings.Split(matchKeys, ",")
|
||
}
|
||
for _, v := range *table {
|
||
in := false
|
||
if v.Key == "PRI" && len(keys) == 0 {
|
||
primary.Filed = v.Field
|
||
continue
|
||
}
|
||
if len(mData) != 0 {
|
||
if utils.ContainsStr(mData, v.Field) {
|
||
in = true
|
||
}
|
||
} else {
|
||
in = true
|
||
}
|
||
if in {
|
||
columns = append(columns, v.Field)
|
||
}
|
||
}
|
||
updateData = map[string]interface{}{}
|
||
updateKeys = map[string]interface{}{}
|
||
for key, value := range resultData {
|
||
if key == primary.Filed {
|
||
primary.FiledValue = value
|
||
continue
|
||
}
|
||
if utils.ContainsStr(keys, key) {
|
||
updateKeys[key] = value
|
||
continue
|
||
}
|
||
if utils.ContainsStr(columns, key) {
|
||
updateData[key] = value
|
||
}
|
||
}
|
||
|
||
if len(updateKeys) == 0 && primary.FiledValue != nil {
|
||
updateKeys[primary.Filed] = primary.FiledValue
|
||
}
|
||
|
||
return updateData, updateKeys
|
||
}
|