Skip to content

Commit 6ab9aa3

Browse files
authored
planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884) (#59914)
close #59877
1 parent 26912fc commit 6ab9aa3

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

pkg/executor/test/tiflashtest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12-
shard_count = 43,
12+
shard_count = 44,
1313
deps = [
1414
"//pkg/config",
1515
"//pkg/domain",

pkg/executor/test/tiflashtest/tiflash_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2032,3 +2032,57 @@ func TestMppAggShouldAlignFinalMode(t *testing.T) {
20322032
err = failpoint.Disable("github.com/pingcap/tidb/pkg/expression/aggregation/show-agg-mode")
20332033
require.Nil(t, err)
20342034
}
2035+
2036+
func TestIssue59877(t *testing.T) {
2037+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2038+
tk := testkit.NewTestKit(t, store)
2039+
tk.MustExec("use test")
2040+
tk.MustExec("drop table if exists t1, t2, t3")
2041+
tk.MustExec("create table t1(id bigint, v1 int)")
2042+
tk.MustExec("alter table t1 set tiflash replica 1")
2043+
tb := external.GetTableByName(t, tk, "test", "t1")
2044+
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2045+
require.NoError(t, err)
2046+
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
2047+
tk.MustExec("alter table t2 set tiflash replica 1")
2048+
tb = external.GetTableByName(t, tk, "test", "t2")
2049+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2050+
require.NoError(t, err)
2051+
tk.MustExec("create table t3(id bigint, v1 int)")
2052+
tk.MustExec("alter table t3 set tiflash replica 1")
2053+
tb = external.GetTableByName(t, tk, "test", "t3")
2054+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2055+
require.NoError(t, err)
2056+
2057+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
2058+
// unistore does not support later materialization
2059+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
2060+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
2061+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
2062+
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
2063+
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
2064+
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
2065+
tk.MustExec("set tidb_enforce_mpp=1")
2066+
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
2067+
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
2068+
"└─TableReader 1.00 root MppVersion: 2, data:ExchangeSender",
2069+
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
2070+
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
2071+
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
2072+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
2073+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
2074+
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
2075+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
2076+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
2077+
" └─HashJoin(Probe) 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
2078+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
2079+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
2080+
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
2081+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
2082+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
2083+
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
2084+
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
2085+
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
2086+
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
2087+
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
2088+
}

pkg/planner/core/optimizer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ const (
771771
type fineGrainedShuffleHelper struct {
772772
shuffleTarget shuffleTarget
773773
plans []*basePhysicalPlan
774-
joinKeysCount int
774+
joinKeys []*expression.Column
775775
}
776776

777777
type tiflashClusterInfoStatus uint8
@@ -790,7 +790,7 @@ type tiflashClusterInfo struct {
790790
func (h *fineGrainedShuffleHelper) clear() {
791791
h.shuffleTarget = unknown
792792
h.plans = h.plans[:0]
793-
h.joinKeysCount = 0
793+
h.joinKeys = nil
794794
}
795795

796796
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) {
@@ -991,7 +991,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx PlanContext, plan
991991
if len(joinKeys) > 0 { // Not cross join
992992
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}}
993993
buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan)
994-
buildHelper.joinKeysCount = len(joinKeys)
994+
buildHelper.joinKeys = joinKeys
995995
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
996996
} else {
997997
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}}
@@ -1021,7 +1021,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx PlanContext, plan
10211021
}
10221022
case joinBuild:
10231023
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
1024-
if len(x.HashCols) != helper.joinKeysCount {
1024+
if len(x.HashCols) != len(helper.joinKeys) {
1025+
break
1026+
}
1027+
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
1028+
// actual join hash code due to type cast
1029+
applyFlag := true
1030+
for i, joinKey := range helper.joinKeys {
1031+
if !x.HashCols[i].Col.Equal(nil, joinKey) {
1032+
applyFlag = false
1033+
break
1034+
}
1035+
}
1036+
if !applyFlag {
10251037
break
10261038
}
10271039
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result

0 commit comments

Comments
 (0)