From e277439abc5f73479d8e06884419fc56405ea744 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 5 Dec 2024 21:00:47 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #56936 Signed-off-by: ti-chi-bot --- pkg/executor/join/base_join_probe.go | 15 + pkg/executor/join/hash_join_spill_helper.go | 17 + pkg/executor/join/hash_join_v2.go | 27 +- pkg/executor/join/inner_join_spill_test.go | 17 +- .../join/left_outer_semi_join_probe_test.go | 578 ++++++++++++++++++ pkg/executor/join/outer_join_spill_test.go | 20 +- pkg/executor/join/semi_join_probe_test.go | 430 +++++++++++++ pkg/util/chunk/chunk_in_disk.go | 59 +- pkg/util/chunk/chunk_in_disk_test.go | 24 +- 9 files changed, 1145 insertions(+), 42 deletions(-) create mode 100644 pkg/executor/join/left_outer_semi_join_probe_test.go create mode 100644 pkg/executor/join/semi_join_probe_test.go diff --git a/pkg/executor/join/base_join_probe.go b/pkg/executor/join/base_join_probe.go index 71eb53e060813..efc6745e69417 100644 --- a/pkg/executor/join/base_join_probe.go +++ b/pkg/executor/join/base_join_probe.go @@ -185,6 +185,7 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) { return errors.New("Previous chunk is not probed yet") } } + j.currentChunk = chk logicalRows := chk.NumRows() // if chk.sel != nil, then physicalRows is different from logicalRows @@ -312,6 +313,20 @@ func (j *baseJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) { } func (j *baseJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error { + defer func() { + if j.ctx.spillHelper.areAllPartitionsSpilled() { + // We will not call `Probe` function when all partitions are spilled. + // So it's necessary to manually set `currentProbeRow` to avoid check fail. + j.currentProbeRow = j.chunkRows + } + }() + + if j.currentChunk != nil { + if j.currentProbeRow < j.chunkRows { + return errors.New("Previous chunk is not probed yet") + } + } + hashValueCol := chk.Column(0) serializedKeysCol := chk.Column(1) colNum := chk.NumCols() diff --git a/pkg/executor/join/hash_join_spill_helper.go b/pkg/executor/join/hash_join_spill_helper.go index 68bd84d54c8d6..e2612f0d4e11f 100644 --- a/pkg/executor/join/hash_join_spill_helper.go +++ b/pkg/executor/join/hash_join_spill_helper.go @@ -81,6 +81,7 @@ type hashJoinSpillHelper struct { spillTriggedInBuildingStageForTest bool spillTriggeredBeforeBuildingHashTableForTest bool allPartitionsSpilledForTest bool + skipProbeInRestoreForTest bool } func newHashJoinSpillHelper(hashJoinExec *HashJoinV2Exec, partitionNum int, probeFieldTypes []*types.FieldType) *hashJoinSpillHelper { @@ -373,6 +374,18 @@ func (h *hashJoinSpillHelper) init() { h.buildRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency) h.probeRowsInDisk = make([][]*chunk.DataInDiskByChunks, h.hashJoinExec.Concurrency) + + for _, worker := range h.hashJoinExec.BuildWorkers { + if worker.restoredChkBuf == nil { + worker.restoredChkBuf = chunk.NewEmptyChunk(h.buildSpillChkFieldTypes) + } + } + + for _, worker := range h.hashJoinExec.ProbeWorkers { + if worker.restoredChkBuf == nil { + worker.restoredChkBuf = chunk.NewEmptyChunk(h.probeSpillFieldTypes) + } + } } } @@ -540,6 +553,10 @@ func (h *hashJoinSpillHelper) initTmpSpillBuildSideChunks() { } } +func (h *hashJoinSpillHelper) isProbeSkippedInRestoreForTest() bool { + return h.skipProbeInRestoreForTest +} + func (h *hashJoinSpillHelper) isRespillTriggeredForTest() bool { return h.spillRoundForTest > 1 } diff --git a/pkg/executor/join/hash_join_v2.go b/pkg/executor/join/hash_join_v2.go index 6387f717c4b23..ca18d0d58eb63 100644 --- a/pkg/executor/join/hash_join_v2.go +++ b/pkg/executor/join/hash_join_v2.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" ) @@ -359,6 +360,8 @@ type ProbeWorkerV2 struct { // We build individual joinProbe for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. JoinProbe ProbeV2 + + restoredChkBuf *chunk.Chunk } func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) { @@ -392,8 +395,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { } failpoint.Inject("ConsumeRandomPanic", nil) - // TODO reuse chunk - chk, err := inDisk.GetChunk(i) + err := inDisk.FillChunk(i, w.restoredChkBuf) if err != nil { joinResult.err = err break @@ -407,7 +409,7 @@ func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks) { start := time.Now() waitTime := int64(0) - ok, waitTime, joinResult = w.processOneRestoredProbeChunk(chk, joinResult) + ok, waitTime, joinResult = w.processOneRestoredProbeChunk(joinResult) probeTime += int64(time.Since(start)) - waitTime if !ok { break @@ -434,6 +436,7 @@ type BuildWorkerV2 struct { HasNullableKey bool WorkerID uint builder *rowTableBuilder + restoredChkBuf *chunk.Chunk } func (b *BuildWorkerV2) getSegmentsInRowTable(partID int) []*rowTableSegment { @@ -449,9 +452,9 @@ func (b *BuildWorkerV2) updatePartitionData(cost int64) { setMaxValue(&b.HashJoinCtx.stats.maxPartitionData, cost) } -func (b *BuildWorkerV2) processOneRestoredChunk(chk *chunk.Chunk, cost *int64) error { +func (b *BuildWorkerV2) processOneRestoredChunk(cost *int64) error { start := time.Now() - err := b.builder.processOneRestoredChunk(chk, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) + err := b.builder.processOneRestoredChunk(b.restoredChkBuf, b.HashJoinCtx, int(b.WorkerID), int(b.HashJoinCtx.partitionNumber)) if err != nil { return err } @@ -476,10 +479,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i return nil } - var chk *chunk.Chunk - - // TODO reuse chunk - chk, err = inDisk.GetChunk(i) + err = inDisk.FillChunk(i, b.restoredChkBuf) if err != nil { return err } @@ -489,7 +489,7 @@ func (b *BuildWorkerV2) splitPartitionAndAppendToRowTableForRestoreImpl(i int, i return err } - err = b.processOneRestoredChunk(chk, cost) + err = b.processOneRestoredChunk(cost) if err != nil { return err } @@ -914,8 +914,8 @@ func (w *ProbeWorkerV2) scanRowTableAfterProbeDone() { } } -func (w *ProbeWorkerV2) processOneRestoredProbeChunk(probeChunk *chunk.Chunk, joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { - joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(probeChunk) +func (w *ProbeWorkerV2) processOneRestoredProbeChunk(joinResult *hashjoinWorkerResult) (ok bool, waitTime int64, _ *hashjoinWorkerResult) { + joinResult.err = w.JoinProbe.SetRestoredChunkForProbe(w.restoredChkBuf) if joinResult.err != nil { return false, 0, joinResult } @@ -932,6 +932,9 @@ func (w *ProbeWorkerV2) processOneProbeChunk(probeChunk *chunk.Chunk, joinResult func (w *ProbeWorkerV2) probeAndSendResult(joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) { if w.HashJoinCtx.spillHelper.areAllPartitionsSpilled() { + if intest.InTest && w.HashJoinCtx.spillHelper.hashJoinExec.inRestore { + w.HashJoinCtx.spillHelper.skipProbeInRestoreForTest = true + } return true, 0, joinResult } diff --git a/pkg/executor/join/inner_join_spill_test.go b/pkg/executor/join/inner_join_spill_test.go index 575c29115f9f3..bc1345c66c1a8 100644 --- a/pkg/executor/join/inner_join_spill_test.go +++ b/pkg/executor/join/inner_join_spill_test.go @@ -147,6 +147,7 @@ func testInnerJoinSpillCase4(t *testing.T, ctx *mock.Context, expectedResult []c require.True(t, hashJoinExec.spillHelper.isSpillTriggedInBuildingStageForTest()) require.True(t, hashJoinExec.spillHelper.areAllPartitionsSpilledForTest()) require.True(t, hashJoinExec.spillHelper.isRespillTriggeredForTest()) + require.True(t, hashJoinExec.spillHelper.isProbeSkippedInRestoreForTest()) checkResults(t, retTypes, result, expectedResult) } @@ -264,14 +265,14 @@ func TestInnerJoinSpillBasic(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // rightUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // leftUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -319,8 +320,8 @@ func TestInnerJoinSpillWithOtherCondition(t *testing.T) { otherCondition = append(otherCondition, sf) params := []spillTestParam{ - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, } err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) diff --git a/pkg/executor/join/left_outer_semi_join_probe_test.go b/pkg/executor/join/left_outer_semi_join_probe_test.go new file mode 100644 index 0000000000000..08b5ec74aa3a8 --- /dev/null +++ b/pkg/executor/join/left_outer_semi_join_probe_test.go @@ -0,0 +1,578 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +func genLeftOuterSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, + leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, + resultTypes []*types.FieldType) []*chunk.Chunk { + return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, true) +} + +func genSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, + leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, + resultTypes []*types.FieldType) []*chunk.Chunk { + return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, false) +} + +// generate left outer semi join result using nested loop +func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, + leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, + resultTypes []*types.FieldType, isLeftOuter bool) []*chunk.Chunk { + filterVector := make([]bool, 0) + var err error + returnChks := make([]*chunk.Chunk, 0, 1) + resultChk := chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) + shallowRowTypes := make([]*types.FieldType, 0, len(leftTypes)+len(rightTypes)) + shallowRowTypes = append(shallowRowTypes, leftTypes...) + shallowRowTypes = append(shallowRowTypes, rightTypes...) + shallowRow := chunk.MutRowFromTypes(shallowRowTypes) + + // For each row in left chunks + for _, leftChunk := range leftChunks { + if leftFilter != nil { + filterVector, err = expression.VectorizedFilter(sessCtx.GetExprCtx().GetEvalCtx(), sessCtx.GetSessionVars().EnableVectorizedExpression, leftFilter, chunk.NewIterator4Chunk(leftChunk), filterVector) + require.NoError(t, err) + } + for leftIndex := 0; leftIndex < leftChunk.NumRows(); leftIndex++ { + filterIndex := leftIndex + if leftChunk.Sel() != nil { + filterIndex = leftChunk.Sel()[leftIndex] + } + if leftFilter != nil && !filterVector[filterIndex] { + if isLeftOuter { + // Filtered by left filter, append 0 for matched flag + appendToResultChk(leftChunk.GetRow(leftIndex), chunk.Row{}, leftUsedColumns, nil, resultChk) + resultChk.AppendInt64(len(leftUsedColumns), 0) + } + + if resultChk.IsFull() { + returnChks = append(returnChks, resultChk) + resultChk = chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) + } + continue + } + + leftRow := leftChunk.GetRow(leftIndex) + hasMatch := false + hasNull := false + + // For each row in right chunks + for _, rightChunk := range rightChunks { + for rightIndex := 0; rightIndex < rightChunk.NumRows(); rightIndex++ { + rightRow := rightChunk.GetRow(rightIndex) + valid := !containsNullKey(leftRow, leftKeyIndex) && !containsNullKey(rightRow, rightKeyIndex) + if valid { + ok, err := codec.EqualChunkRow(sessCtx.GetSessionVars().StmtCtx.TypeCtx(), leftRow, leftKeyTypes, leftKeyIndex, + rightRow, rightKeyTypes, rightKeyIndex) + require.NoError(t, err) + valid = ok + } + + if valid && otherConditions != nil { + shallowRow.ShallowCopyPartialRow(0, leftRow) + shallowRow.ShallowCopyPartialRow(len(leftTypes), rightRow) + matched, null, err := expression.EvalBool(sessCtx.GetExprCtx().GetEvalCtx(), otherConditions, shallowRow.ToRow()) + require.NoError(t, err) + valid = matched + hasNull = hasNull || null + } + + if valid { + hasMatch = true + break + } + } + if hasMatch { + break + } + } + + if isLeftOuter { + // Append result with matched flag + appendToResultChk(leftRow, chunk.Row{}, leftUsedColumns, nil, resultChk) + if hasMatch { + resultChk.AppendInt64(len(leftUsedColumns), 1) + } else { + if hasNull { + resultChk.AppendNull(len(leftUsedColumns)) + } else { + resultChk.AppendInt64(len(leftUsedColumns), 0) + } + } + } else { + if hasMatch { + appendToResultChk(leftRow, chunk.Row{}, leftUsedColumns, nil, resultChk) + } + } + + if resultChk.IsFull() { + returnChks = append(returnChks, resultChk) + resultChk = chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) + } + } + } + if resultChk.NumRows() > 0 { + returnChks = append(returnChks, resultChk) + } + return returnChks +} + +func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool) { + // todo test nullable type after builder support nullable type + tinyTp := types.NewFieldType(mysql.TypeTiny) + tinyTp.AddFlag(mysql.NotNullFlag) + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.NotNullFlag) + uintTp.AddFlag(mysql.UnsignedFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + lTypes := []*types.FieldType{intTp, stringTp, uintTp, stringTp, tinyTp} + rTypes := []*types.FieldType{intTp, stringTp, uintTp, stringTp, tinyTp} + rTypes = append(rTypes, retTypes...) + rTypes1 := []*types.FieldType{uintTp, stringTp, intTp, stringTp, tinyTp} + rTypes1 = append(rTypes1, rTypes1...) + + rightAsBuildSide := []bool{true} + if !isLeftOuter { + rightAsBuildSide = append(rightAsBuildSide, false) + } + + partitionNumber := 4 + simpleFilter := createSimpleFilter(t) + hasFilter := []bool{false} + if isLeftOuter { + hasFilter = append(hasFilter, true) + } + + var joinType logicalop.JoinType + if isLeftOuter { + joinType = logicalop.LeftOuterSemiJoin + } else { + joinType = logicalop.SemiJoin + } + + testCases := []testCase{ + // normal case + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, nil, []int{}, nil, nil, nil}, + // rightUsed is empty + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, + // leftUsed is empty + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{}, []int{}, nil, nil, nil}, + // both left/right Used are empty + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{}, []int{}, nil, nil, nil}, + // both left/right used is part of all columns + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{0, 2}, []int{}, nil, nil, nil}, + // int join uint + {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, lTypes, rTypes1, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, + // multiple join keys + {[]int{0, 1}, []int{0, 1}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, lTypes, rTypes, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, + } + + for _, tc := range testCases { + for _, value := range rightAsBuildSide { + for _, testFilter := range hasFilter { + leftFilter := simpleFilter + if !testFilter { + leftFilter = nil + } + testJoinProbe(t, false, tc.leftKeyIndex, tc.rightKeyIndex, tc.leftKeyTypes, tc.rightKeyTypes, tc.leftTypes, tc.rightTypes, value, tc.leftUsed, + tc.rightUsed, tc.leftUsedByOtherCondition, tc.rightUsedByOtherCondition, leftFilter, nil, tc.otherCondition, partitionNumber, joinType, 200) + testJoinProbe(t, false, tc.leftKeyIndex, tc.rightKeyIndex, toNullableTypes(tc.leftKeyTypes), toNullableTypes(tc.rightKeyTypes), + toNullableTypes(tc.leftTypes), toNullableTypes(tc.rightTypes), value, tc.leftUsed, tc.rightUsed, tc.leftUsedByOtherCondition, tc.rightUsedByOtherCondition, + leftFilter, nil, tc.otherCondition, partitionNumber, joinType, 200) + } + } + } +} + +func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool) { + tinyTp := types.NewFieldType(mysql.TypeTiny) + tinyTp.AddFlag(mysql.NotNullFlag) + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + uintTp.AddFlag(mysql.NotNullFlag) + yearTp := types.NewFieldType(mysql.TypeYear) + yearTp.AddFlag(mysql.NotNullFlag) + durationTp := types.NewFieldType(mysql.TypeDuration) + durationTp.AddFlag(mysql.NotNullFlag) + enumTp := types.NewFieldType(mysql.TypeEnum) + enumTp.AddFlag(mysql.NotNullFlag) + enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) + enumWithIntFlag.AddFlag(mysql.NotNullFlag) + setTp := types.NewFieldType(mysql.TypeSet) + setTp.AddFlag(mysql.NotNullFlag) + bitTp := types.NewFieldType(mysql.TypeBit) + bitTp.AddFlag(mysql.NotNullFlag) + jsonTp := types.NewFieldType(mysql.TypeJSON) + jsonTp.AddFlag(mysql.NotNullFlag) + floatTp := types.NewFieldType(mysql.TypeFloat) + floatTp.AddFlag(mysql.NotNullFlag) + doubleTp := types.NewFieldType(mysql.TypeDouble) + doubleTp.AddFlag(mysql.NotNullFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + datetimeTp := types.NewFieldType(mysql.TypeDatetime) + datetimeTp.AddFlag(mysql.NotNullFlag) + decimalTp := types.NewFieldType(mysql.TypeNewDecimal) + decimalTp.AddFlag(mysql.NotNullFlag) + timestampTp := types.NewFieldType(mysql.TypeTimestamp) + timestampTp.AddFlag(mysql.NotNullFlag) + dateTp := types.NewFieldType(mysql.TypeDate) + dateTp.AddFlag(mysql.NotNullFlag) + binaryStringTp := types.NewFieldType(mysql.TypeBlob) + binaryStringTp.AddFlag(mysql.NotNullFlag) + + lTypes := []*types.FieldType{tinyTp, intTp, uintTp, yearTp, durationTp, enumTp, enumWithIntFlag, setTp, bitTp, jsonTp, floatTp, doubleTp, stringTp, datetimeTp, decimalTp, timestampTp, dateTp, binaryStringTp} + rTypes := lTypes + lUsed := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17} + rUsed := []int{} + var joinType logicalop.JoinType + if isLeftOuter { + joinType = logicalop.LeftOuterSemiJoin + } else { + joinType = logicalop.SemiJoin + } + partitionNumber := 4 + + rightAsBuildSide := []bool{true} + if !isLeftOuter { + rightAsBuildSide = append(rightAsBuildSide, false) + } + + // single key + for i := 0; i < len(lTypes); i++ { + lKeyTypes := []*types.FieldType{lTypes[i]} + rKeyTypes := []*types.FieldType{rTypes[i]} + for _, rightAsBuild := range rightAsBuildSide { + testJoinProbe(t, false, []int{i}, []int{i}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + testJoinProbe(t, false, []int{i}, []int{i}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + } + } + // composed key + // fixed size, inlined + for _, rightAsBuild := range rightAsBuildSide { + lKeyTypes := []*types.FieldType{intTp, uintTp} + rKeyTypes := []*types.FieldType{intTp, uintTp} + testJoinProbe(t, false, []int{1, 2}, []int{1, 2}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + testJoinProbe(t, false, []int{1, 2}, []int{1, 2}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + } + // variable size, inlined + for _, rightAsBuild := range rightAsBuildSide { + lKeyTypes := []*types.FieldType{intTp, binaryStringTp} + rKeyTypes := []*types.FieldType{intTp, binaryStringTp} + testJoinProbe(t, false, []int{1, 17}, []int{1, 17}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + testJoinProbe(t, false, []int{1, 17}, []int{1, 17}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + } + // fixed size, not inlined + for _, rightAsBuild := range rightAsBuildSide { + lKeyTypes := []*types.FieldType{intTp, datetimeTp} + rKeyTypes := []*types.FieldType{intTp, datetimeTp} + testJoinProbe(t, false, []int{1, 13}, []int{1, 13}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + testJoinProbe(t, false, []int{1, 13}, []int{1, 13}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + } + // variable size, not inlined + for _, rightAsBuild := range rightAsBuildSide { + lKeyTypes := []*types.FieldType{intTp, decimalTp} + rKeyTypes := []*types.FieldType{intTp, decimalTp} + testJoinProbe(t, false, []int{1, 14}, []int{1, 14}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + testJoinProbe(t, false, []int{1, 14}, []int{1, 14}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) + } +} + +func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool) { + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + nullableIntTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.NotNullFlag) + uintTp.AddFlag(mysql.UnsignedFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes = append(rTypes, rTypes...) + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: nullableIntTp} + b := &expression.Column{Index: 8, RetType: nullableIntTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + // test condition `a = b` from `a in (select b from t2)` + a2 := &expression.Column{Index: 1, RetType: nullableIntTp, InOperand: true} + b2 := &expression.Column{Index: 8, RetType: nullableIntTp, InOperand: true} + sf2, err := expression.NewFunction(mock.NewContext(), ast.EQ, tinyTp, a2, b2) + require.NoError(t, err, "error when create other condition") + otherCondition := make(expression.CNFExprs, 0) + otherCondition = append(otherCondition, sf) + otherCondition2 := make(expression.CNFExprs, 0) + otherCondition2 = append(otherCondition2, sf2) + + var joinType logicalop.JoinType + if isLeftOuter { + joinType = logicalop.LeftOuterSemiJoin + } else { + joinType = logicalop.SemiJoin + } + + simpleFilter := createSimpleFilter(t) + + hasFilter := []bool{false} + if isLeftOuter { + hasFilter = append(hasFilter, true) + } + + rightAsBuildSide := []bool{true} + if !isLeftOuter { + rightAsBuildSide = append(rightAsBuildSide, false) + } + + partitionNumber := 4 + rightUsed := []int{} + + for _, rightBuild := range rightAsBuildSide { + for _, testFilter := range hasFilter { + leftFilter := simpleFilter + if !testFilter { + leftFilter = nil + } + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) + + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) + } + } +} + +func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool) { + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + nullableIntTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.NotNullFlag) + uintTp.AddFlag(mysql.UnsignedFlag) + nullableUIntTp := types.NewFieldType(mysql.TypeLonglong) + nullableUIntTp.AddFlag(mysql.UnsignedFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes = append(rTypes, rTypes...) + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: nullableIntTp} + b := &expression.Column{Index: 8, RetType: nullableUIntTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + otherCondition := make(expression.CNFExprs, 0) + otherCondition = append(otherCondition, sf) + + var joinType logicalop.JoinType + if isLeftOuter { + joinType = logicalop.LeftOuterSemiJoin + } else { + joinType = logicalop.SemiJoin + } + + rightAsBuildSide := []bool{true} + if !isLeftOuter { + rightAsBuildSide = append(rightAsBuildSide, false) + } + + simpleFilter := createSimpleFilter(t) + + hasFilter := []bool{false} + if isLeftOuter { + hasFilter = append(hasFilter, true) + } + + partitionNumber := 4 + rightUsed := []int{} + + for _, rightBuild := range rightAsBuildSide { + for _, useFilter := range hasFilter { + leftFilter := simpleFilter + if !useFilter { + leftFilter = nil + } + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) + } + } +} + +func TestLeftOuterSemiJoinProbeBasic(t *testing.T) { + testLeftOuterSemiOrSemiJoinProbeBasic(t, true) +} + +func TestLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T) { + testLeftOuterSemiJoinProbeAllJoinKeys(t, true) +} + +func TestLeftOuterSemiJoinProbeOtherCondition(t *testing.T) { + testLeftOuterSemiJoinProbeOtherCondition(t, true) +} + +func TestLeftOuterSemiJoinProbeWithSel(t *testing.T) { + testLeftOuterSemiJoinProbeWithSel(t, true) +} + +func TestLeftOuterSemiJoinBuildResultFastPath(t *testing.T) { + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + nullableIntTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.NotNullFlag) + uintTp.AddFlag(mysql.UnsignedFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} + rTypes = append(rTypes, rTypes...) + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: nullableIntTp} + b := &expression.Column{Index: 8, RetType: nullableIntTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + // test condition `a = b` from `a in (select b from t2)` + a2 := &expression.Column{Index: 1, RetType: nullableIntTp, InOperand: true} + b2 := &expression.Column{Index: 8, RetType: nullableIntTp, InOperand: true} + sf2, err := expression.NewFunction(mock.NewContext(), ast.EQ, tinyTp, a2, b2) + require.NoError(t, err, "error when create other condition") + otherCondition := make(expression.CNFExprs, 0) + otherCondition = append(otherCondition, sf) + otherCondition2 := make(expression.CNFExprs, 0) + otherCondition2 = append(otherCondition2, sf2) + joinType := logicalop.LeftOuterSemiJoin + simpleFilter := createSimpleFilter(t) + hasFilter := []bool{false, true} + rightAsBuildSide := []bool{true} + partitionNumber := 4 + rightUsed := []int{} + + for _, rightBuild := range rightAsBuildSide { + for _, testFilter := range hasFilter { + leftFilter := simpleFilter + if !testFilter { + leftFilter = nil + } + // MockContext set MaxChunkSize to 32, input chunk size should be less than 32 to test fast path + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) + + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) + + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) + testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) + testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) + } + } +} + +func TestLeftOuterSemiJoinSpill(t *testing.T) { + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = 32 + ctx.GetSessionVars().MaxChunkSize = 32 + leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false) + leftDataSourceWithSel, rightDataSourceWithSel := buildLeftAndRightDataSource(ctx, leftCols, rightCols, true) + + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp} + rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp} + + leftKeys := []*expression.Column{ + {Index: 1, RetType: intTp}, + {Index: 3, RetType: stringTp}, + } + rightKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + {Index: 2, RetType: stringTp}, + } + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: intTp} + b := &expression.Column{Index: 8, RetType: intTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + otherCondition := make(expression.CNFExprs, 0) + otherCondition = append(otherCondition, sf) + + maxRowTableSegmentSize = 100 + spillChunkSize = 100 + + joinType := logicalop.LeftOuterSemiJoin + params := []spillTestParam{ + // basic case + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 100000, 10000}}, + // with other condition + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 100000, 10000}}, + } + + for _, param := range params { + testSpill(t, ctx, joinType, leftDataSource, rightDataSource, param) + } + + params2 := []spillTestParam{ + // basic case with sel + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{1000000, 900000, 1700000, 100000, 10000}}, + // with other condition with sel + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{1000000, 900000, 1600000, 100000, 10000}}, + } + + for _, param := range params2 { + testSpill(t, ctx, joinType, leftDataSourceWithSel, rightDataSourceWithSel, param) + } +} diff --git a/pkg/executor/join/outer_join_spill_test.go b/pkg/executor/join/outer_join_spill_test.go index a81ad9a2dc6a6..63c08cb8533b5 100644 --- a/pkg/executor/join/outer_join_spill_test.go +++ b/pkg/executor/join/outer_join_spill_test.go @@ -93,14 +93,14 @@ func TestOuterJoinSpillBasic(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // rightUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, // leftUsed is empty - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 250000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -146,8 +146,8 @@ func TestOuterJoinSpillWithSel(t *testing.T) { params := []spillTestParam{ // Normal case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{2000000, 1000000, 3000000, 200000, 10000}}, } err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) @@ -201,8 +201,8 @@ func TestOuterJoinSpillWithOtherCondition(t *testing.T) { otherCondition = append(otherCondition, sf) params := []spillTestParam{ - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 1500000, 10000}}, + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, otherCondition, []int{0}, []int{4}, []int64{5000000, 1700000, 6000000, 500000, 10000}}, } err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/slowWorkers", `return(true)`) diff --git a/pkg/executor/join/semi_join_probe_test.go b/pkg/executor/join/semi_join_probe_test.go new file mode 100644 index 0000000000000..d5d3eb908fb1c --- /dev/null +++ b/pkg/executor/join/semi_join_probe_test.go @@ -0,0 +1,430 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "math/rand" + "testing" + + "github.com/pingcap/tidb/pkg/executor/internal/testutil" + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +const maxChunkSizeInTest = 32 + +var semiJoinleftCols = []*expression.Column{ + {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, +} + +var semiJoinrightCols = []*expression.Column{ + {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, +} + +var semiJoinRetTypes = []*types.FieldType{ + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeLonglong), +} + +func buildLeftAndRightSemiDataSource(ctx sessionctx.Context, leftCols []*expression.Column, rightCols []*expression.Column, hasSel bool) (*testutil.MockDataSource, *testutil.MockDataSource) { + leftSchema := expression.NewSchema(leftCols...) + rightSchema := expression.NewSchema(rightCols...) + + joinKeyleftIntDatums := buildJoinKeyIntDatums(10000) + joinKeyrightIntDatums := buildJoinKeyIntDatums(10000) + leftMockSrcParm := testutil.MockDataSourceParameters{DataSchema: leftSchema, Ctx: ctx, Rows: 50000, Ndvs: []int{-1, -1}, Datums: [][]any{joinKeyleftIntDatums, joinKeyleftIntDatums}, HasSel: hasSel} + rightMockSrcParm := testutil.MockDataSourceParameters{DataSchema: rightSchema, Ctx: ctx, Rows: 50000, Ndvs: []int{-1, -1}, Datums: [][]any{joinKeyrightIntDatums, joinKeyrightIntDatums}, HasSel: hasSel} + return testutil.BuildMockDataSource(leftMockSrcParm), testutil.BuildMockDataSource(rightMockSrcParm) +} + +func buildSemiDataSourceAndExpectResult(ctx sessionctx.Context, leftCols []*expression.Column, rightCols []*expression.Column, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool, isAntiSemiJoin bool) (*testutil.MockDataSource, *testutil.MockDataSource, []chunk.Row) { + leftSchema := expression.NewSchema(leftCols...) + rightSchema := expression.NewSchema(rightCols...) + + rowNum := int64(50000) + leftCol0Datums := make([]any, 0, rowNum) + leftCol1Datums := make([]any, 0, rowNum) + rightCol0Datums := make([]any, 0, rowNum) + rightCol1Datums := make([]any, 0, rowNum) + + intTp := types.NewFieldType(mysql.TypeLonglong) + expectResultChunk := chunk.NewChunkWithCapacity([]*types.FieldType{intTp, intTp}, 10000) + expectResult := make([]chunk.Row, 0, 100000) + + if hasDuplicateKey { + if hasOtherCondition { + if rightAsBuildSide { + differentKeyNum := int64(10000) + for i := int64(0); i < differentKeyNum; i++ { + leftCol0Datums = append(leftCol0Datums, i) + leftCol1Datums = append(leftCol1Datums, int64(1)) + + singleKeyNum := rand.Int31n(2 * maxChunkSizeInTest) + if singleKeyNum == 0 { + if isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 1) + } + continue + } + + canOtherConditionSuccess := rand.Int31n(10) < 5 + if canOtherConditionSuccess { + if !isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 1) + } + + otherConditionSuccessNum := rand.Int31n(singleKeyNum) + 1 + for j := 0; j < int(singleKeyNum); j++ { + rightCol0Datums = append(rightCol0Datums, i) + if j < int(otherConditionSuccessNum) { + rightCol1Datums = append(rightCol1Datums, int64(0)) + } else { + rightCol1Datums = append(rightCol1Datums, int64(1)) + } + } + } else { + if isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 1) + } + + for j := 0; j < int(singleKeyNum); j++ { + rightCol0Datums = append(rightCol0Datums, i) + rightCol1Datums = append(rightCol1Datums, int64(1)) + } + } + } + } else { + differentKeyNum := int64(10000) + for i := int64(0); i < differentKeyNum; i++ { + rightCol0Datums = append(rightCol0Datums, i) + rightCol1Datums = append(rightCol1Datums, int64(0)) + + singleKeyNum := rand.Int31n(2 * maxChunkSizeInTest) + if singleKeyNum == 0 { + continue + } + + canOtherConditionSuccess := rand.Int31n(10) < 5 + if canOtherConditionSuccess { + otherConditionSuccessNum := rand.Int31n(singleKeyNum) + 1 + for j := 0; j < int(singleKeyNum); j++ { + leftCol0Datums = append(leftCol0Datums, i) + if j < int(otherConditionSuccessNum) { + leftCol1Datums = append(leftCol1Datums, int64(1)) + if !isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 1) + } + } else { + leftCol1Datums = append(leftCol1Datums, int64(0)) + if isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 0) + } + } + } + } else { + for j := 0; j < int(singleKeyNum); j++ { + leftCol0Datums = append(leftCol0Datums, i) + leftCol1Datums = append(leftCol1Datums, int64(0)) + if isAntiSemiJoin { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 0) + } + } + } + } + } + } else { + differentKeyNum := int64(10000) + for i := int64(0); i < differentKeyNum; i++ { + leftSingleKeyNum := rand.Int31n(2*maxChunkSizeInTest) + 1 + rightSingleKeyNum := rand.Int31n(2*maxChunkSizeInTest) + 1 + + for j := 0; j < int(leftSingleKeyNum); j++ { + leftCol0Datums = append(leftCol0Datums, i) + leftCol1Datums = append(leftCol1Datums, int64(0)) + } + + if i%2 == 0 { + for j := 0; j < int(rightSingleKeyNum); j++ { + rightCol0Datums = append(rightCol0Datums, i) + rightCol1Datums = append(rightCol1Datums, int64(0)) + } + + if !isAntiSemiJoin { + for j := 0; j < int(leftSingleKeyNum); j++ { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 0) + } + } + } else { + if isAntiSemiJoin { + for j := 0; j < int(leftSingleKeyNum); j++ { + expectResultChunk.AppendInt64(0, i) + expectResultChunk.AppendInt64(1, 0) + } + } + } + } + } + } else { + leftCol0StartNum := int64(30000) + for i := int64(0); i < rowNum; i++ { + leftCol0AppendedData := leftCol0StartNum + i + leftCol0Datums = append(leftCol0Datums, leftCol0AppendedData) + + if hasOtherCondition { + if leftCol0AppendedData%2 == 0 { + leftCol1Datums = append(leftCol1Datums, int64(1)) + if isAntiSemiJoin { + if leftCol0AppendedData >= rowNum { + expectResultChunk.AppendInt64(0, leftCol0AppendedData) + expectResultChunk.AppendInt64(1, 1) + } + } else { + if leftCol0AppendedData < rowNum { + expectResultChunk.AppendInt64(0, leftCol0AppendedData) + expectResultChunk.AppendInt64(1, 1) + } + } + } else { + leftCol1Datums = append(leftCol1Datums, int64(0)) + if isAntiSemiJoin { + expectResultChunk.AppendInt64(0, leftCol0AppendedData) + expectResultChunk.AppendInt64(1, 0) + } + } + } else { + leftCol1Datums = append(leftCol1Datums, int64(1)) + if isAntiSemiJoin { + if leftCol0AppendedData >= rowNum { + expectResultChunk.AppendInt64(0, leftCol0AppendedData) + expectResultChunk.AppendInt64(1, 1) + } + } else { + if leftCol0AppendedData < rowNum { + expectResultChunk.AppendInt64(0, leftCol0AppendedData) + expectResultChunk.AppendInt64(1, 1) + } + } + } + + rightCol0Datums = append(rightCol0Datums, i) + rightCol1Datums = append(rightCol1Datums, int64(0)) + } + } + + leftLen := len(leftCol0Datums) + rightLen := len(rightCol0Datums) + + // Shuffle + for i := int64(0); i < int64(leftLen); i++ { + j := rand.Int63n(i + 1) + leftCol0Datums[i], leftCol0Datums[j] = leftCol0Datums[j], leftCol0Datums[i] + leftCol1Datums[i], leftCol1Datums[j] = leftCol1Datums[j], leftCol1Datums[i] + } + + for i := int64(0); i < int64(rightLen); i++ { + j := rand.Int63n(i + 1) + rightCol0Datums[i], rightCol0Datums[j] = rightCol0Datums[j], rightCol0Datums[i] + rightCol1Datums[i], rightCol1Datums[j] = rightCol1Datums[j], rightCol1Datums[i] + } + + if isAntiSemiJoin { + expectResult = sortRows([]*chunk.Chunk{expectResultChunk}, semiJoinRetTypes) + } else { + resultRowNum := expectResultChunk.NumRows() + for i := 0; i < resultRowNum; i++ { + expectResult = append(expectResult, expectResultChunk.GetRow(i)) + } + } + + leftMockSrcParm := testutil.MockDataSourceParameters{DataSchema: leftSchema, Ctx: ctx, Rows: leftLen, Ndvs: []int{-2, -2}, Datums: [][]any{leftCol0Datums, leftCol1Datums}, HasSel: false} + rightMockSrcParm := testutil.MockDataSourceParameters{DataSchema: rightSchema, Ctx: ctx, Rows: rightLen, Ndvs: []int{-2, -2}, Datums: [][]any{rightCol0Datums, rightCol1Datums}, HasSel: false} + return testutil.BuildMockDataSource(leftMockSrcParm), testutil.BuildMockDataSource(rightMockSrcParm), expectResult +} + +func testSemiJoin(t *testing.T, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool) { + testSemiOrAntiSemiJoin(t, rightAsBuildSide, hasOtherCondition, hasDuplicateKey, false) +} + +func testSemiOrAntiSemiJoin(t *testing.T, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool, isAntiSemiJoin bool) { + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = maxChunkSizeInTest + ctx.GetSessionVars().MaxChunkSize = maxChunkSizeInTest + leftDataSource, rightDataSource, expectedResult := buildSemiDataSourceAndExpectResult(ctx, semiJoinleftCols, semiJoinrightCols, rightAsBuildSide, hasOtherCondition, hasDuplicateKey, isAntiSemiJoin) + + maxRowTableSegmentSize = 100 + + intTp := types.NewFieldType(mysql.TypeLonglong) + + leftKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + } + + rightKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + } + + var buildKeys []*expression.Column + var probeKeys []*expression.Column + if rightAsBuildSide { + buildKeys = rightKeys + probeKeys = leftKeys + } else { + buildKeys = leftKeys + probeKeys = rightKeys + } + + var otherCondition expression.CNFExprs + lUsedInOtherCondition := []int{} + rUsedInOtherCondition := []int{} + if hasOtherCondition { + lUsedInOtherCondition = append(lUsedInOtherCondition, 1) + rUsedInOtherCondition = append(rUsedInOtherCondition, 1) + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: intTp} + b := &expression.Column{Index: 3, RetType: intTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + otherCondition = append(otherCondition, sf) + } + + var joinType logicalop.JoinType + if isAntiSemiJoin { + joinType = logicalop.AntiSemiJoin + } else { + joinType = logicalop.SemiJoin + } + + info := &hashJoinInfo{ + ctx: ctx, + schema: buildSchema(semiJoinRetTypes), + leftExec: leftDataSource, + rightExec: rightDataSource, + joinType: joinType, + rightAsBuildSide: rightAsBuildSide, + buildKeys: buildKeys, + probeKeys: probeKeys, + lUsed: []int{0, 1}, + rUsed: []int{}, + otherCondition: otherCondition, + lUsedInOtherCondition: lUsedInOtherCondition, + rUsedInOtherCondition: rUsedInOtherCondition, + } + + leftDataSource.PrepareChunks() + rightDataSource.PrepareChunks() + + hashJoinExec := buildHashJoinV2Exec(info) + result := getSortedResults(t, hashJoinExec, semiJoinRetTypes) + checkResults(t, semiJoinRetTypes, result, expectedResult) +} + +func TestSemiJoinBasic(t *testing.T) { + testSemiJoin(t, false, false, false) // Left side build without other condition + testSemiJoin(t, false, true, false) // Left side build with other condition + testSemiJoin(t, true, false, false) // Right side build without other condition + testSemiJoin(t, true, true, false) // Right side build with other condition +} + +func TestSemiJoinDuplicateKeys(t *testing.T) { + testSemiJoin(t, false, false, true) // Left side build without other condition + testSemiJoin(t, false, true, true) // Left side build with other condition + testSemiJoin(t, true, false, true) // Right side build without other condition + testSemiJoin(t, true, true, true) // Right side build with other condition +} + +func TestSemiSpill(t *testing.T) { + var leftCols = []*expression.Column{ + {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, + } + + var rightCols = []*expression.Column{ + {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, + } + + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = 32 + ctx.GetSessionVars().MaxChunkSize = 32 + leftDataSource, rightDataSource := buildLeftAndRightSemiDataSource(ctx, leftCols, rightCols, false) + + intTp := types.NewFieldType(mysql.TypeLonglong) + + leftTypes := []*types.FieldType{intTp, intTp} + rightTypes := []*types.FieldType{intTp, intTp} + + leftKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + } + rightKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + } + + tinyTp := types.NewFieldType(mysql.TypeTiny) + a := &expression.Column{Index: 1, RetType: intTp} + b := &expression.Column{Index: 3, RetType: intTp} + sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) + require.NoError(t, err, "error when create other condition") + otherCondition := make(expression.CNFExprs, 0) + otherCondition = append(otherCondition, sf) + + maxRowTableSegmentSize = 100 + spillChunkSize = 100 + + joinTypes := []logicalop.JoinType{logicalop.SemiJoin} + params := []spillTestParam{ + // basic case + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 100000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 100000, 10000}}, + // with other condition + {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, + {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, + } + + for _, joinType := range joinTypes { + for _, param := range params { + testSpill(t, ctx, joinType, leftDataSource, rightDataSource, param) + } + } +} + +func TestSemiJoinProbeBasic(t *testing.T) { + testLeftOuterSemiOrSemiJoinProbeBasic(t, false) +} + +func TestSemiJoinProbeAllJoinKeys(t *testing.T) { + testLeftOuterSemiJoinProbeAllJoinKeys(t, false) +} + +func TestSemiJoinProbeWithSel(t *testing.T) { + testLeftOuterSemiJoinProbeWithSel(t, false) +} diff --git a/pkg/util/chunk/chunk_in_disk.go b/pkg/util/chunk/chunk_in_disk.go index 268f1e1eca25f..1e49e36d1bfd4 100644 --- a/pkg/util/chunk/chunk_in_disk.go +++ b/pkg/util/chunk/chunk_in_disk.go @@ -128,10 +128,9 @@ func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 { return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx] } -// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. -func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { +func (d *DataInDiskByChunks) readFromFisk(chkIdx int) error { if err := injectChunkInDiskRandomError(); err != nil { - return nil, err + return err } reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx]) @@ -145,19 +144,39 @@ func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { readByteNum, err := io.ReadFull(reader, d.buf) if err != nil { - return nil, err + return err } if int64(readByteNum) != chkSize { - return nil, errors2.New("Fail to restore the spilled chunk") + return errors2.New("Fail to restore the spilled chunk") + } + + return nil +} + +// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. +func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { + err := d.readFromFisk(chkIdx) + if err != nil { + return nil, err } chk := NewEmptyChunk(d.fieldTypes) d.deserializeDataToChunk(chk) - return chk, nil } +// FillChunk fills a Chunk from the DataInDiskByChunks by chkIdx. +func (d *DataInDiskByChunks) FillChunk(srcChkIdx int, destChk *Chunk) error { + err := d.readFromFisk(srcChkIdx) + if err != nil { + return err + } + + d.deserializeDataToChunk(destChk) + return nil +} + // Close releases the disk resource. func (d *DataInDiskByChunks) Close() { if d.dataFile.file != nil { @@ -264,7 +283,11 @@ func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullM func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) { selLen := int64(selSize) / intLen - chk.sel = make([]int, selLen) + if int64(cap(chk.sel)) < selLen { + chk.sel = make([]int, selLen) + } else { + chk.sel = chk.sel[:selLen] + } for i := range selLen { chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos])) *pos += intLen @@ -299,9 +322,25 @@ func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) { func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) { for _, col := range chk.columns { length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos) - col.nullBitmap = make([]byte, nullMapSize) - col.data = make([]byte, dataSize) - col.offsets = make([]int64, offsetSize/int64Len) + + if int64(cap(col.nullBitmap)) < nullMapSize { + col.nullBitmap = make([]byte, nullMapSize) + } else { + col.nullBitmap = col.nullBitmap[:nullMapSize] + } + + if int64(cap(col.data)) < dataSize { + col.data = make([]byte, dataSize) + } else { + col.data = col.data[:dataSize] + } + + offsetsLen := offsetSize / int64Len + if int64(cap(col.offsets)) < offsetsLen { + col.offsets = make([]int64, offsetsLen) + } else { + col.offsets = col.offsets[:offsetsLen] + } col.length = int(length) copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize]) diff --git a/pkg/util/chunk/chunk_in_disk_test.go b/pkg/util/chunk/chunk_in_disk_test.go index 46dd5745c9ee8..c780c96bedd1a 100644 --- a/pkg/util/chunk/chunk_in_disk_test.go +++ b/pkg/util/chunk/chunk_in_disk_test.go @@ -66,7 +66,7 @@ func checkChunk(t *testing.T, chk1, chk2 *Chunk) { } } -func TestDataInDiskByChunks(t *testing.T) { +func testImpl(t *testing.T, isNewChunk bool) { numChk, numRow := 100, 1000 chks, fields := initChunks(numChk, numRow) addAuxDataForChunks(chks) @@ -78,9 +78,29 @@ func TestDataInDiskByChunks(t *testing.T) { require.NoError(t, err) } + chk := NewEmptyChunk(fields) + var err error for i := range numChk { - chk, err := dataInDiskByChunks.GetChunk(i) + if isNewChunk { + chk, err = dataInDiskByChunks.GetChunk(i) + } else { + chk.Reset() + err = dataInDiskByChunks.FillChunk(i, chk) + } require.NoError(t, err) checkChunk(t, chk, chks[i]) } } + +func testGetChunk(t *testing.T) { + testImpl(t, true) +} + +func testFillChunk(t *testing.T) { + testImpl(t, false) +} + +func TestDataInDiskByChunks(t *testing.T) { + testGetChunk(t) + testFillChunk(t) +} From 5c18cd135b07681e7555ca8c6a3b2bee23a33571 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 9 Dec 2024 09:42:33 +0800 Subject: [PATCH 2/2] remove useless files --- .../join/left_outer_semi_join_probe_test.go | 578 ------------------ pkg/executor/join/semi_join_probe_test.go | 430 ------------- 2 files changed, 1008 deletions(-) delete mode 100644 pkg/executor/join/left_outer_semi_join_probe_test.go delete mode 100644 pkg/executor/join/semi_join_probe_test.go diff --git a/pkg/executor/join/left_outer_semi_join_probe_test.go b/pkg/executor/join/left_outer_semi_join_probe_test.go deleted file mode 100644 index 08b5ec74aa3a8..0000000000000 --- a/pkg/executor/join/left_outer_semi_join_probe_test.go +++ /dev/null @@ -1,578 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package join - -import ( - "testing" - - "github.com/pingcap/tidb/pkg/expression" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/mock" - "github.com/stretchr/testify/require" -) - -func genLeftOuterSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, - leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, - resultTypes []*types.FieldType) []*chunk.Chunk { - return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, true) -} - -func genSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, - leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, - resultTypes []*types.FieldType) []*chunk.Chunk { - return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, false) -} - -// generate left outer semi join result using nested loop -func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int, - leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs, - resultTypes []*types.FieldType, isLeftOuter bool) []*chunk.Chunk { - filterVector := make([]bool, 0) - var err error - returnChks := make([]*chunk.Chunk, 0, 1) - resultChk := chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) - shallowRowTypes := make([]*types.FieldType, 0, len(leftTypes)+len(rightTypes)) - shallowRowTypes = append(shallowRowTypes, leftTypes...) - shallowRowTypes = append(shallowRowTypes, rightTypes...) - shallowRow := chunk.MutRowFromTypes(shallowRowTypes) - - // For each row in left chunks - for _, leftChunk := range leftChunks { - if leftFilter != nil { - filterVector, err = expression.VectorizedFilter(sessCtx.GetExprCtx().GetEvalCtx(), sessCtx.GetSessionVars().EnableVectorizedExpression, leftFilter, chunk.NewIterator4Chunk(leftChunk), filterVector) - require.NoError(t, err) - } - for leftIndex := 0; leftIndex < leftChunk.NumRows(); leftIndex++ { - filterIndex := leftIndex - if leftChunk.Sel() != nil { - filterIndex = leftChunk.Sel()[leftIndex] - } - if leftFilter != nil && !filterVector[filterIndex] { - if isLeftOuter { - // Filtered by left filter, append 0 for matched flag - appendToResultChk(leftChunk.GetRow(leftIndex), chunk.Row{}, leftUsedColumns, nil, resultChk) - resultChk.AppendInt64(len(leftUsedColumns), 0) - } - - if resultChk.IsFull() { - returnChks = append(returnChks, resultChk) - resultChk = chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) - } - continue - } - - leftRow := leftChunk.GetRow(leftIndex) - hasMatch := false - hasNull := false - - // For each row in right chunks - for _, rightChunk := range rightChunks { - for rightIndex := 0; rightIndex < rightChunk.NumRows(); rightIndex++ { - rightRow := rightChunk.GetRow(rightIndex) - valid := !containsNullKey(leftRow, leftKeyIndex) && !containsNullKey(rightRow, rightKeyIndex) - if valid { - ok, err := codec.EqualChunkRow(sessCtx.GetSessionVars().StmtCtx.TypeCtx(), leftRow, leftKeyTypes, leftKeyIndex, - rightRow, rightKeyTypes, rightKeyIndex) - require.NoError(t, err) - valid = ok - } - - if valid && otherConditions != nil { - shallowRow.ShallowCopyPartialRow(0, leftRow) - shallowRow.ShallowCopyPartialRow(len(leftTypes), rightRow) - matched, null, err := expression.EvalBool(sessCtx.GetExprCtx().GetEvalCtx(), otherConditions, shallowRow.ToRow()) - require.NoError(t, err) - valid = matched - hasNull = hasNull || null - } - - if valid { - hasMatch = true - break - } - } - if hasMatch { - break - } - } - - if isLeftOuter { - // Append result with matched flag - appendToResultChk(leftRow, chunk.Row{}, leftUsedColumns, nil, resultChk) - if hasMatch { - resultChk.AppendInt64(len(leftUsedColumns), 1) - } else { - if hasNull { - resultChk.AppendNull(len(leftUsedColumns)) - } else { - resultChk.AppendInt64(len(leftUsedColumns), 0) - } - } - } else { - if hasMatch { - appendToResultChk(leftRow, chunk.Row{}, leftUsedColumns, nil, resultChk) - } - } - - if resultChk.IsFull() { - returnChks = append(returnChks, resultChk) - resultChk = chunk.New(resultTypes, sessCtx.GetSessionVars().MaxChunkSize, sessCtx.GetSessionVars().MaxChunkSize) - } - } - } - if resultChk.NumRows() > 0 { - returnChks = append(returnChks, resultChk) - } - return returnChks -} - -func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool) { - // todo test nullable type after builder support nullable type - tinyTp := types.NewFieldType(mysql.TypeTiny) - tinyTp.AddFlag(mysql.NotNullFlag) - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.NotNullFlag) - uintTp.AddFlag(mysql.UnsignedFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - lTypes := []*types.FieldType{intTp, stringTp, uintTp, stringTp, tinyTp} - rTypes := []*types.FieldType{intTp, stringTp, uintTp, stringTp, tinyTp} - rTypes = append(rTypes, retTypes...) - rTypes1 := []*types.FieldType{uintTp, stringTp, intTp, stringTp, tinyTp} - rTypes1 = append(rTypes1, rTypes1...) - - rightAsBuildSide := []bool{true} - if !isLeftOuter { - rightAsBuildSide = append(rightAsBuildSide, false) - } - - partitionNumber := 4 - simpleFilter := createSimpleFilter(t) - hasFilter := []bool{false} - if isLeftOuter { - hasFilter = append(hasFilter, true) - } - - var joinType logicalop.JoinType - if isLeftOuter { - joinType = logicalop.LeftOuterSemiJoin - } else { - joinType = logicalop.SemiJoin - } - - testCases := []testCase{ - // normal case - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, nil, []int{}, nil, nil, nil}, - // rightUsed is empty - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, - // leftUsed is empty - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{}, []int{}, nil, nil, nil}, - // both left/right Used are empty - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{}, []int{}, nil, nil, nil}, - // both left/right used is part of all columns - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, []int{0, 2}, []int{}, nil, nil, nil}, - // int join uint - {[]int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, lTypes, rTypes1, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, - // multiple join keys - {[]int{0, 1}, []int{0, 1}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, lTypes, rTypes, []int{0, 1, 2, 3}, []int{}, nil, nil, nil}, - } - - for _, tc := range testCases { - for _, value := range rightAsBuildSide { - for _, testFilter := range hasFilter { - leftFilter := simpleFilter - if !testFilter { - leftFilter = nil - } - testJoinProbe(t, false, tc.leftKeyIndex, tc.rightKeyIndex, tc.leftKeyTypes, tc.rightKeyTypes, tc.leftTypes, tc.rightTypes, value, tc.leftUsed, - tc.rightUsed, tc.leftUsedByOtherCondition, tc.rightUsedByOtherCondition, leftFilter, nil, tc.otherCondition, partitionNumber, joinType, 200) - testJoinProbe(t, false, tc.leftKeyIndex, tc.rightKeyIndex, toNullableTypes(tc.leftKeyTypes), toNullableTypes(tc.rightKeyTypes), - toNullableTypes(tc.leftTypes), toNullableTypes(tc.rightTypes), value, tc.leftUsed, tc.rightUsed, tc.leftUsedByOtherCondition, tc.rightUsedByOtherCondition, - leftFilter, nil, tc.otherCondition, partitionNumber, joinType, 200) - } - } - } -} - -func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool) { - tinyTp := types.NewFieldType(mysql.TypeTiny) - tinyTp.AddFlag(mysql.NotNullFlag) - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - uintTp.AddFlag(mysql.NotNullFlag) - yearTp := types.NewFieldType(mysql.TypeYear) - yearTp.AddFlag(mysql.NotNullFlag) - durationTp := types.NewFieldType(mysql.TypeDuration) - durationTp.AddFlag(mysql.NotNullFlag) - enumTp := types.NewFieldType(mysql.TypeEnum) - enumTp.AddFlag(mysql.NotNullFlag) - enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) - enumWithIntFlag.AddFlag(mysql.NotNullFlag) - setTp := types.NewFieldType(mysql.TypeSet) - setTp.AddFlag(mysql.NotNullFlag) - bitTp := types.NewFieldType(mysql.TypeBit) - bitTp.AddFlag(mysql.NotNullFlag) - jsonTp := types.NewFieldType(mysql.TypeJSON) - jsonTp.AddFlag(mysql.NotNullFlag) - floatTp := types.NewFieldType(mysql.TypeFloat) - floatTp.AddFlag(mysql.NotNullFlag) - doubleTp := types.NewFieldType(mysql.TypeDouble) - doubleTp.AddFlag(mysql.NotNullFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - datetimeTp := types.NewFieldType(mysql.TypeDatetime) - datetimeTp.AddFlag(mysql.NotNullFlag) - decimalTp := types.NewFieldType(mysql.TypeNewDecimal) - decimalTp.AddFlag(mysql.NotNullFlag) - timestampTp := types.NewFieldType(mysql.TypeTimestamp) - timestampTp.AddFlag(mysql.NotNullFlag) - dateTp := types.NewFieldType(mysql.TypeDate) - dateTp.AddFlag(mysql.NotNullFlag) - binaryStringTp := types.NewFieldType(mysql.TypeBlob) - binaryStringTp.AddFlag(mysql.NotNullFlag) - - lTypes := []*types.FieldType{tinyTp, intTp, uintTp, yearTp, durationTp, enumTp, enumWithIntFlag, setTp, bitTp, jsonTp, floatTp, doubleTp, stringTp, datetimeTp, decimalTp, timestampTp, dateTp, binaryStringTp} - rTypes := lTypes - lUsed := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17} - rUsed := []int{} - var joinType logicalop.JoinType - if isLeftOuter { - joinType = logicalop.LeftOuterSemiJoin - } else { - joinType = logicalop.SemiJoin - } - partitionNumber := 4 - - rightAsBuildSide := []bool{true} - if !isLeftOuter { - rightAsBuildSide = append(rightAsBuildSide, false) - } - - // single key - for i := 0; i < len(lTypes); i++ { - lKeyTypes := []*types.FieldType{lTypes[i]} - rKeyTypes := []*types.FieldType{rTypes[i]} - for _, rightAsBuild := range rightAsBuildSide { - testJoinProbe(t, false, []int{i}, []int{i}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - testJoinProbe(t, false, []int{i}, []int{i}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - } - } - // composed key - // fixed size, inlined - for _, rightAsBuild := range rightAsBuildSide { - lKeyTypes := []*types.FieldType{intTp, uintTp} - rKeyTypes := []*types.FieldType{intTp, uintTp} - testJoinProbe(t, false, []int{1, 2}, []int{1, 2}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - testJoinProbe(t, false, []int{1, 2}, []int{1, 2}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - } - // variable size, inlined - for _, rightAsBuild := range rightAsBuildSide { - lKeyTypes := []*types.FieldType{intTp, binaryStringTp} - rKeyTypes := []*types.FieldType{intTp, binaryStringTp} - testJoinProbe(t, false, []int{1, 17}, []int{1, 17}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - testJoinProbe(t, false, []int{1, 17}, []int{1, 17}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - } - // fixed size, not inlined - for _, rightAsBuild := range rightAsBuildSide { - lKeyTypes := []*types.FieldType{intTp, datetimeTp} - rKeyTypes := []*types.FieldType{intTp, datetimeTp} - testJoinProbe(t, false, []int{1, 13}, []int{1, 13}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - testJoinProbe(t, false, []int{1, 13}, []int{1, 13}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - } - // variable size, not inlined - for _, rightAsBuild := range rightAsBuildSide { - lKeyTypes := []*types.FieldType{intTp, decimalTp} - rKeyTypes := []*types.FieldType{intTp, decimalTp} - testJoinProbe(t, false, []int{1, 14}, []int{1, 14}, lKeyTypes, rKeyTypes, lTypes, rTypes, rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - testJoinProbe(t, false, []int{1, 14}, []int{1, 14}, toNullableTypes(lKeyTypes), toNullableTypes(rKeyTypes), toNullableTypes(lTypes), toNullableTypes(rTypes), rightAsBuild, lUsed, rUsed, nil, nil, nil, nil, nil, partitionNumber, joinType, 100) - } -} - -func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool) { - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - nullableIntTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.NotNullFlag) - uintTp.AddFlag(mysql.UnsignedFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes = append(rTypes, rTypes...) - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: nullableIntTp} - b := &expression.Column{Index: 8, RetType: nullableIntTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - // test condition `a = b` from `a in (select b from t2)` - a2 := &expression.Column{Index: 1, RetType: nullableIntTp, InOperand: true} - b2 := &expression.Column{Index: 8, RetType: nullableIntTp, InOperand: true} - sf2, err := expression.NewFunction(mock.NewContext(), ast.EQ, tinyTp, a2, b2) - require.NoError(t, err, "error when create other condition") - otherCondition := make(expression.CNFExprs, 0) - otherCondition = append(otherCondition, sf) - otherCondition2 := make(expression.CNFExprs, 0) - otherCondition2 = append(otherCondition2, sf2) - - var joinType logicalop.JoinType - if isLeftOuter { - joinType = logicalop.LeftOuterSemiJoin - } else { - joinType = logicalop.SemiJoin - } - - simpleFilter := createSimpleFilter(t) - - hasFilter := []bool{false} - if isLeftOuter { - hasFilter = append(hasFilter, true) - } - - rightAsBuildSide := []bool{true} - if !isLeftOuter { - rightAsBuildSide = append(rightAsBuildSide, false) - } - - partitionNumber := 4 - rightUsed := []int{} - - for _, rightBuild := range rightAsBuildSide { - for _, testFilter := range hasFilter { - leftFilter := simpleFilter - if !testFilter { - leftFilter = nil - } - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 200) - - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 200) - } - } -} - -func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool) { - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - nullableIntTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.NotNullFlag) - uintTp.AddFlag(mysql.UnsignedFlag) - nullableUIntTp := types.NewFieldType(mysql.TypeLonglong) - nullableUIntTp.AddFlag(mysql.UnsignedFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes = append(rTypes, rTypes...) - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: nullableIntTp} - b := &expression.Column{Index: 8, RetType: nullableUIntTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - otherCondition := make(expression.CNFExprs, 0) - otherCondition = append(otherCondition, sf) - - var joinType logicalop.JoinType - if isLeftOuter { - joinType = logicalop.LeftOuterSemiJoin - } else { - joinType = logicalop.SemiJoin - } - - rightAsBuildSide := []bool{true} - if !isLeftOuter { - rightAsBuildSide = append(rightAsBuildSide, false) - } - - simpleFilter := createSimpleFilter(t) - - hasFilter := []bool{false} - if isLeftOuter { - hasFilter = append(hasFilter, true) - } - - partitionNumber := 4 - rightUsed := []int{} - - for _, rightBuild := range rightAsBuildSide { - for _, useFilter := range hasFilter { - leftFilter := simpleFilter - if !useFilter { - leftFilter = nil - } - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 500) - } - } -} - -func TestLeftOuterSemiJoinProbeBasic(t *testing.T) { - testLeftOuterSemiOrSemiJoinProbeBasic(t, true) -} - -func TestLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T) { - testLeftOuterSemiJoinProbeAllJoinKeys(t, true) -} - -func TestLeftOuterSemiJoinProbeOtherCondition(t *testing.T) { - testLeftOuterSemiJoinProbeOtherCondition(t, true) -} - -func TestLeftOuterSemiJoinProbeWithSel(t *testing.T) { - testLeftOuterSemiJoinProbeWithSel(t, true) -} - -func TestLeftOuterSemiJoinBuildResultFastPath(t *testing.T) { - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - nullableIntTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.NotNullFlag) - uintTp.AddFlag(mysql.UnsignedFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - lTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes := []*types.FieldType{intTp, intTp, stringTp, uintTp, stringTp} - rTypes = append(rTypes, rTypes...) - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: nullableIntTp} - b := &expression.Column{Index: 8, RetType: nullableIntTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - // test condition `a = b` from `a in (select b from t2)` - a2 := &expression.Column{Index: 1, RetType: nullableIntTp, InOperand: true} - b2 := &expression.Column{Index: 8, RetType: nullableIntTp, InOperand: true} - sf2, err := expression.NewFunction(mock.NewContext(), ast.EQ, tinyTp, a2, b2) - require.NoError(t, err, "error when create other condition") - otherCondition := make(expression.CNFExprs, 0) - otherCondition = append(otherCondition, sf) - otherCondition2 := make(expression.CNFExprs, 0) - otherCondition2 = append(otherCondition2, sf2) - joinType := logicalop.LeftOuterSemiJoin - simpleFilter := createSimpleFilter(t) - hasFilter := []bool{false, true} - rightAsBuildSide := []bool{true} - partitionNumber := 4 - rightUsed := []int{} - - for _, rightBuild := range rightAsBuildSide { - for _, testFilter := range hasFilter { - leftFilter := simpleFilter - if !testFilter { - leftFilter = nil - } - // MockContext set MaxChunkSize to 32, input chunk size should be less than 32 to test fast path - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition, partitionNumber, joinType, 30) - - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, otherCondition2, partitionNumber, joinType, 30) - - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) - testJoinProbe(t, false, []int{0}, []int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, lTypes, rTypes, rightBuild, []int{}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, []int{1, 2, 4}, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) - testJoinProbe(t, true, []int{0}, []int{0}, []*types.FieldType{nullableIntTp}, []*types.FieldType{nullableIntTp}, toNullableTypes(lTypes), toNullableTypes(rTypes), rightBuild, nil, rightUsed, []int{1}, []int{3}, leftFilter, nil, nil, partitionNumber, joinType, 30) - } - } -} - -func TestLeftOuterSemiJoinSpill(t *testing.T) { - ctx := mock.NewContext() - ctx.GetSessionVars().InitChunkSize = 32 - ctx.GetSessionVars().MaxChunkSize = 32 - leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false) - leftDataSourceWithSel, rightDataSourceWithSel := buildLeftAndRightDataSource(ctx, leftCols, rightCols, true) - - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp} - rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp} - - leftKeys := []*expression.Column{ - {Index: 1, RetType: intTp}, - {Index: 3, RetType: stringTp}, - } - rightKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - {Index: 2, RetType: stringTp}, - } - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: intTp} - b := &expression.Column{Index: 8, RetType: intTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - otherCondition := make(expression.CNFExprs, 0) - otherCondition = append(otherCondition, sf) - - maxRowTableSegmentSize = 100 - spillChunkSize = 100 - - joinType := logicalop.LeftOuterSemiJoin - params := []spillTestParam{ - // basic case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 100000, 10000}}, - // with other condition - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 100000, 10000}}, - } - - for _, param := range params { - testSpill(t, ctx, joinType, leftDataSource, rightDataSource, param) - } - - params2 := []spillTestParam{ - // basic case with sel - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{1000000, 900000, 1700000, 100000, 10000}}, - // with other condition with sel - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{1000000, 900000, 1600000, 100000, 10000}}, - } - - for _, param := range params2 { - testSpill(t, ctx, joinType, leftDataSourceWithSel, rightDataSourceWithSel, param) - } -} diff --git a/pkg/executor/join/semi_join_probe_test.go b/pkg/executor/join/semi_join_probe_test.go deleted file mode 100644 index d5d3eb908fb1c..0000000000000 --- a/pkg/executor/join/semi_join_probe_test.go +++ /dev/null @@ -1,430 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package join - -import ( - "math/rand" - "testing" - - "github.com/pingcap/tidb/pkg/executor/internal/testutil" - "github.com/pingcap/tidb/pkg/expression" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/mock" - "github.com/stretchr/testify/require" -) - -const maxChunkSizeInTest = 32 - -var semiJoinleftCols = []*expression.Column{ - {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, - {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, -} - -var semiJoinrightCols = []*expression.Column{ - {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, - {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, -} - -var semiJoinRetTypes = []*types.FieldType{ - types.NewFieldType(mysql.TypeLonglong), - types.NewFieldType(mysql.TypeLonglong), -} - -func buildLeftAndRightSemiDataSource(ctx sessionctx.Context, leftCols []*expression.Column, rightCols []*expression.Column, hasSel bool) (*testutil.MockDataSource, *testutil.MockDataSource) { - leftSchema := expression.NewSchema(leftCols...) - rightSchema := expression.NewSchema(rightCols...) - - joinKeyleftIntDatums := buildJoinKeyIntDatums(10000) - joinKeyrightIntDatums := buildJoinKeyIntDatums(10000) - leftMockSrcParm := testutil.MockDataSourceParameters{DataSchema: leftSchema, Ctx: ctx, Rows: 50000, Ndvs: []int{-1, -1}, Datums: [][]any{joinKeyleftIntDatums, joinKeyleftIntDatums}, HasSel: hasSel} - rightMockSrcParm := testutil.MockDataSourceParameters{DataSchema: rightSchema, Ctx: ctx, Rows: 50000, Ndvs: []int{-1, -1}, Datums: [][]any{joinKeyrightIntDatums, joinKeyrightIntDatums}, HasSel: hasSel} - return testutil.BuildMockDataSource(leftMockSrcParm), testutil.BuildMockDataSource(rightMockSrcParm) -} - -func buildSemiDataSourceAndExpectResult(ctx sessionctx.Context, leftCols []*expression.Column, rightCols []*expression.Column, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool, isAntiSemiJoin bool) (*testutil.MockDataSource, *testutil.MockDataSource, []chunk.Row) { - leftSchema := expression.NewSchema(leftCols...) - rightSchema := expression.NewSchema(rightCols...) - - rowNum := int64(50000) - leftCol0Datums := make([]any, 0, rowNum) - leftCol1Datums := make([]any, 0, rowNum) - rightCol0Datums := make([]any, 0, rowNum) - rightCol1Datums := make([]any, 0, rowNum) - - intTp := types.NewFieldType(mysql.TypeLonglong) - expectResultChunk := chunk.NewChunkWithCapacity([]*types.FieldType{intTp, intTp}, 10000) - expectResult := make([]chunk.Row, 0, 100000) - - if hasDuplicateKey { - if hasOtherCondition { - if rightAsBuildSide { - differentKeyNum := int64(10000) - for i := int64(0); i < differentKeyNum; i++ { - leftCol0Datums = append(leftCol0Datums, i) - leftCol1Datums = append(leftCol1Datums, int64(1)) - - singleKeyNum := rand.Int31n(2 * maxChunkSizeInTest) - if singleKeyNum == 0 { - if isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 1) - } - continue - } - - canOtherConditionSuccess := rand.Int31n(10) < 5 - if canOtherConditionSuccess { - if !isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 1) - } - - otherConditionSuccessNum := rand.Int31n(singleKeyNum) + 1 - for j := 0; j < int(singleKeyNum); j++ { - rightCol0Datums = append(rightCol0Datums, i) - if j < int(otherConditionSuccessNum) { - rightCol1Datums = append(rightCol1Datums, int64(0)) - } else { - rightCol1Datums = append(rightCol1Datums, int64(1)) - } - } - } else { - if isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 1) - } - - for j := 0; j < int(singleKeyNum); j++ { - rightCol0Datums = append(rightCol0Datums, i) - rightCol1Datums = append(rightCol1Datums, int64(1)) - } - } - } - } else { - differentKeyNum := int64(10000) - for i := int64(0); i < differentKeyNum; i++ { - rightCol0Datums = append(rightCol0Datums, i) - rightCol1Datums = append(rightCol1Datums, int64(0)) - - singleKeyNum := rand.Int31n(2 * maxChunkSizeInTest) - if singleKeyNum == 0 { - continue - } - - canOtherConditionSuccess := rand.Int31n(10) < 5 - if canOtherConditionSuccess { - otherConditionSuccessNum := rand.Int31n(singleKeyNum) + 1 - for j := 0; j < int(singleKeyNum); j++ { - leftCol0Datums = append(leftCol0Datums, i) - if j < int(otherConditionSuccessNum) { - leftCol1Datums = append(leftCol1Datums, int64(1)) - if !isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 1) - } - } else { - leftCol1Datums = append(leftCol1Datums, int64(0)) - if isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 0) - } - } - } - } else { - for j := 0; j < int(singleKeyNum); j++ { - leftCol0Datums = append(leftCol0Datums, i) - leftCol1Datums = append(leftCol1Datums, int64(0)) - if isAntiSemiJoin { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 0) - } - } - } - } - } - } else { - differentKeyNum := int64(10000) - for i := int64(0); i < differentKeyNum; i++ { - leftSingleKeyNum := rand.Int31n(2*maxChunkSizeInTest) + 1 - rightSingleKeyNum := rand.Int31n(2*maxChunkSizeInTest) + 1 - - for j := 0; j < int(leftSingleKeyNum); j++ { - leftCol0Datums = append(leftCol0Datums, i) - leftCol1Datums = append(leftCol1Datums, int64(0)) - } - - if i%2 == 0 { - for j := 0; j < int(rightSingleKeyNum); j++ { - rightCol0Datums = append(rightCol0Datums, i) - rightCol1Datums = append(rightCol1Datums, int64(0)) - } - - if !isAntiSemiJoin { - for j := 0; j < int(leftSingleKeyNum); j++ { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 0) - } - } - } else { - if isAntiSemiJoin { - for j := 0; j < int(leftSingleKeyNum); j++ { - expectResultChunk.AppendInt64(0, i) - expectResultChunk.AppendInt64(1, 0) - } - } - } - } - } - } else { - leftCol0StartNum := int64(30000) - for i := int64(0); i < rowNum; i++ { - leftCol0AppendedData := leftCol0StartNum + i - leftCol0Datums = append(leftCol0Datums, leftCol0AppendedData) - - if hasOtherCondition { - if leftCol0AppendedData%2 == 0 { - leftCol1Datums = append(leftCol1Datums, int64(1)) - if isAntiSemiJoin { - if leftCol0AppendedData >= rowNum { - expectResultChunk.AppendInt64(0, leftCol0AppendedData) - expectResultChunk.AppendInt64(1, 1) - } - } else { - if leftCol0AppendedData < rowNum { - expectResultChunk.AppendInt64(0, leftCol0AppendedData) - expectResultChunk.AppendInt64(1, 1) - } - } - } else { - leftCol1Datums = append(leftCol1Datums, int64(0)) - if isAntiSemiJoin { - expectResultChunk.AppendInt64(0, leftCol0AppendedData) - expectResultChunk.AppendInt64(1, 0) - } - } - } else { - leftCol1Datums = append(leftCol1Datums, int64(1)) - if isAntiSemiJoin { - if leftCol0AppendedData >= rowNum { - expectResultChunk.AppendInt64(0, leftCol0AppendedData) - expectResultChunk.AppendInt64(1, 1) - } - } else { - if leftCol0AppendedData < rowNum { - expectResultChunk.AppendInt64(0, leftCol0AppendedData) - expectResultChunk.AppendInt64(1, 1) - } - } - } - - rightCol0Datums = append(rightCol0Datums, i) - rightCol1Datums = append(rightCol1Datums, int64(0)) - } - } - - leftLen := len(leftCol0Datums) - rightLen := len(rightCol0Datums) - - // Shuffle - for i := int64(0); i < int64(leftLen); i++ { - j := rand.Int63n(i + 1) - leftCol0Datums[i], leftCol0Datums[j] = leftCol0Datums[j], leftCol0Datums[i] - leftCol1Datums[i], leftCol1Datums[j] = leftCol1Datums[j], leftCol1Datums[i] - } - - for i := int64(0); i < int64(rightLen); i++ { - j := rand.Int63n(i + 1) - rightCol0Datums[i], rightCol0Datums[j] = rightCol0Datums[j], rightCol0Datums[i] - rightCol1Datums[i], rightCol1Datums[j] = rightCol1Datums[j], rightCol1Datums[i] - } - - if isAntiSemiJoin { - expectResult = sortRows([]*chunk.Chunk{expectResultChunk}, semiJoinRetTypes) - } else { - resultRowNum := expectResultChunk.NumRows() - for i := 0; i < resultRowNum; i++ { - expectResult = append(expectResult, expectResultChunk.GetRow(i)) - } - } - - leftMockSrcParm := testutil.MockDataSourceParameters{DataSchema: leftSchema, Ctx: ctx, Rows: leftLen, Ndvs: []int{-2, -2}, Datums: [][]any{leftCol0Datums, leftCol1Datums}, HasSel: false} - rightMockSrcParm := testutil.MockDataSourceParameters{DataSchema: rightSchema, Ctx: ctx, Rows: rightLen, Ndvs: []int{-2, -2}, Datums: [][]any{rightCol0Datums, rightCol1Datums}, HasSel: false} - return testutil.BuildMockDataSource(leftMockSrcParm), testutil.BuildMockDataSource(rightMockSrcParm), expectResult -} - -func testSemiJoin(t *testing.T, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool) { - testSemiOrAntiSemiJoin(t, rightAsBuildSide, hasOtherCondition, hasDuplicateKey, false) -} - -func testSemiOrAntiSemiJoin(t *testing.T, rightAsBuildSide bool, hasOtherCondition bool, hasDuplicateKey bool, isAntiSemiJoin bool) { - ctx := mock.NewContext() - ctx.GetSessionVars().InitChunkSize = maxChunkSizeInTest - ctx.GetSessionVars().MaxChunkSize = maxChunkSizeInTest - leftDataSource, rightDataSource, expectedResult := buildSemiDataSourceAndExpectResult(ctx, semiJoinleftCols, semiJoinrightCols, rightAsBuildSide, hasOtherCondition, hasDuplicateKey, isAntiSemiJoin) - - maxRowTableSegmentSize = 100 - - intTp := types.NewFieldType(mysql.TypeLonglong) - - leftKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - } - - rightKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - } - - var buildKeys []*expression.Column - var probeKeys []*expression.Column - if rightAsBuildSide { - buildKeys = rightKeys - probeKeys = leftKeys - } else { - buildKeys = leftKeys - probeKeys = rightKeys - } - - var otherCondition expression.CNFExprs - lUsedInOtherCondition := []int{} - rUsedInOtherCondition := []int{} - if hasOtherCondition { - lUsedInOtherCondition = append(lUsedInOtherCondition, 1) - rUsedInOtherCondition = append(rUsedInOtherCondition, 1) - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: intTp} - b := &expression.Column{Index: 3, RetType: intTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - otherCondition = append(otherCondition, sf) - } - - var joinType logicalop.JoinType - if isAntiSemiJoin { - joinType = logicalop.AntiSemiJoin - } else { - joinType = logicalop.SemiJoin - } - - info := &hashJoinInfo{ - ctx: ctx, - schema: buildSchema(semiJoinRetTypes), - leftExec: leftDataSource, - rightExec: rightDataSource, - joinType: joinType, - rightAsBuildSide: rightAsBuildSide, - buildKeys: buildKeys, - probeKeys: probeKeys, - lUsed: []int{0, 1}, - rUsed: []int{}, - otherCondition: otherCondition, - lUsedInOtherCondition: lUsedInOtherCondition, - rUsedInOtherCondition: rUsedInOtherCondition, - } - - leftDataSource.PrepareChunks() - rightDataSource.PrepareChunks() - - hashJoinExec := buildHashJoinV2Exec(info) - result := getSortedResults(t, hashJoinExec, semiJoinRetTypes) - checkResults(t, semiJoinRetTypes, result, expectedResult) -} - -func TestSemiJoinBasic(t *testing.T) { - testSemiJoin(t, false, false, false) // Left side build without other condition - testSemiJoin(t, false, true, false) // Left side build with other condition - testSemiJoin(t, true, false, false) // Right side build without other condition - testSemiJoin(t, true, true, false) // Right side build with other condition -} - -func TestSemiJoinDuplicateKeys(t *testing.T) { - testSemiJoin(t, false, false, true) // Left side build without other condition - testSemiJoin(t, false, true, true) // Left side build with other condition - testSemiJoin(t, true, false, true) // Right side build without other condition - testSemiJoin(t, true, true, true) // Right side build with other condition -} - -func TestSemiSpill(t *testing.T) { - var leftCols = []*expression.Column{ - {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, - {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, - } - - var rightCols = []*expression.Column{ - {Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)}, - {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, - } - - ctx := mock.NewContext() - ctx.GetSessionVars().InitChunkSize = 32 - ctx.GetSessionVars().MaxChunkSize = 32 - leftDataSource, rightDataSource := buildLeftAndRightSemiDataSource(ctx, leftCols, rightCols, false) - - intTp := types.NewFieldType(mysql.TypeLonglong) - - leftTypes := []*types.FieldType{intTp, intTp} - rightTypes := []*types.FieldType{intTp, intTp} - - leftKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - } - rightKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - } - - tinyTp := types.NewFieldType(mysql.TypeTiny) - a := &expression.Column{Index: 1, RetType: intTp} - b := &expression.Column{Index: 3, RetType: intTp} - sf, err := expression.NewFunction(mock.NewContext(), ast.GT, tinyTp, a, b) - require.NoError(t, err, "error when create other condition") - otherCondition := make(expression.CNFExprs, 0) - otherCondition = append(otherCondition, sf) - - maxRowTableSegmentSize = 100 - spillChunkSize = 100 - - joinTypes := []logicalop.JoinType{logicalop.SemiJoin} - params := []spillTestParam{ - // basic case - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3000000, 100000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, nil, nil, nil, []int64{1800000, 1500000, 3500000, 100000, 10000}}, - // with other condition - {true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, - {false, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1}, []int{}, otherCondition, []int{1}, []int{1}, []int64{1800000, 1500000, 3500000, 100000, 10000}}, - } - - for _, joinType := range joinTypes { - for _, param := range params { - testSpill(t, ctx, joinType, leftDataSource, rightDataSource, param) - } - } -} - -func TestSemiJoinProbeBasic(t *testing.T) { - testLeftOuterSemiOrSemiJoinProbeBasic(t, false) -} - -func TestSemiJoinProbeAllJoinKeys(t *testing.T) { - testLeftOuterSemiJoinProbeAllJoinKeys(t, false) -} - -func TestSemiJoinProbeWithSel(t *testing.T) { - testLeftOuterSemiJoinProbeWithSel(t, false) -}