Skip to content

Commit 8832684

Browse files
authored
executor: support left outer semi join for hash join v2 (#57053)
ref #53127
1 parent 4cca1ff commit 8832684

File tree

11 files changed

+980
-2
lines changed

11 files changed

+980
-2
lines changed

pkg/executor/join/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_library(
2020
"join_row_table.go",
2121
"join_table_meta.go",
2222
"joiner.go",
23+
"left_outer_semi_join_probe.go",
2324
"merge_join.go",
2425
"outer_join_probe.go",
2526
"row_table_builder.go",
@@ -58,6 +59,7 @@ go_library(
5859
"//pkg/util/logutil",
5960
"//pkg/util/memory",
6061
"//pkg/util/mvmap",
62+
"//pkg/util/queue",
6163
"//pkg/util/ranger",
6264
"//pkg/util/serialization",
6365
"//pkg/util/sqlkiller",
@@ -86,6 +88,7 @@ go_test(
8688
"join_table_meta_test.go",
8789
"joiner_test.go",
8890
"left_outer_join_probe_test.go",
91+
"left_outer_semi_join_probe_test.go",
8992
"merge_join_test.go",
9093
"outer_join_spill_test.go",
9194
"right_outer_join_probe_test.go",

pkg/executor/join/base_join_probe.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,11 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType,
747747
return newOuterJoinProbe(base, !rightAsBuildSide, rightAsBuildSide)
748748
case logicalop.RightOuterJoin:
749749
return newOuterJoinProbe(base, rightAsBuildSide, rightAsBuildSide)
750+
case logicalop.LeftOuterSemiJoin:
751+
if rightAsBuildSide {
752+
return newLeftOuterSemiJoinProbe(base)
753+
}
754+
fallthrough
750755
default:
751756
panic("unsupported join type")
752757
}

pkg/executor/join/inner_join_probe_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,9 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
301301
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
302302
}
303303
}
304+
if joinType == logicalop.LeftOuterSemiJoin {
305+
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
306+
}
304307

305308
meta := newTableMeta(buildKeyIndex, buildTypes, buildKeyTypes, probeKeyTypes, buildUsedByOtherCondition, buildUsed, needUsedFlag)
306309
hashJoinCtx := &HashJoinCtxV2{
@@ -458,6 +461,10 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
458461
expectedChunks := genRightOuterJoinResult(t, hashJoinCtx.SessCtx, rightFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
459462
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, rightUsed, otherCondition, resultTypes)
460463
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
464+
case logicalop.LeftOuterSemiJoin:
465+
expectedChunks := genLeftOuterSemiJoinResult(t, hashJoinCtx.SessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
466+
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, rightUsed, otherCondition, resultTypes)
467+
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
461468
default:
462469
require.NoError(t, errors.New("not supported join type"))
463470
}

pkg/executor/join/inner_join_spill_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ func getReturnTypes(joinType logicalop.JoinType, param spillTestParam) []*types.
194194
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
195195
}
196196
}
197+
if joinType == logicalop.LeftOuterSemiJoin {
198+
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
199+
}
197200
return resultTypes
198201
}
199202

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package join
16+
17+
import (
18+
"github.com/pingcap/tidb/pkg/expression"
19+
"github.com/pingcap/tidb/pkg/util/chunk"
20+
"github.com/pingcap/tidb/pkg/util/queue"
21+
"github.com/pingcap/tidb/pkg/util/sqlkiller"
22+
)
23+
24+
type leftOuterSemiJoinProbe struct {
25+
baseJoinProbe
26+
27+
// isMatchedRows marks whether the left side row is matched
28+
isMatchedRows []bool
29+
// isNullRows marks whether the left side row matched result is null
30+
isNullRows []bool
31+
32+
// buffer isNull for other condition evaluation
33+
isNulls []bool
34+
35+
// used in other condition to record which rows need to be processed
36+
unFinishedProbeRowIdxQueue *queue.Queue[int]
37+
}
38+
39+
var _ ProbeV2 = &leftOuterSemiJoinProbe{}
40+
41+
func newLeftOuterSemiJoinProbe(base baseJoinProbe) *leftOuterSemiJoinProbe {
42+
probe := &leftOuterSemiJoinProbe{
43+
baseJoinProbe: base,
44+
}
45+
if base.ctx.hasOtherCondition() {
46+
probe.unFinishedProbeRowIdxQueue = queue.NewQueue[int](32)
47+
}
48+
return probe
49+
}
50+
51+
func (j *leftOuterSemiJoinProbe) SetChunkForProbe(chunk *chunk.Chunk) (err error) {
52+
err = j.baseJoinProbe.SetChunkForProbe(chunk)
53+
if err != nil {
54+
return err
55+
}
56+
j.resetProbeState()
57+
return nil
58+
}
59+
60+
func (j *leftOuterSemiJoinProbe) SetRestoredChunkForProbe(chunk *chunk.Chunk) (err error) {
61+
err = j.baseJoinProbe.SetRestoredChunkForProbe(chunk)
62+
if err != nil {
63+
return err
64+
}
65+
j.resetProbeState()
66+
return nil
67+
}
68+
69+
func (j *leftOuterSemiJoinProbe) resetProbeState() {
70+
j.isMatchedRows = j.isMatchedRows[:0]
71+
for i := 0; i < j.chunkRows; i++ {
72+
j.isMatchedRows = append(j.isMatchedRows, false)
73+
}
74+
j.isNullRows = j.isNullRows[:0]
75+
for i := 0; i < j.chunkRows; i++ {
76+
j.isNullRows = append(j.isNullRows, false)
77+
}
78+
if j.ctx.hasOtherCondition() {
79+
j.unFinishedProbeRowIdxQueue.Clear()
80+
for i := 0; i < j.chunkRows; i++ {
81+
if j.matchedRowsHeaders[i] != 0 {
82+
j.unFinishedProbeRowIdxQueue.Push(i)
83+
}
84+
}
85+
}
86+
}
87+
88+
func (*leftOuterSemiJoinProbe) NeedScanRowTable() bool {
89+
return false
90+
}
91+
92+
func (*leftOuterSemiJoinProbe) IsScanRowTableDone() bool {
93+
panic("should not reach here")
94+
}
95+
96+
func (*leftOuterSemiJoinProbe) InitForScanRowTable() {
97+
panic("should not reach here")
98+
}
99+
100+
func (*leftOuterSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, _ *sqlkiller.SQLKiller) *hashjoinWorkerResult {
101+
return joinResult
102+
}
103+
104+
func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
105+
joinedChk, remainCap, err := j.prepareForProbe(joinResult.chk)
106+
if err != nil {
107+
joinResult.err = err
108+
return false, joinResult
109+
}
110+
111+
if j.ctx.hasOtherCondition() {
112+
err = j.probeWithOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
113+
} else {
114+
err = j.probeWithoutOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
115+
}
116+
if err != nil {
117+
joinResult.err = err
118+
return false, joinResult
119+
}
120+
return true, joinResult
121+
}
122+
123+
func (j *leftOuterSemiJoinProbe) probeWithOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
124+
if !j.unFinishedProbeRowIdxQueue.IsEmpty() {
125+
err = j.produceResult(joinedChk, sqlKiller)
126+
if err != nil {
127+
return err
128+
}
129+
j.currentProbeRow = 0
130+
}
131+
132+
if j.unFinishedProbeRowIdxQueue.IsEmpty() {
133+
startProbeRow := j.currentProbeRow
134+
j.currentProbeRow = min(startProbeRow+remainCap, j.chunkRows)
135+
j.buildResult(chk, startProbeRow)
136+
}
137+
return
138+
}
139+
140+
func (j *leftOuterSemiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
141+
err = j.concatenateProbeAndBuildRows(joinedChk, sqlKiller)
142+
if err != nil {
143+
return err
144+
}
145+
146+
if joinedChk.NumRows() > 0 {
147+
j.selected, j.isNulls, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), j.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, j.ctx.OtherCondition, joinedChk, j.selected, j.isNulls)
148+
if err != nil {
149+
return err
150+
}
151+
152+
for i := 0; i < joinedChk.NumRows(); i++ {
153+
if j.selected[i] {
154+
j.isMatchedRows[j.rowIndexInfos[i].probeRowIndex] = true
155+
}
156+
if j.isNulls[i] {
157+
j.isNullRows[j.rowIndexInfos[i].probeRowIndex] = true
158+
}
159+
}
160+
}
161+
return nil
162+
}
163+
164+
func (j *leftOuterSemiJoinProbe) probeWithoutOtherCondition(_, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
165+
meta := j.ctx.hashTableMeta
166+
startProbeRow := j.currentProbeRow
167+
tagHelper := j.ctx.hashTableContext.tagHelper
168+
169+
for remainCap > 0 && j.currentProbeRow < j.chunkRows {
170+
if j.matchedRowsHeaders[j.currentProbeRow] != 0 {
171+
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
172+
if !isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
173+
j.probeCollision++
174+
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
175+
continue
176+
}
177+
j.isMatchedRows[j.currentProbeRow] = true
178+
}
179+
j.matchedRowsHeaders[j.currentProbeRow] = 0
180+
remainCap--
181+
j.currentProbeRow++
182+
}
183+
184+
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
185+
186+
if err != nil {
187+
return err
188+
}
189+
190+
j.buildResult(joinedChk, startProbeRow)
191+
return nil
192+
}
193+
194+
func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int) {
195+
var selected []bool
196+
if startProbeRow == 0 && j.currentProbeRow == j.chunkRows && j.currentChunk.Sel() == nil && chk.NumRows() == 0 && len(j.spilledIdx) == 0 {
197+
// TODO: Can do a shallow copy by directly copying the Column pointers
198+
for index, colIndex := range j.lUsed {
199+
j.currentChunk.Column(colIndex).CopyConstruct(chk.Column(index))
200+
}
201+
} else {
202+
selected = make([]bool, j.chunkRows)
203+
for i := startProbeRow; i < j.currentProbeRow; i++ {
204+
selected[i] = true
205+
}
206+
for _, spilledIdx := range j.spilledIdx {
207+
selected[spilledIdx] = false // ignore spilled rows
208+
}
209+
for index, colIndex := range j.lUsed {
210+
dstCol := chk.Column(index)
211+
srcCol := j.currentChunk.Column(colIndex)
212+
chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, selected, 0, len(selected), func(i int) int {
213+
return j.usedRows[i]
214+
})
215+
}
216+
}
217+
218+
for i := startProbeRow; i < j.currentProbeRow; i++ {
219+
if selected != nil && !selected[i] {
220+
continue
221+
}
222+
if j.isMatchedRows[i] {
223+
chk.AppendInt64(len(j.lUsed), 1)
224+
} else if j.isNullRows[i] {
225+
chk.AppendNull(len(j.lUsed))
226+
} else {
227+
chk.AppendInt64(len(j.lUsed), 0)
228+
}
229+
}
230+
chk.SetNumVirtualRows(chk.NumRows())
231+
}
232+
233+
var maxMatchedRowNum = 4
234+
235+
func (j *leftOuterSemiJoinProbe) matchMultiBuildRows(joinedChk *chunk.Chunk, joinedChkRemainCap *int) {
236+
tagHelper := j.ctx.hashTableContext.tagHelper
237+
meta := j.ctx.hashTableMeta
238+
for j.matchedRowsHeaders[j.currentProbeRow] != 0 && *joinedChkRemainCap > 0 && j.matchedRowsForCurrentProbeRow < maxMatchedRowNum {
239+
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
240+
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
241+
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, true)
242+
j.matchedRowsForCurrentProbeRow++
243+
*joinedChkRemainCap--
244+
} else {
245+
j.probeCollision++
246+
}
247+
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
248+
}
249+
250+
j.finishLookupCurrentProbeRow()
251+
}
252+
253+
func (j *leftOuterSemiJoinProbe) concatenateProbeAndBuildRows(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) error {
254+
joinedChkRemainCap := joinedChk.Capacity() - joinedChk.NumRows()
255+
256+
for joinedChkRemainCap > 0 && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
257+
probeRowIdx := j.unFinishedProbeRowIdxQueue.Pop()
258+
if j.isMatchedRows[probeRowIdx] {
259+
continue
260+
}
261+
j.currentProbeRow = probeRowIdx
262+
j.matchMultiBuildRows(joinedChk, &joinedChkRemainCap)
263+
if j.matchedRowsHeaders[probeRowIdx] == 0 {
264+
continue
265+
}
266+
j.unFinishedProbeRowIdxQueue.Push(probeRowIdx)
267+
}
268+
269+
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
270+
if err != nil {
271+
return err
272+
}
273+
274+
j.finishCurrentLookupLoop(joinedChk)
275+
return nil
276+
}
277+
278+
func (j *leftOuterSemiJoinProbe) IsCurrentChunkProbeDone() bool {
279+
if j.ctx.hasOtherCondition() && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
280+
return false
281+
}
282+
return j.baseJoinProbe.IsCurrentChunkProbeDone()
283+
}

0 commit comments

Comments
 (0)