@@ -18,6 +18,7 @@ import (
18
18
"context"
19
19
"encoding/json"
20
20
"fmt"
21
+ "math"
21
22
"slices"
22
23
"strconv"
23
24
"strings"
@@ -41,12 +42,12 @@ import (
41
42
handle_metrics "github.com/pingcap/tidb/statistics/handle/metrics"
42
43
"github.com/pingcap/tidb/table"
43
44
"github.com/pingcap/tidb/types"
44
- "github.com/pingcap/tidb/util"
45
45
"github.com/pingcap/tidb/util/chunk"
46
46
"github.com/pingcap/tidb/util/logutil"
47
47
"github.com/pingcap/tidb/util/mathutil"
48
48
"github.com/pingcap/tidb/util/sqlexec"
49
49
"github.com/pingcap/tidb/util/syncutil"
50
+ "github.com/tiancaiamao/gp"
50
51
"github.com/tikv/client-go/v2/oracle"
51
52
atomic2 "go.uber.org/atomic"
52
53
"go.uber.org/zap"
@@ -62,6 +63,9 @@ const (
62
63
63
64
// Handle can update stats info periodically.
64
65
type Handle struct {
66
+ // this gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
67
+ gpool * gp.Pool
68
+
65
69
pool sessionPool
66
70
67
71
// initStatsCtx is the ctx only used for initStats
@@ -483,6 +487,7 @@ type sessionPool interface {
483
487
func NewHandle (ctx , initStatsCtx sessionctx.Context , lease time.Duration , pool sessionPool , tracker sessionctx.SysProcTracker , autoAnalyzeProcIDGetter func () uint64 ) (* Handle , error ) {
484
488
cfg := config .GetGlobalConfig ()
485
489
handle := & Handle {
490
+ gpool : gp .New (math .MaxInt16 , time .Minute ),
486
491
ddlEventCh : make (chan * ddlUtil.Event , 1000 ),
487
492
listHead : & SessionStatsCollector {mapper : make (tableDeltaMap ), rateMap : make (errorRateDeltaMap )},
488
493
idxUsageListHead : & SessionIndexUsageCollector {mapper : make (indexUsageMap )},
@@ -857,7 +862,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
857
862
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
858
863
var popedTopN []statistics.TopNMeta
859
864
wrapper := statistics .NewStatsWrapper (allHg [i ], allTopN [i ])
860
- globalStats .TopN [i ], popedTopN , allHg [i ], err = mergeGlobalStatsTopN (sc , wrapper , sc .GetSessionVars ().StmtCtx .TimeZone , sc .GetSessionVars ().AnalyzeVersion , uint32 (opts [ast .AnalyzeOptNumTopN ]), isIndex == 1 )
865
+ globalStats .TopN [i ], popedTopN , allHg [i ], err = mergeGlobalStatsTopN (h . gpool , sc , wrapper , sc .GetSessionVars ().StmtCtx .TimeZone , sc .GetSessionVars ().AnalyzeVersion , uint32 (opts [ast .AnalyzeOptNumTopN ]), isIndex == 1 )
861
866
if err != nil {
862
867
return
863
868
}
@@ -889,7 +894,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
889
894
return
890
895
}
891
896
892
- func mergeGlobalStatsTopN (sc sessionctx.Context , wrapper * statistics.StatsWrapper ,
897
+ func mergeGlobalStatsTopN (gp * gp. Pool , sc sessionctx.Context , wrapper * statistics.StatsWrapper ,
893
898
timeZone * time.Location , version int , n uint32 , isIndex bool ) (* statistics.TopN ,
894
899
[]statistics.TopNMeta , []* statistics.Histogram , error ) {
895
900
mergeConcurrency := sc .GetSessionVars ().AnalyzePartitionMergeConcurrency
@@ -904,14 +909,14 @@ func mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrappe
904
909
} else if batchSize > MaxPartitionMergeBatchSize {
905
910
batchSize = MaxPartitionMergeBatchSize
906
911
}
907
- return MergeGlobalStatsTopNByConcurrency (mergeConcurrency , batchSize , wrapper , timeZone , version , n , isIndex , killed )
912
+ return MergeGlobalStatsTopNByConcurrency (gp , mergeConcurrency , batchSize , wrapper , timeZone , version , n , isIndex , killed )
908
913
}
909
914
910
915
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
911
916
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
912
917
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
913
918
// the partition size for each worker to solve it
914
- func MergeGlobalStatsTopNByConcurrency (mergeConcurrency , mergeBatchSize int , wrapper * statistics.StatsWrapper ,
919
+ func MergeGlobalStatsTopNByConcurrency (gp * gp. Pool , mergeConcurrency , mergeBatchSize int , wrapper * statistics.StatsWrapper ,
915
920
timeZone * time.Location , version int , n uint32 , isIndex bool , killed * uint32 ) (* statistics.TopN ,
916
921
[]statistics.TopNMeta , []* statistics.Histogram , error ) {
917
922
if len (wrapper .AllTopN ) < mergeConcurrency {
@@ -927,13 +932,15 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra
927
932
tasks = append (tasks , task )
928
933
start = end
929
934
}
930
- var wg util. WaitGroupWrapper
935
+ var wg sync. WaitGroup
931
936
taskNum := len (tasks )
932
937
taskCh := make (chan * statistics.TopnStatsMergeTask , taskNum )
933
938
respCh := make (chan * statistics.TopnStatsMergeResponse , taskNum )
934
939
for i := 0 ; i < mergeConcurrency ; i ++ {
935
940
worker := statistics .NewTopnStatsMergeWorker (taskCh , respCh , wrapper , killed )
936
- wg .Run (func () {
941
+ wg .Add (1 )
942
+ gp .Go (func () {
943
+ defer wg .Done ()
937
944
worker .Run (timeZone , isIndex , n , version )
938
945
})
939
946
}
@@ -2322,4 +2329,5 @@ func (h *Handle) SetStatsCacheCapacity(c int64) {
2322
2329
// Close stops the background
2323
2330
func (h * Handle ) Close () {
2324
2331
h .statsCache .Load ().Close ()
2332
+ h .gpool .Close ()
2325
2333
}
0 commit comments