From 3b35517fc4509d1f9f732932013ec89c20f16912 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 28 Jun 2024 12:11:53 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #54208 Signed-off-by: ti-chi-bot --- executor/cte.go | 47 +++++++- pkg/executor/cte_test.go | 228 +++++++++++++++++++++++++++++++++++++++ util/cteutil/storage.go | 13 +++ 3 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 pkg/executor/cte_test.go diff --git a/executor/cte.go b/executor/cte.go index 4ba59c4fbbeb3..db0845883f5d2 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -106,7 +106,7 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.producer.resTbl.Lock() defer e.producer.resTbl.Unlock() if !e.producer.resTbl.Done() { - if err = e.producer.produce(ctx, e); err != nil { + if err = e.producer.produce(ctx); err != nil { return err } } @@ -331,15 +331,23 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error { return nil } -func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) { +func (p *cteProducer) produce(ctx context.Context) (err error) { if p.resTbl.Error() != nil { return p.resTbl.Error() } +<<<<<<< HEAD:executor/cte.go resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) var iterOutAction *chunk.SpillDiskAction if p.iterOutTbl != nil { iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) +======= + resAction := setupCTEStorageTracker(p.resTbl, p.ctx, p.memTracker, p.diskTracker) + iterInAction := setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if p.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker) +>>>>>>> 479f4be0920 (executor: setup mem tracker for CTE correctly (#54208)):pkg/executor/cte.go } failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { @@ -422,12 +430,29 @@ func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) { return } + var iterNum uint64 for { chk := newFirstChunk(p.recursiveExec) if err = Next(ctx, p.recursiveExec, chk); err != nil { return } if chk.NumRows() == 0 { + if iterNum%1000 == 0 { + // To avoid too many logs. + p.logTbls(ctx, err, iterNum) + } + iterNum++ + failpoint.Inject("assertIterTableSpillToDisk", func(maxIter failpoint.Value) { + if iterNum > 0 && iterNum < uint64(maxIter.(int)) && err == nil { + if p.iterInTbl.GetMemBytes() != 0 || p.iterInTbl.GetDiskBytes() == 0 || + p.iterOutTbl.GetMemBytes() != 0 || p.iterOutTbl.GetDiskBytes() == 0 || + p.resTbl.GetMemBytes() != 0 || p.resTbl.GetDiskBytes() == 0 { + p.logTbls(ctx, err, iterNum) + panic("assert row container spill disk failed") + } + } + }) + if err = p.setupTblsForNewIteration(); err != nil { return } @@ -486,6 +511,8 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) { if err = p.iterInTbl.Reopen(); err != nil { return err } + setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker) + if p.isDistinct { // Already deduplicated by resTbl, adding directly is ok. for _, chk := range chks { @@ -500,7 +527,11 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) { } // Clear data in iterOutTbl. - return p.iterOutTbl.Reopen() + if err = p.iterOutTbl.Reopen(); err != nil { + return err + } + setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker) + return nil } func (p *cteProducer) reset() { @@ -528,6 +559,8 @@ func (p *cteProducer) reopenTbls() (err error) { if p.isDistinct { p.hashTbl = newConcurrentMapHashTable() } + // Normally we need to setup tracker after calling Reopen(), + // But reopen resTbl means we need to call produce() again, it will setup tracker. if err := p.resTbl.Reopen(); err != nil { return err } @@ -732,3 +765,11 @@ func (p *cteProducer) checkAndUpdateCorColHashCode() bool { } return changed } + +func (p *cteProducer) logTbls(ctx context.Context, err error, iterNum uint64) { + logutil.Logger(ctx).Debug("cte iteration info", + zap.Any("iterInTbl mem usage", p.iterInTbl.GetMemBytes()), zap.Any("iterInTbl disk usage", p.iterInTbl.GetDiskBytes()), + zap.Any("iterOutTbl mem usage", p.iterOutTbl.GetMemBytes()), zap.Any("iterOutTbl disk usage", p.iterOutTbl.GetDiskBytes()), + zap.Any("resTbl mem usage", p.resTbl.GetMemBytes()), zap.Any("resTbl disk usage", p.resTbl.GetDiskBytes()), + zap.Any("resTbl rows", p.resTbl.NumRows()), zap.Any("iteration num", iterNum), zap.Error(err)) +} diff --git a/pkg/executor/cte_test.go b/pkg/executor/cte_test.go new file mode 100644 index 0000000000000..68d6c322e4f22 --- /dev/null +++ b/pkg/executor/cte_test.go @@ -0,0 +1,228 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + "math/rand" + "slices" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" + "github.com/stretchr/testify/require" +) + +func TestCTEIssue49096(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test;") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock")) + }() + insertStr := "insert into t1 values(0)" + rowNum := 10 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec("create table t2(c1 int);") + tk.MustExec(insertStr) + // should be insert statement, otherwise it couldn't step int resetCTEStorageMap in handleNoDelay func. + sql := "insert into t2 with cte1 as ( " + + "select c1 from t1) " + + "select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;" + tk.MustExec(sql) // No deadlock +} + +func TestSpillToDisk(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 1") + defer tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 0") + tk.MustExec("use test;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill")) + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill")) + }() + + // Use duplicated rows to test UNION DISTINCT. + tk.MustExec("set tidb_mem_quota_query = 1073741824;") + insertStr := "insert into t1 values(0)" + rowNum := 1000 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec(insertStr) + tk.MustExec("set tidb_mem_quota_query = 40000;") + tk.MustExec("set cte_max_recursion_depth = 500000;") + sql := fmt.Sprintf("with recursive cte1 as ( "+ + "select c1 from t1 "+ + "union "+ + "select c1 + 1 c1 from cte1 where c1 < %d) "+ + "select c1 from cte1 order by c1;", rowNum) + rows := tk.MustQuery(sql) + + memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker + diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker + require.Greater(t, memTracker.MaxConsumed(), int64(0)) + require.Greater(t, diskTracker.MaxConsumed(), int64(0)) + + slices.Sort(vals) + resRows := make([]string, 0, rowNum) + for i := vals[0]; i <= rowNum; i++ { + resRows = append(resRows, fmt.Sprintf("%d", i)) + } + rows.Check(testkit.Rows(resRows...)) +} + +func TestCTEExecError(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists src;") + tk.MustExec("create table src(first int, second int);") + + insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) + for i := 0; i < 1000; i++ { + insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) + } + insertStr += ";" + tk.MustExec(insertStr) + + // Increase projection concurrency and decrease chunk size + // to increase the probability of reproducing the problem. + tk.MustExec("set tidb_max_chunk_size = 32") + tk.MustExec("set tidb_projection_concurrency = 20") + for i := 0; i < 10; i++ { + err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + + "(select 1, first, second, first+second from src " + + " union all " + + "select iter+1, second, result, second+result from cte where iter < 80 )" + + "select * from cte") + require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) + } +} + +func TestCTEPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(c1 int)") + tk.MustExec("insert into t1 values(1), (2), (3)") + + fpPathPrefix := "github.com/pingcap/tidb/pkg/executor/" + fp := "testCTESeedPanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) + + fp = "testCTERecursivePanic" + require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) + err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") + require.Contains(t, err.Error(), fp) + require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) +} + +func TestCTEDelSpillFile(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 int);") + tk.MustExec("create table t2(c1 int);") + tk.MustExec("set @@cte_max_recursion_depth = 1000000;") + tk.MustExec("set global tidb_mem_oom_action = 'log';") + tk.MustExec("set @@tidb_mem_quota_query = 100;") + tk.MustExec("insert into t2 values(1);") + tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;") + require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap) +} + +func TestCTEShareCorColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(c1 int, c2 varchar(100));") + tk.MustExec("insert into t1 values(1, '2020-10-10');") + tk.MustExec("create table t2(c1 int, c2 date);") + tk.MustExec("insert into t2 values(1, '2020-10-10');") + for i := 0; i < 100; i++ { + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) + } + + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(a int);") + tk.MustExec("insert into t1 values(1), (2);") + tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2")) +} + +func TestCTEIterationMemTracker(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + insertStr := "insert into t1 values(0)" + rowNum := 1000 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec(insertStr) + + tk.MustExec("set @@cte_max_recursion_depth=1000000") + tk.MustExec("set global tidb_mem_oom_action = 'log';") + defer func() { + tk.MustExec("set global tidb_mem_oom_action = default;") + }() + tk.MustExec("set @@tidb_mem_quota_query=10;") + maxIter := 5000 + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk")) + }() + tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter)) +} diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index 1e6109eeb5715..f70f5180e0092 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -89,6 +89,9 @@ type Storage interface { GetMemTracker() *memory.Tracker GetDiskTracker() *disk.Tracker ActionSpill() *chunk.SpillDiskAction + + GetMemBytes() int64 + GetDiskBytes() int64 } // StorageRC implements Storage interface using RowContainer. @@ -271,3 +274,13 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction { func (s *StorageRC) valid() bool { return s.refCnt > 0 && s.rc != nil } + +// GetMemBytes returns memory bytes used by row container. +func (s *StorageRC) GetMemBytes() int64 { + return s.rc.GetMemTracker().BytesConsumed() +} + +// GetDiskBytes returns disk bytes used by row container. +func (s *StorageRC) GetDiskBytes() int64 { + return s.rc.GetDiskTracker().BytesConsumed() +} From f7330b681586c247d75cc755dbc158f4cb569c20 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 1 Jul 2024 10:54:32 +0800 Subject: [PATCH 2/2] fix Signed-off-by: guo-shaoge --- executor/cte.go | 8 -- executor/cte_test.go | 33 ++++++ pkg/executor/cte_test.go | 228 --------------------------------------- 3 files changed, 33 insertions(+), 236 deletions(-) delete mode 100644 pkg/executor/cte_test.go diff --git a/executor/cte.go b/executor/cte.go index db0845883f5d2..aec4c58e44eda 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -335,19 +335,11 @@ func (p *cteProducer) produce(ctx context.Context) (err error) { if p.resTbl.Error() != nil { return p.resTbl.Error() } -<<<<<<< HEAD:executor/cte.go - resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker) - iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker) - var iterOutAction *chunk.SpillDiskAction - if p.iterOutTbl != nil { - iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker) -======= resAction := setupCTEStorageTracker(p.resTbl, p.ctx, p.memTracker, p.diskTracker) iterInAction := setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker) var iterOutAction *chunk.SpillDiskAction if p.iterOutTbl != nil { iterOutAction = setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker) ->>>>>>> 479f4be0920 (executor: setup mem tracker for CTE correctly (#54208)):pkg/executor/cte.go } failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { diff --git a/executor/cte_test.go b/executor/cte_test.go index e80df8f1845cb..dd614adab4296 100644 --- a/executor/cte_test.go +++ b/executor/cte_test.go @@ -536,3 +536,36 @@ func TestCTESmallChunkSize(t *testing.T) { tk.MustQuery("with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1 offset 100) select * from cte1;").Check(testkit.Rows("100")) tk.MustExec("set @@tidb_max_chunk_size = default;") } + +func TestCTEIterationMemTracker(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + insertStr := "insert into t1 values(0)" + rowNum := 1000 + vals := make([]int, rowNum) + vals[0] = 0 + for i := 1; i < rowNum; i++ { + v := rand.Intn(100) + vals[i] = v + insertStr += fmt.Sprintf(", (%d)", v) + } + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int);") + tk.MustExec(insertStr) + + tk.MustExec("set @@cte_max_recursion_depth=1000000") + tk.MustExec("set global tidb_mem_oom_action = 'log';") + defer func() { + tk.MustExec("set global tidb_mem_oom_action = default;") + }() + tk.MustExec("set @@tidb_mem_quota_query=10;") + maxIter := 5000 + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk")) + }() + tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter)) +} diff --git a/pkg/executor/cte_test.go b/pkg/executor/cte_test.go deleted file mode 100644 index 68d6c322e4f22..0000000000000 --- a/pkg/executor/cte_test.go +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package executor_test - -import ( - "fmt" - "math/rand" - "slices" - "testing" - - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tidb/pkg/types" - "github.com/stretchr/testify/require" -) - -func TestCTEIssue49096(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - - tk.MustExec("use test;") - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock")) - }() - insertStr := "insert into t1 values(0)" - rowNum := 10 - vals := make([]int, rowNum) - vals[0] = 0 - for i := 1; i < rowNum; i++ { - v := rand.Intn(100) - vals[i] = v - insertStr += fmt.Sprintf(", (%d)", v) - } - tk.MustExec("drop table if exists t1, t2;") - tk.MustExec("create table t1(c1 int);") - tk.MustExec("create table t2(c1 int);") - tk.MustExec(insertStr) - // should be insert statement, otherwise it couldn't step int resetCTEStorageMap in handleNoDelay func. - sql := "insert into t2 with cte1 as ( " + - "select c1 from t1) " + - "select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;" - tk.MustExec(sql) // No deadlock -} - -func TestSpillToDisk(t *testing.T) { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 1") - defer tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 0") - tk.MustExec("use test;") - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill")) - tk.MustExec("set tidb_mem_quota_query = 1073741824;") - }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill", "return(true)")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill")) - }() - - // Use duplicated rows to test UNION DISTINCT. - tk.MustExec("set tidb_mem_quota_query = 1073741824;") - insertStr := "insert into t1 values(0)" - rowNum := 1000 - vals := make([]int, rowNum) - vals[0] = 0 - for i := 1; i < rowNum; i++ { - v := rand.Intn(100) - vals[i] = v - insertStr += fmt.Sprintf(", (%d)", v) - } - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int);") - tk.MustExec(insertStr) - tk.MustExec("set tidb_mem_quota_query = 40000;") - tk.MustExec("set cte_max_recursion_depth = 500000;") - sql := fmt.Sprintf("with recursive cte1 as ( "+ - "select c1 from t1 "+ - "union "+ - "select c1 + 1 c1 from cte1 where c1 < %d) "+ - "select c1 from cte1 order by c1;", rowNum) - rows := tk.MustQuery(sql) - - memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker - diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker - require.Greater(t, memTracker.MaxConsumed(), int64(0)) - require.Greater(t, diskTracker.MaxConsumed(), int64(0)) - - slices.Sort(vals) - resRows := make([]string, 0, rowNum) - for i := vals[0]; i <= rowNum; i++ { - resRows = append(resRows, fmt.Sprintf("%d", i)) - } - rows.Check(testkit.Rows(resRows...)) -} - -func TestCTEExecError(t *testing.T) { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("drop table if exists src;") - tk.MustExec("create table src(first int, second int);") - - insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000)) - for i := 0; i < 1000; i++ { - insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000)) - } - insertStr += ";" - tk.MustExec(insertStr) - - // Increase projection concurrency and decrease chunk size - // to increase the probability of reproducing the problem. - tk.MustExec("set tidb_max_chunk_size = 32") - tk.MustExec("set tidb_projection_concurrency = 20") - for i := 0; i < 10; i++ { - err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " + - "(select 1, first, second, first+second from src " + - " union all " + - "select iter+1, second, result, second+result from cte where iter < 80 )" + - "select * from cte") - require.True(t, terror.ErrorEqual(err, types.ErrOverflow)) - } -} - -func TestCTEPanic(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("create table t1(c1 int)") - tk.MustExec("insert into t1 values(1), (2), (3)") - - fpPathPrefix := "github.com/pingcap/tidb/pkg/executor/" - fp := "testCTESeedPanic" - require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) - err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") - require.Contains(t, err.Error(), fp) - require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) - - fp = "testCTERecursivePanic" - require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp))) - err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1") - require.Contains(t, err.Error(), fp) - require.NoError(t, failpoint.Disable(fpPathPrefix+fp)) -} - -func TestCTEDelSpillFile(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("drop table if exists t1, t2;") - tk.MustExec("create table t1(c1 int, c2 int);") - tk.MustExec("create table t2(c1 int);") - tk.MustExec("set @@cte_max_recursion_depth = 1000000;") - tk.MustExec("set global tidb_mem_oom_action = 'log';") - tk.MustExec("set @@tidb_mem_quota_query = 100;") - tk.MustExec("insert into t2 values(1);") - tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;") - require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap) -} - -func TestCTEShareCorColumn(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("drop table if exists t1, t2;") - tk.MustExec("create table t1(c1 int, c2 varchar(100));") - tk.MustExec("insert into t1 values(1, '2020-10-10');") - tk.MustExec("create table t2(c1 int, c2 date);") - tk.MustExec("insert into t2 values(1, '2020-10-10');") - for i := 0; i < 100; i++ { - tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) - tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10")) - } - - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(a int);") - tk.MustExec("insert into t1 values(1), (2);") - tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2")) -} - -func TestCTEIterationMemTracker(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - - insertStr := "insert into t1 values(0)" - rowNum := 1000 - vals := make([]int, rowNum) - vals[0] = 0 - for i := 1; i < rowNum; i++ { - v := rand.Intn(100) - vals[i] = v - insertStr += fmt.Sprintf(", (%d)", v) - } - tk.MustExec("use test;") - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1(c1 int);") - tk.MustExec(insertStr) - - tk.MustExec("set @@cte_max_recursion_depth=1000000") - tk.MustExec("set global tidb_mem_oom_action = 'log';") - defer func() { - tk.MustExec("set global tidb_mem_oom_action = default;") - }() - tk.MustExec("set @@tidb_mem_quota_query=10;") - maxIter := 5000 - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter))) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk")) - }() - tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter)) -}