Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ add_library(Olap STATIC
reader.cpp
row_block.cpp
row_cursor.cpp
rowset.cpp
segment_group.cpp
run_length_byte_reader.cpp
run_length_byte_writer.cpp
run_length_integer_reader.cpp
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "olap/column_data.h"
#include "olap/olap_engine.h"
#include "olap/olap_header.h"
#include "olap/rowset.h"
#include "olap/segment_group.h"
#include "olap/olap_table.h"
#include "olap/utils.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -119,7 +119,7 @@ OLAPStatus BaseCompaction::run() {
DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size());
int64_t merge_bytes = 0;
for (ColumnData* i_data : base_data_sources) {
merge_bytes += i_data->olap_index()->data_size();
merge_bytes += i_data->segment_group()->data_size();
}
DorisMetrics::base_compaction_bytes_total.increment(merge_bytes);
}
Expand Down Expand Up @@ -148,7 +148,7 @@ OLAPStatus BaseCompaction::run() {
// 4. make new versions visable.
// If success, remove files belong to old versions;
// If fail, gc files belong to new versions.
vector<Rowset*> unused_olap_indices;
vector<SegmentGroup*> unused_olap_indices;
res = _update_header(row_count, &unused_olap_indices);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to update header. table=" << _table->full_name() << ", "
Expand Down Expand Up @@ -323,12 +323,12 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
vector<ColumnData*>* base_data_sources,
uint64_t* row_count) {
// 1. 生成新base文件对应的olap index
Rowset* new_base = new (std::nothrow) Rowset(_table.get(),
SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_table.get(),
_new_base_version,
new_base_version_hash,
false, 0, 0);
if (new_base == NULL) {
OLAP_LOG_WARNING("fail to new Rowset.");
OLAP_LOG_WARNING("fail to new SegmentGroup.");
return OLAP_ERR_MALLOC_ERROR;
}

Expand Down Expand Up @@ -398,7 +398,7 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
// Check row num changes
uint64_t source_rows = 0;
for (ColumnData* i_data : *base_data_sources) {
source_rows += i_data->olap_index()->num_rows();
source_rows += i_data->segment_group()->num_rows();
}
bool row_nums_check = config::row_nums_check;
if (row_nums_check) {
Expand All @@ -423,7 +423,7 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
return OLAP_SUCCESS;
}

OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<Rowset*>* unused_olap_indices) {
OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<SegmentGroup*>* unused_olap_indices) {
WriteLock wrlock(_table->get_header_lock_ptr());
vector<Version> unused_versions;
_get_unused_versions(&unused_versions);
Expand Down Expand Up @@ -464,11 +464,11 @@ OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<Rowset*>* u
return OLAP_SUCCESS;
}

void BaseCompaction::_delete_old_files(vector<Rowset*>* unused_indices) {
void BaseCompaction::_delete_old_files(vector<SegmentGroup*>* unused_indices) {
if (!unused_indices->empty()) {
OLAPEngine* unused_index = OLAPEngine::get_instance();

for (vector<Rowset*>::iterator it = unused_indices->begin();
for (vector<SegmentGroup*>::iterator it = unused_indices->begin();
it != unused_indices->end(); ++it) {
unused_index->add_unused_index(*it);
}
Expand All @@ -477,7 +477,7 @@ void BaseCompaction::_delete_old_files(vector<Rowset*>* unused_indices) {

void BaseCompaction::_garbage_collection() {
// 清理掉已生成的版本文件
for (vector<Rowset*>::iterator it = _new_olap_indices.begin();
for (vector<SegmentGroup*>::iterator it = _new_olap_indices.begin();
it != _new_olap_indices.end(); ++it) {
(*it)->delete_all_files();
SAFE_DELETE(*it);
Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
namespace doris {

class ColumnData;
class Rowset;
class SegmentGroup;

// @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果
class BaseCompaction {
Expand Down Expand Up @@ -94,23 +94,23 @@ class BaseCompaction {

// 更新Header使得修改对外可见
// 输出参数:
// - unused_olap_indices: 需要被物理删除的Rowset*
// - unused_olap_indices: 需要被物理删除的SegmentGroup*
//
// 返回值:
// - 如果执行成功,则返回OLAP_SUCCESS;
// - 其它情况下,返回相应的错误码
OLAPStatus _update_header(uint64_t row_count,
std::vector<Rowset*>* unused_olap_indices);
std::vector<SegmentGroup*>* unused_olap_indices);

// 删除不再使用的Rowset文件
// 删除不再使用的SegmentGroup文件
//
// 输入参数:
// - unused_olap_indices: 需要被物理删除的Rowset*
// - unused_olap_indices: 需要被物理删除的SegmentGroup*
//
// 返回值:
// - 如果执行成功,则返回OLAP_SUCCESS;
// - 其它情况下,返回相应的错误码
void _delete_old_files(std::vector<Rowset*>* unused_indices);
void _delete_old_files(std::vector<SegmentGroup*>* unused_indices);

// 其它函数执行失败时,调用该函数进行清理工作
void _garbage_collection();
Expand Down Expand Up @@ -173,8 +173,8 @@ class BaseCompaction {
Version _latest_cumulative;
// 在此次base compaction执行过程中,将被合并的cumulative文件版本
std::vector<Version> _need_merged_versions;
// 需要新增的版本对应的Rowset
std::vector<Rowset*> _new_olap_indices;
// 需要新增的版本对应的SegmentGroup
std::vector<SegmentGroup*> _new_olap_indices;

bool _base_compaction_locked;

Expand Down
62 changes: 31 additions & 31 deletions be/src/olap/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

namespace doris {

ColumnData* ColumnData::create(Rowset* index) {
ColumnData* ColumnData::create(SegmentGroup* segment_group) {
ColumnData* data = NULL;
DataFileType file_type = index->table()->data_file_type();
DataFileType file_type = segment_group->table()->data_file_type();

switch (file_type) {
case COLUMN_ORIENTED_FILE:
data = new(std::nothrow) ColumnData(index);
data = new(std::nothrow) ColumnData(segment_group);
break;

default:
Expand All @@ -40,29 +40,29 @@ ColumnData* ColumnData::create(Rowset* index) {
return data;
}

ColumnData::ColumnData(Rowset* olap_index)
ColumnData::ColumnData(SegmentGroup* segment_group)
: _data_file_type(COLUMN_ORIENTED_FILE),
_olap_index(olap_index),
_segment_group(segment_group),
_eof(false),
_conditions(NULL),
_col_predicates(NULL),
_delete_status(DEL_NOT_SATISFIED),
_runtime_state(NULL),
_is_using_cache(false),
_segment_reader(NULL) {
_table = olap_index->table();
_table = segment_group->table();
_num_rows_per_block = _table->num_rows_per_row_block();
}

ColumnData::~ColumnData() {
_olap_index->release();
_segment_group->release();
SAFE_DELETE(_segment_reader);
}

OLAPStatus ColumnData::init() {
_olap_index->acquire();
_segment_group->acquire();

auto res = _short_key_cursor.init(_olap_index->short_key_fields());
auto res = _short_key_cursor.init(_segment_group->short_key_fields());
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "key cursor init failed, table:" << _table->id()
<< ", res:" << res;
Expand Down Expand Up @@ -105,7 +105,7 @@ OLAPStatus ColumnData::_next_row(const RowCursor** row, bool without_filter) {
} else {
DCHECK(_read_block->block_status() == DEL_PARTIAL_SATISFIED);
bool row_del_filter = _delete_handler.is_filter_data(
_olap_index->version().second, _cursor);
_segment_group->version().second, _cursor);
if (!row_del_filter) {
*row = &_cursor;
return OLAP_SUCCESS;
Expand All @@ -130,16 +130,16 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi
// TODO(zc): _segment_readers???
// open segment reader if needed
if (_segment_reader == nullptr || block_pos.segment != _current_segment) {
if (block_pos.segment >= _olap_index->num_segments() ||
if (block_pos.segment >= _segment_group->num_segments() ||
(_end_key_is_set && block_pos.segment > _end_segment)) {
_eof = true;
return OLAP_ERR_DATA_EOF;
}
SAFE_DELETE(_segment_reader);
std::string file_name;
file_name = olap_index()->construct_data_file_path(olap_index()->rowset_id(), block_pos.segment);
file_name = segment_group()->construct_data_file_path(segment_group()->segment_group_id(), block_pos.segment);
_segment_reader = new(std::nothrow) SegmentReader(
file_name, _table, olap_index(), block_pos.segment,
file_name, _table, segment_group(), block_pos.segment,
_seek_columns, _load_bf_columns, _conditions,
_col_predicates, _delete_handler, _delete_status, _runtime_state, _stats);
if (_segment_reader == nullptr) {
Expand Down Expand Up @@ -170,7 +170,7 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi
OLAPStatus ColumnData::_find_position_by_short_key(
const RowCursor& key, bool find_last_key, RowBlockPosition *position) {
RowBlockPosition tmp_pos;
auto res = _olap_index->find_short_key(key, &_short_key_cursor, find_last_key, &tmp_pos);
auto res = _segment_group->find_short_key(key, &_short_key_cursor, find_last_key, &tmp_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -179,7 +179,7 @@ OLAPStatus ColumnData::_find_position_by_short_key(
}
return res;
}
res = olap_index()->find_prev_point(tmp_pos, position);
res = segment_group()->find_prev_point(tmp_pos, position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
Expand All @@ -190,7 +190,7 @@ OLAPStatus ColumnData::_find_position_by_short_key(
OLAPStatus ColumnData::_find_position_by_full_key(
const RowCursor& key, bool find_last_key, RowBlockPosition *position) {
RowBlockPosition tmp_pos;
auto res = _olap_index->find_short_key(key, &_short_key_cursor, false, &tmp_pos);
auto res = _segment_group->find_short_key(key, &_short_key_cursor, false, &tmp_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -200,14 +200,14 @@ OLAPStatus ColumnData::_find_position_by_full_key(
return res;
}
RowBlockPosition start_position;
res = olap_index()->find_prev_point(tmp_pos, &start_position);
res = segment_group()->find_prev_point(tmp_pos, &start_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
}

RowBlockPosition end_position;
res = _olap_index->find_short_key(key, &_short_key_cursor, true, &end_position);
res = _segment_group->find_short_key(key, &_short_key_cursor, true, &end_position);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
res = OLAP_ERR_DATA_EOF;
Expand All @@ -226,7 +226,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
OLAPIndexOffset index_offset;
index_offset.segment = _end_segment;
index_offset.offset = _end_block;
res = olap_index()->get_row_block_position(index_offset, &end_position);
res = segment_group()->get_row_block_position(index_offset, &end_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get row block position. [res=%d]", res);
return res;
Expand All @@ -235,15 +235,15 @@ OLAPStatus ColumnData::_find_position_by_full_key(
}

// ????end_position
uint32_t distance = olap_index()->compute_distance(start_position, end_position);
uint32_t distance = segment_group()->compute_distance(start_position, end_position);

BinarySearchIterator it_start(0u);
BinarySearchIterator it_end(distance + 1);
BinarySearchIterator it_result(0u);
ColumnDataComparator comparator(
start_position,
this,
olap_index());
segment_group());
try {
if (!find_last_key) {
it_result = std::lower_bound(it_start, it_end, key, comparator);
Expand All @@ -261,7 +261,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
it_result -= 1;
}

if (OLAP_SUCCESS != (res = olap_index()->advance_row_block(*it_result,
if (OLAP_SUCCESS != (res = segment_group()->advance_row_block(*it_result,
&start_position))) {
OLAP_LOG_WARNING("fail to advance row_block. [res=%d it_offset=%u "
"start_pos='%s']", res, *it_result,
Expand Down Expand Up @@ -490,16 +490,16 @@ OLAPStatus ColumnData::get_first_row_block(RowBlock** row_block) {
return res;
}

// to be same with OLAPData, we use olap_index.
// to be same with OLAPData, we use segment_group.
RowBlockPosition block_pos;
res = olap_index()->find_first_row_block(&block_pos);
res = segment_group()->find_first_row_block(&block_pos);
if (res != OLAP_SUCCESS) {
if (res == OLAP_ERR_INDEX_EOF) {
*row_block = nullptr;
_eof = true;
return res;
}
OLAP_LOG_WARNING("fail to find first row block with Rowset.");
OLAP_LOG_WARNING("fail to find first row block with SegmentGroup.");
return res;
}

Expand Down Expand Up @@ -545,11 +545,11 @@ bool ColumnData::delta_pruning_filter() {
return true;
}

if (!_olap_index->has_column_statistics()) {
if (!_segment_group->has_column_statistics()) {
return false;
}

return _conditions->delta_pruning_filter(_olap_index->get_column_statistics());
return _conditions->delta_pruning_filter(_segment_group->get_column_statistics());
}

int ColumnData::delete_pruning_filter() {
Expand All @@ -559,9 +559,9 @@ int ColumnData::delete_pruning_filter() {
return DEL_NOT_SATISFIED;
}

if (false == _olap_index->has_column_statistics()) {
if (false == _segment_group->has_column_statistics()) {
/*
* if olap_index has no column statistics, we cannot judge whether the data can be filtered or not
* if segment_group has no column statistics, we cannot judge whether the data can be filtered or not
*/
return DEL_PARTIAL_SATISFIED;
}
Expand All @@ -576,12 +576,12 @@ int ColumnData::delete_pruning_filter() {
bool del_partial_stastified = false;
bool del_stastified = false;
for (auto& delete_condtion : _delete_handler.get_delete_conditions()) {
if (delete_condtion.filter_version <= _olap_index->version().first) {
if (delete_condtion.filter_version <= _segment_group->version().first) {
continue;
}

Conditions* del_cond = delete_condtion.del_cond;
int del_ret = del_cond->delete_pruning_filter(_olap_index->get_column_statistics());
int del_ret = del_cond->delete_pruning_filter(_segment_group->get_column_statistics());
if (DEL_SATISFIED == del_ret) {
del_stastified = true;
break;
Expand Down
Loading