refactor(server): 优化数据库连接池设置及任务恢复并发控制

- 新增 applyDefaultPool 函数统一设置数据库连接池默认参数
- 在主函数中各数据库连接初始化后调用 applyDefaultPool 进行默认配置
- recoverRunningJobs 中使用信号量限制任务恢复最大并发数为3
- 使用 WaitGroup 等待所有恢复任务启动完成后再继续执行
- 删除无用注释,提升代码可读性和并发性能
This commit is contained in:
zhouyonggao 2025-12-19 18:28:32 +08:00
parent b2901a9113
commit 56e298d79b
1 changed files with 32 additions and 9 deletions

View File

@ -10,6 +10,7 @@ import (
"server/internal/db" "server/internal/db"
"server/internal/logging" "server/internal/logging"
"server/internal/repo" "server/internal/repo"
"sync"
"time" "time"
) )
@ -35,13 +36,15 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// apply pool settings from env for YMT // apply default pool settings, can be overridden by env
applyDefaultPool(ymt)
db.ApplyPoolFromEnv(ymt, "YMT_DB_") db.ApplyPoolFromEnv(ymt, "YMT_DB_")
marketing, err := db.ConnectMySQL(cfg.MarketingDB.DSN()) marketing, err := db.ConnectMySQL(cfg.MarketingDB.DSN())
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// apply pool settings from env for Marketing // apply default pool settings, can be overridden by env
applyDefaultPool(marketing)
db.ApplyPoolFromEnv(marketing, "MARKETING_DB_") db.ApplyPoolFromEnv(marketing, "MARKETING_DB_")
// Marketing Authorization DB for creators // Marketing Authorization DB for creators
var marketingAuth *sql.DB var marketingAuth *sql.DB
@ -50,6 +53,7 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
applyDefaultPool(marketingAuth)
db.ApplyPoolFromEnv(marketingAuth, "MARKETING_AUTH_DB_") db.ApplyPoolFromEnv(marketingAuth, "MARKETING_AUTH_DB_")
log.Println("connecting Marketing Authorization MySQL:", cfg.MarketingAuthorizationDB.Host+":"+cfg.MarketingAuthorizationDB.Port, "db", cfg.MarketingAuthorizationDB.Name) log.Println("connecting Marketing Authorization MySQL:", cfg.MarketingAuthorizationDB.Host+":"+cfg.MarketingAuthorizationDB.Port, "db", cfg.MarketingAuthorizationDB.Name)
} else { } else {
@ -60,6 +64,7 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
// apply pool settings from env for Meta (templates/jobs) // apply pool settings from env for Meta (templates/jobs)
applyDefaultPool(meta)
db.ApplyPoolFromEnv(meta, "YMT_TEST_DB_") db.ApplyPoolFromEnv(meta, "YMT_TEST_DB_")
// Marketing Reseller DB // Marketing Reseller DB
var resellerDB *sql.DB var resellerDB *sql.DB
@ -91,7 +96,7 @@ func main() {
log.Fatal(srv.ListenAndServe()) log.Fatal(srv.ListenAndServe())
} }
// recoverRunningJobs 恢复服务中断前未完成的导出任务 // recoverRunningJobs 恢复服务中断前未完成的导出任务最多并发3个
func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) { func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) {
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
@ -112,16 +117,34 @@ func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB)
// 创建一个 ExportsAPI 实例用于恢复任务 // 创建一个 ExportsAPI 实例用于恢复任务
exportsAPI := &api.ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt} exportsAPI := &api.ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt}
// 使用信号量限制并发数最多3个
semaphore := make(chan struct{}, 3)
var wg sync.WaitGroup
// 启动每个任务的恢复 // 启动每个任务的恢复
for _, job := range jobs { for _, job := range jobs {
jobID := job.ID wg.Add(1)
log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID) go func(jobID uint64) {
// 通过 goroutine 并行启动任务恢复 defer wg.Done()
go exportsAPI.RunJobByID(jobID) semaphore <- struct{}{} // 获取信号量
defer func() { <-semaphore }() // 释放信号量
log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID)
exportsAPI.RunJobByID(jobID)
}(job.ID)
} }
// 给予一点时间让 goroutine 启动 // 等待所有任务启动完成
time.Sleep(1 * time.Second) wg.Wait()
log.Println("[任务恢复] 所有任务已启动")
} }
// buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN() // buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN()
// applyDefaultPool 应用连接池默认值
func applyDefaultPool(db *sql.DB) {
db.SetMaxOpenConns(25) // 最多25个开放连接
db.SetMaxIdleConns(5) // 保持5个空闲连接
db.SetConnMaxLifetime(time.Hour) // 连接最长生存时间1小时
db.SetConnMaxIdleTime(10 * time.Minute) // 连接空闲10分钟后关闭
}