Skip to content

Commit ff8f713

Browse files
AilinKidti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#56227
Signed-off-by: ti-chi-bot <[email protected]>
1 parent e329890 commit ff8f713

File tree

4 files changed

+304
-3
lines changed

4 files changed

+304
-3
lines changed

pkg/executor/test/tiflashtest/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12+
<<<<<<< HEAD
1213
shard_count = 39,
14+
=======
15+
shard_count = 45,
16+
>>>>>>> 8df006280e9 (planner: make converge index merge path feel the prefer tiflash hint (#56227))
1317
deps = [
1418
"//pkg/config",
1519
"//pkg/domain",

pkg/executor/test/tiflashtest/tiflash_test.go

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1836,3 +1836,283 @@ func checkMPPInExplain(t *testing.T, tk *testkit.TestKit, sql string) {
18361836
res := resBuff.String()
18371837
require.Contains(t, res, "mpp[tiflash]")
18381838
}
1839+
<<<<<<< HEAD
1840+
=======
1841+
1842+
func TestMPPRecovery(t *testing.T) {
1843+
store := testkit.CreateMockStore(t, withMockTiFlash(2))
1844+
tk := testkit.NewTestKit(t, store)
1845+
tk.MustExec("use test")
1846+
1847+
tk.MustExec("create table t(a int, b int)")
1848+
tk.MustExec("alter table t set tiflash replica 1")
1849+
tb := external.GetTableByName(t, tk, "test", "t")
1850+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1851+
require.NoError(t, err)
1852+
1853+
checkStrs := []string{"0 0"}
1854+
insertStr := "insert into t values(0, 0)"
1855+
for i := 1; i < 1500; i++ {
1856+
insertStr += fmt.Sprintf(",(%d, %d)", i, i)
1857+
checkStrs = append(checkStrs, fmt.Sprintf("%d %d", i, i))
1858+
}
1859+
tk.MustExec(insertStr)
1860+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1861+
sql := "select * from t order by 1, 2"
1862+
const packagePath = "github.com/pingcap/tidb/pkg/executor/internal/mpp/"
1863+
1864+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_mock_enable", "return()"))
1865+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_ignore_recovery_err", "return()"))
1866+
// Test different chunk size. And force one mpp err.
1867+
{
1868+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_max_err_times", "return(1)"))
1869+
1870+
tk.MustExec("set @@tidb_max_chunk_size = default")
1871+
tk.MustQuery(sql).Check(testkit.Rows(checkStrs...))
1872+
tk.MustExec("set @@tidb_max_chunk_size = 32")
1873+
tk.MustQuery(sql).Check(testkit.Rows(checkStrs...))
1874+
1875+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_max_err_times"))
1876+
}
1877+
1878+
// Test exceeds max recovery times. Default max times is 3.
1879+
{
1880+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_max_err_times", "return(5)"))
1881+
1882+
tk.MustExec("set @@tidb_max_chunk_size = 32")
1883+
err := tk.QueryToErr(sql)
1884+
strings.Contains(err.Error(), "mock mpp err")
1885+
1886+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_max_err_times"))
1887+
}
1888+
1889+
{
1890+
// When AllowFallbackToTiKV, mpp err recovery should be disabled.
1891+
// So event we inject mock err multiple times, the query should be ok.
1892+
tk.MustExec("set @@tidb_allow_fallback_to_tikv = \"tiflash\"")
1893+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_max_err_times", "return(5)"))
1894+
1895+
tk.MustExec("set @@tidb_max_chunk_size = 32")
1896+
tk.MustQuery(sql).Check(testkit.Rows(checkStrs...))
1897+
1898+
tk.MustExec("set @@tidb_allow_fallback_to_tikv = default")
1899+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_max_err_times"))
1900+
}
1901+
1902+
// Test hold logic. Default hold 4 * MaxChunkSize rows.
1903+
{
1904+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_max_err_times", "return(0)"))
1905+
1906+
tk.MustExec("set @@tidb_max_chunk_size = 32")
1907+
expectedHoldSize := 2
1908+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_hold_size", fmt.Sprintf("1*return(%d)", expectedHoldSize)))
1909+
tk.MustQuery(sql).Check(testkit.Rows(checkStrs...))
1910+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_hold_size"))
1911+
1912+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_max_err_times"))
1913+
}
1914+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_ignore_recovery_err"))
1915+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_mock_enable"))
1916+
1917+
{
1918+
// We have 2 mock tiflash, but the table is small, so only 1 tiflash node is in computation.
1919+
require.NoError(t, failpoint.Enable(packagePath+"mpp_recovery_test_check_node_cnt", "return(1)"))
1920+
1921+
tk.MustExec("set @@tidb_max_chunk_size = 32")
1922+
tk.MustQuery(sql).Check(testkit.Rows(checkStrs...))
1923+
1924+
require.NoError(t, failpoint.Disable(packagePath+"mpp_recovery_test_check_node_cnt"))
1925+
}
1926+
1927+
tk.MustExec("set @@tidb_max_chunk_size = default")
1928+
}
1929+
1930+
func TestIssue50358(t *testing.T) {
1931+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1932+
tk := testkit.NewTestKit(t, store)
1933+
tk.MustExec("use test")
1934+
tk.MustExec("drop table if exists t")
1935+
tk.MustExec("create table t(a int not null primary key, b int not null)")
1936+
tk.MustExec("alter table t set tiflash replica 1")
1937+
tb := external.GetTableByName(t, tk, "test", "t")
1938+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1939+
require.NoError(t, err)
1940+
tk.MustExec("insert into t values(1,0)")
1941+
tk.MustExec("insert into t values(2,0)")
1942+
1943+
tk.MustExec("drop table if exists t1")
1944+
tk.MustExec("create table t1(c int not null primary key)")
1945+
tk.MustExec("alter table t1 set tiflash replica 1")
1946+
tb = external.GetTableByName(t, tk, "test", "t1")
1947+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1948+
require.NoError(t, err)
1949+
tk.MustExec("insert into t1 values(3)")
1950+
1951+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1952+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
1953+
for i := 0; i < 20; i++ {
1954+
// test if it is stable.
1955+
tk.MustQuery("select 8 from t join t1").Check(testkit.Rows("8", "8"))
1956+
}
1957+
}
1958+
1959+
func TestMppAggShouldAlignFinalMode(t *testing.T) {
1960+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
1961+
tk := testkit.NewTestKit(t, store)
1962+
tk.MustExec("use test")
1963+
tk.MustExec("create table t (" +
1964+
" d date," +
1965+
" v int," +
1966+
" primary key(d, v)" +
1967+
") partition by range columns (d) (" +
1968+
" partition p1 values less than ('2023-07-02')," +
1969+
" partition p2 values less than ('2023-07-03')" +
1970+
");")
1971+
tk.MustExec("alter table t set tiflash replica 1")
1972+
tb := external.GetTableByName(t, tk, "test", "t")
1973+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
1974+
require.NoError(t, err)
1975+
tk.MustExec(`set tidb_partition_prune_mode='static';`)
1976+
err = failpoint.Enable("github.com/pingcap/tidb/pkg/expression/aggregation/show-agg-mode", "return(true)")
1977+
require.Nil(t, err)
1978+
1979+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
1980+
tk.MustQuery("explain format='brief' select 1 from (" +
1981+
" select /*+ read_from_storage(tiflash[t]) */ sum(1)" +
1982+
" from t where d BETWEEN '2023-07-01' and '2023-07-03' group by d" +
1983+
") total;").Check(testkit.Rows("Projection 400.00 root 1->Column#4",
1984+
"└─HashAgg 400.00 root group by:test.t.d, funcs:count(complete,1)->Column#8",
1985+
" └─PartitionUnion 400.00 root ",
1986+
" ├─Projection 200.00 root test.t.d",
1987+
" │ └─HashAgg 200.00 root group by:test.t.d, funcs:firstrow(partial2,test.t.d)->test.t.d, funcs:count(final,Column#12)->Column#9",
1988+
" │ └─TableReader 200.00 root MppVersion: 2, data:ExchangeSender",
1989+
" │ └─ExchangeSender 200.00 mpp[tiflash] ExchangeType: PassThrough",
1990+
" │ └─HashAgg 200.00 mpp[tiflash] group by:test.t.d, funcs:count(partial1,1)->Column#12",
1991+
" │ └─TableRangeScan 250.00 mpp[tiflash] table:t, partition:p1 range:[2023-07-01,2023-07-03], keep order:false, stats:pseudo",
1992+
" └─Projection 200.00 root test.t.d",
1993+
" └─HashAgg 200.00 root group by:test.t.d, funcs:firstrow(partial2,test.t.d)->test.t.d, funcs:count(final,Column#14)->Column#10",
1994+
" └─TableReader 200.00 root MppVersion: 2, data:ExchangeSender",
1995+
" └─ExchangeSender 200.00 mpp[tiflash] ExchangeType: PassThrough",
1996+
" └─HashAgg 200.00 mpp[tiflash] group by:test.t.d, funcs:count(partial1,1)->Column#14",
1997+
" └─TableRangeScan 250.00 mpp[tiflash] table:t, partition:p2 range:[2023-07-01,2023-07-03], keep order:false, stats:pseudo"))
1998+
1999+
err = failpoint.Disable("github.com/pingcap/tidb/pkg/expression/aggregation/show-agg-mode")
2000+
require.Nil(t, err)
2001+
}
2002+
2003+
func TestMppTableReaderCacheForSingleSQL(t *testing.T) {
2004+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2005+
tk := testkit.NewTestKit(t, store)
2006+
tk.MustExec("use test")
2007+
tk.MustExec("create table t(a int, b int, primary key(a))")
2008+
tk.MustExec("alter table t set tiflash replica 1")
2009+
tb := external.GetTableByName(t, tk, "test", "t")
2010+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2011+
require.NoError(t, err)
2012+
2013+
tk.MustExec("create table t2(a int, b int) partition by hash(b) partitions 4")
2014+
tk.MustExec("alter table t2 set tiflash replica 1")
2015+
tb = external.GetTableByName(t, tk, "test", "t2")
2016+
err = domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2017+
require.NoError(t, err)
2018+
tk.MustExec("insert into t values(1, 1)")
2019+
tk.MustExec("insert into t values(2, 2)")
2020+
tk.MustExec("insert into t values(3, 3)")
2021+
tk.MustExec("insert into t values(4, 4)")
2022+
tk.MustExec("insert into t values(5, 5)")
2023+
2024+
tk.MustExec("insert into t2 values(1, 1)")
2025+
tk.MustExec("insert into t2 values(2, 2)")
2026+
tk.MustExec("insert into t2 values(3, 3)")
2027+
tk.MustExec("insert into t2 values(4, 4)")
2028+
tk.MustExec("insert into t2 values(5, 5)")
2029+
2030+
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
2031+
tk.MustExec("set @@session.tidb_allow_mpp=ON")
2032+
tk.MustExec("set @@session.tidb_enforce_mpp=ON")
2033+
tk.MustExec("set @@session.tidb_max_chunk_size=32")
2034+
2035+
// Test TableReader cache for single SQL.
2036+
type testCase struct {
2037+
sql string
2038+
expectHitNum int32
2039+
expectMissNum int32
2040+
}
2041+
2042+
testCases := []testCase{
2043+
// Non-Partition
2044+
// Cache hit
2045+
{"select * from t", 0, 1},
2046+
{"select * from t union select * from t", 1, 1},
2047+
{"select * from t union select * from t t1 union select * from t t2", 2, 1},
2048+
{"select * from t where b <= 3 union select * from t where b > 3", 1, 1}, // both full range
2049+
{"select * from t where a <= 3 union select * from t where a <= 3", 1, 1}, // same range
2050+
{"select * from t t1 join t t2 on t1.b=t2.b", 1, 1},
2051+
2052+
// Cache miss
2053+
{"select * from t union all select * from t", 0, 2}, // different mpp task root
2054+
{"select * from t where a <= 3 union select * from t where a > 3", 0, 2}, // different range
2055+
2056+
// Partition
2057+
// Cache hit
2058+
{"select * from t2 union select * from t2", 1, 1},
2059+
{"select * from t2 where b = 1 union select * from t2 where b = 5", 1, 1}, // same partition, full range
2060+
{"select * from t2 where b = 1 and a < 3 union select * from t2 where b = 5 and a < 3", 1, 1}, // same partition, same range
2061+
{"select * from t2 t1 join t2 t2 on t1.b=t2.b", 1, 1},
2062+
{"select * from t2 t1 join t2 t2 on t1.b=t2.b where t1.a = 2 and t2.a = 2", 1, 1},
2063+
2064+
// Cache miss
2065+
{"select * from t2 union select * from t2 where b = 1", 0, 2}, // different partition
2066+
{"select * from t2 where b = 2 union select * from t2 where b = 1", 0, 2}, // different partition
2067+
}
2068+
2069+
var hitNum, missNum atomic.Int32
2070+
hitFunc := func() {
2071+
hitNum.Add(1)
2072+
}
2073+
missFunc := func() {
2074+
missNum.Add(1)
2075+
}
2076+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/planner/core/mppTaskGeneratorTableReaderCacheHit", hitFunc)
2077+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/planner/core/mppTaskGeneratorTableReaderCacheMiss", missFunc)
2078+
for _, tc := range testCases {
2079+
hitNum.Store(0)
2080+
missNum.Store(0)
2081+
tk.MustQuery(tc.sql)
2082+
require.Equal(t, tc.expectHitNum, hitNum.Load())
2083+
require.Equal(t, tc.expectMissNum, missNum.Load())
2084+
}
2085+
}
2086+
2087+
func TestIndexMergeCarePreferTiflash(t *testing.T) {
2088+
store := testkit.CreateMockStore(t, withMockTiFlash(1))
2089+
tk := testkit.NewTestKit(t, store)
2090+
tk.MustExec("use test")
2091+
2092+
tk.MustExec("drop table if exists t")
2093+
tk.MustExec("CREATE TABLE `t` (" +
2094+
"`i` bigint(20) NOT NULL, " +
2095+
"`w` varchar(32) NOT NULL," +
2096+
"`l` varchar(32) NOT NULL," +
2097+
"`a` tinyint(4) NOT NULL DEFAULT '0'," +
2098+
"`m` int(11) NOT NULL DEFAULT '0'," +
2099+
"`s` int(11) NOT NULL DEFAULT '0'," +
2100+
"PRIMARY KEY (`i`) /*T![clustered_index] NONCLUSTERED */," +
2101+
"KEY `idx_win_user_site_code` (`w`,`m`)," +
2102+
"KEY `idx_lose_user_site_code` (`l`,`m`)," +
2103+
"KEY `idx_win_site_code_status` (`w`,`a`)," +
2104+
"KEY `idx_lose_site_code_status` (`l`,`a`)" +
2105+
")")
2106+
tk.MustExec("alter table t set tiflash replica 1")
2107+
tb := external.GetTableByName(t, tk, "test", "t")
2108+
err := domain.GetDomain(tk.Session()).DDLExecutor().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
2109+
require.NoError(t, err)
2110+
tk.MustQuery("explain format=\"brief\" SELECT" +
2111+
" /*+ read_from_storage(tiflash[a]) */ a.i FROM t a WHERE a.s = 0 AND a.a NOT IN (-1, 0) AND m >= 1726910326 AND m <= 1726910391 AND ( a.w IN ('1123') OR a.l IN ('1123'))").Check(
2112+
testkit.Rows("TableReader 0.00 root MppVersion: 2, data:ExchangeSender",
2113+
"└─ExchangeSender 0.00 mpp[tiflash] ExchangeType: PassThrough",
2114+
" └─Projection 0.00 mpp[tiflash] test.t.i",
2115+
" └─Selection 0.00 mpp[tiflash] ge(test.t.m, 1726910326), le(test.t.m, 1726910391), not(in(test.t.a, -1, 0)), or(eq(test.t.w, \"1123\"), eq(test.t.l, \"1123\"))",
2116+
" └─TableFullScan 10.00 mpp[tiflash] table:a pushed down filter:eq(test.t.s, 0), keep order:false, stats:pseudo"))
2117+
}
2118+
>>>>>>> 8df006280e9 (planner: make converge index merge path feel the prefer tiflash hint (#56227))

pkg/planner/core/casetest/physicalplantest/testdata/plan_suite_out.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,11 +406,11 @@
406406
"└─ExchangeSender 4439.11 mpp[tiflash] ExchangeType: PassThrough",
407407
" └─Projection 4439.11 mpp[tiflash] test.t.a, Column#5",
408408
" └─Projection 4439.11 mpp[tiflash] Column#5, test.t.a",
409-
" └─HashAgg 4439.11 mpp[tiflash] group by:test.t.a, test.t.c, funcs:sum(Column#14)->Column#5, funcs:firstrow(test.t.a)->test.t.a",
409+
" └─HashAgg 4439.11 mpp[tiflash] group by:test.t.a, test.t.c, funcs:sum(Column#8)->Column#5, funcs:firstrow(test.t.a)->test.t.a",
410410
" └─ExchangeReceiver 4439.11 mpp[tiflash] ",
411411
" └─ExchangeSender 4439.11 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t.a, collate: binary], [name: test.t.c, collate: binary]",
412-
" └─HashAgg 4439.11 mpp[tiflash] group by:Column#17, Column#18, funcs:sum(Column#16)->Column#14",
413-
" └─Projection 5548.89 mpp[tiflash] cast(test.t.b, decimal(10,0) BINARY)->Column#16, test.t.a->Column#17, test.t.c->Column#18",
412+
" └─HashAgg 4439.11 mpp[tiflash] group by:Column#11, Column#12, funcs:sum(Column#10)->Column#8",
413+
" └─Projection 5548.89 mpp[tiflash] cast(test.t.b, decimal(10,0) BINARY)->Column#10, test.t.a->Column#11, test.t.c->Column#12",
414414
" └─Selection 5548.89 mpp[tiflash] or(lt(test.t.b, 2), gt(test.t.a, 2))",
415415
" └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo"
416416
],

pkg/planner/core/find_best_task.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,15 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
11371137
for _, candidate := range candidates {
11381138
path := candidate.path
11391139
if path.PartialIndexPaths != nil {
1140+
<<<<<<< HEAD
11401141
idxMergeTask, err := ds.convertToIndexMergeScan(prop, candidate, opt)
1142+
=======
1143+
// prefer tiflash, while current table path is tikv, skip it.
1144+
if ds.PreferStoreType&h.PreferTiFlash != 0 && path.StoreType == kv.TiKV {
1145+
continue
1146+
}
1147+
idxMergeTask, err := convertToIndexMergeScan(ds, prop, candidate, opt)
1148+
>>>>>>> 8df006280e9 (planner: make converge index merge path feel the prefer tiflash hint (#56227))
11411149
if err != nil {
11421150
return nil, 0, err
11431151
}
@@ -1263,10 +1271,19 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
12631271
}
12641272
}
12651273
if path.IsTablePath() {
1274+
<<<<<<< HEAD
12661275
if ds.preferStoreType&preferTiFlash != 0 && path.StoreType == kv.TiKV {
12671276
continue
12681277
}
12691278
if ds.preferStoreType&preferTiKV != 0 && path.StoreType == kv.TiFlash {
1279+
=======
1280+
// prefer tiflash, while current table path is tikv, skip it.
1281+
if ds.PreferStoreType&h.PreferTiFlash != 0 && path.StoreType == kv.TiKV {
1282+
continue
1283+
}
1284+
// prefer tikv, while current table path is tiflash, skip it.
1285+
if ds.PreferStoreType&h.PreferTiKV != 0 && path.StoreType == kv.TiFlash {
1286+
>>>>>>> 8df006280e9 (planner: make converge index merge path feel the prefer tiflash hint (#56227))
12701287
continue
12711288
}
12721289
var tblTask task

0 commit comments

Comments
 (0)