Skip to content

Commit 0a7b552

Browse files
wshwsh12ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#54855
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 033d247 commit 0a7b552

File tree

3 files changed

+288
-0
lines changed

3 files changed

+288
-0
lines changed

pkg/executor/index_lookup_hash_join.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde
562562
select {
563563
case joinResult.chk, ok = <-iw.joinChkResourceCh:
564564
case <-ctx.Done():
565+
joinResult.err = ctx.Err()
565566
return joinResult, false
566567
}
567568
return joinResult, ok
@@ -783,7 +784,10 @@ func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Contex
783784
select {
784785
case iw.resultCh <- joinResult:
785786
case <-ctx.Done():
787+
joinResult.err = ctx.Err()
788+
return false, joinResult
786789
}
790+
failpoint.InjectCall("joinMatchedInnerRow2Chunk")
787791
joinResult, ok = iw.getNewJoinResult(ctx)
788792
if !ok {
789793
return false, joinResult

pkg/executor/join/BUILD.bazel

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "join",
5+
srcs = [
6+
"base_join_probe.go",
7+
"concurrent_map.go",
8+
"hash_join_base.go",
9+
"hash_join_v1.go",
10+
"hash_join_v2.go",
11+
"hash_table_v1.go",
12+
"hash_table_v2.go",
13+
"index_lookup_hash_join.go",
14+
"index_lookup_join.go",
15+
"index_lookup_merge_join.go",
16+
"inner_join_probe.go",
17+
"join_row_table.go",
18+
"joiner.go",
19+
"merge_join.go",
20+
"outer_join_probe.go",
21+
],
22+
importpath = "github.com/pingcap/tidb/pkg/executor/join",
23+
visibility = ["//visibility:public"],
24+
deps = [
25+
"//pkg/executor/aggregate",
26+
"//pkg/executor/internal/applycache",
27+
"//pkg/executor/internal/exec",
28+
"//pkg/executor/internal/vecgroupchecker",
29+
"//pkg/executor/unionexec",
30+
"//pkg/expression",
31+
"//pkg/parser/mysql",
32+
"//pkg/parser/terror",
33+
"//pkg/planner/core",
34+
"//pkg/sessionctx",
35+
"//pkg/sessionctx/stmtctx",
36+
"//pkg/sessionctx/variable",
37+
"//pkg/types",
38+
"//pkg/util",
39+
"//pkg/util/bitmap",
40+
"//pkg/util/channel",
41+
"//pkg/util/chunk",
42+
"//pkg/util/codec",
43+
"//pkg/util/collate",
44+
"//pkg/util/dbterror/exeerrors",
45+
"//pkg/util/disk",
46+
"//pkg/util/execdetails",
47+
"//pkg/util/hack",
48+
"//pkg/util/logutil",
49+
"//pkg/util/memory",
50+
"//pkg/util/mvmap",
51+
"//pkg/util/ranger",
52+
"//pkg/util/serialization",
53+
"//pkg/util/sqlkiller",
54+
"//pkg/util/syncutil",
55+
"@com_github_pingcap_errors//:errors",
56+
"@com_github_pingcap_failpoint//:failpoint",
57+
"@org_uber_go_zap//:zap",
58+
],
59+
)
60+
61+
go_test(
62+
name = "join_test",
63+
timeout = "short",
64+
srcs = [
65+
"bench_test.go",
66+
"concurrent_map_test.go",
67+
"hash_table_v1_test.go",
68+
"hash_table_v2_test.go",
69+
"index_lookup_join_test.go",
70+
"index_lookup_merge_join_test.go",
71+
"inner_join_probe_test.go",
72+
"join_row_table_test.go",
73+
"join_stats_test.go",
74+
"joiner_test.go",
75+
"left_outer_join_probe_test.go",
76+
"merge_join_test.go",
77+
"right_outer_join_probe_test.go",
78+
"row_table_builder_test.go",
79+
],
80+
embed = [":join"],
81+
flaky = True,
82+
shard_count = 48,
83+
deps = [
84+
"//pkg/config",
85+
"//pkg/domain",
86+
"//pkg/executor/internal/testutil",
87+
"//pkg/expression",
88+
"//pkg/parser/ast",
89+
"//pkg/parser/mysql",
90+
"//pkg/planner/core",
91+
"//pkg/session",
92+
"//pkg/sessionctx",
93+
"//pkg/sessionctx/variable",
94+
"//pkg/testkit",
95+
"//pkg/types",
96+
"//pkg/util",
97+
"//pkg/util/chunk",
98+
"//pkg/util/codec",
99+
"//pkg/util/disk",
100+
"//pkg/util/hack",
101+
"//pkg/util/memory",
102+
"//pkg/util/mock",
103+
"//pkg/util/sqlkiller",
104+
"@com_github_pingcap_errors//:errors",
105+
"@com_github_pingcap_failpoint//:failpoint",
106+
"@com_github_stretchr_testify//require",
107+
],
108+
)
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2018 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package join_test
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"math/rand"
21+
"runtime"
22+
"strings"
23+
"testing"
24+
25+
"github.com/pingcap/failpoint"
26+
"github.com/pingcap/tidb/pkg/session"
27+
"github.com/pingcap/tidb/pkg/testkit"
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
func TestIndexLookupJoinHang(t *testing.T) {
32+
store := testkit.CreateMockStore(t)
33+
34+
tk := testkit.NewTestKit(t, store)
35+
tk.MustExec("use test")
36+
tk.MustExec("create table idxJoinOuter (a int unsigned)")
37+
tk.MustExec("create table idxJoinInner (a int unsigned unique)")
38+
tk.MustExec("insert idxJoinOuter values (1), (1), (1), (1), (1)")
39+
tk.MustExec("insert idxJoinInner values (1)")
40+
tk.Session().GetSessionVars().IndexJoinBatchSize = 1
41+
tk.Session().GetSessionVars().SetIndexLookupJoinConcurrency(1)
42+
43+
rs, err := tk.Exec("select /*+ INL_JOIN(i)*/ * from idxJoinOuter o left join idxJoinInner i on o.a = i.a where o.a in (1, 2) and (i.a - 3) > 0")
44+
require.NoError(t, err)
45+
req := rs.NewChunk(nil)
46+
for i := 0; i < 5; i++ {
47+
// FIXME: cannot check err, since err exists, Panic: [tikv:1690]BIGINT UNSIGNED value is out of range in '(Column#0 - 3)'
48+
_ = rs.Next(context.Background(), req)
49+
}
50+
err = rs.Close()
51+
require.NoError(t, err)
52+
53+
rs, err = tk.Exec("select /*+ INL_HASH_JOIN(i)*/ * from idxJoinOuter o left join idxJoinInner i on o.a = i.a where o.a in (1, 2) and (i.a - 3) > 0")
54+
require.NoError(t, err)
55+
req = rs.NewChunk(nil)
56+
for i := 0; i < 5; i++ {
57+
// to fix: cannot check err, since err exists, Panic: [tikv:1690]BIGINT UNSIGNED value is out of range in '(Column#0 - 3)'
58+
_ = rs.Next(context.Background(), req)
59+
}
60+
err = rs.Close()
61+
require.NoError(t, err)
62+
63+
rs, err = tk.Exec("select /*+ INL_MERGE_JOIN(i)*/ * from idxJoinOuter o left join idxJoinInner i on o.a = i.a where o.a in (1, 2) and (i.a - 3) > 0")
64+
require.NoError(t, err)
65+
req = rs.NewChunk(nil)
66+
for i := 0; i < 5; i++ {
67+
// to fix: cannot check err, since err exists, Panic: [tikv:1690]BIGINT UNSIGNED value is out of range in '(Column#0 - 3)'
68+
_ = rs.Next(context.Background(), req)
69+
}
70+
err = rs.Close()
71+
require.NoError(t, err)
72+
}
73+
74+
func TestIssue16887(t *testing.T) {
75+
store := testkit.CreateMockStore(t)
76+
77+
tk := testkit.NewTestKit(t, store)
78+
tk.MustExec("use test")
79+
tk.MustExec("drop table if exists admin_roles, admin_role_has_permissions")
80+
tk.MustExec("CREATE TABLE `admin_role_has_permissions` (`permission_id` bigint(20) unsigned NOT NULL, `role_id` bigint(20) unsigned NOT NULL, PRIMARY KEY (`permission_id`,`role_id`), KEY `admin_role_has_permissions_role_id_foreign` (`role_id`))")
81+
tk.MustExec("CREATE TABLE `admin_roles` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL COMMENT '角色名称', `created_at` timestamp NULL DEFAULT NULL, `updated_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`))")
82+
tk.MustExec("INSERT INTO `admin_roles` (`id`, `name`, `created_at`, `updated_at`) VALUES(1, 'admin','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(2, 'developer','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(3, 'analyst','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(4, 'channel_admin','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(5, 'test','2020-04-27 02:40:08', '2020-04-27 02:40:08')")
83+
tk.MustExec("INSERT INTO `admin_role_has_permissions` (`permission_id`, `role_id`) VALUES(1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 1),(21, 1),(22, 1),(23, 1),(24, 1),(25, 1),(26, 1),(27, 1),(28, 1),(29, 1),(30, 1),(31, 1),(32, 1),(33, 1),(34, 1),(35, 1),(36, 1),(37, 1),(38, 1),(39, 1),(40, 1),(41, 1),(42, 1),(43, 1),(44, 1),(45, 1),(46, 1),(47, 1),(48, 1),(49, 1),(50, 1),(51, 1),(52, 1),(53, 1),(54, 1),(55, 1),(56, 1),(57, 1),(58, 1),(59, 1),(60, 1),(61, 1),(62, 1),(63, 1),(64, 1),(65, 1),(66, 1),(67, 1),(68, 1),(69, 1),(70, 1),(71, 1),(72, 1),(73, 1),(74, 1),(75, 1),(76, 1),(77, 1),(78, 1),(79, 1),(80, 1),(81, 1),(82, 1),(83, 1),(5, 4),(6, 4),(7, 4),(84, 5),(85, 5),(86, 5)")
84+
rows := tk.MustQuery("SELECT /*+ inl_merge_join(admin_role_has_permissions) */ `admin_roles`.* FROM `admin_roles` INNER JOIN `admin_role_has_permissions` ON `admin_roles`.`id` = `admin_role_has_permissions`.`role_id` WHERE `admin_role_has_permissions`.`permission_id`\n IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67)").Rows()
85+
require.Len(t, rows, 70)
86+
rows = tk.MustQuery("show warnings").Rows()
87+
require.Less(t, 0, len(rows))
88+
}
89+
90+
func TestPartitionTableIndexJoinAndIndexReader(t *testing.T) {
91+
store := testkit.CreateMockStore(t)
92+
93+
tk := testkit.NewTestKit(t, store)
94+
tk.MustExec("use test")
95+
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
96+
tk.MustExec(`create table t (a int, b int, key(a)) partition by hash(a) partitions 4`)
97+
tk.MustExec("create table tnormal (a int, b int, key(a), key(b))")
98+
nRows := 512
99+
values := make([]string, 0, nRows)
100+
for i := 0; i < nRows; i++ {
101+
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
102+
}
103+
tk.MustExec(fmt.Sprintf("insert into t values %v", strings.Join(values, ", ")))
104+
tk.MustExec(fmt.Sprintf("insert into tnormal values %v", strings.Join(values, ", ")))
105+
106+
randRange := func() (int, int) {
107+
a, b := rand.Intn(nRows), rand.Intn(nRows)
108+
if a > b {
109+
return b, a
110+
}
111+
return a, b
112+
}
113+
for i := 0; i < nRows; i++ {
114+
lb, rb := randRange()
115+
cond := fmt.Sprintf("(t2.b between %v and %v)", lb, rb)
116+
result := tk.MustQuery("select t1.a from tnormal t1, tnormal t2 where t1.a=t2.b and " + cond).Sort().Rows()
117+
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.a from t t1, t t2 where t1.a=t2.b and " + cond).Sort().Check(result)
118+
}
119+
}
120+
121+
func TestIssue45716(t *testing.T) {
122+
store := testkit.CreateMockStore(t)
123+
124+
tk := testkit.NewTestKit(t, store)
125+
tk.MustExec("use test")
126+
tk.MustExec("set tidb_mem_quota_query = 120000;")
127+
tk.MustExec("drop table if exists t1, t2;")
128+
tk.MustExec("create table t1(a int, index(a));")
129+
tk.MustExec("create table t2(a int, index(a));")
130+
tk.MustExec("insert into t1 values (1), (2);")
131+
tk.MustExec("insert into t2 values (1),(1),(2),(2);")
132+
133+
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/inlNewInnerPanic", `return(true)`)
134+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/inlNewInnerPanic")
135+
err := tk.QueryToErr("select /*+ inl_join(t2) */ * from t1 join t2 on t1.a = t2.a;")
136+
require.Error(t, err)
137+
tk.MustContainErrMsg(err.Error(), "test inlNewInnerPanic")
138+
}
139+
140+
func TestIssue54688(t *testing.T) {
141+
val := runtime.GOMAXPROCS(1)
142+
defer func() {
143+
runtime.GOMAXPROCS(val)
144+
}()
145+
store := testkit.CreateMockStore(t)
146+
tk := testkit.NewTestKit(t, store)
147+
tk.MustExec("use test;")
148+
tk.MustExec("drop table if exists t, s;")
149+
tk.MustExec("create table t(a int, index(a));")
150+
tk.MustExec("create table s(a int, index(a));")
151+
tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);")
152+
tk.MustExec("insert into s values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16);")
153+
tk.MustExec("insert into s select * from s")
154+
tk.MustExec("insert into s select * from s")
155+
tk.MustExec("insert into s select * from s")
156+
tk.MustExec("insert into s select * from s")
157+
tk.MustExec("insert into s select * from s")
158+
tk.MustExec("insert into s select * from s")
159+
tk.MustExec("insert into s select * from s")
160+
tk.MustExec("insert into s select * from s")
161+
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")
162+
tk.MustExec("set @@tidb_index_join_batch_size=1000000;")
163+
164+
for i := 0; i <= 100; i++ {
165+
rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a")
166+
require.NoError(t, err)
167+
context, cancel := context.WithCancel(context.Background())
168+
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/executor/join/joinMatchedInnerRow2Chunk",
169+
func() {
170+
cancel()
171+
},
172+
))
173+
_, _ = session.GetRows4Test(context, nil, rs)
174+
rs.Close()
175+
}
176+
}

0 commit comments

Comments
 (0)