Skip to content

Commit 5228e2f

Browse files
authored
planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884) (#59916)
close #59877
1 parent 781794d commit 5228e2f

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

executor/tiflashtest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12-
shard_count = 39,
12+
shard_count = 40,
1313
deps = [
1414
"//config",
1515
"//domain",

executor/tiflashtest/tiflash_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,3 +1770,57 @@ func TestIndexMergeCarePreferTiflash(t *testing.T) {
17701770
" └─Selection 0.00 mpp[tiflash] ge(test.t.m, 1726910326), le(test.t.m, 1726910391), not(in(test.t.a, -1, 0)), or(eq(test.t.w, \"1123\"), eq(test.t.l, \"1123\"))",
17711771
" └─TableFullScan 10.00 mpp[tiflash] table:a pushed down filter:eq(test.t.s, 0), keep order:false, stats:pseudo"))
17721772
}
1773+
1774+
func TestIssue59877(t *testing.T) {
1775+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1776+
tk := testkit.NewTestKit(t, store)
1777+
tk.MustExec("use test")
1778+
tk.MustExec("drop table if exists t1, t2, t3")
1779+
tk.MustExec("create table t1(id bigint, v1 int)")
1780+
tk.MustExec("alter table t1 set tiflash replica 1")
1781+
tb := external.GetTableByName(t, tk, "test", "t1")
1782+
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1783+
require.NoError(t, err)
1784+
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
1785+
tk.MustExec("alter table t2 set tiflash replica 1")
1786+
tb = external.GetTableByName(t, tk, "test", "t2")
1787+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1788+
require.NoError(t, err)
1789+
tk.MustExec("create table t3(id bigint, v1 int)")
1790+
tk.MustExec("alter table t3 set tiflash replica 1")
1791+
tb = external.GetTableByName(t, tk, "test", "t3")
1792+
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1793+
require.NoError(t, err)
1794+
1795+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1796+
// unistore does not support later materialization
1797+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
1798+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
1799+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
1800+
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
1801+
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
1802+
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
1803+
tk.MustExec("set tidb_enforce_mpp=1")
1804+
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
1805+
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
1806+
"└─TableReader 1.00 root MppVersion: 1, data:ExchangeSender",
1807+
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
1808+
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
1809+
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
1810+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1811+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
1812+
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
1813+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
1814+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
1815+
" └─HashJoin(Probe) 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
1816+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1817+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
1818+
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
1819+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
1820+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
1821+
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
1822+
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
1823+
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
1824+
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
1825+
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
1826+
}

planner/core/optimizer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ const (
743743
type fineGrainedShuffleHelper struct {
744744
shuffleTarget shuffleTarget
745745
plans []*basePhysicalPlan
746-
joinKeysCount int
746+
joinKeys []*expression.Column
747747
}
748748

749749
type tiflashClusterInfoStatus uint8
@@ -762,7 +762,7 @@ type tiflashClusterInfo struct {
762762
func (h *fineGrainedShuffleHelper) clear() {
763763
h.shuffleTarget = unknown
764764
h.plans = h.plans[:0]
765-
h.joinKeysCount = 0
765+
h.joinKeys = nil
766766
}
767767

768768
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) {
@@ -961,7 +961,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
961961
if len(joinKeys) > 0 { // Not cross join
962962
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}}
963963
buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan)
964-
buildHelper.joinKeysCount = len(joinKeys)
964+
buildHelper.joinKeys = joinKeys
965965
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
966966
} else {
967967
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}}
@@ -991,7 +991,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
991991
}
992992
case joinBuild:
993993
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
994-
if len(x.HashCols) != helper.joinKeysCount {
994+
if len(x.HashCols) != len(helper.joinKeys) {
995+
break
996+
}
997+
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
998+
// actual join hash code due to type cast
999+
applyFlag := true
1000+
for i, joinKey := range helper.joinKeys {
1001+
if !x.HashCols[i].Col.Equal(nil, joinKey) {
1002+
applyFlag = false
1003+
break
1004+
}
1005+
}
1006+
if !applyFlag {
9951007
break
9961008
}
9971009
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result

0 commit comments

Comments
 (0)