Skip to content

Commit 7e48ab1

Browse files
authored
executor: skip inner rows when the join keys contains NULL (#7255)
1 parent 6178cfd commit 7e48ab1

File tree

3 files changed

+60
-15
lines changed

3 files changed

+60
-15
lines changed

executor/index_lookup_join.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,10 @@ func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error {
532532
chk := task.innerResult.GetChunk(i)
533533
for j := 0; j < chk.NumRows(); j++ {
534534
innerRow := chk.GetRow(j)
535+
if iw.hasNullInJoinKey(innerRow) {
536+
continue
537+
}
538+
535539
keyBuf = keyBuf[:0]
536540
for _, keyCol := range iw.keyCols {
537541
d := innerRow.GetDatum(keyCol, iw.rowTypes[keyCol])
@@ -549,6 +553,15 @@ func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error {
549553
return nil
550554
}
551555

556+
func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool {
557+
for _, ordinal := range iw.keyCols {
558+
if row.IsNull(ordinal) {
559+
return true
560+
}
561+
}
562+
return false
563+
}
564+
552565
// Close implements the Executor interface.
553566
func (e *IndexLookUpJoin) Close() error {
554567
if e.cancelFunc != nil {

executor/join_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,14 @@ func (s *testSuite) TestIndexLookupJoin(c *C) {
843843
tk.MustExec("CREATE TABLE t(a BIGINT PRIMARY KEY, b BIGINT);")
844844
tk.MustExec("INSERT INTO t VALUES(1, 2);")
845845
tk.MustQuery("SELECT /*+ TIDB_INLJ(t1, t2) */ * FROM t t1 JOIN t t2 ON t1.a=t2.a UNION ALL SELECT /*+ TIDB_INLJ(t1, t2) */ * FROM t t1 JOIN t t2 ON t1.a=t2.a;").Check(testkit.Rows("1 2 1 2", "1 2 1 2"))
846+
847+
tk.MustExec(`drop table if exists t;`)
848+
tk.MustExec(`create table t(a decimal(6,2), index idx(a));`)
849+
tk.MustExec(`insert into t values(1.01), (2.02), (NULL);`)
850+
tk.MustQuery(`select /*+ TIDB_INLJ(t1) */ t1.a from t t1 join t t2 on t1.a=t2.a order by t1.a;`).Check(testkit.Rows(
851+
`1.01`,
852+
`2.02`,
853+
))
846854
}
847855

848856
func (s *testSuite) TestMergejoinOrder(c *C) {
@@ -880,4 +888,12 @@ func (s *testSuite) TestMergejoinOrder(c *C) {
880888
`2 1 2 1`,
881889
`2 2 2 2`,
882890
))
891+
892+
tk.MustExec(`drop table if exists t;`)
893+
tk.MustExec(`create table t(a decimal(6,2), index idx(a));`)
894+
tk.MustExec(`insert into t values(1.01), (2.02), (NULL);`)
895+
tk.MustQuery(`select /*+ TIDB_SMJ(t1) */ t1.a from t t1 join t t2 on t1.a=t2.a order by t1.a;`).Check(testkit.Rows(
896+
`1.01`,
897+
`2.02`,
898+
))
883899
}

executor/merge_join.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -131,23 +131,39 @@ func (t *mergeJoinInnerTable) rowsWithSameKey() ([]chunk.Row, error) {
131131
}
132132

133133
func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) {
134-
if t.curRow == t.curIter.End() {
135-
t.reallocReaderResult()
136-
oldMemUsage := t.curResult.MemoryUsage()
137-
err := t.reader.Next(t.ctx, t.curResult)
138-
// error happens or no more data.
139-
if err != nil || t.curResult.NumRows() == 0 {
140-
t.curRow = t.curIter.End()
141-
return t.curRow, errors.Trace(err)
134+
for {
135+
if t.curRow == t.curIter.End() {
136+
t.reallocReaderResult()
137+
oldMemUsage := t.curResult.MemoryUsage()
138+
err := t.reader.Next(t.ctx, t.curResult)
139+
// error happens or no more data.
140+
if err != nil || t.curResult.NumRows() == 0 {
141+
t.curRow = t.curIter.End()
142+
return t.curRow, errors.Trace(err)
143+
}
144+
newMemUsage := t.curResult.MemoryUsage()
145+
t.memTracker.Consume(newMemUsage - oldMemUsage)
146+
t.curRow = t.curIter.Begin()
147+
}
148+
149+
result := t.curRow
150+
t.curResultInUse = true
151+
t.curRow = t.curIter.Next()
152+
153+
if !t.hasNullInJoinKey(result) {
154+
return result, nil
155+
}
156+
}
157+
}
158+
159+
func (t *mergeJoinInnerTable) hasNullInJoinKey(row chunk.Row) bool {
160+
for _, col := range t.joinKeys {
161+
ordinal := col.Index
162+
if row.IsNull(ordinal) {
163+
return true
142164
}
143-
newMemUsage := t.curResult.MemoryUsage()
144-
t.memTracker.Consume(newMemUsage - oldMemUsage)
145-
t.curRow = t.curIter.Begin()
146165
}
147-
result := t.curRow
148-
t.curResultInUse = true
149-
t.curRow = t.curIter.Next()
150-
return result, nil
166+
return false
151167
}
152168

153169
// reallocReaderResult resets "t.curResult" to an empty Chunk to buffer the result of "t.reader".

0 commit comments

Comments
 (0)