263 lines
6.8 KiB
Go
263 lines
6.8 KiB
Go
package cmd_services
|
|
|
|
import (
|
|
"cron_admin/app/constants/common"
|
|
"cron_admin/app/http/entities/backend"
|
|
"cron_admin/app/models/croncmdmodel"
|
|
"cron_admin/app/models/crondbmodel"
|
|
"cron_admin/app/models/cronreportchannelmodel"
|
|
"cron_admin/app/models/cronusermodel"
|
|
"cron_admin/app/repository"
|
|
"cron_admin/app/utils/excute"
|
|
"cron_admin/app/utils/mapstructure"
|
|
"fmt"
|
|
"github.com/ahmetb/go-linq/v3"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"xorm.io/builder"
|
|
)
|
|
|
|
func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count int64, cmdListInfo []croncmdmodel.CronCmd, err error) {
|
|
cond := builder.NewCond()
|
|
|
|
if request.CmdName != "" {
|
|
cond = cond.And(builder.Like{"cmd_name", request.CmdName})
|
|
}
|
|
if request.Status != 0 {
|
|
cond = cond.And(builder.Eq{"status": request.Status})
|
|
}
|
|
if request.ExecuteType != 0 {
|
|
cond = cond.And(builder.Eq{"execute_type": request.ExecuteType})
|
|
}
|
|
if len(request.CmdIds) > 0 {
|
|
// 使用IN查询
|
|
cond = cond.And(builder.In("cmd_id", request.CmdIds))
|
|
}
|
|
|
|
if request.ReadDbId != 0 {
|
|
// 使用IN查询
|
|
cond = cond.And(builder.In("read_db_id", request.ReadDbId))
|
|
}
|
|
|
|
if request.WriteDbId != 0 {
|
|
// 使用IN查询
|
|
cond = cond.And(builder.In("write_db_id", request.WriteDbId))
|
|
}
|
|
|
|
session := croncmdmodel.GetInstance().GetDb().Where(cond).OrderBy("update_time desc")
|
|
|
|
if page != 0 && limit != 0 {
|
|
session = session.Limit(limit, (page-1)*limit)
|
|
}
|
|
count, err = session.FindAndCount(&cmdListInfo)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func Add(request *backend.CmdAddRequest) (err error) {
|
|
var db croncmdmodel.CronCmd
|
|
_ = mapstructure.Decode(request, &db)
|
|
db.Status = common.CMD_STATUS_STOP
|
|
if db.MatchJson == "" {
|
|
db.MatchJson = "null"
|
|
}
|
|
_, err = croncmdmodel.GetInstance().GetDb().InsertOne(db)
|
|
return
|
|
}
|
|
|
|
func Edit(request *backend.CmdEditRequest) (err error) {
|
|
var db croncmdmodel.CronCmd
|
|
_ = mapstructure.Decode(request, &db)
|
|
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&db)
|
|
return
|
|
}
|
|
|
|
func Del(request *backend.CmdDeleteRequest) (err error) {
|
|
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Delete(&croncmdmodel.CronCmd{})
|
|
return
|
|
}
|
|
|
|
func Start(request *backend.CmdStartRequest) (err error) {
|
|
var data croncmdmodel.CronCmd
|
|
exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data)
|
|
if !exists {
|
|
return fmt.Errorf("数据不存在")
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
if data.EntryId != 0 {
|
|
return fmt.Errorf("该任务还没有完全关闭,请稍后再试或联系管理员")
|
|
}
|
|
if data.Status == common.CMD_STATUS_WAIT {
|
|
return fmt.Errorf("任务已在启动中")
|
|
}
|
|
if data.Status == common.CMD_STATUS_RUN {
|
|
return fmt.Errorf("任务已启动,无法再次进行此操作")
|
|
}
|
|
data.Status = common.CMD_STATUS_WAIT
|
|
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&data)
|
|
return
|
|
}
|
|
|
|
func Stop(request *backend.CmdStopRequest) (err error) {
|
|
var data croncmdmodel.CronCmd
|
|
exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data)
|
|
if !exists {
|
|
return fmt.Errorf("数据不存在")
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
data.Status = common.CMD_STATUS_STOP
|
|
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&data)
|
|
return
|
|
}
|
|
|
|
func TestRead(request *backend.CmdTestReadRequest) (result []map[string]interface{}, time_duration float64, err error) {
|
|
var (
|
|
database crondbmodel.CronDb
|
|
)
|
|
exists, err := crondbmodel.GetInstance().GetDb().ID(request.ReadDbId).Get(&database)
|
|
if !exists {
|
|
return nil, 0, fmt.Errorf("数据库不存在")
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
//执行
|
|
start := time.Now()
|
|
db, err := excute.NewExecuteDb(database.Source, database.DbName)
|
|
dbConn, _ := db.Db.DB()
|
|
defer dbConn.Close()
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
result, err = db.ExecuteRead(request.ExecuteRead)
|
|
if err != nil {
|
|
return result, 0, err
|
|
}
|
|
// 记录结束时间
|
|
end := time.Now()
|
|
// 计算执行时间
|
|
duration := end.Sub(start)
|
|
time_duration = duration.Seconds()
|
|
return
|
|
}
|
|
|
|
func TestWrite(request *backend.CmdTestWriteRequest) (time_duration float64, err error) {
|
|
var (
|
|
finalData []map[string]interface{}
|
|
database crondbmodel.CronDb
|
|
)
|
|
readRes, readDuration, err := TestRead(&backend.CmdTestReadRequest{
|
|
ReadDbId: request.ReadDbId,
|
|
ExecuteRead: request.ExecuteRead,
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(readRes) == 0 {
|
|
return 0, err
|
|
}
|
|
//过滤数据
|
|
finalData, err = excute.FilterReadData(&readRes, request.MatchJson)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
//执行
|
|
start := time.Now()
|
|
exists, err := crondbmodel.GetInstance().GetDb().ID(request.WriteDbId).Get(&database)
|
|
if !exists {
|
|
return 0, fmt.Errorf("数据库不存在")
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
db, err := excute.NewExecuteDb(database.Source, database.DbName)
|
|
dbConn, _ := db.Db.DB()
|
|
defer dbConn.Close()
|
|
_, err = db.ExecuteWriteV2(finalData, request.ExecuteWrite)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// 记录结束时间
|
|
end := time.Now()
|
|
// 计算执行时间
|
|
duration := end.Sub(start)
|
|
time_duration = duration.Seconds() + readDuration
|
|
return
|
|
}
|
|
|
|
func ListToRep(list []croncmdmodel.CronCmd) (cmdList []backend.CmdListResponse, err error) {
|
|
var (
|
|
userIdList []int
|
|
dbIdList []int
|
|
channelList []int
|
|
)
|
|
for _, v := range list {
|
|
if len(v.UserIds) != 0 {
|
|
for _, v := range strings.Split(v.UserIds, ",") {
|
|
intVal, _ := strconv.Atoi(v)
|
|
userIdList = append(userIdList, intVal)
|
|
}
|
|
|
|
}
|
|
if v.WriteDbId != 0 {
|
|
dbIdList = append(dbIdList, v.WriteDbId)
|
|
}
|
|
if v.ReadDbId != 0 {
|
|
dbIdList = append(dbIdList, v.ReadDbId)
|
|
}
|
|
if v.ReportChannelId != 0 {
|
|
channelList = append(channelList, v.ReportChannelId)
|
|
}
|
|
}
|
|
|
|
dbMap, err := repository.InAndToMap[crondbmodel.CronDb, int](crondbmodel.GetInstance().GetDb(), "db_id", dbIdList)
|
|
userMap, err := repository.InAndToMap[cronusermodel.CronUser, int](cronusermodel.GetInstance().GetDb(), "user_id", userIdList)
|
|
channelMap, err := repository.InAndToMap[cronreportchannelmodel.CronReportChannel, int](cronreportchannelmodel.GetInstance().GetDb(), "report_channel_id", channelList)
|
|
cmdList = make([]backend.CmdListResponse, len(list))
|
|
linq.From(list).SelectT(func(in croncmdmodel.CronCmd) (d backend.CmdListResponse) {
|
|
d.ResponseFromDb(in)
|
|
if in.WriteDbId != 0 {
|
|
if _, exist := dbMap[in.WriteDbId]; exist {
|
|
d.WriteDb = dbMap[in.WriteDbId]
|
|
d.WriteDb.Source = ""
|
|
} else {
|
|
d.WriteDb = nil
|
|
}
|
|
|
|
}
|
|
if in.ReadDbId != 0 {
|
|
if _, exist := dbMap[in.ReadDbId]; exist {
|
|
d.ReadDb = dbMap[in.ReadDbId]
|
|
d.ReadDb.Source = ""
|
|
} else {
|
|
d.ReadDb = nil
|
|
}
|
|
|
|
}
|
|
if in.ReportChannelId != 0 {
|
|
if _, exist := channelMap[in.ReportChannelId]; exist {
|
|
d.ReportChannel = channelMap[in.ReportChannelId]
|
|
} else {
|
|
d.ReportChannel = nil
|
|
}
|
|
|
|
}
|
|
if len(in.UserIds) != 0 {
|
|
for _, v := range strings.Split(in.UserIds, ",") {
|
|
intVal, _ := strconv.Atoi(v)
|
|
d.Users = append(d.Users, userMap[intVal])
|
|
}
|
|
}
|
|
return d
|
|
}).ToSlice(&cmdList)
|
|
return
|
|
}
|