Skip to content

Commit d078286

Browse files
authored
planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884) (#59915)
close #59877
1 parent 55e84ff commit d078286

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

pkg/executor/test/tiflashtest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12-
shard_count = 40,
12+
shard_count = 41,
1313
deps = [
1414
"//pkg/config",
1515
"//pkg/domain",

pkg/executor/test/tiflashtest/tiflash_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1868,3 +1868,57 @@ func TestIndexMergeCarePreferTiflash(t *testing.T) {
18681868
" └─Selection 0.00 mpp[tiflash] ge(test.t.m, 1726910326), le(test.t.m, 1726910391), not(in(test.t.a, -1, 0)), or(eq(test.t.w, \"1123\"), eq(test.t.l, \"1123\"))",
18691869
" └─TableFullScan 10.00 mpp[tiflash] table:a pushed down filter:eq(test.t.s, 0), keep order:false, stats:pseudo"))
18701870
}
1871+
1872+
func TestIssue59877(t *testing.T) {
1873+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1874+
tk := testkit.NewTestKit(t, store)
1875+
tk.MustExec("use test")
1876+
tk.MustExec("drop table if exists t1, t2, t3")
1877+
tk.MustExec("create table t1(id bigint, v1 int)")
1878+
tk.MustExec("alter table t1 set tiflash replica 1")
1879+
tb := external.GetTableByName(t, tk, "test", "t1")
1880+
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1881+
require.NoError(t, err)
1882+
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
1883+
tk.MustExec("alter table t2 set tiflash replica 1")
1884+
tb = external.GetTableByName(t, tk, "test", "t2")
1885+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1886+
require.NoError(t, err)
1887+
tk.MustExec("create table t3(id bigint, v1 int)")
1888+
tk.MustExec("alter table t3 set tiflash replica 1")
1889+
tb = external.GetTableByName(t, tk, "test", "t3")
1890+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1891+
require.NoError(t, err)
1892+
1893+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1894+
// unistore does not support later materialization
1895+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
1896+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
1897+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
1898+
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
1899+
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
1900+
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
1901+
tk.MustExec("set tidb_enforce_mpp=1")
1902+
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
1903+
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
1904+
"└─TableReader 1.00 root MppVersion: 2, data:ExchangeSender",
1905+
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
1906+
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
1907+
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
1908+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1909+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
1910+
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
1911+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
1912+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
1913+
" └─HashJoin(Probe) 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
1914+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1915+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
1916+
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
1917+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
1918+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
1919+
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
1920+
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
1921+
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
1922+
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
1923+
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
1924+
}

pkg/planner/core/optimizer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,7 @@ const (
785785
type fineGrainedShuffleHelper struct {
786786
shuffleTarget shuffleTarget
787787
plans []*basePhysicalPlan
788-
joinKeysCount int
788+
joinKeys []*expression.Column
789789
}
790790

791791
type tiflashClusterInfoStatus uint8
@@ -804,7 +804,7 @@ type tiflashClusterInfo struct {
804804
func (h *fineGrainedShuffleHelper) clear() {
805805
h.shuffleTarget = unknown
806806
h.plans = h.plans[:0]
807-
h.joinKeysCount = 0
807+
h.joinKeys = nil
808808
}
809809

810810
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) {
@@ -1005,7 +1005,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
10051005
if len(joinKeys) > 0 { // Not cross join
10061006
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}}
10071007
buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan)
1008-
buildHelper.joinKeysCount = len(joinKeys)
1008+
buildHelper.joinKeys = joinKeys
10091009
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
10101010
} else {
10111011
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}}
@@ -1035,7 +1035,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
10351035
}
10361036
case joinBuild:
10371037
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
1038-
if len(x.HashCols) != helper.joinKeysCount {
1038+
if len(x.HashCols) != len(helper.joinKeys) {
1039+
break
1040+
}
1041+
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
1042+
// actual join hash code due to type cast
1043+
applyFlag := true
1044+
for i, joinKey := range helper.joinKeys {
1045+
if !x.HashCols[i].Col.Equal(nil, joinKey) {
1046+
applyFlag = false
1047+
break
1048+
}
1049+
}
1050+
if !applyFlag {
10391051
break
10401052
}
10411053
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result

0 commit comments

Comments
 (0)