Skip to content

Commit ba7f5d8

Browse files
authored
[Improvement]Log be thread num (#37289)
1 parent 2c12d2b commit ba7f5d8

File tree

10 files changed

+208
-0
lines changed

10 files changed

+208
-0
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,9 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
11651165
// cgroup
11661166
DEFINE_mString(doris_cgroup_cpu_path, "");
11671167

1168+
DEFINE_mBool(enable_be_proc_monitor, "false");
1169+
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
1170+
11681171
DEFINE_mBool(enable_workload_group_memory_gc, "true");
11691172

11701173
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,8 @@ DECLARE_mBool(exit_on_exception);
12481248

12491249
// cgroup
12501250
DECLARE_mString(doris_cgroup_cpu_path);
1251+
DECLARE_mBool(enable_be_proc_monitor);
1252+
DECLARE_mInt32(be_proc_monitor_interval_ms);
12511253

12521254
DECLARE_mBool(enable_workload_group_memory_gc);
12531255

be/src/common/daemon.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "olap/options.h"
4949
#include "olap/storage_engine.h"
5050
#include "olap/tablet_manager.h"
51+
#include "runtime/be_proc_monitor.h"
5152
#include "runtime/client_cache.h"
5253
#include "runtime/exec_env.h"
5354
#include "runtime/fragment_mgr.h"
@@ -399,6 +400,13 @@ void Daemon::wg_mem_used_refresh_thread() {
399400
}
400401
}
401402

403+
void Daemon::be_proc_monitor_thread() {
404+
while (!_stop_background_threads_latch.wait_for(
405+
std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
406+
LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info();
407+
}
408+
}
409+
402410
void Daemon::start() {
403411
Status st;
404412
st = Thread::create(
@@ -435,6 +443,12 @@ void Daemon::start() {
435443
st = Thread::create(
436444
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
437445
&_threads.emplace_back());
446+
447+
if (config::enable_be_proc_monitor) {
448+
st = Thread::create(
449+
"Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); },
450+
&_threads.emplace_back());
451+
}
438452
CHECK(st.ok()) << st;
439453
}
440454

be/src/common/daemon.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class Daemon {
4545
void je_purge_dirty_pages_thread() const;
4646
void report_runtime_query_statistics_thread();
4747
void wg_mem_used_refresh_thread();
48+
void be_proc_monitor_thread();
4849

4950
CountDownLatch _stop_background_threads_latch;
5051
std::vector<scoped_refptr<Thread>> _threads;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "http/action/be_proc_thread_action.h"
19+
20+
#include "http/http_channel.h"
21+
#include "http/http_headers.h"
22+
#include "http/http_status.h"
23+
#include "runtime/be_proc_monitor.h"
24+
25+
namespace doris {
26+
27+
const static std::string HEADER_JSON = "application/json";
28+
29+
void BeProcThreadAction::handle(HttpRequest* req) {
30+
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
31+
HttpChannel::send_reply(req, HttpStatus::OK, BeProcMonitor::get_be_thread_info());
32+
}
33+
34+
}; // namespace doris
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
#pragma once
18+
19+
#include "http/http_handler.h"
20+
#include "http/http_request.h"
21+
22+
namespace doris {
23+
24+
class HttpRequest;
25+
26+
class BeProcThreadAction : public HttpHandler {
27+
public:
28+
BeProcThreadAction() = default;
29+
~BeProcThreadAction() override = default;
30+
void handle(HttpRequest* req) override;
31+
};
32+
33+
}; // namespace doris

be/src/runtime/be_proc_monitor.cpp

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "runtime/be_proc_monitor.h"
19+
20+
#include <fmt/format.h>
21+
#include <glog/logging.h>
22+
#include <sys/stat.h>
23+
#include <unistd.h>
24+
25+
#include <deque>
26+
#include <filesystem>
27+
#include <fstream>
28+
#include <map>
29+
#include <nlohmann/json.hpp>
30+
31+
std::string BeProcMonitor::get_be_thread_info() {
32+
int32_t pid = getpid();
33+
std::string proc_path = fmt::format("/proc/{}/task", pid);
34+
if (access(proc_path.c_str(), F_OK) != 0) {
35+
LOG(WARNING) << "be proc path " << proc_path << " not exists.";
36+
return "";
37+
}
38+
39+
std::map<std::string, int> thread_num_map;
40+
41+
int total_thread_count = 0;
42+
int distinct_thread_name_count = 0;
43+
for (const auto& entry : std::filesystem::directory_iterator(proc_path)) {
44+
const std::string tid_path = entry.path().string();
45+
std::string thread_name_path = tid_path + "/comm";
46+
struct stat st;
47+
// == 0 means exists
48+
if (stat(thread_name_path.c_str(), &st) == 0) {
49+
// NOTE: there is no need to close std::ifstream, it's called during deconstruction.
50+
// refer:https://stackoverflow.com/questions/748014/do-i-need-to-manually-close-an-ifstream
51+
std::ifstream file(thread_name_path.c_str());
52+
if (!file.is_open()) {
53+
continue;
54+
}
55+
std::stringstream str_buf;
56+
str_buf << file.rdbuf();
57+
std::string thread_name = str_buf.str();
58+
thread_name.erase(std::remove(thread_name.begin(), thread_name.end(), '\n'),
59+
thread_name.end());
60+
61+
if (thread_num_map.find(thread_name) != thread_num_map.end()) {
62+
thread_num_map[thread_name]++;
63+
} else {
64+
distinct_thread_name_count++;
65+
thread_num_map.emplace(thread_name, 1);
66+
}
67+
total_thread_count++;
68+
}
69+
}
70+
71+
std::deque<std::pair<std::string, int>> ordered_list(thread_num_map.begin(),
72+
thread_num_map.end());
73+
std::sort(ordered_list.begin(), ordered_list.end(),
74+
[](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; });
75+
76+
ordered_list.push_front(
77+
std::make_pair("distinct_thread_name_count", distinct_thread_name_count));
78+
ordered_list.push_front(std::make_pair("total_thread_count", total_thread_count));
79+
ordered_list.push_front(std::make_pair("be_process_id", pid));
80+
81+
nlohmann::json js = nlohmann::json::array();
82+
for (const auto& p : ordered_list) {
83+
js.push_back({p.first, p.second});
84+
}
85+
86+
std::string output_json_str = js.dump();
87+
return output_json_str;
88+
}

be/src/runtime/be_proc_monitor.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Currently BeProcMonitor used to read proc/<pid>/task/ and log be's thread num, we can find
19+
// which logic cost too much thread when BE core because of thread exhaustion.
20+
#include <string>
21+
class BeProcMonitor {
22+
public:
23+
static std::string get_be_thread_info();
24+
};

be/src/service/http_service.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "common/status.h"
3131
#include "http/action/adjust_log_level.h"
3232
#include "http/action/adjust_tracing_dump.h"
33+
#include "http/action/be_proc_thread_action.h"
3334
#include "http/action/calc_file_crc_action.h"
3435
#include "http/action/check_rpc_channel_action.h"
3536
#include "http/action/check_tablet_segment_action.h"
@@ -175,6 +176,11 @@ Status HttpService::start() {
175176
_ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}",
176177
query_pipeline_task_action);
177178

179+
// Dump all be process thread num
180+
BeProcThreadAction* be_proc_thread_action = _pool.add(new BeProcThreadAction());
181+
_ev_http_server->register_handler(HttpMethod::GET, "/api/be_process_thread_num",
182+
be_proc_thread_action);
183+
178184
// Register BE LoadStream action
179185
LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
180186
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action);

regression-test/pipeline/p0/conf/be.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,6 @@ trino_connector_plugin_dir=/tmp/trino_connector/connectors
6464

6565
enable_jvm_monitor = true
6666

67+
enable_be_proc_monitor = true
68+
be_proc_monitor_interval_ms = 30000
69+

0 commit comments

Comments
 (0)