ai_scheduler/internal/server/cron.go

139 lines
3.0 KiB
Go

package server
import (
"ai_scheduler/internal/services"
"context"
"fmt"
"github.com/gofiber/fiber/v2/log"
"github.com/robfig/cron/v3"
)
type CronServer struct {
Cron *cron.Cron
jobs []*cronJob
log log.AllLogger
cronService *services.CronService
ctx context.Context
}
type cronJob struct {
EntryId int32
Func func(context.Context) error
Name string
Key string
Schedule string
}
func NewCronServer(
log log.AllLogger,
cronService *services.CronService,
) *CronServer {
return &CronServer{
Cron: cron.New(),
log: log,
cronService: cronService,
ctx: context.Background(),
}
}
func (c *CronServer) InitJobs(ctx context.Context) {
// 创建一个可用于所有定时任务的上下文(可以取消的上下文)
c.ctx = ctx
c.jobs = []*cronJob{
{
Func: c.cronService.CronReportSendDingTalk,
Name: "直连天下报表推送(钉钉)",
Key: "ding_report_dingtalk",
Schedule: "20 12,18,23 * * *",
},
{
Func: c.cronService.CronReportSendQywx,
Name: "直连天下报表推送(微信)",
Key: "ding_report_qywx",
Schedule: "20 12,18,23 * * *",
},
}
}
func (c *CronServer) Run(ctx context.Context) {
// 先初始化任务
if c.jobs == nil {
c.InitJobs(ctx)
}
for i, job := range c.jobs {
// 复制变量到闭包内,避免闭包变量捕获问题
job := job
jobID := i + 1
_, err := c.Cron.AddFunc(job.Schedule, func() {
c.log.Infof("任务[%d]:%s开始执行", jobID, job.Name)
defer func() {
if r := recover(); r != nil {
c.log.Errorf("任务[%d]:%s执行时发生panic: %v", jobID, job.Name, r)
}
c.log.Infof("任务[%d]:%s执行结束", jobID, job.Name)
}()
//c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name)
//为每次执行创建新的上下文
ctx := context.Background()
err := job.Func(ctx)
if err != nil {
c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error())
}
})
if err != nil {
c.log.Errorf("添加任务失败:%s", err.Error())
}
}
// 启动cron调度器
c.Cron.Start()
c.log.Info("Cron调度器已启动")
}
// Stop 停止cron调度器
func (c *CronServer) Stop() {
if c.Cron != nil {
c.Cron.Stop()
c.log.Info("Cron调度器已停止")
}
}
func (c *CronServer) RunOnce(ctx context.Context, key string) error {
if c.jobs == nil {
c.InitJobs(ctx)
}
// 获取key对应的任务
var job *cronJob
for _, j := range c.jobs {
if j.Key == key {
job = j
break
}
}
if job == nil {
return fmt.Errorf("unknown job key: %s\n", key)
}
defer func() {
if r := recover(); r != nil {
fmt.Printf("任务[once]:%s执行时发生panic: %v\n", job.Name, r)
}
fmt.Printf("任务[once]:%s执行结束\n", job.Name)
}()
fmt.Printf("任务[once]:%s开始执行\n", job.Name)
err := job.Func(ctx)
if err != nil {
return fmt.Errorf("任务[once]:%s执行失败: %s\n", job.Name, err.Error())
}
fmt.Printf("任务[once]:%s执行成功\n", job.Name)
return nil
}