Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,46 @@
#include <Common/ThreadManager.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <TestUtils/ExecutorTestUtils.h>
#include <TestUtils/FailPointUtils.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>

namespace DB::tests
{
class PipelineExecutorContextTestRunner : public ::testing::Test
class PipelineExecutorContextTestRunner : public ExecutorTest
{
public:
~PipelineExecutorContextTestRunner() override = default;
};

TEST_F(PipelineExecutorContextTestRunner, suffixExceptionTest)
try
{
context.addMockTable(
"simple_test",
"t1",
{{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}},
{toNullableVec<String>("a", {"1"}), toNullableVec<String>("b", {"3"})});

auto req = context.scan("simple_test", "t1").aggregation({Count(col("a"))}, {col("a")}).build(context);

const auto failpoints = std::vector{
"random_pipeline_model_execute_suffix_failpoint-1",
"random_pipeline_model_execute_prefix_failpoint-1"};

for (const auto & fp : failpoints)
{
auto config_str = fmt::format("[flash]\nrandom_fail_points = \"{}\"", fp);
initRandomFailPoint(config_str);
enablePipeline(true);
// Expect this case throw failpoint instead of stuck.
ASSERT_THROW(executeStreams(req, 1), Exception);
disableRandomFailPoint(config_str);
}
}
CATCH

TEST_F(PipelineExecutorContextTestRunner, waitTimeout)
try
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void PipelineExec::executePrefix()
for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert)
(*it)->operatePrefix();
source_op->operatePrefix();
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_suffix_failpoint);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_prefix_failpoint);
}

void PipelineExec::executeSuffix()
Expand All @@ -101,6 +101,7 @@ void PipelineExec::executeSuffix()
for (auto it = transform_ops.rbegin(); it != transform_ops.rend(); ++it) // NOLINT(modernize-loop-convert)
(*it)->operateSuffix();
source_op->operateSuffix();
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_execute_suffix_failpoint);
}

void PipelineExec::notify()
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/EventTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ EventTask::EventTask(

void EventTask::finalizeImpl()
{
SCOPE_EXIT({
event->onTaskFinish(profile_info);
event.reset();
});

doFinalizeImpl();
event->onTaskFinish(profile_info);
event.reset();
}

UInt64 EventTask::getScheduleDuration() const
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1329,8 +1329,14 @@ try
"random_pipeline_model_cancel_failpoint-0.8",
"random_pipeline_model_execute_prefix_failpoint-1.0",
"random_pipeline_model_execute_suffix_failpoint-1.0"};
auto log = Logger::get();
for (const auto & failpoint : failpoints)
{
// Cancel test will make MockTableScanBlockInputStream output infinite blocks,
// but suffix failpoint will have to be triggered when table scan finish.
// So for suffix related failpoint, make it non cancel test.
const bool is_cancel_test = !failpoint.contains("suffix");
LOG_DEBUG(log, "running failpoint: {}, is_cancel_test: {}", failpoint, is_cancel_test);
auto config_str = fmt::format("[flash]\nrandom_fail_points = \"{}\"", failpoint);
initRandomFailPoint(config_str);
auto properties = DB::tests::getDAGPropertiesForTest(serverNum());
Expand All @@ -1349,7 +1355,8 @@ try
.join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})
.aggregation({Max(col("l_table.s"))}, {col("l_table.s")})
.project({col("max(l_table.s)"), col("l_table.s")}),
properties);
properties,
is_cancel_test);
}
catch (...)
{
Expand Down
20 changes: 15 additions & 5 deletions dbms/src/TestUtils/MPPTaskTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,31 @@ void MPPTaskTestUtils::setCancelTest()
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
}

BlockInputStreamPtr MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder, const DAGProperties & properties)
BlockInputStreamPtr MPPTaskTestUtils::prepareMPPStreams(
DAGRequestBuilder builder,
const DAGProperties & properties,
bool is_cancel_test)
{
auto tasks = prepareMPPTasks(builder, properties);
auto tasks = prepareMPPTasks(builder, properties, is_cancel_test);
return executeMPPQueryWithMultipleContext(
properties,
tasks,
MockComputeServerManager::instance().getServerConfigMap());
}

std::vector<QueryTask> MPPTaskTestUtils::prepareMPPTasks(DAGRequestBuilder builder, const DAGProperties & properties)
std::vector<QueryTask> MPPTaskTestUtils::prepareMPPTasks(
DAGRequestBuilder builder,
const DAGProperties & properties,
bool is_cancel_test)
{
std::lock_guard lock(mu);
auto tasks = builder.buildMPPTasks(context, properties);
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
if (is_cancel_test)
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
TiFlashTestEnv::getGlobalContext(i).setCancelTest();
else
for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i)
TiFlashTestEnv::getGlobalContext(i).setMPPTest();
return tasks;
}

Expand Down
11 changes: 9 additions & 2 deletions dbms/src/TestUtils/MPPTaskTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,17 @@ class MPPTaskTestUtils : public ExecutorTest
static size_t serverNum();

// run mpp tasks which are ready to cancel, the return value is the start_ts of query.
BlockInputStreamPtr prepareMPPStreams(DAGRequestBuilder builder, const DAGProperties & properties);
// if is_cancel_test is true, mock table scan will be infinite.
BlockInputStreamPtr prepareMPPStreams(
DAGRequestBuilder builder,
const DAGProperties & properties,
bool is_cancel_test = true);

// prepareMPPTasks is not thread safe, the builder's executor_index(which is ref to context's index) is updated during this process
std::vector<QueryTask> prepareMPPTasks(DAGRequestBuilder builder, const DAGProperties & properties);
std::vector<QueryTask> prepareMPPTasks(
DAGRequestBuilder builder,
const DAGProperties & properties,
bool is_cancel_test = true);

// prepareMPPTasks is thread safe
std::vector<QueryTask> prepareMPPTasks(
Expand Down