Skip to content

Commit 033b175

Browse files
executor: support anti semi join (#57971)
close #56793
1 parent 5e8d1b8 commit 033b175

File tree

12 files changed

+912
-58
lines changed

12 files changed

+912
-58
lines changed

pkg/executor/join/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "join",
55
srcs = [
6+
"anti_semi_join_probe.go",
67
"base_join_probe.go",
78
"base_semi_join.go",
89
"concurrent_map.go",
@@ -77,6 +78,7 @@ go_test(
7778
name = "join_test",
7879
timeout = "short",
7980
srcs = [
81+
"anti_semi_join_probe_test.go",
8082
"bench_test.go",
8183
"concurrent_map_test.go",
8284
"hash_table_v1_test.go",
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package join
16+
17+
import (
18+
"unsafe"
19+
20+
"github.com/pingcap/tidb/pkg/expression"
21+
"github.com/pingcap/tidb/pkg/util/chunk"
22+
"github.com/pingcap/tidb/pkg/util/sqlkiller"
23+
)
24+
25+
type antiSemiJoinProbe struct {
26+
baseSemiJoin
27+
}
28+
29+
func newAntiSemiJoinProbe(base baseJoinProbe, isLeftSideBuild bool) *antiSemiJoinProbe {
30+
ret := &antiSemiJoinProbe{
31+
baseSemiJoin: *newBaseSemiJoin(base, isLeftSideBuild),
32+
}
33+
34+
if ret.ctx.hasOtherCondition() {
35+
ret.isNulls = make([]bool, 0, chunk.InitialCapacity)
36+
}
37+
38+
return ret
39+
}
40+
41+
func (a *antiSemiJoinProbe) InitForScanRowTable() {
42+
if !a.isLeftSideBuild {
43+
panic("should not reach here")
44+
}
45+
a.rowIter = commonInitForScanRowTable(&a.baseJoinProbe)
46+
}
47+
48+
func (a *antiSemiJoinProbe) NeedScanRowTable() bool {
49+
return a.isLeftSideBuild
50+
}
51+
52+
func (a *antiSemiJoinProbe) IsScanRowTableDone() bool {
53+
if !a.isLeftSideBuild {
54+
panic("should not reach here")
55+
}
56+
return a.rowIter.isEnd()
57+
}
58+
59+
func (a *antiSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) *hashjoinWorkerResult {
60+
if !a.isLeftSideBuild {
61+
panic("should not reach here")
62+
}
63+
if joinResult.chk.IsFull() {
64+
return joinResult
65+
}
66+
if a.rowIter == nil {
67+
panic("scanRowTable before init")
68+
}
69+
a.nextCachedBuildRowIndex = 0
70+
meta := a.ctx.hashTableMeta
71+
insertedRows := 0
72+
remainCap := joinResult.chk.RequiredRows() - joinResult.chk.NumRows()
73+
for insertedRows < remainCap && !a.rowIter.isEnd() {
74+
currentRow := a.rowIter.getValue()
75+
if !meta.isCurrentRowUsed(currentRow) {
76+
// append build side of this row
77+
a.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false)
78+
insertedRows++
79+
}
80+
a.rowIter.next()
81+
}
82+
err := checkSQLKiller(sqlKiller, "killedDuringProbe")
83+
if err != nil {
84+
joinResult.err = err
85+
return joinResult
86+
}
87+
if a.nextCachedBuildRowIndex > 0 {
88+
a.batchConstructBuildRows(joinResult.chk, 0, false)
89+
}
90+
return joinResult
91+
}
92+
93+
func (a *antiSemiJoinProbe) ResetProbe() {
94+
a.rowIter = nil
95+
a.baseJoinProbe.ResetProbe()
96+
}
97+
98+
func (a *antiSemiJoinProbe) resetProbeState() {
99+
a.baseSemiJoin.resetProbeState()
100+
101+
if !a.isLeftSideBuild {
102+
if a.ctx.spillHelper.isSpillTriggered() {
103+
for _, idx := range a.spilledIdx {
104+
// We see rows that have be spilled as matched rows
105+
a.isMatchedRows[idx] = true
106+
}
107+
}
108+
}
109+
}
110+
111+
func (a *antiSemiJoinProbe) SetChunkForProbe(chk *chunk.Chunk) (err error) {
112+
err = a.baseJoinProbe.SetChunkForProbe(chk)
113+
if err != nil {
114+
return err
115+
}
116+
117+
a.resetProbeState()
118+
return nil
119+
}
120+
121+
func (a *antiSemiJoinProbe) SetRestoredChunkForProbe(chk *chunk.Chunk) error {
122+
err := a.baseJoinProbe.SetRestoredChunkForProbe(chk)
123+
if err != nil {
124+
return err
125+
}
126+
127+
a.resetProbeState()
128+
return nil
129+
}
130+
131+
func (a *antiSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
132+
if joinResult.chk.IsFull() {
133+
return true, joinResult
134+
}
135+
136+
joinedChk, remainCap, err := a.prepareForProbe(joinResult.chk)
137+
if err != nil {
138+
joinResult.err = err
139+
return false, joinResult
140+
}
141+
142+
hasOtherCondition := a.ctx.hasOtherCondition()
143+
if a.isLeftSideBuild {
144+
if hasOtherCondition {
145+
err = a.probeForLeftSideBuildHasOtherCondition(joinedChk, sqlKiller)
146+
} else {
147+
err = a.probeForLeftSideBuildNoOtherCondition(sqlKiller)
148+
}
149+
} else {
150+
if hasOtherCondition {
151+
err = a.probeForRightSideBuildHasOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
152+
} else {
153+
err = a.probeForRightSideBuildNoOtherCondition(joinResult.chk, remainCap, sqlKiller)
154+
}
155+
}
156+
if err != nil {
157+
joinResult.err = err
158+
return false, joinResult
159+
}
160+
return true, joinResult
161+
}
162+
163+
func (a *antiSemiJoinProbe) probeForLeftSideBuildHasOtherCondition(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
164+
err = a.concatenateProbeAndBuildRows(joinedChk, sqlKiller, false)
165+
if err != nil {
166+
return err
167+
}
168+
169+
if a.unFinishedProbeRowIdxQueue.IsEmpty() {
170+
// To avoid `Previous chunk is not probed yet` error
171+
a.currentProbeRow = a.chunkRows
172+
}
173+
174+
meta := a.ctx.hashTableMeta
175+
if joinedChk.NumRows() > 0 {
176+
a.selected, a.isNulls, err = expression.VecEvalBool(a.ctx.SessCtx.GetExprCtx().GetEvalCtx(), a.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, a.ctx.OtherCondition, joinedChk, a.selected, a.isNulls)
177+
if err != nil {
178+
return err
179+
}
180+
181+
for index, result := range a.selected {
182+
if result || a.isNulls[index] {
183+
meta.setUsedFlag(*(*unsafe.Pointer)(unsafe.Pointer(&a.rowIndexInfos[index].buildRowStart)))
184+
}
185+
}
186+
}
187+
188+
return
189+
}
190+
191+
func (a *antiSemiJoinProbe) probeForLeftSideBuildNoOtherCondition(sqlKiller *sqlkiller.SQLKiller) (err error) {
192+
meta := a.ctx.hashTableMeta
193+
tagHelper := a.ctx.hashTableContext.tagHelper
194+
195+
loopCnt := 0
196+
197+
for a.currentProbeRow < a.chunkRows {
198+
if a.matchedRowsHeaders[a.currentProbeRow] != 0 {
199+
candidateRow := tagHelper.toUnsafePointer(a.matchedRowsHeaders[a.currentProbeRow])
200+
if !meta.isCurrentRowUsedWithAtomic(candidateRow) {
201+
if isKeyMatched(meta.keyMode, a.serializedKeys[a.currentProbeRow], candidateRow, meta) {
202+
meta.setUsedFlag(candidateRow)
203+
} else {
204+
a.probeCollision++
205+
}
206+
}
207+
a.matchedRowsHeaders[a.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, a.matchedRowsHashValue[a.currentProbeRow])
208+
} else {
209+
a.currentProbeRow++
210+
}
211+
212+
loopCnt++
213+
if loopCnt%2000 == 0 {
214+
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
215+
if err != nil {
216+
return err
217+
}
218+
}
219+
}
220+
221+
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
222+
return err
223+
}
224+
225+
func (a *antiSemiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
226+
err = a.concatenateProbeAndBuildRows(joinedChk, sqlKiller, true)
227+
if err != nil {
228+
return err
229+
}
230+
231+
if joinedChk.NumRows() > 0 {
232+
a.selected, a.isNulls, err = expression.VecEvalBool(a.ctx.SessCtx.GetExprCtx().GetEvalCtx(), a.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, a.ctx.OtherCondition, joinedChk, a.selected, a.isNulls)
233+
if err != nil {
234+
return err
235+
}
236+
237+
length := len(a.selected)
238+
for i := range length {
239+
if a.selected[i] || a.isNulls[i] {
240+
probeRowIdx := a.rowIndexInfos[i].probeRowIndex
241+
a.isMatchedRows[probeRowIdx] = true
242+
}
243+
}
244+
}
245+
return
246+
}
247+
248+
func (a *antiSemiJoinProbe) probeForRightSideBuildHasOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
249+
if !a.unFinishedProbeRowIdxQueue.IsEmpty() {
250+
err = a.produceResult(joinedChk, sqlKiller)
251+
if err != nil {
252+
return err
253+
}
254+
a.currentProbeRow = 0
255+
}
256+
257+
if a.unFinishedProbeRowIdxQueue.IsEmpty() {
258+
a.generateResultChkForRightBuildWithOtherCondition(remainCap, chk, a.isMatchedRows, false)
259+
}
260+
261+
return
262+
}
263+
264+
func (a *antiSemiJoinProbe) probeForRightSideBuildNoOtherCondition(chk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
265+
meta := a.ctx.hashTableMeta
266+
tagHelper := a.ctx.hashTableContext.tagHelper
267+
matched := false
268+
269+
if cap(a.offsets) == 0 {
270+
a.offsets = make([]int, 0, remainCap)
271+
}
272+
273+
a.offsets = a.offsets[:0]
274+
275+
for remainCap > 0 && a.currentProbeRow < a.chunkRows {
276+
if a.matchedRowsHeaders[a.currentProbeRow] != 0 {
277+
candidateRow := tagHelper.toUnsafePointer(a.matchedRowsHeaders[a.currentProbeRow])
278+
if isKeyMatched(meta.keyMode, a.serializedKeys[a.currentProbeRow], candidateRow, meta) {
279+
matched = true
280+
a.matchedRowsHeaders[a.currentProbeRow] = 0
281+
} else {
282+
a.probeCollision++
283+
a.matchedRowsHeaders[a.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, a.matchedRowsHashValue[a.currentProbeRow])
284+
}
285+
} else {
286+
if a.ctx.spillHelper.isSpillTriggered() && a.isMatchedRows[a.currentProbeRow] {
287+
// We see rows that have be spilled as matched rows
288+
matched = true
289+
}
290+
291+
if !matched {
292+
remainCap--
293+
a.offsets = append(a.offsets, a.usedRows[a.currentProbeRow])
294+
}
295+
296+
matched = false
297+
a.currentProbeRow++
298+
}
299+
}
300+
301+
err = checkSQLKiller(sqlKiller, "killedDuringProbe")
302+
if err != nil {
303+
return err
304+
}
305+
306+
a.generateResultChkForRightBuildNoOtherCondition(chk)
307+
return
308+
}
309+
310+
func (a *antiSemiJoinProbe) IsCurrentChunkProbeDone() bool {
311+
if a.ctx.hasOtherCondition() && !a.unFinishedProbeRowIdxQueue.IsEmpty() {
312+
return false
313+
}
314+
return a.baseJoinProbe.IsCurrentChunkProbeDone()
315+
}

0 commit comments

Comments
 (0)