Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
statsConcurrncy, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
<<<<<<< HEAD
// subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would
// report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test
// case with `-race` flag now.
Expand All @@ -105,6 +101,10 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
defer wg.Wait()

count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
=======
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
>>>>>>> 1c05c7fe383 (planner: avoid exceeding the configured concurrency limit (#61786))
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
Expand Down Expand Up @@ -432,7 +432,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}

// handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling.
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, statsConcurrncy int) {
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -448,12 +448,17 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
AddNewAnalyzeJob(e.ctx, task.job)
}
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
if len(tasks) < statsConcurrncy {
statsConcurrncy = len(tasks)
if len(tasks) < samplingStatsConcurrency {
samplingStatsConcurrency = len(tasks)
}
var subIndexWorkerWg = NewAnalyzeResultsNotifyWaitGroupWrapper(resultsCh)
<<<<<<< HEAD
subIndexWorkerWg.Add(statsConcurrncy)
for i := 0; i < statsConcurrncy; i++ {
=======
subIndexWorkerWg.Add(samplingStatsConcurrency)
for range samplingStatsConcurrency {
>>>>>>> 1c05c7fe383 (planner: avoid exceeding the configured concurrency limit (#61786))
subIndexWorkerWg.Run(func() { e.subIndexWorkerForNDV(taskCh, resultsCh) })
}
for _, task := range tasks {
Expand All @@ -465,7 +470,12 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)),
}
var err error
<<<<<<< HEAD
for panicCnt < statsConcurrncy {
=======
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for panicCnt < samplingStatsConcurrency {
>>>>>>> 1c05c7fe383 (planner: avoid exceeding the configured concurrency limit (#61786))
results, ok := <-resultsCh
if !ok {
break
Expand Down