Skip to content

Commit ad18e98

Browse files
guo-shaogeti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#54208
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 4a6f3e5 commit ad18e98

File tree

3 files changed

+85
-6
lines changed

3 files changed

+85
-6
lines changed

pkg/executor/cte.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
108108
e.producer.resTbl.Lock()
109109
defer e.producer.resTbl.Unlock()
110110
if !e.producer.resTbl.Done() {
111-
if err = e.producer.produce(ctx, e); err != nil {
111+
if err = e.producer.produce(ctx); err != nil {
112112
return err
113113
}
114114
}
@@ -335,15 +335,15 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error {
335335
return nil
336336
}
337337

338-
func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) {
338+
func (p *cteProducer) produce(ctx context.Context) (err error) {
339339
if p.resTbl.Error() != nil {
340340
return p.resTbl.Error()
341341
}
342-
resAction := setupCTEStorageTracker(p.resTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
343-
iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
342+
resAction := setupCTEStorageTracker(p.resTbl, p.ctx, p.memTracker, p.diskTracker)
343+
iterInAction := setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)
344344
var iterOutAction *chunk.SpillDiskAction
345345
if p.iterOutTbl != nil {
346-
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
346+
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
347347
}
348348

349349
failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
@@ -426,12 +426,29 @@ func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) {
426426
return
427427
}
428428

429+
var iterNum uint64
429430
for {
430431
chk := exec.TryNewCacheChunk(p.recursiveExec)
431432
if err = exec.Next(ctx, p.recursiveExec, chk); err != nil {
432433
return
433434
}
434435
if chk.NumRows() == 0 {
436+
if iterNum%1000 == 0 {
437+
// To avoid too many logs.
438+
p.logTbls(ctx, err, iterNum)
439+
}
440+
iterNum++
441+
failpoint.Inject("assertIterTableSpillToDisk", func(maxIter failpoint.Value) {
442+
if iterNum > 0 && iterNum < uint64(maxIter.(int)) && err == nil {
443+
if p.iterInTbl.GetMemBytes() != 0 || p.iterInTbl.GetDiskBytes() == 0 ||
444+
p.iterOutTbl.GetMemBytes() != 0 || p.iterOutTbl.GetDiskBytes() == 0 ||
445+
p.resTbl.GetMemBytes() != 0 || p.resTbl.GetDiskBytes() == 0 {
446+
p.logTbls(ctx, err, iterNum)
447+
panic("assert row container spill disk failed")
448+
}
449+
}
450+
})
451+
435452
if err = p.setupTblsForNewIteration(); err != nil {
436453
return
437454
}
@@ -490,6 +507,8 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
490507
if err = p.iterInTbl.Reopen(); err != nil {
491508
return err
492509
}
510+
setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)
511+
493512
if p.isDistinct {
494513
// Already deduplicated by resTbl, adding directly is ok.
495514
for _, chk := range chks {
@@ -504,7 +523,11 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
504523
}
505524

506525
// Clear data in iterOutTbl.
507-
return p.iterOutTbl.Reopen()
526+
if err = p.iterOutTbl.Reopen(); err != nil {
527+
return err
528+
}
529+
setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
530+
return nil
508531
}
509532

510533
func (p *cteProducer) reset() {
@@ -532,6 +555,8 @@ func (p *cteProducer) reopenTbls() (err error) {
532555
if p.isDistinct {
533556
p.hashTbl = newConcurrentMapHashTable()
534557
}
558+
// Normally we need to setup tracker after calling Reopen(),
559+
// But reopen resTbl means we need to call produce() again, it will setup tracker.
535560
if err := p.resTbl.Reopen(); err != nil {
536561
return err
537562
}
@@ -736,3 +761,11 @@ func (p *cteProducer) checkAndUpdateCorColHashCode() bool {
736761
}
737762
return changed
738763
}
764+
765+
func (p *cteProducer) logTbls(ctx context.Context, err error, iterNum uint64) {
766+
logutil.Logger(ctx).Debug("cte iteration info",
767+
zap.Any("iterInTbl mem usage", p.iterInTbl.GetMemBytes()), zap.Any("iterInTbl disk usage", p.iterInTbl.GetDiskBytes()),
768+
zap.Any("iterOutTbl mem usage", p.iterOutTbl.GetMemBytes()), zap.Any("iterOutTbl disk usage", p.iterOutTbl.GetDiskBytes()),
769+
zap.Any("resTbl mem usage", p.resTbl.GetMemBytes()), zap.Any("resTbl disk usage", p.resTbl.GetDiskBytes()),
770+
zap.Any("resTbl rows", p.resTbl.NumRows()), zap.Any("iteration num", iterNum), zap.Error(err))
771+
}

pkg/executor/cte_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ func TestCTEShareCorColumn(t *testing.T) {
537537
tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2"))
538538
}
539539

540+
<<<<<<< HEAD
540541
func TestCTESmallChunkSize(t *testing.T) {
541542
store := testkit.CreateMockStore(t)
542543
tk := testkit.NewTestKit(t, store)
@@ -571,4 +572,36 @@ func TestIssue46522(t *testing.T) {
571572
tk1.MustExec("commit;")
572573

573574
tk.MustExec("commit;")
575+
=======
576+
func TestCTEIterationMemTracker(t *testing.T) {
577+
store := testkit.CreateMockStore(t)
578+
tk := testkit.NewTestKit(t, store)
579+
580+
insertStr := "insert into t1 values(0)"
581+
rowNum := 1000
582+
vals := make([]int, rowNum)
583+
vals[0] = 0
584+
for i := 1; i < rowNum; i++ {
585+
v := rand.Intn(100)
586+
vals[i] = v
587+
insertStr += fmt.Sprintf(", (%d)", v)
588+
}
589+
tk.MustExec("use test;")
590+
tk.MustExec("drop table if exists t1;")
591+
tk.MustExec("create table t1(c1 int);")
592+
tk.MustExec(insertStr)
593+
594+
tk.MustExec("set @@cte_max_recursion_depth=1000000")
595+
tk.MustExec("set global tidb_mem_oom_action = 'log';")
596+
defer func() {
597+
tk.MustExec("set global tidb_mem_oom_action = default;")
598+
}()
599+
tk.MustExec("set @@tidb_mem_quota_query=10;")
600+
maxIter := 5000
601+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter)))
602+
defer func() {
603+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk"))
604+
}()
605+
tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter))
606+
>>>>>>> 479f4be0920 (executor: setup mem tracker for CTE correctly (#54208))
574607
}

pkg/util/cteutil/storage.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ type Storage interface {
8888
GetMemTracker() *memory.Tracker
8989
GetDiskTracker() *disk.Tracker
9090
ActionSpill() *chunk.SpillDiskAction
91+
92+
GetMemBytes() int64
93+
GetDiskBytes() int64
9194
}
9295

9396
// StorageRC implements Storage interface using RowContainer.
@@ -268,3 +271,13 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
268271
func (s *StorageRC) valid() bool {
269272
return s.refCnt > 0 && s.rc != nil
270273
}
274+
275+
// GetMemBytes returns memory bytes used by row container.
276+
func (s *StorageRC) GetMemBytes() int64 {
277+
return s.rc.GetMemTracker().BytesConsumed()
278+
}
279+
280+
// GetDiskBytes returns disk bytes used by row container.
281+
func (s *StorageRC) GetDiskBytes() int64 {
282+
return s.rc.GetDiskTracker().BytesConsumed()
283+
}

0 commit comments

Comments
 (0)