From b2901a91131436f999af2b6bf71e069e46466461 Mon Sep 17 00:00:00 2001 From: zhouyonggao <1971162852@qq.com> Date: Fri, 19 Dec 2025 18:21:28 +0800 Subject: [PATCH] =?UTF-8?q?perf(exporter):=20=E4=BC=98=E5=8C=96=E5=88=86?= =?UTF-8?q?=E7=89=87=E6=95=B0=E6=8D=AE=E7=BB=9F=E8=AE=A1=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 引入sync包实现并发控制 - 使用信号量限制最大并发数为10 - 针对每个时间范围启动goroutine并行执行查询 - 利用互斥锁保护total计数的安全更新 - 等待所有并发查询完成后返回总计数结果 --- server/internal/exporter/stream.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/server/internal/exporter/stream.go b/server/internal/exporter/stream.go index 32d598c..8565d54 100644 --- a/server/internal/exporter/stream.go +++ b/server/internal/exporter/stream.go @@ -7,6 +7,7 @@ import ( "server/internal/schema" "server/internal/utils" "strings" + "sync" "time" ) @@ -228,15 +229,32 @@ func CountRowsFastChunked(db *sql.DB, ds, main string, filters map[string]interf }) } + // 并行查询所有分片,最多10个并发 var total int64 + var mu sync.Mutex + var wg sync.WaitGroup + semaphore := make(chan struct{}, 10) // 控制最多10个并发查询 + for _, rg := range ranges { - fl := map[string]interface{}{} - for k, v := range filters { - fl[k] = v - } - fl["create_time_between"] = []string{rg[0], rg[1]} - total += CountRowsFast(db, ds, main, fl) + wg.Add(1) + go func(rangeVal [2]string) { + defer wg.Done() + semaphore <- struct{}{} // 获取信号量 + defer func() { <-semaphore }() // 释放信号量 + + fl := map[string]interface{}{} + for k, v := range filters { + fl[k] = v + } + fl["create_time_between"] = []string{rangeVal[0], rangeVal[1]} + count := CountRowsFast(db, ds, main, fl) + + mu.Lock() + total += count + mu.Unlock() + }(rg) } + wg.Wait() return total }