diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index f941997..66e6bfb 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -10,6 +10,7 @@ import ( "server/internal/db" "server/internal/logging" "server/internal/repo" + "sync" "time" ) @@ -35,13 +36,15 @@ func main() { if err != nil { log.Fatal(err) } - // apply pool settings from env for YMT + // apply default pool settings, can be overridden by env + applyDefaultPool(ymt) db.ApplyPoolFromEnv(ymt, "YMT_DB_") marketing, err := db.ConnectMySQL(cfg.MarketingDB.DSN()) if err != nil { log.Fatal(err) } - // apply pool settings from env for Marketing + // apply default pool settings, can be overridden by env + applyDefaultPool(marketing) db.ApplyPoolFromEnv(marketing, "MARKETING_DB_") // Marketing Authorization DB for creators var marketingAuth *sql.DB @@ -50,6 +53,7 @@ func main() { if err != nil { log.Fatal(err) } + applyDefaultPool(marketingAuth) db.ApplyPoolFromEnv(marketingAuth, "MARKETING_AUTH_DB_") log.Println("connecting Marketing Authorization MySQL:", cfg.MarketingAuthorizationDB.Host+":"+cfg.MarketingAuthorizationDB.Port, "db", cfg.MarketingAuthorizationDB.Name) } else { @@ -60,6 +64,7 @@ func main() { log.Fatal(err) } // apply pool settings from env for Meta (templates/jobs) + applyDefaultPool(meta) db.ApplyPoolFromEnv(meta, "YMT_TEST_DB_") // Marketing Reseller DB var resellerDB *sql.DB @@ -91,7 +96,7 @@ func main() { log.Fatal(srv.ListenAndServe()) } -// recoverRunningJobs 恢复服务中断前未完成的导出任务 +// recoverRunningJobs 恢复服务中断前未完成的导出任务(最多并发3个) func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) { rrepo := repo.NewExportRepo() @@ -112,16 +117,34 @@ func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) // 创建一个 ExportsAPI 实例用于恢复任务 exportsAPI := &api.ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt} + // 使用信号量限制并发数(最多3个) + semaphore := make(chan struct{}, 3) + var wg sync.WaitGroup + // 启动每个任务的恢复 for _, job := range jobs { - jobID := job.ID - log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID) - // 通过 goroutine 并行启动任务恢复 - go exportsAPI.RunJobByID(jobID) + wg.Add(1) + go func(jobID uint64) { + defer wg.Done() + semaphore <- struct{}{} // 获取信号量 + defer func() { <-semaphore }() // 释放信号量 + + log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID) + exportsAPI.RunJobByID(jobID) + }(job.ID) } - // 给予一点时间让 goroutine 启动 - time.Sleep(1 * time.Second) + // 等待所有任务启动完成 + wg.Wait() + log.Println("[任务恢复] 所有任务已启动") } // buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN() + +// applyDefaultPool 应用连接池默认值 +func applyDefaultPool(db *sql.DB) { + db.SetMaxOpenConns(25) // 最多25个开放连接 + db.SetMaxIdleConns(5) // 保持5个空闲连接 + db.SetConnMaxLifetime(time.Hour) // 连接最长生存时间1小时 + db.SetConnMaxIdleTime(10 * time.Minute) // 连接空闲10分钟后关闭 +}