Skip to content

Commit dc33637

Browse files
authored
Support graceful shutdown in TiFlash again (#10299)
close #10266 Support graceful shutdown in TiFlash again Signed-off-by: gengliqi <[email protected]>
1 parent b153a8f commit dc33637

File tree

4 files changed

+48
-1
lines changed

4 files changed

+48
-1
lines changed

dbms/src/Flash/FlashService.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ grpc::Status FlashService::IsAlive(
564564
return check_result;
565565

566566
auto & tmt_context = context->getTMTContext();
567-
response->set_available(tmt_context.checkRunning());
567+
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
568568
response->set_mpp_version(DB::GetMppVersion());
569569
return grpc::Status::OK;
570570
}

dbms/src/Flash/Mpp/MPPTaskManager.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <Common/FailPoint.h>
1616
#include <Common/FmtUtils.h>
17+
#include <Common/Stopwatch.h>
1718
#include <Common/TiFlashMetrics.h>
1819
#include <Flash/Coprocessor/DAGContext.h>
1920
#include <Flash/EstablishCall.h>
@@ -83,6 +84,39 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
8384
return ptr;
8485
}
8586

87+
void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
88+
{
89+
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
90+
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
91+
// The default value of flash.graceful_wait_before_shutdown
92+
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
93+
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
94+
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
95+
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
96+
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
97+
Stopwatch watch;
98+
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
99+
std::this_thread::sleep_for(std::chrono::seconds(1));
100+
while (true)
101+
{
102+
auto elapsed_ms = watch.elapsedMilliseconds();
103+
{
104+
std::unique_lock lock(mu);
105+
if (monitored_tasks.empty())
106+
{
107+
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
108+
break;
109+
}
110+
}
111+
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
112+
{
113+
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
114+
break;
115+
}
116+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
117+
}
118+
}
119+
86120
MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
87121
: scheduler(std::move(scheduler_))
88122
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)

dbms/src/Flash/Mpp/MPPTaskManager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ struct MPPTaskMonitor
194194
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
195195
}
196196

197+
void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);
198+
197199
std::mutex mu;
198200
std::condition_variable cv;
199201
bool is_shutdown = false;
@@ -221,6 +223,8 @@ class MPPTaskManager : private boost::noncopyable
221223

222224
std::shared_ptr<MPPTaskMonitor> monitor;
223225

226+
std::atomic<bool> is_available{true};
227+
224228
public:
225229
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);
226230

@@ -273,6 +277,9 @@ class MPPTaskManager : private boost::noncopyable
273277

274278
bool isTaskExists(const MPPTaskId & id);
275279

280+
void setUnavailable() { is_available = false; }
281+
bool isAvailable() { return is_available; }
282+
276283
private:
277284
MPPQueryPtr addMPPQuery(
278285
const MPPQueryId & query_id,

dbms/src/Server/Server.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,6 +1252,12 @@ try
12521252
LOG_INFO(log, "Start to wait for terminal signal");
12531253
waitForTerminationRequest();
12541254

1255+
// Note: `waitAllMPPTasksFinish` must be called before stopping the proxy.
1256+
// Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully.
1257+
LOG_INFO(log, "Set unavailable for MPPTask");
1258+
tmt_context.getMPPTaskManager()->setUnavailable();
1259+
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);
1260+
12551261
{
12561262
// Set limiters stopping and wakeup threads in waitting queue.
12571263
global_context->getIORateLimiter().setStop();

0 commit comments

Comments
 (0)