Skip to content

Commit 21b70e1

Browse files
yibin87ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59884
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 26912fc commit 21b70e1

File tree

3 files changed

+231
-2
lines changed

3 files changed

+231
-2
lines changed

pkg/executor/test/tiflashtest/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12+
<<<<<<< HEAD
1213
shard_count = 43,
14+
=======
15+
shard_count = 48,
16+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
1317
deps = [
1418
"//pkg/config",
1519
"//pkg/domain",

pkg/executor/test/tiflashtest/tiflash_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2032,3 +2032,205 @@ func TestMppAggShouldAlignFinalMode(t *testing.T) {
20322032
err = failpoint.Disable("github.com/pingcap/tidb/pkg/expression/aggregation/show-agg-mode")
20332033
require.Nil(t, err)
20342034
}
2035+
<<<<<<< HEAD
2036+
=======
2037+
2038+
func TestMppTableReaderCacheForSingleSQL(t *testing.T) {
2039+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2040+
tk := testkit.NewTestKit(t, store)
2041+
tk.MustExec("use test")
2042+
tk.MustExec("create table t(a int, b int, primary key(a))")
2043+
tk.MustExec("alter table t set tiflash replica 1")
2044+
tb := external.GetTableByName(t, tk, "test", "t")
2045+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2046+
require.NoError(t, err)
2047+
2048+
tk.MustExec("create table t2(a int, b int) partition by hash(b) partitions 4")
2049+
tk.MustExec("alter table t2 set tiflash replica 1")
2050+
tb = external.GetTableByName(t, tk, "test", "t2")
2051+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2052+
require.NoError(t, err)
2053+
tk.MustExec("insert into t values(1, 1)")
2054+
tk.MustExec("insert into t values(2, 2)")
2055+
tk.MustExec("insert into t values(3, 3)")
2056+
tk.MustExec("insert into t values(4, 4)")
2057+
tk.MustExec("insert into t values(5, 5)")
2058+
2059+
tk.MustExec("insert into t2 values(1, 1)")
2060+
tk.MustExec("insert into t2 values(2, 2)")
2061+
tk.MustExec("insert into t2 values(3, 3)")
2062+
tk.MustExec("insert into t2 values(4, 4)")
2063+
tk.MustExec("insert into t2 values(5, 5)")
2064+
2065+
// unistore does not support later materialization
2066+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
2067+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
2068+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
2069+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
2070+
tk.MustExec("set @@session.tidb_max_chunk_size=32")
2071+
2072+
// Test TableReader cache for single SQL.
2073+
type testCase struct {
2074+
sql string
2075+
expectHitNum int32
2076+
expectMissNum int32
2077+
}
2078+
2079+
testCases := []testCase{
2080+
// Non-Partition
2081+
// Cache hit
2082+
{"select * from t", 0, 1},
2083+
{"select * from t union select * from t", 1, 1},
2084+
{"select * from t union select * from t t1 union select * from t t2", 2, 1},
2085+
{"select * from t where b <= 3 union select * from t where b > 3", 1, 1}, // both full range
2086+
{"select * from t where a <= 3 union select * from t where a <= 3", 1, 1}, // same range
2087+
{"select * from t t1 join t t2 on t1.b=t2.b", 1, 1},
2088+
2089+
// Cache miss
2090+
{"select * from t union all select * from t", 0, 2}, // different mpp task root
2091+
{"select * from t where a <= 3 union select * from t where a > 3", 0, 2}, // different range
2092+
2093+
// Partition
2094+
// Cache hit
2095+
{"select * from t2 union select * from t2", 1, 1},
2096+
{"select * from t2 where b = 1 union select * from t2 where b = 5", 1, 1}, // same partition, full range
2097+
{"select * from t2 where b = 1 and a < 3 union select * from t2 where b = 5 and a < 3", 1, 1}, // same partition, same range
2098+
{"select * from t2 t1 join t2 t2 on t1.b=t2.b", 1, 1},
2099+
{"select * from t2 t1 join t2 t2 on t1.b=t2.b where t1.a = 2 and t2.a = 2", 1, 1},
2100+
2101+
// Cache miss
2102+
{"select * from t2 union select * from t2 where b = 1", 0, 2}, // different partition
2103+
{"select * from t2 where b = 2 union select * from t2 where b = 1", 0, 2}, // different partition
2104+
}
2105+
2106+
var hitNum, missNum atomic.Int32
2107+
hitFunc := func() {
2108+
hitNum.Add(1)
2109+
}
2110+
missFunc := func() {
2111+
missNum.Add(1)
2112+
}
2113+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/planner/core/mppTaskGeneratorTableReaderCacheHit", hitFunc)
2114+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/planner/core/mppTaskGeneratorTableReaderCacheMiss", missFunc)
2115+
for _, tc := range testCases {
2116+
hitNum.Store(0)
2117+
missNum.Store(0)
2118+
tk.MustQuery(tc.sql)
2119+
require.Equal(t, tc.expectHitNum, hitNum.Load())
2120+
require.Equal(t, tc.expectMissNum, missNum.Load())
2121+
}
2122+
}
2123+
2124+
func TestIndexMergeCarePreferTiflash(t *testing.T) {
2125+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2126+
tk := testkit.NewTestKit(t, store)
2127+
tk.MustExec("use test")
2128+
2129+
tk.MustExec("drop table if exists t")
2130+
tk.MustExec("CREATE TABLE `t` (" +
2131+
"`i` bigint(20) NOT NULL, " +
2132+
"`w` varchar(32) NOT NULL," +
2133+
"`l` varchar(32) NOT NULL," +
2134+
"`a` tinyint(4) NOT NULL DEFAULT '0'," +
2135+
"`m` int(11) NOT NULL DEFAULT '0'," +
2136+
"`s` int(11) NOT NULL DEFAULT '0'," +
2137+
"PRIMARY KEY (`i`) /*T![clustered_index] NONCLUSTERED */," +
2138+
"KEY `idx_win_user_site_code` (`w`,`m`)," +
2139+
"KEY `idx_lose_user_site_code` (`l`,`m`)," +
2140+
"KEY `idx_win_site_code_status` (`w`,`a`)," +
2141+
"KEY `idx_lose_site_code_status` (`l`,`a`)" +
2142+
")")
2143+
tk.MustExec("alter table t set tiflash replica 1")
2144+
tb := external.GetTableByName(t, tk, "test", "t")
2145+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2146+
require.NoError(t, err)
2147+
tk.MustQuery("explain format=\"brief\" SELECT" +
2148+
" /*+ read_from_storage(tiflash[a]) */ a.i FROM t a WHERE a.s = 0 AND a.a NOT IN (-1, 0) AND m >= 1726910326 AND m <= 1726910391 AND ( a.w IN ('1123') OR a.l IN ('1123'))").Check(
2149+
testkit.Rows("TableReader 0.00 root MppVersion: 3, data:ExchangeSender",
2150+
"└─ExchangeSender 0.00 mpp[tiflash] ExchangeType: PassThrough",
2151+
" └─Projection 0.00 mpp[tiflash] test.t.i",
2152+
" └─Selection 0.00 mpp[tiflash] ge(test.t.m, 1726910326), le(test.t.m, 1726910391), not(in(test.t.a, -1, 0)), or(eq(test.t.w, \"1123\"), eq(test.t.l, \"1123\"))",
2153+
" └─TableFullScan 10.00 mpp[tiflash] table:a pushed down filter:eq(test.t.s, 0), keep order:false, stats:pseudo"))
2154+
}
2155+
2156+
func TestIssue59703(t *testing.T) {
2157+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2158+
tk := testkit.NewTestKit(t, store)
2159+
tk.MustExec("use test")
2160+
tk.MustExec("drop table if exists t")
2161+
tk.MustExec("create table t(a int not null primary key, b int not null)")
2162+
tk.MustExec("alter table t set tiflash replica 1")
2163+
tb := external.GetTableByName(t, tk, "test", "t")
2164+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2165+
require.NoError(t, err)
2166+
tk.MustExec("insert into t values(1,0)")
2167+
tk.MustExec("insert into t values(2,0)")
2168+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
2169+
// unistore does not support later materialization
2170+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
2171+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
2172+
2173+
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/mpp/mpp_coordinator_execute_err", "return()")
2174+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/mpp/mpp_coordinator_execute_err")
2175+
2176+
err = tk.ExecToErr("select count(*) from t")
2177+
require.Contains(t, err.Error(), "mock mpp error")
2178+
require.Equal(t, mppcoordmanager.InstanceMPPCoordinatorManager.GetCoordCount(), 0)
2179+
}
2180+
2181+
func TestIssue59877(t *testing.T) {
2182+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2183+
tk := testkit.NewTestKit(t, store)
2184+
tk.MustExec("use test")
2185+
tk.MustExec("drop table if exists t1, t2, t3")
2186+
tk.MustExec("create table t1(id bigint, v1 int)")
2187+
tk.MustExec("alter table t1 set tiflash replica 1")
2188+
tb := external.GetTableByName(t, tk, "test", "t1")
2189+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2190+
require.NoError(t, err)
2191+
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
2192+
tk.MustExec("alter table t2 set tiflash replica 1")
2193+
tb = external.GetTableByName(t, tk, "test", "t2")
2194+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2195+
require.NoError(t, err)
2196+
tk.MustExec("create table t3(id bigint, v1 int)")
2197+
tk.MustExec("alter table t3 set tiflash replica 1")
2198+
tb = external.GetTableByName(t, tk, "test", "t3")
2199+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2200+
require.NoError(t, err)
2201+
2202+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
2203+
// unistore does not support later materialization
2204+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
2205+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
2206+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
2207+
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
2208+
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
2209+
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
2210+
tk.MustExec("set tidb_enforce_mpp=1")
2211+
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
2212+
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
2213+
"└─TableReader 1.00 root MppVersion: 3, data:ExchangeSender",
2214+
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
2215+
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
2216+
" └─Projection 15609.38 mpp[tiflash] test.t1.id, Column#14",
2217+
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
2218+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
2219+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
2220+
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
2221+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
2222+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
2223+
" └─Projection(Probe) 12487.50 mpp[tiflash] test.t1.id, Column#14",
2224+
" └─HashJoin 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
2225+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
2226+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
2227+
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
2228+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
2229+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
2230+
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
2231+
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
2232+
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
2233+
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
2234+
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
2235+
}
2236+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))

pkg/planner/core/optimizer.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,8 +770,13 @@ const (
770770

771771
type fineGrainedShuffleHelper struct {
772772
shuffleTarget shuffleTarget
773+
<<<<<<< HEAD
773774
plans []*basePhysicalPlan
774775
joinKeysCount int
776+
=======
777+
plans []*physicalop.BasePhysicalPlan
778+
joinKeys []*expression.Column
779+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
775780
}
776781

777782
type tiflashClusterInfoStatus uint8
@@ -790,7 +795,7 @@ type tiflashClusterInfo struct {
790795
func (h *fineGrainedShuffleHelper) clear() {
791796
h.shuffleTarget = unknown
792797
h.plans = h.plans[:0]
793-
h.joinKeysCount = 0
798+
h.joinKeys = nil
794799
}
795800

796801
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) {
@@ -989,9 +994,15 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx PlanContext, plan
989994
probChild = child0
990995
}
991996
if len(joinKeys) > 0 { // Not cross join
997+
<<<<<<< HEAD
992998
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}}
993999
buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan)
9941000
buildHelper.joinKeysCount = len(joinKeys)
1001+
=======
1002+
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*physicalop.BasePhysicalPlan{}}
1003+
buildHelper.plans = append(buildHelper.plans, &x.BasePhysicalPlan)
1004+
buildHelper.joinKeys = joinKeys
1005+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
9951006
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
9961007
} else {
9971008
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}}
@@ -1021,7 +1032,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx PlanContext, plan
10211032
}
10221033
case joinBuild:
10231034
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
1024-
if len(x.HashCols) != helper.joinKeysCount {
1035+
if len(x.HashCols) != len(helper.joinKeys) {
1036+
break
1037+
}
1038+
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
1039+
// actual join hash code due to type cast
1040+
applyFlag := true
1041+
for i, joinKey := range helper.joinKeys {
1042+
if !x.HashCols[i].Col.EqualColumn(joinKey) {
1043+
applyFlag = false
1044+
break
1045+
}
1046+
}
1047+
if !applyFlag {
10251048
break
10261049
}
10271050
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result

0 commit comments

Comments
 (0)