Skip to content

Commit da592dc

Browse files
authored
executor: setup mem tracker for CTE correctly (#54208) (#54300)
close #54181
1 parent 5c20394 commit da592dc

File tree

3 files changed

+84
-6
lines changed

3 files changed

+84
-6
lines changed

pkg/executor/cte.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
110110
e.producer.resTbl.Lock()
111111
defer e.producer.resTbl.Unlock()
112112
if !e.producer.resTbl.Done() {
113-
if err = e.producer.produce(ctx, e); err != nil {
113+
if err = e.producer.produce(ctx); err != nil {
114114
return err
115115
}
116116
}
@@ -338,15 +338,15 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error {
338338
return nil
339339
}
340340

341-
func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) {
341+
func (p *cteProducer) produce(ctx context.Context) (err error) {
342342
if p.resTbl.Error() != nil {
343343
return p.resTbl.Error()
344344
}
345-
resAction := setupCTEStorageTracker(p.resTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
346-
iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
345+
resAction := setupCTEStorageTracker(p.resTbl, p.ctx, p.memTracker, p.diskTracker)
346+
iterInAction := setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)
347347
var iterOutAction *chunk.SpillDiskAction
348348
if p.iterOutTbl != nil {
349-
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.Ctx(), p.memTracker, p.diskTracker)
349+
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
350350
}
351351

352352
failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
@@ -429,12 +429,29 @@ func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) {
429429
return
430430
}
431431

432+
var iterNum uint64
432433
for {
433434
chk := exec.TryNewCacheChunk(p.recursiveExec)
434435
if err = exec.Next(ctx, p.recursiveExec, chk); err != nil {
435436
return
436437
}
437438
if chk.NumRows() == 0 {
439+
if iterNum%1000 == 0 {
440+
// To avoid too many logs.
441+
p.logTbls(ctx, err, iterNum)
442+
}
443+
iterNum++
444+
failpoint.Inject("assertIterTableSpillToDisk", func(maxIter failpoint.Value) {
445+
if iterNum > 0 && iterNum < uint64(maxIter.(int)) && err == nil {
446+
if p.iterInTbl.GetMemBytes() != 0 || p.iterInTbl.GetDiskBytes() == 0 ||
447+
p.iterOutTbl.GetMemBytes() != 0 || p.iterOutTbl.GetDiskBytes() == 0 ||
448+
p.resTbl.GetMemBytes() != 0 || p.resTbl.GetDiskBytes() == 0 {
449+
p.logTbls(ctx, err, iterNum)
450+
panic("assert row container spill disk failed")
451+
}
452+
}
453+
})
454+
438455
if err = p.setupTblsForNewIteration(); err != nil {
439456
return
440457
}
@@ -493,6 +510,8 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
493510
if err = p.iterInTbl.Reopen(); err != nil {
494511
return err
495512
}
513+
setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)
514+
496515
if p.isDistinct {
497516
// Already deduplicated by resTbl, adding directly is ok.
498517
for _, chk := range chks {
@@ -507,7 +526,11 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
507526
}
508527

509528
// Clear data in iterOutTbl.
510-
return p.iterOutTbl.Reopen()
529+
if err = p.iterOutTbl.Reopen(); err != nil {
530+
return err
531+
}
532+
setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
533+
return nil
511534
}
512535

513536
func (p *cteProducer) reset() {
@@ -535,6 +558,8 @@ func (p *cteProducer) reopenTbls() (err error) {
535558
if p.isDistinct {
536559
p.hashTbl = join.NewConcurrentMapHashTable()
537560
}
561+
// Normally we need to setup tracker after calling Reopen(),
562+
// But reopen resTbl means we need to call produce() again, it will setup tracker.
538563
if err := p.resTbl.Reopen(); err != nil {
539564
return err
540565
}
@@ -736,3 +761,11 @@ func (p *cteProducer) checkAndUpdateCorColHashCode() bool {
736761
}
737762
return changed
738763
}
764+
765+
func (p *cteProducer) logTbls(ctx context.Context, err error, iterNum uint64) {
766+
logutil.Logger(ctx).Debug("cte iteration info",
767+
zap.Any("iterInTbl mem usage", p.iterInTbl.GetMemBytes()), zap.Any("iterInTbl disk usage", p.iterInTbl.GetDiskBytes()),
768+
zap.Any("iterOutTbl mem usage", p.iterOutTbl.GetMemBytes()), zap.Any("iterOutTbl disk usage", p.iterOutTbl.GetDiskBytes()),
769+
zap.Any("resTbl mem usage", p.resTbl.GetMemBytes()), zap.Any("resTbl disk usage", p.resTbl.GetDiskBytes()),
770+
zap.Any("resTbl rows", p.resTbl.NumRows()), zap.Any("iteration num", iterNum), zap.Error(err))
771+
}

pkg/executor/cte_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,35 @@ func TestCTEShareCorColumn(t *testing.T) {
194194
tk.MustExec("insert into t1 values(1), (2);")
195195
tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2"))
196196
}
197+
198+
func TestCTEIterationMemTracker(t *testing.T) {
199+
store := testkit.CreateMockStore(t)
200+
tk := testkit.NewTestKit(t, store)
201+
202+
insertStr := "insert into t1 values(0)"
203+
rowNum := 1000
204+
vals := make([]int, rowNum)
205+
vals[0] = 0
206+
for i := 1; i < rowNum; i++ {
207+
v := rand.Intn(100)
208+
vals[i] = v
209+
insertStr += fmt.Sprintf(", (%d)", v)
210+
}
211+
tk.MustExec("use test;")
212+
tk.MustExec("drop table if exists t1;")
213+
tk.MustExec("create table t1(c1 int);")
214+
tk.MustExec(insertStr)
215+
216+
tk.MustExec("set @@cte_max_recursion_depth=1000000")
217+
tk.MustExec("set global tidb_mem_oom_action = 'log';")
218+
defer func() {
219+
tk.MustExec("set global tidb_mem_oom_action = default;")
220+
}()
221+
tk.MustExec("set @@tidb_mem_quota_query=10;")
222+
maxIter := 5000
223+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter)))
224+
defer func() {
225+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk"))
226+
}()
227+
tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter))
228+
}

pkg/util/cteutil/storage.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ type Storage interface {
8888
GetMemTracker() *memory.Tracker
8989
GetDiskTracker() *disk.Tracker
9090
ActionSpill() *chunk.SpillDiskAction
91+
92+
GetMemBytes() int64
93+
GetDiskBytes() int64
9194
}
9295

9396
// StorageRC implements Storage interface using RowContainer.
@@ -268,3 +271,13 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
268271
func (s *StorageRC) valid() bool {
269272
return s.refCnt > 0 && s.rc != nil
270273
}
274+
275+
// GetMemBytes returns memory bytes used by row container.
276+
func (s *StorageRC) GetMemBytes() int64 {
277+
return s.rc.GetMemTracker().BytesConsumed()
278+
}
279+
280+
// GetDiskBytes returns disk bytes used by row container.
281+
func (s *StorageRC) GetDiskBytes() int64 {
282+
return s.rc.GetDiskTracker().BytesConsumed()
283+
}

0 commit comments

Comments
 (0)