Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>

#include <fstream>
#include <memory>
#include <string>

Expand Down Expand Up @@ -70,7 +71,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr),
_query_ctx(ctx) {
Status status =
init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env);
Expand Down Expand Up @@ -117,7 +117,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr),
_query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
Expand Down Expand Up @@ -153,7 +152,6 @@ RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr),
_query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env);
_query_mem_tracker = ctx->query_mem_tracker;
Expand Down Expand Up @@ -185,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
_error_log_file(nullptr),
_query_ctx(ctx) {
// TODO: do we really need instance id?
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
Expand Down Expand Up @@ -255,22 +252,6 @@ RuntimeState::~RuntimeState() {
// close error log file
if (_error_log_file != nullptr && _error_log_file->is_open()) {
_error_log_file->close();
delete _error_log_file;
_error_log_file = nullptr;
if (_s3_error_fs) {
std::string error_log_absolute_path =
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
// upload error log file to s3
Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path);
if (st.ok()) {
// remove local error log file
std::filesystem::remove(error_log_absolute_path);
} else {
// remove local error log file later by clean_expired_temp_path thread
LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path="
<< _error_log_file_path << ", error=" << st;
}
}
}

_obj_pool->clear();
Expand Down Expand Up @@ -394,7 +375,7 @@ Status RuntimeState::create_error_log_file() {
_db_name, _import_label, _fragment_instance_id, &_error_log_file_path));
std::string error_log_absolute_path =
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
_error_log_file = new std::ofstream(error_log_absolute_path, std::ifstream::out);
_error_log_file = std::make_unique<std::ofstream>(error_log_absolute_path, std::ifstream::out);
if (!_error_log_file->is_open()) {
std::stringstream error_msg;
error_msg << "Fail to open error file: [" << _error_log_file_path << "].";
Expand All @@ -420,8 +401,6 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
LOG(WARNING) << "Create error file log failed. because: " << status;
if (_error_log_file != nullptr) {
_error_log_file->close();
delete _error_log_file;
_error_log_file = nullptr;
}
return status;
}
Expand Down Expand Up @@ -464,13 +443,28 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
return Status::OK();
}

std::string RuntimeState::get_error_log_file_path() const {
if (_s3_error_fs) {
std::string RuntimeState::get_error_log_file_path() {
if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
// close error log file
_error_log_file->close();
std::string error_log_absolute_path =
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
// upload error log file to s3
Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path);
if (st.ok()) {
// remove local error log file
std::filesystem::remove(error_log_absolute_path);
} else {
// upload failed and return local error log file path
LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path="
<< _error_log_file_path << ", error=" << st;
return _error_log_file_path;
}
// expiration must be less than a week (in seconds) for presigned url
static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1;
// We should return a public endpoint to user.
return _s3_error_fs->generate_presigned_url(_s3_error_log_file_path, EXPIRATION_SECONDS,
true);
_error_log_file_path = _s3_error_fs->generate_presigned_url(_s3_error_log_file_path,
EXPIRATION_SECONDS, true);
}
return _error_log_file_path;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class RuntimeState {

int64_t load_job_id() const { return _load_job_id; }

std::string get_error_log_file_path() const;
std::string get_error_log_file_path();

// append error msg and error line to file when loading data.
// is_summary is true, means we are going to write the summary line
Expand Down Expand Up @@ -710,7 +710,7 @@ class RuntimeState {
int64_t _normal_row_number;
int64_t _error_row_number;
std::string _error_log_file_path;
std::ofstream* _error_log_file = nullptr; // error file path, absolute path
std::unique_ptr<std::ofstream> _error_log_file; // error file path, absolute path
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
Expand Down