diff --git a/server/internal/exporter/stream.go b/server/internal/exporter/stream.go index 32d598c..8565d54 100644 --- a/server/internal/exporter/stream.go +++ b/server/internal/exporter/stream.go @@ -7,6 +7,7 @@ import ( "server/internal/schema" "server/internal/utils" "strings" + "sync" "time" ) @@ -228,15 +229,32 @@ func CountRowsFastChunked(db *sql.DB, ds, main string, filters map[string]interf }) } + // 并行查询所有分片,最多10个并发 var total int64 + var mu sync.Mutex + var wg sync.WaitGroup + semaphore := make(chan struct{}, 10) // 控制最多10个并发查询 + for _, rg := range ranges { - fl := map[string]interface{}{} - for k, v := range filters { - fl[k] = v - } - fl["create_time_between"] = []string{rg[0], rg[1]} - total += CountRowsFast(db, ds, main, fl) + wg.Add(1) + go func(rangeVal [2]string) { + defer wg.Done() + semaphore <- struct{}{} // 获取信号量 + defer func() { <-semaphore }() // 释放信号量 + + fl := map[string]interface{}{} + for k, v := range filters { + fl[k] = v + } + fl["create_time_between"] = []string{rangeVal[0], rangeVal[1]} + count := CountRowsFast(db, ds, main, fl) + + mu.Lock() + total += count + mu.Unlock() + }(rg) } + wg.Wait() return total }