Skip to content

Commit 19f4843

Browse files
authored
pkg/executor: fix the hang issue in indexHashJoin (#49218) (#49411)
close #49033
1 parent aab2da0 commit 19f4843

File tree

3 files changed

+100
-42
lines changed

3 files changed

+100
-42
lines changed

executor/builder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4795,6 +4795,9 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
47954795
}
47964796
}
47974797
if len(kvRanges) != 0 && memTracker != nil {
4798+
failpoint.Inject("testIssue49033", func() {
4799+
panic("testIssue49033")
4800+
})
47984801
memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges)))
47994802
}
48004803
if len(tmpDatumRanges) != 0 && memTracker != nil {

executor/index_lookup_hash_join.go

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ type IndexNestedLoopHashJoin struct {
7474

7575
stats *indexLookUpJoinRuntimeStats
7676
prepared bool
77+
// panicErr records the error generated by panic recover. This is introduced to
78+
// return the actual error message instead of `context cancelled` to the client.
79+
panicErr error
80+
ctxWithCancel context.Context
7781
}
7882

7983
type indexHashJoinOuterWorker struct {
@@ -149,7 +153,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
149153
e.stats.concurrency = concurrency
150154
}
151155
workerCtx, cancelFunc := context.WithCancel(ctx)
152-
e.cancelFunc = cancelFunc
156+
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
153157
innerCh := make(chan *indexHashJoinTask, concurrency)
154158
if e.keepOuterOrder {
155159
e.taskCh = make(chan *indexHashJoinTask, concurrency)
@@ -162,7 +166,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
162166
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
163167
e.workerWg.Add(1)
164168
ow := e.newOuterWorker(innerCh)
165-
go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers)
169+
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)
166170

167171
for i := 0; i < concurrency; i++ {
168172
if !e.keepOuterOrder {
@@ -179,7 +183,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
179183
e.workerWg.Add(concurrency)
180184
for i := 0; i < concurrency; i++ {
181185
workerID := i
182-
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers)
186+
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
183187
}
184188
go e.wait4JoinWorkers()
185189
}
@@ -194,6 +198,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
194198
task := &indexHashJoinTask{err: err}
195199
e.taskCh <- task
196200
}
201+
e.panicErr = err
197202
if e.cancelFunc != nil {
198203
e.cancelFunc()
199204
}
@@ -219,59 +224,39 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
219224
}
220225
req.Reset()
221226
if e.keepOuterOrder {
222-
return e.runInOrder(ctx, req)
227+
return e.runInOrder(e.ctxWithCancel, req)
223228
}
224-
// unordered run
225-
var (
226-
result *indexHashJoinResult
227-
ok bool
228-
)
229-
select {
230-
case result, ok = <-e.resultCh:
231-
if !ok {
232-
return nil
233-
}
234-
if result.err != nil {
235-
return result.err
236-
}
237-
case <-ctx.Done():
238-
return ctx.Err()
239-
}
240-
req.SwapColumns(result.chk)
241-
result.src <- result.chk
242-
return nil
229+
return e.runUnordered(e.ctxWithCancel, req)
243230
}
244231

245232
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
246-
var (
247-
result *indexHashJoinResult
248-
ok bool
249-
)
250233
for {
251234
if e.isDryUpTasks(ctx) {
252-
return nil
235+
return e.panicErr
253236
}
254237
if e.curTask.err != nil {
255238
return e.curTask.err
256239
}
257-
select {
258-
case result, ok = <-e.curTask.resultCh:
259-
if !ok {
260-
e.curTask = nil
261-
continue
262-
}
263-
if result.err != nil {
264-
return result.err
265-
}
266-
case <-ctx.Done():
267-
return ctx.Err()
240+
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
241+
if err != nil {
242+
return err
268243
}
269-
req.SwapColumns(result.chk)
270-
result.src <- result.chk
271-
return nil
244+
if result == nil {
245+
e.curTask = nil
246+
continue
247+
}
248+
return e.handleResult(req, result)
272249
}
273250
}
274251

252+
func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
253+
result, err := e.getResultFromChannel(ctx, e.resultCh)
254+
if err != nil {
255+
return err
256+
}
257+
return e.handleResult(req, result)
258+
}
259+
275260
// isDryUpTasks indicates whether all the tasks have been processed.
276261
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
277262
if e.curTask != nil {
@@ -289,6 +274,38 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
289274
return false
290275
}
291276

277+
func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
278+
var (
279+
result *indexHashJoinResult
280+
ok bool
281+
)
282+
select {
283+
case result, ok = <-resultCh:
284+
if !ok {
285+
return nil, nil
286+
}
287+
if result.err != nil {
288+
return nil, result.err
289+
}
290+
case <-ctx.Done():
291+
err := e.panicErr
292+
if err == nil {
293+
err = ctx.Err()
294+
}
295+
return nil, err
296+
}
297+
return result, nil
298+
}
299+
300+
func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
301+
if result == nil {
302+
return nil
303+
}
304+
req.SwapColumns(result.chk)
305+
result.src <- result.chk
306+
return nil
307+
}
308+
292309
// Close implements the IndexNestedLoopHashJoin Executor interface.
293310
func (e *IndexNestedLoopHashJoin) Close() error {
294311
if e.stats != nil {
@@ -311,6 +328,7 @@ func (e *IndexNestedLoopHashJoin) Close() error {
311328
e.joinChkResourceCh = nil
312329
e.finished.Store(false)
313330
e.prepared = false
331+
e.ctxWithCancel = nil
314332
return e.baseExecutor.Close()
315333
}
316334

executor/join_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
package executor_test
1616

1717
import (
18+
"context"
1819
"testing"
1920

21+
"github.com/pingcap/failpoint"
22+
"github.com/pingcap/tidb/session"
2023
"github.com/pingcap/tidb/testkit"
2124
"github.com/pingcap/tidb/testkit/testdata"
25+
"github.com/stretchr/testify/require"
2226
)
2327

2428
func TestNaturalJoin(t *testing.T) {
@@ -119,3 +123,36 @@ func TestIssue48991(t *testing.T) {
119123
res := tk.MustQuery("SELECT `col_14` FROM `test`.`tbl_3` WHERE ((`tbl_3`.`col_15` < 'dV') AND `tbl_3`.`col_12` IN (SELECT `col_12` FROM `test`.`tbl_3` WHERE NOT (ISNULL(`tbl_3`.`col_12`)))) ORDER BY IF(ISNULL(`col_14`),0,1),`col_14`;")
120124
res.Check(testkit.Rows("1984-06-10 00:00:00", "1984-07-31 00:00:00", "2017-06-07 00:00:00"))
121125
}
126+
127+
func TestIssue49033(t *testing.T) {
128+
store := testkit.CreateMockStore(t)
129+
tk := testkit.NewTestKit(t, store)
130+
tk.MustExec("use test;")
131+
tk.MustExec("drop table if exists t, s;")
132+
tk.MustExec("create table t(a int, index(a));")
133+
tk.MustExec("create table s(a int, index(a));")
134+
tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16), (17), (18), (19), (20), (21), (22), (23), (24), (25), (26), (27), (28), (29), (30), (31), (32), (33), (34), (35), (36), (37), (38), (39), (40), (41), (42), (43), (44), (45), (46), (47), (48), (49), (50), (51), (52), (53), (54), (55), (56), (57), (58), (59), (60), (61), (62), (63), (64), (65), (66), (67), (68), (69), (70), (71), (72), (73), (74), (75), (76), (77), (78), (79), (80), (81), (82), (83), (84), (85), (86), (87), (88), (89), (90), (91), (92), (93), (94), (95), (96), (97), (98), (99), (100), (101), (102), (103), (104), (105), (106), (107), (108), (109), (110), (111), (112), (113), (114), (115), (116), (117), (118), (119), (120), (121), (122), (123), (124), (125), (126), (127), (128);")
135+
tk.MustExec("insert into s values(1), (128);")
136+
tk.MustExec("set @@tidb_max_chunk_size=32;")
137+
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")
138+
tk.MustExec("set @@tidb_index_join_batch_size=32;")
139+
tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a;")
140+
tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;")
141+
142+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIssue49033", "return"))
143+
defer func() {
144+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIssue49033"))
145+
}()
146+
147+
rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;")
148+
require.NoError(t, err)
149+
_, err = session.GetRows4Test(context.Background(), nil, rs)
150+
require.EqualError(t, err, "testIssue49033")
151+
require.NoError(t, rs.Close())
152+
153+
rs, err = tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a")
154+
require.NoError(t, err)
155+
_, err = session.GetRows4Test(context.Background(), nil, rs)
156+
require.EqualError(t, err, "testIssue49033")
157+
require.NoError(t, rs.Close())
158+
}

0 commit comments

Comments
 (0)