Skip to content

Commit 24d0b8c

Browse files
authored
executor: fix the incorrect return when hash join encounters error (#59381) (#59399)
close #59377
1 parent d90faf1 commit 24d0b8c

File tree

4 files changed

+103
-65
lines changed

4 files changed

+103
-65
lines changed

pkg/executor/join/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ go_test(
9898
deps = [
9999
"//pkg/config",
100100
"//pkg/domain",
101+
"//pkg/executor/internal/exec",
101102
"//pkg/executor/internal/testutil",
102103
"//pkg/expression",
103104
"//pkg/parser/ast",

pkg/executor/join/hash_join_base.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,14 +312,17 @@ func (w *buildWorkerBase) fetchBuildSideRows(ctx context.Context, hashJoinCtx *h
312312

313313
for {
314314
err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, spillHelper)
315+
issue59377Intest(&err)
315316
if err != nil {
316317
hasError = true
318+
errCh <- errors.Trace(err)
317319
return
318320
}
319321

320322
err = triggerIntest(2)
321323
if err != nil {
322324
hasError = true
325+
errCh <- errors.Trace(err)
323326
return
324327
}
325328

pkg/executor/join/hash_join_v2.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1576,6 +1576,12 @@ func rehash(oldHashValue uint64, rehashBuf []byte, hash hash.Hash64) uint64 {
15761576
return hash.Sum64()
15771577
}
15781578

1579+
func issue59377Intest(err *error) {
1580+
failpoint.Inject("Issue59377", func() {
1581+
*err = errors.New("Random failpoint error is triggered")
1582+
})
1583+
}
1584+
15791585
func triggerIntest(errProbability int) error {
15801586
failpoint.Inject("slowWorkers", func(val failpoint.Value) {
15811587
if val.(bool) {

pkg/executor/join/outer_join_spill_test.go

Lines changed: 93 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package join
1616

1717
import (
18+
"context"
1819
"testing"
1920

2021
"github.com/pingcap/failpoint"
22+
"github.com/pingcap/tidb/pkg/executor/internal/exec"
2123
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
2224
"github.com/pingcap/tidb/pkg/expression"
2325
"github.com/pingcap/tidb/pkg/parser/ast"
@@ -29,6 +31,76 @@ import (
2931
"github.com/stretchr/testify/require"
3032
)
3133

34+
func prepareSimpleHashJoinEnv() (*testutil.MockDataSource, *testutil.MockDataSource, *hashJoinInfo, *testutil.MockActionOnExceed) {
35+
hardLimitBytesNum := int64(5000000)
36+
newRootExceedAction := new(testutil.MockActionOnExceed)
37+
38+
ctx := mock.NewContext()
39+
ctx.GetSessionVars().InitChunkSize = 32
40+
ctx.GetSessionVars().MaxChunkSize = 32
41+
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
42+
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
43+
// Consume lots of memory in advance to help to trigger fallback action.
44+
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
45+
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
46+
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
47+
48+
leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)
49+
50+
intTp := types.NewFieldType(mysql.TypeLonglong)
51+
intTp.AddFlag(mysql.NotNullFlag)
52+
stringTp := types.NewFieldType(mysql.TypeVarString)
53+
stringTp.AddFlag(mysql.NotNullFlag)
54+
55+
leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
56+
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}
57+
58+
leftKeys := []*expression.Column{
59+
{Index: 1, RetType: intTp},
60+
{Index: 3, RetType: stringTp},
61+
}
62+
rightKeys := []*expression.Column{
63+
{Index: 0, RetType: intTp},
64+
{Index: 2, RetType: stringTp},
65+
}
66+
67+
param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}
68+
69+
maxRowTableSegmentSize = 100
70+
spillChunkSize = 100
71+
joinType := logicalop.InnerJoin
72+
73+
returnTypes := getReturnTypes(joinType, param)
74+
75+
var buildKeys []*expression.Column
76+
var probeKeys []*expression.Column
77+
if param.rightAsBuildSide {
78+
buildKeys = param.rightKeys
79+
probeKeys = param.leftKeys
80+
} else {
81+
buildKeys = param.leftKeys
82+
probeKeys = param.rightKeys
83+
}
84+
85+
info := &hashJoinInfo{
86+
ctx: ctx,
87+
schema: buildSchema(returnTypes),
88+
leftExec: leftDataSource,
89+
rightExec: rightDataSource,
90+
joinType: joinType,
91+
rightAsBuildSide: param.rightAsBuildSide,
92+
buildKeys: buildKeys,
93+
probeKeys: probeKeys,
94+
lUsed: param.leftUsed,
95+
rUsed: param.rightUsed,
96+
otherCondition: param.otherCondition,
97+
lUsedInOtherCondition: param.leftUsedByOtherCondition,
98+
rUsedInOtherCondition: param.rightUsedByOtherCondition,
99+
}
100+
101+
return leftDataSource, rightDataSource, info, newRootExceedAction
102+
}
103+
32104
func testRandomFail(t *testing.T, ctx *mock.Context, joinType logicalop.JoinType, param spillTestParam, leftDataSource *testutil.MockDataSource, rightDataSource *testutil.MockDataSource) {
33105
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1500000)
34106
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
@@ -267,71 +339,7 @@ func TestOuterJoinUnderApplyExec(t *testing.T) {
267339
}
268340

269341
func TestFallBackAction(t *testing.T) {
270-
hardLimitBytesNum := int64(5000000)
271-
newRootExceedAction := new(testutil.MockActionOnExceed)
272-
273-
ctx := mock.NewContext()
274-
ctx.GetSessionVars().InitChunkSize = 32
275-
ctx.GetSessionVars().MaxChunkSize = 32
276-
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
277-
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
278-
// Consume lots of memory in advance to help to trigger fallback action.
279-
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
280-
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
281-
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
282-
283-
leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)
284-
285-
intTp := types.NewFieldType(mysql.TypeLonglong)
286-
intTp.AddFlag(mysql.NotNullFlag)
287-
stringTp := types.NewFieldType(mysql.TypeVarString)
288-
stringTp.AddFlag(mysql.NotNullFlag)
289-
290-
leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
291-
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}
292-
293-
leftKeys := []*expression.Column{
294-
{Index: 1, RetType: intTp},
295-
{Index: 3, RetType: stringTp},
296-
}
297-
rightKeys := []*expression.Column{
298-
{Index: 0, RetType: intTp},
299-
{Index: 2, RetType: stringTp},
300-
}
301-
302-
param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}
303-
304-
maxRowTableSegmentSize = 100
305-
spillChunkSize = 100
306-
joinType := logicalop.InnerJoin
307-
308-
returnTypes := getReturnTypes(joinType, param)
309-
310-
var buildKeys []*expression.Column
311-
var probeKeys []*expression.Column
312-
if param.rightAsBuildSide {
313-
buildKeys = param.rightKeys
314-
probeKeys = param.leftKeys
315-
} else {
316-
buildKeys = param.leftKeys
317-
probeKeys = param.rightKeys
318-
}
319-
320-
info := &hashJoinInfo{
321-
ctx: ctx,
322-
schema: buildSchema(returnTypes),
323-
leftExec: leftDataSource,
324-
rightExec: rightDataSource,
325-
joinType: joinType,
326-
rightAsBuildSide: param.rightAsBuildSide,
327-
buildKeys: buildKeys,
328-
probeKeys: probeKeys,
329-
lUsed: param.leftUsed,
330-
rUsed: param.rightUsed,
331-
otherCondition: param.otherCondition,
332-
lUsedInOtherCondition: param.leftUsedByOtherCondition,
333-
rUsedInOtherCondition: param.rightUsedByOtherCondition,
334-
}
342+
leftDataSource, rightDataSource, info, newRootExceedAction := prepareSimpleHashJoinEnv()
335343

336344
leftDataSource.PrepareChunks()
337345
rightDataSource.PrepareChunks()
@@ -340,6 +348,26 @@ func TestFallBackAction(t *testing.T) {
340348
require.Less(t, 0, newRootExceedAction.GetTriggeredNum())
341349
}
342350

351+
func TestIssue59377(t *testing.T) {
352+
leftDataSource, rightDataSource, info, _ := prepareSimpleHashJoinEnv()
353+
leftDataSource.PrepareChunks()
354+
rightDataSource.PrepareChunks()
355+
hashJoinExec := buildHashJoinV2Exec(info)
356+
357+
err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/Issue59377", "return")
358+
require.NoError(t, err)
359+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/Issue59377")
360+
361+
tmpCtx := context.Background()
362+
hashJoinExec.isMemoryClearedForTest = true
363+
err = hashJoinExec.Open(tmpCtx)
364+
require.NoError(t, err)
365+
chk := exec.NewFirstChunk(hashJoinExec)
366+
err = hashJoinExec.Next(tmpCtx, chk)
367+
require.True(t, err != nil)
368+
_ = hashJoinExec.Close()
369+
}
370+
343371
func TestHashJoinRandomFail(t *testing.T) {
344372
ctx := mock.NewContext()
345373
ctx.GetSessionVars().InitChunkSize = 32

0 commit comments

Comments
 (0)