Skip to content

Commit 460e050

Browse files
authored
executor: fix random cte error under apply (#57294) (#59185)
close #55881
1 parent fe3f3c8 commit 460e050

File tree

2 files changed

+68
-41
lines changed

2 files changed

+68
-41
lines changed

executor/cte.go

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,16 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
8585
defer e.producer.resTbl.Unlock()
8686

8787
if e.producer.checkAndUpdateCorColHashCode() {
88-
e.producer.reset()
89-
if err = e.producer.reopenTbls(); err != nil {
88+
err = e.producer.reset()
89+
if err != nil {
9090
return err
9191
}
9292
}
9393
if e.producer.openErr != nil {
9494
return e.producer.openErr
9595
}
96-
if !e.producer.opened {
97-
if err = e.producer.openProducer(ctx, e); err != nil {
96+
if !e.producer.hasCTEResult() && !e.producer.executorOpened {
97+
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
9898
return err
9999
}
100100
}
@@ -105,8 +105,14 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
105105
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
106106
e.producer.resTbl.Lock()
107107
defer e.producer.resTbl.Unlock()
108-
if !e.producer.resTbl.Done() {
109-
if err = e.producer.produce(ctx); err != nil {
108+
if !e.producer.hasCTEResult() {
109+
// in case that another CTEExec call close without generate CTE result.
110+
if !e.producer.executorOpened {
111+
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
112+
return err
113+
}
114+
}
115+
if err = e.producer.genCTEResult(ctx); err != nil {
110116
return err
111117
}
112118
}
@@ -128,18 +134,23 @@ func (e *CTEExec) Close() (firstErr error) {
128134
func() {
129135
e.producer.resTbl.Lock()
130136
defer e.producer.resTbl.Unlock()
131-
if !e.producer.closed {
137+
if e.producer.executorOpened {
132138
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
133139
if ok := v.(bool); ok {
134140
panic("mock mem oom panic")
135141
}
136142
})
137-
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
138-
// It means you can still read resTbl after call closeProducer().
139-
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
143+
// closeProducerExecutor() only close seedExec and recursiveExec, will not touch resTbl.
144+
// It means you can still read resTbl after call closeProducerExecutor().
145+
// You can even call all three functions(openProducerExecutor/genCTEResult/closeProducerExecutor) in CTEExec.Next().
140146
// Separating these three function calls is only to follow the abstraction of the volcano model.
141-
err := e.producer.closeProducer()
147+
err := e.producer.closeProducerExecutor()
142148
firstErr = setFirstErr(firstErr, err, "close cte producer error")
149+
if !e.producer.hasCTEResult() {
150+
// CTE result is not generated, in this case, we reset it
151+
err = e.producer.reset()
152+
firstErr = setFirstErr(firstErr, err, "close cte producer error")
153+
}
143154
}
144155
}()
145156
err := e.baseExecutor.Close()
@@ -154,10 +165,10 @@ func (e *CTEExec) reset() {
154165
}
155166

156167
type cteProducer struct {
157-
// opened should be false when not open or open fail(a.k.a. openErr != nil)
158-
opened bool
159-
produced bool
160-
closed bool
168+
// executorOpened is used to indicate whether the executor(seedExec/recursiveExec) is opened.
169+
// when executorOpened is true, the executor is opened, otherwise it means the executor is
170+
// not opened or is already closed.
171+
executorOpened bool
161172

162173
// cteProducer is shared by multiple operators, so if the first operator tries to open
163174
// and got error, the second should return open error directly instead of open again.
@@ -196,14 +207,10 @@ type cteProducer struct {
196207
corColHashCodes [][]byte
197208
}
198209

199-
func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
210+
func (p *cteProducer) openProducerExecutor(ctx context.Context, cteExec *CTEExec) (err error) {
200211
defer func() {
201212
p.openErr = err
202-
if err == nil {
203-
p.opened = true
204-
} else {
205-
p.opened = false
206-
}
213+
p.executorOpened = true
207214
}()
208215
if p.seedExec == nil {
209216
return errors.New("seedExec for CTEExec is nil")
@@ -246,7 +253,7 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
246253
return nil
247254
}
248255

249-
func (p *cteProducer) closeProducer() (firstErr error) {
256+
func (p *cteProducer) closeProducerExecutor() (firstErr error) {
250257
err := p.seedExec.Close()
251258
firstErr = setFirstErr(firstErr, err, "close seedExec err")
252259
if p.recursiveExec != nil {
@@ -264,7 +271,7 @@ func (p *cteProducer) closeProducer() (firstErr error) {
264271
// because ExplainExec still needs tracker to get mem usage info.
265272
p.memTracker = nil
266273
p.diskTracker = nil
267-
p.closed = true
274+
p.executorOpened = false
268275
return
269276
}
270277

@@ -331,7 +338,13 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error {
331338
return nil
332339
}
333340

334-
func (p *cteProducer) produce(ctx context.Context) (err error) {
341+
func (p *cteProducer) hasCTEResult() bool {
342+
return p.resTbl.Done()
343+
}
344+
345+
// genCTEResult generates the result of CTE, and stores the result in resTbl.
346+
// This is a synchronous function, which means it will block until the result is generated.
347+
func (p *cteProducer) genCTEResult(ctx context.Context) (err error) {
335348
if p.resTbl.Error() != nil {
336349
return p.resTbl.Error()
337350
}
@@ -524,14 +537,18 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
524537
return nil
525538
}
526539

527-
func (p *cteProducer) reset() {
540+
func (p *cteProducer) reset() error {
528541
p.curIter = 0
529542
p.hashTbl = nil
530-
531-
p.opened = false
543+
p.executorOpened = false
532544
p.openErr = nil
533-
p.produced = false
534-
p.closed = false
545+
546+
// Normally we need to setup tracker after calling Reopen(),
547+
// But reopen resTbl means we need to call genCTEResult() again, it will setup tracker.
548+
if err := p.resTbl.Reopen(); err != nil {
549+
return err
550+
}
551+
return p.iterInTbl.Reopen()
535552
}
536553

537554
func (p *cteProducer) resetTracker() {
@@ -545,18 +562,6 @@ func (p *cteProducer) resetTracker() {
545562
}
546563
}
547564

548-
func (p *cteProducer) reopenTbls() (err error) {
549-
if p.isDistinct {
550-
p.hashTbl = newConcurrentMapHashTable()
551-
}
552-
// Normally we need to setup tracker after calling Reopen(),
553-
// But reopen resTbl means we need to call produce() again, it will setup tracker.
554-
if err := p.resTbl.Reopen(); err != nil {
555-
return err
556-
}
557-
return p.iterInTbl.Reopen()
558-
}
559-
560565
// Check if tbl meets the requirement of limit.
561566
func (p *cteProducer) limitDone(tbl cteutil.Storage) bool {
562567
return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd

executor/executor_issue_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,3 +1332,25 @@ func TestIssue49902(t *testing.T) {
13321332
tk.MustQuery("SELECT count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`;").Check(testkit.Rows("170"))
13331333
tk.MustExec("set @@tidb_max_chunk_size = default;")
13341334
}
1335+
1336+
func TestIssue55881(t *testing.T) {
1337+
store, clean := testkit.CreateMockStore(t)
1338+
defer clean()
1339+
tk := testkit.NewTestKit(t, store)
1340+
1341+
tk.MustExec("use test;")
1342+
tk.MustExec("drop table if exists aaa;")
1343+
tk.MustExec("drop table if exists bbb;")
1344+
tk.MustExec("create table aaa(id int, value int);")
1345+
tk.MustExec("create table bbb(id int, value int);")
1346+
tk.MustExec("insert into aaa values(1,2),(2,3)")
1347+
tk.MustExec("insert into bbb values(1,2),(2,3),(3,4)")
1348+
// set tidb_executor_concurrency to 1 to let the issue happens with high probability.
1349+
tk.MustExec("set tidb_executor_concurrency=1;")
1350+
1351+
// this is a random issue, so run it 100 times to increase the probability of the issue.
1352+
for i := 0; i < 100; i++ {
1353+
tk.MustQuery("with cte as (select * from aaa) select id, (select id from (select * from aaa where aaa.id != bbb.id union all select * from cte union all select * from cte) d limit 1)," +
1354+
"(select max(value) from (select * from cte union all select * from cte union all select * from aaa where aaa.id > bbb.id)) from bbb;")
1355+
}
1356+
}

0 commit comments

Comments
 (0)