Skip to content

Commit 75b473f

Browse files
yibin87ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59884
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 55e84ff commit 75b473f

File tree

3 files changed

+113
-2
lines changed

3 files changed

+113
-2
lines changed

pkg/executor/test/tiflashtest/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12+
<<<<<<< HEAD
1213
shard_count = 40,
14+
=======
15+
shard_count = 48,
16+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
1317
deps = [
1418
"//pkg/config",
1519
"//pkg/domain",

pkg/executor/test/tiflashtest/tiflash_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1868,3 +1868,87 @@ func TestIndexMergeCarePreferTiflash(t *testing.T) {
18681868
" └─Selection 0.00 mpp[tiflash] ge(test.t.m, 1726910326), le(test.t.m, 1726910391), not(in(test.t.a, -1, 0)), or(eq(test.t.w, \"1123\"), eq(test.t.l, \"1123\"))",
18691869
" └─TableFullScan 10.00 mpp[tiflash] table:a pushed down filter:eq(test.t.s, 0), keep order:false, stats:pseudo"))
18701870
}
1871+
<<<<<<< HEAD
1872+
=======
1873+
1874+
func TestIssue59703(t *testing.T) {
1875+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1876+
tk := testkit.NewTestKit(t, store)
1877+
tk.MustExec("use test")
1878+
tk.MustExec("drop table if exists t")
1879+
tk.MustExec("create table t(a int not null primary key, b int not null)")
1880+
tk.MustExec("alter table t set tiflash replica 1")
1881+
tb := external.GetTableByName(t, tk, "test", "t")
1882+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1883+
require.NoError(t, err)
1884+
tk.MustExec("insert into t values(1,0)")
1885+
tk.MustExec("insert into t values(2,0)")
1886+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1887+
// unistore does not support later materialization
1888+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
1889+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
1890+
1891+
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/mpp/mpp_coordinator_execute_err", "return()")
1892+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/mpp/mpp_coordinator_execute_err")
1893+
1894+
err = tk.ExecToErr("select count(*) from t")
1895+
require.Contains(t, err.Error(), "mock mpp error")
1896+
require.Equal(t, mppcoordmanager.InstanceMPPCoordinatorManager.GetCoordCount(), 0)
1897+
}
1898+
1899+
func TestIssue59877(t *testing.T) {
1900+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1901+
tk := testkit.NewTestKit(t, store)
1902+
tk.MustExec("use test")
1903+
tk.MustExec("drop table if exists t1, t2, t3")
1904+
tk.MustExec("create table t1(id bigint, v1 int)")
1905+
tk.MustExec("alter table t1 set tiflash replica 1")
1906+
tb := external.GetTableByName(t, tk, "test", "t1")
1907+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1908+
require.NoError(t, err)
1909+
tk.MustExec("create table t2(id bigint unsigned, v1 int)")
1910+
tk.MustExec("alter table t2 set tiflash replica 1")
1911+
tb = external.GetTableByName(t, tk, "test", "t2")
1912+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1913+
require.NoError(t, err)
1914+
tk.MustExec("create table t3(id bigint, v1 int)")
1915+
tk.MustExec("alter table t3 set tiflash replica 1")
1916+
tb = external.GetTableByName(t, tk, "test", "t3")
1917+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1918+
require.NoError(t, err)
1919+
1920+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1921+
// unistore does not support later materialization
1922+
tk.MustExec("set tidb_opt_enable_late_materialization=0")
1923+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
1924+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
1925+
tk.MustExec("set tidb_broadcast_join_threshold_size=0")
1926+
tk.MustExec("set tidb_broadcast_join_threshold_count=0")
1927+
tk.MustExec("set tiflash_fine_grained_shuffle_stream_count=8")
1928+
tk.MustExec("set tidb_enforce_mpp=1")
1929+
tk.MustQuery("explain format=\"brief\" select /*+ hash_join_build(t3) */ count(*) from t1 straight_join t2 on t1.id = t2.id straight_join t3 on t1.id = t3.id").Check(
1930+
testkit.Rows("HashAgg 1.00 root funcs:count(Column#18)->Column#10",
1931+
"└─TableReader 1.00 root MppVersion: 3, data:ExchangeSender",
1932+
" └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough",
1933+
" └─HashAgg 1.00 mpp[tiflash] funcs:count(1)->Column#18",
1934+
" └─Projection 15609.38 mpp[tiflash] test.t1.id, Column#14",
1935+
" └─HashJoin 15609.38 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t3.id)]",
1936+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1937+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#17, collate: binary]",
1938+
" │ └─Projection 9990.00 mpp[tiflash] test.t3.id, cast(test.t3.id, decimal(20,0))->Column#17",
1939+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t3.id))",
1940+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t3 keep order:false, stats:pseudo",
1941+
" └─Projection(Probe) 12487.50 mpp[tiflash] test.t1.id, Column#14",
1942+
" └─HashJoin 12487.50 mpp[tiflash] inner join, equal:[eq(test.t1.id, test.t2.id)]",
1943+
" ├─ExchangeReceiver(Build) 9990.00 mpp[tiflash] ",
1944+
" │ └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#13, collate: binary]",
1945+
" │ └─Projection 9990.00 mpp[tiflash] test.t1.id, cast(test.t1.id, decimal(20,0))->Column#13",
1946+
" │ └─Selection 9990.00 mpp[tiflash] not(isnull(test.t1.id))",
1947+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo",
1948+
" └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ",
1949+
" └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: Column#14, collate: binary]",
1950+
" └─Projection 9990.00 mpp[tiflash] test.t2.id, cast(test.t2.id, decimal(20,0) UNSIGNED)->Column#14",
1951+
" └─Selection 9990.00 mpp[tiflash] not(isnull(test.t2.id))",
1952+
" └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo"))
1953+
}
1954+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))

pkg/planner/core/optimizer.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,8 +784,13 @@ const (
784784

785785
type fineGrainedShuffleHelper struct {
786786
shuffleTarget shuffleTarget
787+
<<<<<<< HEAD
787788
plans []*basePhysicalPlan
788789
joinKeysCount int
790+
=======
791+
plans []*physicalop.BasePhysicalPlan
792+
joinKeys []*expression.Column
793+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
789794
}
790795

791796
type tiflashClusterInfoStatus uint8
@@ -804,7 +809,7 @@ type tiflashClusterInfo struct {
804809
func (h *fineGrainedShuffleHelper) clear() {
805810
h.shuffleTarget = unknown
806811
h.plans = h.plans[:0]
807-
h.joinKeysCount = 0
812+
h.joinKeys = nil
808813
}
809814

810815
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) {
@@ -1003,9 +1008,15 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
10031008
probChild = child0
10041009
}
10051010
if len(joinKeys) > 0 { // Not cross join
1011+
<<<<<<< HEAD
10061012
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}}
10071013
buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan)
10081014
buildHelper.joinKeysCount = len(joinKeys)
1015+
=======
1016+
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*physicalop.BasePhysicalPlan{}}
1017+
buildHelper.plans = append(buildHelper.plans, &x.BasePhysicalPlan)
1018+
buildHelper.joinKeys = joinKeys
1019+
>>>>>>> b2a9059b5e1 (planner: Limit fine grained shuffle usage for mpp join operators to ensure shuffle keys are the same with actual join keys (#59884))
10091020
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
10101021
} else {
10111022
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}}
@@ -1035,7 +1046,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex
10351046
}
10361047
case joinBuild:
10371048
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
1038-
if len(x.HashCols) != helper.joinKeysCount {
1049+
if len(x.HashCols) != len(helper.joinKeys) {
1050+
break
1051+
}
1052+
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
1053+
// actual join hash code due to type cast
1054+
applyFlag := true
1055+
for i, joinKey := range helper.joinKeys {
1056+
if !x.HashCols[i].Col.EqualColumn(joinKey) {
1057+
applyFlag = false
1058+
break
1059+
}
1060+
}
1061+
if !applyFlag {
10391062
break
10401063
}
10411064
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result

0 commit comments

Comments
 (0)