Compare commits

...

2 Commits

Author SHA1 Message Date
renzhiyuan cdbba6b3c0 Merge remote-tracking branch 'origin/main' into main
# Conflicts:
#	app/services/db_service/db_service.go
2024-11-29 14:57:43 +08:00
renzhiyuan 96ad17b6c1 任务模块 2024-11-29 14:56:55 +08:00
12 changed files with 323 additions and 55 deletions

View File

@ -10,3 +10,10 @@ const (
STATUS_DISABLE = 2
STATUS_ENABLE = 1
)
const (
CMD_STATUS_WAIT = 1
CMD_STATUS_RUN = 2
CMD_STATUS_STOP = 3
CMD_STATUS_FAIL = 4
)

View File

@ -4,9 +4,7 @@ import (
"cron_admin/app/constants/errorcode"
"cron_admin/app/http/controllers"
"cron_admin/app/http/entities/backend"
"cron_admin/app/models/croncmdmodel"
cmd_services "cron_admin/app/services/cmd_service"
"github.com/ahmetb/go-linq/v3"
"github.com/gin-gonic/gin"
)
@ -16,33 +14,37 @@ func CmdList(c *gin.Context) {
if err != nil {
controllers.HandRes(c, nil, errorcode.NotFound)
} else {
var (
cmdList = make([]backend.CmdListResponse, 0)
//userIdList []int
)
linq.From(list).SelectT(func(in croncmdmodel.CronCmd) (d backend.CmdListResponse) {
d.ResponseFromDb(in)
return d
}).ToSlice(&cmdList)
cmdList, err := cmd_services.ListToRep(list)
controllers.HandRes(c, gin.H{"data": cmdList, "count": count}, err)
}
}
func CmdAdd(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdAddRequest)
err := cmd_services.UserAdd(request)
err := cmd_services.Add(request)
controllers.HandRes(c, nil, err)
}
func CmdEdit(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdEditRequest)
err := cmd_services.UserEdit(request)
err := cmd_services.Edit(request)
controllers.HandRes(c, nil, err)
}
func CmdDel(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdDeleteRequest)
err := cmd_services.UserDel(request)
err := cmd_services.Del(request)
controllers.HandRes(c, nil, err)
}
func CmdStop(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdStopRequest)
err := cmd_services.Stop(request)
controllers.HandRes(c, nil, err)
}
func CmdStart(c *gin.Context) {
request := controllers.GetRequest(c).(*backend.CmdStartRequest)
err := cmd_services.Start(request)
controllers.HandRes(c, nil, err)
}

View File

@ -2,6 +2,9 @@ package backend
import (
"cron_admin/app/models/croncmdmodel"
"cron_admin/app/models/crondbmodel"
"cron_admin/app/models/cronreportchannelmodel"
"cron_admin/app/models/cronusermodel"
"time"
)
@ -19,28 +22,28 @@ type CmdInfoRequest struct {
}
type CmdListResponse struct {
CmdId int `json:"cmd_id"`
CmdName string `json:"cmd_name"`
UserIds string `json:"user_ids"`
EntryId int `json:"entry_id"`
ReadDbId int `json:"read_db_id"`
WriteDbId int `json:"write_db_id"`
ExecuteType int `json:"execute_type"`
ExecuteRead string `json:"execute_read"`
ExecuteWrite string `json:"execute_write"`
Cron string `json:"cron"`
MatchJson string `json:"match_json"`
SendTimeType int `json:"send_time_type"`
SendLimit int `json:"send_limit"`
ReportChannelId int `json:"report_channel_id"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
Status int `json:"status"`
FailReason string `json:"fail_reason"`
Users []*UserListResponse `json:"users"`
ReadDb DbListResponse `json:"read_db"`
WriteDb DbListResponse `json:"write_db"`
ReportChannel ReportChannelListResponse `json:"report_channel"`
CmdId int `json:"cmd_id"`
CmdName string `json:"cmd_name"`
UserIds string `json:"user_ids"`
EntryId int `json:"entry_id"`
ReadDbId int `json:"read_db_id"`
WriteDbId int `json:"write_db_id"`
ExecuteType int `json:"execute_type"`
ExecuteRead string `json:"execute_read"`
ExecuteWrite string `json:"execute_write"`
Cron string `json:"cron"`
MatchJson string `json:"match_json"`
SendTimeType int `json:"send_time_type"`
SendLimit int `json:"send_limit"`
ReportChannelId int `json:"report_channel_id"`
CreateTime string `json:"create_time"`
UpdateTime string `json:"update_time"`
Status int `json:"status"`
FailReason string `json:"fail_reason"`
Users []*cronusermodel.CronUser `json:"users"`
ReadDb *crondbmodel.CronDb `json:"read_db"`
WriteDb *crondbmodel.CronDb `json:"write_db"`
ReportChannel *cronreportchannelmodel.CronReportChannel `json:"report_channel"`
}
type CmdAddRequest struct {
@ -75,7 +78,15 @@ type CmdEditRequest struct {
}
type CmdDeleteRequest struct {
CmdId int `json:"user_id" validate:"required" form:"user_id" example:""`
CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""`
}
type CmdStartRequest struct {
CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""`
}
type CmdStopRequest struct {
CmdId int `json:"cmd_id" validate:"required" form:"cmd_id" example:""`
}
func (response *CmdListResponse) ResponseFromDb(l croncmdmodel.CronCmd) {

View File

@ -17,6 +17,9 @@ var BackendRequestMap = map[string]func() interface{}{
common.ADMIN_OAUTH_V1 + "/user/edit": func() interface{} {
return new(backend.UserEditRequest)
},
common.ADMIN_OAUTH_V1 + "/user/del": func() interface{} {
return new(backend.UserDeleteRequest)
},
common.ADMIN_OAUTH_V1 + "/sql/list": func() interface{} { return new(backend.DbListRequest) },
common.ADMIN_OAUTH_V1 + "/sql/add": func() interface{} { return new(backend.DbAddRequest) },
@ -33,4 +36,12 @@ var BackendRequestMap = map[string]func() interface{}{
// 日志
common.ADMIN_OAUTH_V1 + "/log/cmd/list": func() interface{} { return new(backend.CronFuncLogsListRequest) },
common.ADMIN_OAUTH_V1 + "/log/mes/list": func() interface{} { return new(backend.CronReportLogsListRequest) },
//任务
common.ADMIN_OAUTH_V1 + "/cmd/list": func() interface{} { return new(backend.CmdListRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/add": func() interface{} { return new(backend.CmdAddRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/edit": func() interface{} { return new(backend.CmdEditRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/del": func() interface{} { return new(backend.CmdDeleteRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/start": func() interface{} { return new(backend.CmdStartRequest) },
common.ADMIN_OAUTH_V1 + "/cmd/stop": func() interface{} { return new(backend.CmdStopRequest) },
}

View File

@ -49,10 +49,12 @@ func RegisterAdminRoute(router *gin.Engine) {
//任务
cmd := v1.Group("/cmd")
{
cmd.GET("/query", backend.Empty)
cmd.GET("/info", backend.Empty)
cmd.PUT("/update", backend.Empty)
cmd.DELETE("/delete", backend.Empty)
cmd.POST("/list", backend.CmdList)
cmd.POST("/add", backend.CmdAdd)
cmd.POST("/edit", backend.CmdEdit)
cmd.DELETE("/del", backend.CmdDel)
cmd.POST("/start", backend.CmdStart)
cmd.POST("/stop", backend.CmdStop)
}
//消息管理
mes := v1.Group("/channel")

View File

@ -1,11 +1,15 @@
package models
import (
"cron_admin/app/models/croncmdmodel"
"cron_admin/app/models/crondbmodel"
"cron_admin/app/models/cronreportchannelmodel"
"cron_admin/app/models/cronusermodel"
)
type PO interface {
cronreportchannelmodel.CronReportChannel |
cronusermodel.CronUser
cronusermodel.CronUser |
crondbmodel.CronDb |
croncmdmodel.CronCmd
}

View File

@ -13,7 +13,7 @@ var (
// 实体
type CronCmd struct {
CmdId int `xorm:"'cmd_id' UNSIGNED INT"`
CmdId int `xorm:"'cmd_id' UNSIGNED INT pk autoincr""`
CmdName string `xorm:"'cmd_name' varchar(20)"`
UserIds string `xorm:"'user_ids' varchar(50)"`
EntryId int `xorm:"'entry_id' int(10)"`
@ -27,11 +27,11 @@ type CronCmd struct {
SendTimeType int `xorm:"'send_time_type' TINYINT"`
SendLimit int `xorm:"'send_limit' SMALLINT"`
ReportChannelId int `xorm:"'report_channel_id' int(11)"`
CreateTime *time.Time `xorm:"'create_time' datetime"`
UpdateTime time.Time `xorm:"'update_time' timestamp"`
CreateTime *time.Time `xorm:"'create_time' created datetime"`
UpdateTime time.Time `xorm:"'update_time' updated timestamp"`
DeletedTime time.Time `xorm:"'deleted_time' deleted datetime"`
Status int `xorm:"'status' TINYINT"`
FailReason string `xorm:"'fail_reason' varchar(200)"`
DeletedTime time.Time `xorm:"'deleted_time' datetime"`
}
// 表名

View File

@ -3,10 +3,12 @@ package repository
import (
"cron_admin/app/http/entities"
"cron_admin/app/models"
"cron_admin/app/utils"
"cron_admin/app/utils/mapstructure"
"github.com/pkg/errors"
"github.com/qit-team/snow-core/db"
"time"
"xorm.io/builder"
"xorm.io/xorm"
)
@ -14,6 +16,10 @@ type CommonRepo[P models.PO] struct {
repo *xorm.Session
}
type ExtraCommonRepo[P models.PO, T any] struct {
repo *xorm.Session
}
type ICommonRepo[P models.PO] interface {
GetSession() *xorm.Session
FindAll(list *[]P, opts ...DBOption) error
@ -88,9 +94,50 @@ func (this *CommonRepo[P]) InsertBatch(db *[]P, opts ...DBOption) (int64, error)
func getDb(repo *xorm.Session, opts ...DBOption) *xorm.Session {
if repo == nil {
repo = db.GetDb().NewSession()
}
for _, opt := range opts {
repo = opt(repo)
}
return repo
}
func InAndToMap[P models.PO, T string | int](engin *xorm.EngineGroup, key string, values []T) (map[T]*P, error) {
var (
results []*P
resultSlice []map[string]interface{}
)
cond := builder.NewCond()
cond = cond.And(builder.Eq{key: values})
err := engin.Where(cond).Find(&results)
if err != nil {
return nil, err
}
for _, v := range results {
var resultsm map[string]interface{}
mapstructure.Decode(v, &resultsm)
resultSlice = append(resultSlice, resultsm)
}
resultMap := make(map[T]*P, len(resultSlice))
for _, v := range resultSlice {
keyC := utils.SnakeToCamel(key)
if _, exist := v[keyC]; !exist {
resultMap = nil
break
}
if v[keyC] == nil || v[keyC] == "" {
continue
}
var vToP P
mapstructure.Decode(v, &vToP)
keyValue := v[keyC].(T)
resultMap[keyValue] = &vToP
}
return resultMap, nil
}

15
app/repository/cron_db.go Normal file
View File

@ -0,0 +1,15 @@
package repository
import (
"cron_admin/app/models/crondbmodel"
)
type Db struct {
ICommonRepo[crondbmodel.CronDb]
}
func NewDbRepo() *Db {
return &Db{
ICommonRepo: NewCommonRepo[crondbmodel.CronDb](),
}
}

View File

@ -4,8 +4,15 @@ import (
"cron_admin/app/constants/common"
"cron_admin/app/http/entities/backend"
"cron_admin/app/models/croncmdmodel"
"cron_admin/app/models/crondbmodel"
"cron_admin/app/models/cronreportchannelmodel"
"cron_admin/app/models/cronusermodel"
"cron_admin/app/repository"
"cron_admin/app/utils/mapstructure"
"fmt"
"github.com/ahmetb/go-linq/v3"
"strconv"
"strings"
"xorm.io/builder"
)
@ -19,7 +26,7 @@ func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count
cond = cond.And(builder.Eq{"status": request.Status})
}
if request.ExecuteType != 0 {
cond = cond.And(builder.Eq{"status": request.Status})
cond = cond.And(builder.Eq{"execute_type": request.ExecuteType})
}
if len(request.CmdIds) > 0 {
// 使用IN查询
@ -29,7 +36,7 @@ func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count
session := croncmdmodel.GetInstance().GetDb().Where(cond)
if page != 0 && limit != 0 {
session = session.Limit(page, (page-1)*limit)
session = session.Limit(limit, (page-1)*limit)
}
count, err = session.FindAndCount(&cmdListInfo)
@ -39,22 +46,128 @@ func GetListByWhere(request *backend.CmdListRequest, page int, limit int) (count
return
}
func UserAdd(request *backend.CmdAddRequest) (err error) {
func Add(request *backend.CmdAddRequest) (err error) {
var db croncmdmodel.CronCmd
_ = mapstructure.Decode(request, &db)
db.Status = common.STATUS_ENABLE
db.Status = common.CMD_STATUS_STOP
_, err = croncmdmodel.GetInstance().GetDb().InsertOne(db)
return
}
func UserEdit(request *backend.CmdEditRequest) (err error) {
func Edit(request *backend.CmdEditRequest) (err error) {
var db croncmdmodel.CronCmd
_ = mapstructure.Decode(request, &db)
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&db)
return
}
func UserDel(request *backend.CmdDeleteRequest) (err error) {
func Del(request *backend.CmdDeleteRequest) (err error) {
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Delete(&croncmdmodel.CronCmd{})
return
}
func Start(request *backend.CmdStartRequest) (err error) {
var data croncmdmodel.CronCmd
exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data)
if !exists {
return fmt.Errorf("数据不存在")
}
if err != nil {
return
}
if data.EntryId != 0 {
return fmt.Errorf("该任务还没有完全关闭,请稍后再试或联系管理员")
}
if data.Status == common.CMD_STATUS_WAIT {
return fmt.Errorf("任务已在启动中")
}
if data.Status == common.CMD_STATUS_RUN {
return fmt.Errorf("任务已启动,无法再次进行此操作")
}
data.Status = common.CMD_STATUS_WAIT
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&data)
return
}
func Stop(request *backend.CmdStopRequest) (err error) {
var data croncmdmodel.CronCmd
exists, err := croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Get(&data)
if !exists {
return fmt.Errorf("数据不存在")
}
if err != nil {
return
}
data.Status = common.CMD_STATUS_STOP
_, err = croncmdmodel.GetInstance().GetDb().ID(request.CmdId).Update(&data)
return
}
func ListToRep(list []croncmdmodel.CronCmd) (cmdList []backend.CmdListResponse, err error) {
var (
userIdList []int
dbIdList []int
channelList []int
)
for _, v := range list {
if len(v.UserIds) != 0 {
for _, v := range strings.Split(v.UserIds, ",") {
intVal, _ := strconv.Atoi(v)
userIdList = append(userIdList, intVal)
}
}
if v.WriteDbId != 0 {
dbIdList = append(dbIdList, v.WriteDbId)
}
if v.ReadDbId != 0 {
dbIdList = append(dbIdList, v.ReadDbId)
}
if v.ReportChannelId != 0 {
channelList = append(channelList, v.ReportChannelId)
}
}
dbMap, err := repository.InAndToMap[crondbmodel.CronDb, int](crondbmodel.GetInstance().GetDb(), "db_id", dbIdList)
userMap, err := repository.InAndToMap[cronusermodel.CronUser, int](cronusermodel.GetInstance().GetDb(), "user_id", userIdList)
channelMap, err := repository.InAndToMap[cronreportchannelmodel.CronReportChannel, int](cronreportchannelmodel.GetInstance().GetDb(), "channel_id", channelList)
cmdList = make([]backend.CmdListResponse, len(list))
linq.From(list).SelectT(func(in croncmdmodel.CronCmd) (d backend.CmdListResponse) {
d.ResponseFromDb(in)
if in.WriteDbId != 0 {
if _, exist := dbMap[in.WriteDbId]; exist {
d.WriteDb = dbMap[in.WriteDbId]
d.WriteDb.Source = ""
} else {
d.WriteDb = nil
}
}
if in.ReadDbId != 0 {
if _, exist := dbMap[in.ReadDbId]; exist {
d.ReadDb = dbMap[in.ReadDbId]
d.ReadDb.Source = ""
} else {
d.ReadDb = nil
}
}
if in.ReportChannelId != 0 {
if _, exist := channelMap[in.ReportChannelId]; exist {
d.ReportChannel = channelMap[in.ReportChannelId]
} else {
d.ReportChannel = nil
}
}
if len(in.UserIds) != 0 {
for _, v := range strings.Split(in.UserIds, ",") {
intVal, _ := strconv.Atoi(v)
d.Users = append(d.Users, userMap[intVal])
}
}
return d
}).ToSlice(&cmdList)
return
}

View File

@ -22,7 +22,7 @@ func GetListByWhere(request *backend.UserListRequest, page int, limit int) (coun
// 使用IN查询
cond = cond.And(builder.In("user_id", request.UserIds))
}
//model := repository.NewCommonRepo[userinfomodel.CronUserModel](userinfomodel.GetInstance().GetDb().NewSession())
session := cronusermodel.GetInstance().GetDb().Where(cond)
if page != 0 && limit != 0 {

View File

@ -410,3 +410,59 @@ func ParseToken(tokenString string) (*jwt.Token, *Claims, error) {
})
return token, Claims, err
}
// toMap 将结构体转换为map[string]interface{}
// StructToMap 将一个struct转换为map[string]interface{}
func StructToMap(obj interface{}) map[string]interface{} {
// 获取obj的类型
val := reflect.ValueOf(obj)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
// 确保obj是一个struct
if val.Kind() != reflect.Struct {
return nil
}
// 创建一个map来保存结果
data := make(map[string]interface{})
// 遍历struct的字段
for i := 0; i < val.NumField(); i++ {
// 获取字段的类型和值
valueField := val.Field(i)
typeField := val.Type().Field(i)
jsonTag := typeField.Tag.Get("json")
if idx := strings.Index(jsonTag, ","); idx != -1 {
// 如果有逗号,则取逗号之前的部分
jsonTag = jsonTag[:idx]
}
// 忽略未导出的字段(字段名首字母小写)
if !typeField.IsExported() {
continue
}
// 将字段名和值添加到map中
data[jsonTag] = valueField.Interface()
}
return data
}
// SnakeToCamel 将下划线命名法转换为驼峰命名法
func SnakeToCamel(s string) string {
// 用下划线分割字符串
parts := strings.Split(s, "_")
var camelCase string
for _, part := range parts {
// 将每个部分的首字母转换为大写,如果它不是第一个部分
part = strings.Title(strings.ToLower(part))
camelCase += part
}
return camelCase
}