Skip to content

Commit d74298c

Browse files
authored
statstics: correctly handle error when merging global stats (#47770)
close #47771
1 parent 90bd2dd commit d74298c

File tree

3 files changed

+112
-29
lines changed

3 files changed

+112
-29
lines changed

pkg/statistics/handle/globalstats/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ go_test(
4040
"topn_bench_test.go",
4141
],
4242
flaky = True,
43-
shard_count = 14,
43+
shard_count = 18,
4444
deps = [
4545
":globalstats",
4646
"//pkg/config",

pkg/statistics/handle/globalstats/global_stats_async.go

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
3232
"github.com/pingcap/tidb/pkg/statistics/handle/util"
3333
"github.com/pingcap/tidb/pkg/types"
34+
"github.com/pingcap/tidb/pkg/util/logutil"
35+
"go.uber.org/zap"
3436
"golang.org/x/sync/errgroup"
3537
)
3638

@@ -79,8 +81,11 @@ type AsyncMergePartitionStats2GlobalStats struct {
7981
PartitionDefinition map[int64]model.PartitionDefinition
8082
tableInfo map[int64]*model.TableInfo
8183
// key is partition id and histID
82-
skipPartition map[skipItem]struct{}
83-
exitWhenErrChan chan struct{}
84+
skipPartition map[skipItem]struct{}
85+
// ioWorker meet error, it will close this channel to notify cpuWorker.
86+
ioWorkerExitWhenErrChan chan struct{}
87+
// cpuWorker exit, it will close this channel to notify ioWorker.
88+
cpuWorkerExitChan chan struct{}
8489
globalTableInfo *model.TableInfo
8590
histIDs []int64
8691
globalStatsNDV []int64
@@ -97,20 +102,21 @@ func NewAsyncMergePartitionStats2GlobalStats(
97102
is infoschema.InfoSchema) (*AsyncMergePartitionStats2GlobalStats, error) {
98103
partitionNum := len(globalTableInfo.Partition.Definitions)
99104
return &AsyncMergePartitionStats2GlobalStats{
100-
statsHandle: statsHandle,
101-
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
102-
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
103-
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
104-
PartitionDefinition: make(map[int64]model.PartitionDefinition),
105-
tableInfo: make(map[int64]*model.TableInfo),
106-
partitionIDs: make([]int64, 0, partitionNum),
107-
exitWhenErrChan: make(chan struct{}),
108-
skipPartition: make(map[skipItem]struct{}),
109-
allPartitionStats: make(map[int64]*statistics.Table),
110-
globalTableInfo: globalTableInfo,
111-
histIDs: histIDs,
112-
is: is,
113-
partitionNum: partitionNum,
105+
statsHandle: statsHandle,
106+
cmsketch: make(chan mergeItem[*statistics.CMSketch], 5),
107+
fmsketch: make(chan mergeItem[*statistics.FMSketch], 5),
108+
histogramAndTopn: make(chan mergeItem[*StatsWrapper]),
109+
PartitionDefinition: make(map[int64]model.PartitionDefinition),
110+
tableInfo: make(map[int64]*model.TableInfo),
111+
partitionIDs: make([]int64, 0, partitionNum),
112+
ioWorkerExitWhenErrChan: make(chan struct{}),
113+
cpuWorkerExitChan: make(chan struct{}),
114+
skipPartition: make(map[skipItem]struct{}),
115+
allPartitionStats: make(map[int64]*statistics.Table),
116+
globalTableInfo: globalTableInfo,
117+
histIDs: histIDs,
118+
is: is,
119+
partitionNum: partitionNum,
114120
}, nil
115121
}
116122

@@ -218,25 +224,32 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealErrPartitionColumnStatsMissin
218224
func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context, isIndex bool) (err error) {
219225
defer func() {
220226
if r := recover(); r != nil {
221-
close(a.exitWhenErrChan)
227+
logutil.BgLogger().Warn("ioWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
228+
close(a.ioWorkerExitWhenErrChan)
222229
err = errors.New(fmt.Sprint(r))
223230
}
224231
}()
225232
err = a.loadFmsketch(sctx, isIndex)
226233
if err != nil {
227-
close(a.exitWhenErrChan)
234+
close(a.ioWorkerExitWhenErrChan)
228235
return err
229236
}
230237
close(a.fmsketch)
231238
err = a.loadCMsketch(sctx, isIndex)
232239
if err != nil {
233-
close(a.exitWhenErrChan)
240+
close(a.ioWorkerExitWhenErrChan)
234241
return err
235242
}
236243
close(a.cmsketch)
244+
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
245+
if val, _ := val.(bool); val {
246+
time.Sleep(1 * time.Second)
247+
panic("test for PanicSameTime")
248+
}
249+
})
237250
err = a.loadHistogramAndTopN(sctx, a.globalTableInfo, isIndex)
238251
if err != nil {
239-
close(a.exitWhenErrChan)
252+
close(a.ioWorkerExitWhenErrChan)
240253
return err
241254
}
242255
close(a.histogramAndTopn)
@@ -246,13 +259,14 @@ func (a *AsyncMergePartitionStats2GlobalStats) ioWorker(sctx sessionctx.Context,
246259
func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
247260
defer func() {
248261
if r := recover(); r != nil {
249-
close(a.exitWhenErrChan)
262+
logutil.BgLogger().Warn("cpuWorker panic", zap.Stack("stack"), zap.Any("error", r), zap.String("category", "stats"))
250263
err = errors.New(fmt.Sprint(r))
251264
}
265+
close(a.cpuWorkerExitChan)
252266
}()
253267
a.dealFMSketch()
254268
select {
255-
case <-a.exitWhenErrChan:
269+
case <-a.ioWorkerExitWhenErrChan:
256270
return nil
257271
default:
258272
for i := 0; i < a.globalStats.Num; i++ {
@@ -267,10 +281,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem
267281
}
268282
err = a.dealCMSketch()
269283
if err != nil {
284+
logutil.BgLogger().Warn("dealCMSketch failed", zap.Error(err), zap.String("category", "stats"))
270285
return err
271286
}
287+
failpoint.Inject("PanicSameTime", func(val failpoint.Value) {
288+
if val, _ := val.(bool); val {
289+
time.Sleep(1 * time.Second)
290+
panic("test for PanicSameTime")
291+
}
292+
})
272293
err = a.dealHistogramAndTopN(stmtCtx, sctx, opts, isIndex, tz, analyzeVersion)
273294
if err != nil {
295+
logutil.BgLogger().Warn("dealHistogramAndTopN failed", zap.Error(err), zap.String("category", "stats"))
274296
return err
275297
}
276298
return nil
@@ -337,7 +359,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadFmsketch(sctx sessionctx.Cont
337359
case a.fmsketch <- mergeItem[*statistics.FMSketch]{
338360
fmsketch, i,
339361
}:
340-
case <-a.exitWhenErrChan:
362+
case <-a.cpuWorkerExitChan:
363+
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
341364
return nil
342365
}
343366
}
@@ -367,7 +390,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
367390
case a.cmsketch <- mergeItem[*statistics.CMSketch]{
368391
cmsketch, i,
369392
}:
370-
case <-a.exitWhenErrChan:
393+
case <-a.cpuWorkerExitChan:
394+
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
371395
return nil
372396
}
373397
}
@@ -376,6 +400,12 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadCMsketch(sctx sessionctx.Cont
376400
}
377401

378402
func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx sessionctx.Context, tableInfo *model.TableInfo, isIndex bool) error {
403+
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
404+
if val, _ := val.(bool); val {
405+
time.Sleep(1 * time.Second)
406+
failpoint.Return(errors.New("ErrorSameTime returned error"))
407+
}
408+
})
379409
for i := 0; i < a.globalStats.Num; i++ {
380410
hists := make([]*statistics.Histogram, 0, a.partitionNum)
381411
topn := make([]*statistics.TopN, 0, a.partitionNum)
@@ -402,7 +432,8 @@ func (a *AsyncMergePartitionStats2GlobalStats) loadHistogramAndTopN(sctx session
402432
case a.histogramAndTopn <- mergeItem[*StatsWrapper]{
403433
NewStatsWrapper(hists, topn), i,
404434
}:
405-
case <-a.exitWhenErrChan:
435+
case <-a.cpuWorkerExitChan:
436+
logutil.BgLogger().Warn("ioWorker detects CPUWorker has exited", zap.String("category", "stats"))
406437
return nil
407438
}
408439
}
@@ -422,13 +453,18 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealFMSketch() {
422453
} else {
423454
a.globalStats.Fms[fms.idx].MergeFMSketch(fms.item)
424455
}
425-
case <-a.exitWhenErrChan:
456+
case <-a.ioWorkerExitWhenErrChan:
426457
return
427458
}
428459
}
429460
}
430461

431462
func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
463+
failpoint.Inject("dealCMSketchErr", func(val failpoint.Value) {
464+
if val, _ := val.(bool); val {
465+
failpoint.Return(errors.New("dealCMSketch returned error"))
466+
}
467+
})
432468
for {
433469
select {
434470
case cms, ok := <-a.cmsketch:
@@ -443,13 +479,24 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealCMSketch() error {
443479
return err
444480
}
445481
}
446-
case <-a.exitWhenErrChan:
482+
case <-a.ioWorkerExitWhenErrChan:
447483
return nil
448484
}
449485
}
450486
}
451487

452488
func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stmtctx.StatementContext, sctx sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, isIndex bool, tz *time.Location, analyzeVersion int) (err error) {
489+
failpoint.Inject("dealHistogramAndTopNErr", func(val failpoint.Value) {
490+
if val, _ := val.(bool); val {
491+
failpoint.Return(errors.New("dealHistogramAndTopNErr returned error"))
492+
}
493+
})
494+
failpoint.Inject("ErrorSameTime", func(val failpoint.Value) {
495+
if val, _ := val.(bool); val {
496+
time.Sleep(1 * time.Second)
497+
failpoint.Return(errors.New("ErrorSameTime returned error"))
498+
}
499+
})
453500
for {
454501
select {
455502
case item, ok := <-a.histogramAndTopn:
@@ -478,7 +525,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm
478525
a.globalStats.Hg[item.idx].Buckets[j].NDV = 0
479526
}
480527
a.globalStats.Hg[item.idx].NDV = a.globalStatsNDV[item.idx]
481-
case <-a.exitWhenErrChan:
528+
case <-a.ioWorkerExitWhenErrChan:
482529
return nil
483530
}
484531
}

pkg/statistics/handle/globalstats/globalstats_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,24 @@ func TestGlobalStatsPanicInIOWorker(t *testing.T) {
7676
simpleTest(t)
7777
}
7878

79+
func TestGlobalStatsWithCMSketchErr(t *testing.T) {
80+
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealCMSketchErr"
81+
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
82+
defer func() {
83+
require.NoError(t, failpoint.Disable(fpName))
84+
}()
85+
simpleTest(t)
86+
}
87+
88+
func TestGlobalStatsWithHistogramAndTopNErr(t *testing.T) {
89+
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/dealHistogramAndTopNErr"
90+
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
91+
defer func() {
92+
require.NoError(t, failpoint.Disable(fpName))
93+
}()
94+
simpleTest(t)
95+
}
96+
7997
func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
8098
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicInCPUWorker"
8199
require.NoError(t, failpoint.Enable(fpName, "panic(\"inject panic\")"))
@@ -85,6 +103,24 @@ func TestGlobalStatsPanicInCPUWorker(t *testing.T) {
85103
simpleTest(t)
86104
}
87105

106+
func TestGlobalStatsPanicSametime(t *testing.T) {
107+
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/PanicSameTime"
108+
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
109+
defer func() {
110+
require.NoError(t, failpoint.Disable(fpName))
111+
}()
112+
simpleTest(t)
113+
}
114+
115+
func TestGlobalStatsErrorSametime(t *testing.T) {
116+
fpName := "github.com/pingcap/tidb/pkg/statistics/handle/globalstats/ErrorSameTime"
117+
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
118+
defer func() {
119+
require.NoError(t, failpoint.Disable(fpName))
120+
}()
121+
simpleTest(t)
122+
}
123+
88124
func TestBuildGlobalLevelStats(t *testing.T) {
89125
store := testkit.CreateMockStore(t)
90126
testKit := testkit.NewTestKit(t, store)

0 commit comments

Comments
 (0)