Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,8 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
statsConcurrncy, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
// subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would
// report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test
// case with `-race` flag now.
wg := util.NewWaitGroupPool(gp)
wg.Run(func() {
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, statsConcurrncy)
})
defer wg.Wait()

e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
Expand Down Expand Up @@ -439,7 +426,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}

// handleNDVForSpecialIndexes deals with the logic to analyze the index containing the virtual column when the mode is full sampling.
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, statsConcurrncy int) {
func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.IndexInfo, totalResultCh chan analyzeIndexNDVTotalResult, samplingStatsConcurrency int) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze ndv for special index panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -455,12 +442,12 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
AddNewAnalyzeJob(e.ctx, task.job)
}
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
if len(tasks) < statsConcurrncy {
statsConcurrncy = len(tasks)
if len(tasks) < samplingStatsConcurrency {
samplingStatsConcurrency = len(tasks)
}
var subIndexWorkerWg = NewAnalyzeResultsNotifyWaitGroupWrapper(resultsCh)
subIndexWorkerWg.Add(statsConcurrncy)
for i := 0; i < statsConcurrncy; i++ {
subIndexWorkerWg.Add(samplingStatsConcurrency)
for range samplingStatsConcurrency {
subIndexWorkerWg.Run(func() { e.subIndexWorkerForNDV(taskCh, resultsCh) })
}
for _, task := range tasks {
Expand All @@ -473,7 +460,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
}
var err error
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for panicCnt < statsConcurrncy {
for panicCnt < samplingStatsConcurrency {
results, ok := <-resultsCh
if !ok {
break
Expand Down