Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_test(
deps = [
"//pkg/config",
"//pkg/domain",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/testutil",
"//pkg/expression",
"//pkg/parser/ast",
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,17 @@ func (w *buildWorkerBase) fetchBuildSideRows(ctx context.Context, hashJoinCtx *h

for {
err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, spillHelper)
issue59377Intest(&err)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

err = triggerIntest(2)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,12 @@ func rehash(oldHashValue uint64, rehashBuf []byte, hash hash.Hash64) uint64 {
return hash.Sum64()
}

func issue59377Intest(err *error) {
failpoint.Inject("Issue59377", func() {
*err = errors.New("Random failpoint error is triggered")
})
}

func triggerIntest(errProbability int) error {
failpoint.Inject("slowWorkers", func(val failpoint.Value) {
if val.(bool) {
Expand Down
158 changes: 93 additions & 65 deletions pkg/executor/join/outer_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package join

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -29,6 +31,76 @@ import (
"github.com/stretchr/testify/require"
)

func prepareSimpleHashJoinEnv() (*testutil.MockDataSource, *testutil.MockDataSource, *hashJoinInfo, *testutil.MockActionOnExceed) {
hardLimitBytesNum := int64(5000000)
newRootExceedAction := new(testutil.MockActionOnExceed)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
// Consume lots of memory in advance to help to trigger fallback action.
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)

leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)

intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
stringTp := types.NewFieldType(mysql.TypeVarString)
stringTp.AddFlag(mysql.NotNullFlag)

leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}

leftKeys := []*expression.Column{
{Index: 1, RetType: intTp},
{Index: 3, RetType: stringTp},
}
rightKeys := []*expression.Column{
{Index: 0, RetType: intTp},
{Index: 2, RetType: stringTp},
}

param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}

maxRowTableSegmentSize = 100
spillChunkSize = 100
joinType := logicalop.InnerJoin

returnTypes := getReturnTypes(joinType, param)

var buildKeys []*expression.Column
var probeKeys []*expression.Column
if param.rightAsBuildSide {
buildKeys = param.rightKeys
probeKeys = param.leftKeys
} else {
buildKeys = param.leftKeys
probeKeys = param.rightKeys
}

info := &hashJoinInfo{
ctx: ctx,
schema: buildSchema(returnTypes),
leftExec: leftDataSource,
rightExec: rightDataSource,
joinType: joinType,
rightAsBuildSide: param.rightAsBuildSide,
buildKeys: buildKeys,
probeKeys: probeKeys,
lUsed: param.leftUsed,
rUsed: param.rightUsed,
otherCondition: param.otherCondition,
lUsedInOtherCondition: param.leftUsedByOtherCondition,
rUsedInOtherCondition: param.rightUsedByOtherCondition,
}

return leftDataSource, rightDataSource, info, newRootExceedAction
}

func testRandomFail(t *testing.T, ctx *mock.Context, joinType logicalop.JoinType, param spillTestParam, leftDataSource *testutil.MockDataSource, rightDataSource *testutil.MockDataSource) {
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1500000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
Expand Down Expand Up @@ -267,71 +339,7 @@ func TestOuterJoinUnderApplyExec(t *testing.T) {
}

func TestFallBackAction(t *testing.T) {
hardLimitBytesNum := int64(5000000)
newRootExceedAction := new(testutil.MockActionOnExceed)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
// Consume lots of memory in advance to help to trigger fallback action.
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)

leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)

intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
stringTp := types.NewFieldType(mysql.TypeVarString)
stringTp.AddFlag(mysql.NotNullFlag)

leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}

leftKeys := []*expression.Column{
{Index: 1, RetType: intTp},
{Index: 3, RetType: stringTp},
}
rightKeys := []*expression.Column{
{Index: 0, RetType: intTp},
{Index: 2, RetType: stringTp},
}

param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}

maxRowTableSegmentSize = 100
spillChunkSize = 100
joinType := logicalop.InnerJoin

returnTypes := getReturnTypes(joinType, param)

var buildKeys []*expression.Column
var probeKeys []*expression.Column
if param.rightAsBuildSide {
buildKeys = param.rightKeys
probeKeys = param.leftKeys
} else {
buildKeys = param.leftKeys
probeKeys = param.rightKeys
}

info := &hashJoinInfo{
ctx: ctx,
schema: buildSchema(returnTypes),
leftExec: leftDataSource,
rightExec: rightDataSource,
joinType: joinType,
rightAsBuildSide: param.rightAsBuildSide,
buildKeys: buildKeys,
probeKeys: probeKeys,
lUsed: param.leftUsed,
rUsed: param.rightUsed,
otherCondition: param.otherCondition,
lUsedInOtherCondition: param.leftUsedByOtherCondition,
rUsedInOtherCondition: param.rightUsedByOtherCondition,
}
leftDataSource, rightDataSource, info, newRootExceedAction := prepareSimpleHashJoinEnv()

leftDataSource.PrepareChunks()
rightDataSource.PrepareChunks()
Expand All @@ -340,6 +348,26 @@ func TestFallBackAction(t *testing.T) {
require.Less(t, 0, newRootExceedAction.GetTriggeredNum())
}

func TestIssue59377(t *testing.T) {
leftDataSource, rightDataSource, info, _ := prepareSimpleHashJoinEnv()
leftDataSource.PrepareChunks()
rightDataSource.PrepareChunks()
hashJoinExec := buildHashJoinV2Exec(info)

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/Issue59377", "return")
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/Issue59377")

tmpCtx := context.Background()
hashJoinExec.isMemoryClearedForTest = true
err = hashJoinExec.Open(tmpCtx)
require.NoError(t, err)
chk := exec.NewFirstChunk(hashJoinExec)
err = hashJoinExec.Next(tmpCtx, chk)
require.True(t, err != nil)
_ = hashJoinExec.Close()
}

func TestHashJoinRandomFail(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
Expand Down