ai_scheduler/internal/server/cron.go

99 lines
2.2 KiB
Go

package server
import (
"ai_scheduler/internal/services"
"context"
"github.com/gofiber/fiber/v2/log"
"github.com/robfig/cron/v3"
)
type CronServer struct {
Cron *cron.Cron
jobs []*cronJob
log log.AllLogger
cronService *services.CronService
ctx context.Context
}
type cronJob struct {
EntryId int32
Func func(context.Context) error
Name string
Schedule string
}
func NewCronServer(
log log.AllLogger,
cronService *services.CronService,
) *CronServer {
return &CronServer{
Cron: cron.New(),
log: log,
cronService: cronService,
ctx: context.Background(),
}
}
func (c *CronServer) InitJobs(ctx context.Context) {
// 创建一个可用于所有定时任务的上下文(可以取消的上下文)
c.ctx = ctx
c.jobs = []*cronJob{
{
Func: c.cronService.CronReportSendDingTalk,
Name: "直连天下报表推送",
Schedule: "20 12,18,23 * * *",
},
{
Func: c.cronService.CronReportSendQywx,
Name: "直连天下报表推送",
Schedule: "20 12,18,23 * * *",
},
}
}
func (c *CronServer) Run(ctx context.Context) {
// 先初始化任务
if c.jobs == nil {
c.InitJobs(ctx)
}
for i, job := range c.jobs {
// 复制变量到闭包内,避免闭包变量捕获问题
job := job
jobID := i + 1
_, err := c.Cron.AddFunc(job.Schedule, func() {
c.log.Infof("任务[%d]:%s开始执行", jobID, job.Name)
defer func() {
if r := recover(); r != nil {
c.log.Errorf("任务[%d]:%s执行时发生panic: %v", jobID, job.Name, r)
}
c.log.Infof("任务[%d]:%s执行结束", jobID, job.Name)
}()
//c.log.Infof("任务[%d]:%s执ddd", jobID, job.Name)
//为每次执行创建新的上下文
ctx := context.Background()
err := job.Func(ctx)
if err != nil {
c.log.Errorf("任务[%d]:%s执行失败: %s", jobID, job.Name, err.Error())
}
})
if err != nil {
c.log.Errorf("添加任务失败:%s", err.Error())
}
}
// 启动cron调度器
c.Cron.Start()
c.log.Info("Cron调度器已启动")
}
// Stop 停止cron调度器
func (c *CronServer) Stop() {
if c.Cron != nil {
c.Cron.Stop()
c.log.Info("Cron调度器已停止")
}
}