Skip to content

Commit 731e684

Browse files
authored
ensure Event::onTaskFinish is called to avoid pipeline task hang (#10335)
close #10334 Signed-off-by: guo-shaoge <[email protected]>
1 parent b155ed6 commit 731e684

File tree

6 files changed

+71
-12
lines changed

6 files changed

+71
-12
lines changed

dbms/src/Flash/Executor/tests/gtest_pipeline_executor_context.cpp

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,46 @@
1616
#include <Common/ThreadManager.h>
1717
#include <Flash/Executor/PipelineExecutorContext.h>
1818
#include <Flash/Executor/ResultQueue.h>
19+
#include <TestUtils/ExecutorTestUtils.h>
20+
#include <TestUtils/FailPointUtils.h>
1921
#include <TestUtils/TiFlashTestBasic.h>
2022
#include <gtest/gtest.h>
2123

2224
namespace DB::tests
2325
{
24-
class PipelineExecutorContextTestRunner : public ::testing::Test
26+
class PipelineExecutorContextTestRunner : public ExecutorTest
2527
{
28+
public:
29+
~PipelineExecutorContextTestRunner() override = default;
2630
};
2731

32+
TEST_F(PipelineExecutorContextTestRunner, suffixExceptionTest)
33+
try
34+
{
35+
context.addMockTable(
36+
"simple_test",
37+
"t1",
38+
{{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}},
39+
{toNullableVec<String>("a", {"1"}), toNullableVec<String>("b", {"3"})});
40+
41+
auto req = context.scan("simple_test", "t1").aggregation({Count(col("a"))}, {col("a")}).build(context);
42+
43+
const auto failpoints = std::vector{
44+
"random_pipeline_model_execute_suffix_failpoint-1",
45+
"random_pipeline_model_execute_prefix_failpoint-1"};
46+
47+
for (const auto & fp : failpoints)
48+
{
49+
auto config_str = fmt::format("[flash]\nrandom_fail_points = \"{}\"", fp);
50+
initRandomFailPoint(config_str);
51+
enablePipeline(true);
52+
// Expect this case throw failpoint instead of stuck.
53+
ASSERT_THROW(executeStreams(req, 1), Exception);
54+
disableRandomFailPoint(config_str);
55+
}
56+
}
57+
CATCH
58+
2859
TEST_F(PipelineExecutorContextTestRunner, waitTimeout)
2960
try
3061
{

dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void PipelineExec::executePrefix()
9292
for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert)
9393
(*it)->operatePrefix();
9494
source_op->operatePrefix();
95-
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_suffix_failpoint);
95+
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_prefix_failpoint);
9696
}
9797

9898
void PipelineExec::executeSuffix()
@@ -101,6 +101,7 @@ void PipelineExec::executeSuffix()
101101
for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert)
102102
(*it)->operateSuffix();
103103
source_op->operateSuffix();
104+
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_suffix_failpoint);
104105
}
105106

106107
void PipelineExec::notify()

dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/EventTask.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ EventTask::EventTask(
3737

3838
void EventTask::finalizeImpl()
3939
{
40+
SCOPE_EXIT({
41+
event->onTaskFinish(profile_info);
42+
event.reset();
43+
});
44+
4045
doFinalizeImpl();
41-
event->onTaskFinish(profile_info);
42-
event.reset();
4346
}
4447

4548
UInt64 EventTask::getScheduleDuration() const

dbms/src/Flash/tests/gtest_compute_server.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1329,8 +1329,14 @@ try
13291329
"random_pipeline_model_cancel_failpoint-0.8",
13301330
"random_pipeline_model_execute_prefix_failpoint-1.0",
13311331
"random_pipeline_model_execute_suffix_failpoint-1.0"};
1332+
auto log = Logger::get();
13321333
for (const auto & failpoint : failpoints)
13331334
{
1335+
// Cancel test will make MockTableScanBlockInputStream output infinite blocks,
1336+
// but suffix failpoint will have to be triggered when table scan finish.
1337+
// So for suffix related failpoint, make it non cancel test.
1338+
const bool is_cancel_test = !failpoint.contains("suffix");
1339+
LOG_DEBUG(log, "running failpoint: {}, is_cancel_test: {}", failpoint, is_cancel_test);
13341340
auto config_str = fmt::format("[flash]\nrandom_fail_points = \"{}\"", failpoint);
13351341
initRandomFailPoint(config_str);
13361342
auto properties = DB::tests::getDAGPropertiesForTest(serverNum());
@@ -1349,7 +1355,8 @@ try
13491355
.join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})
13501356
.aggregation({Max(col("l_table.s"))}, {col("l_table.s")})
13511357
.project({col("max(l_table.s)"), col("l_table.s")}),
1352-
properties);
1358+
properties,
1359+
is_cancel_test);
13531360
}
13541361
catch (...)
13551362
{

dbms/src/TestUtils/MPPTaskTestUtils.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,31 @@ void MPPTaskTestUtils::setCancelTest()
9797
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
9898
}
9999

100-
BlockInputStreamPtr MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder, const DAGProperties & properties)
100+
BlockInputStreamPtr MPPTaskTestUtils::prepareMPPStreams(
101+
DAGRequestBuilder builder,
102+
const DAGProperties & properties,
103+
bool is_cancel_test)
101104
{
102-
auto tasks = prepareMPPTasks(builder, properties);
105+
auto tasks = prepareMPPTasks(builder, properties, is_cancel_test);
103106
return executeMPPQueryWithMultipleContext(
104107
properties,
105108
tasks,
106109
MockComputeServerManager::instance().getServerConfigMap());
107110
}
108111

109-
std::vector<QueryTask> MPPTaskTestUtils::prepareMPPTasks(DAGRequestBuilder builder, const DAGProperties & properties)
112+
std::vector<QueryTask> MPPTaskTestUtils::prepareMPPTasks(
113+
DAGRequestBuilder builder,
114+
const DAGProperties & properties,
115+
bool is_cancel_test)
110116
{
111117
std::lock_guard lock(mu);
112118
auto tasks = builder.buildMPPTasks(context, properties);
113-
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
114-
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
119+
if (is_cancel_test)
120+
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
121+
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
122+
else
123+
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
124+
TiFlashTestEnv::getGlobalContext(i).setMPPTest();
115125
return tasks;
116126
}
117127

dbms/src/TestUtils/MPPTaskTestUtils.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,17 @@ class MPPTaskTestUtils : public ExecutorTest
7676
static size_t serverNum();
7777

7878
// run mpp tasks which are ready to cancel, the return value is the start_ts of query.
79-
BlockInputStreamPtr prepareMPPStreams(DAGRequestBuilder builder, const DAGProperties & properties);
79+
// if is_cancel_test is true, mock table scan will be infinite.
80+
BlockInputStreamPtr prepareMPPStreams(
81+
DAGRequestBuilder builder,
82+
const DAGProperties & properties,
83+
bool is_cancel_test = true);
8084

8185
// prepareMPPTasks is not thread safe, the builder's executor_index(which is ref to context's index) is updated during this process
82-
std::vector<QueryTask> prepareMPPTasks(DAGRequestBuilder builder, const DAGProperties & properties);
86+
std::vector<QueryTask> prepareMPPTasks(
87+
DAGRequestBuilder builder,
88+
const DAGProperties & properties,
89+
bool is_cancel_test = true);
8390

8491
// prepareMPPTasks is thread safe
8592
std::vector<QueryTask> prepareMPPTasks(

0 commit comments

Comments
 (0)