Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,6 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {

void create_dependencies(int local_exchange_id) override {
sink_deps.resize(source_deps.size());
std::vector<DependencySPtr> new_deps(sink_deps.size(), nullptr);
source_deps.swap(new_deps);
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] =
std::make_shared<Dependency>(local_exchange_id, local_exchange_id,
Expand Down
42 changes: 20 additions & 22 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) {
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
DCHECK(input_block->columns() == sorted_block.columns());
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
return Status::OK();
}

Status PartitionSorter::prepare_for_read() {
auto& cursors = _state->get_cursors();
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
cursors.emplace_back(block, _sort_description);
}
for (auto& cursor : cursors) {
priority_queue.push(MergeSortCursor(&cursor));
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
}
blocks.clear();
return Status::OK();
}

Expand All @@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
}

Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
if (_state->get_sorted_block().empty()) {
if (_state->get_priority_queue().empty()) {
*eos = true;
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
block->swap(*_state->get_priority_queue().top().impl->block);
block->set_num_rows(_partition_inner_limit);
*eos = true;
} else {
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
auto& sorted_block = _state->get_sorted_block()[0];
block->swap(sorted_block);
block->set_num_rows(_partition_inner_limit);
*eos = true;
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
}
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
}
return Status::OK();
}

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'partition_sort_read' has cognitive complexity of 55 (threshold 50) [readability-function-cognitive-complexity]

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
                        ^
Additional context

be/src/vec/common/sort/partition_sorter.cpp:102: +1, including nesting penalty of 0, nesting level increased to 1

    if (priority_queue.empty()) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:114: +1, including nesting penalty of 0, nesting level increased to 1

    while (!priority_queue.empty()) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:117: +2, including nesting penalty of 1, nesting level increased to 2

        if (UNLIKELY(_previous_row->impl == nullptr)) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:121: +2, including nesting penalty of 1, nesting level increased to 2

        switch (_top_n_algorithm) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:124: +3, including nesting penalty of 2, nesting level increased to 3

            if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:125: +4, including nesting penalty of 3, nesting level increased to 4

                for (size_t i = 0; i < num_columns; ++i) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:128: +1, nesting level increased to 3

            } else {
              ^

be/src/vec/common/sort/partition_sorter.cpp:140: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit &&
            ^

be/src/vec/common/sort/partition_sorter.cpp:140: +1

            if (_has_global_limit &&
                                  ^

be/src/vec/common/sort/partition_sorter.cpp:145: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:147: +1, nesting level increased to 3

            } else {
              ^

be/src/vec/common/sort/partition_sorter.cpp:150: +4, including nesting penalty of 3, nesting level increased to 4

                if (cmp_res == false) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:152: +5, including nesting penalty of 4, nesting level increased to 5

                    if (_output_distinct_rows >= _partition_inner_limit) {
                    ^

be/src/vec/common/sort/partition_sorter.cpp:159: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i < num_columns; ++i) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:169: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit &&
            ^

be/src/vec/common/sort/partition_sorter.cpp:169: +1

            if (_has_global_limit &&
                                  ^

be/src/vec/common/sort/partition_sorter.cpp:176: +3, including nesting penalty of 2, nesting level increased to 3

            if (cmp_res == false) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:178: +4, including nesting penalty of 3, nesting level increased to 4

                if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:184: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i < num_columns; ++i) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:194: +2, including nesting penalty of 1, nesting level increased to 2

        if (!current->is_last()) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:199: +2, including nesting penalty of 1, nesting level increased to 2

        if (current_output_rows == batch_size || get_enough_data == true) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:199: +1

        if (current_output_rows == batch_size || get_enough_data == true) {
                                              ^

be/src/vec/common/sort/partition_sorter.cpp:205: +1, including nesting penalty of 0, nesting level increased to 1

    if (current_output_rows == 0 || get_enough_data == true) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:205: +1

    if (current_output_rows == 0 || get_enough_data == true) {
                                 ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'partition_sort_read' has cognitive complexity of 55 (threshold 50) [readability-function-cognitive-complexity]

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
                        ^
Additional context

be/src/vec/common/sort/partition_sorter.cpp:97: +1, including nesting penalty of 0, nesting level increased to 1

    if (priority_queue.empty()) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:109: +1, including nesting penalty of 0, nesting level increased to 1

    while (!priority_queue.empty()) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:112: +2, including nesting penalty of 1, nesting level increased to 2

        if (UNLIKELY(_previous_row->impl == nullptr)) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:116: +2, including nesting penalty of 1, nesting level increased to 2

        switch (_top_n_algorithm) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:119: +3, including nesting penalty of 2, nesting level increased to 3

            if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:120: +4, including nesting penalty of 3, nesting level increased to 4

                for (size_t i = 0; i < num_columns; ++i) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:123: +1, nesting level increased to 3

            } else {
              ^

be/src/vec/common/sort/partition_sorter.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit &&
            ^

be/src/vec/common/sort/partition_sorter.cpp:135: +1

            if (_has_global_limit &&
                                  ^

be/src/vec/common/sort/partition_sorter.cpp:140: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:142: +1, nesting level increased to 3

            } else {
              ^

be/src/vec/common/sort/partition_sorter.cpp:145: +4, including nesting penalty of 3, nesting level increased to 4

                if (cmp_res == false) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:147: +5, including nesting penalty of 4, nesting level increased to 5

                    if (_output_distinct_rows >= _partition_inner_limit) {
                    ^

be/src/vec/common/sort/partition_sorter.cpp:154: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i < num_columns; ++i) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_global_limit &&
            ^

be/src/vec/common/sort/partition_sorter.cpp:164: +1

            if (_has_global_limit &&
                                  ^

be/src/vec/common/sort/partition_sorter.cpp:171: +3, including nesting penalty of 2, nesting level increased to 3

            if (cmp_res == false) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:173: +4, including nesting penalty of 3, nesting level increased to 4

                if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) {
                ^

be/src/vec/common/sort/partition_sorter.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3

            for (size_t i = 0; i < num_columns; ++i) {
            ^

be/src/vec/common/sort/partition_sorter.cpp:189: +2, including nesting penalty of 1, nesting level increased to 2

        if (!current->is_last()) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:194: +2, including nesting penalty of 1, nesting level increased to 2

        if (current_output_rows == batch_size || get_enough_data == true) {
        ^

be/src/vec/common/sort/partition_sorter.cpp:194: +1

        if (current_output_rows == batch_size || get_enough_data == true) {
                                              ^

be/src/vec/common/sort/partition_sorter.cpp:200: +1, including nesting penalty of 0, nesting level increased to 1

    if (current_output_rows == 0 || get_enough_data == true) {
    ^

be/src/vec/common/sort/partition_sorter.cpp:200: +1

    if (current_output_rows == 0 || get_enough_data == true) {
                                 ^

const auto& sorted_block = _state->get_sorted_block()[0];
size_t num_columns = sorted_block.columns();
auto& priority_queue = _state->get_priority_queue();
if (priority_queue.empty()) {
*eos = true;
return Status::OK();
}
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
auto& priority_queue = _state->get_priority_queue();

bool get_enough_data = false;
while (!priority_queue.empty()) {
Expand All @@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
//1 row_number no need to check distinct, just output partition_inner_limit row
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
} else {
//rows has get enough
Expand Down Expand Up @@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
break;
}
Expand All @@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
*_previous_row = current;
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
}
current_output_rows++;
break;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/common/sort/partition_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct SortCursorCmp {
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}

void reset() {
impl = nullptr;
impl->reset();
row = 0;
}
bool compare_two_rows(const MergeSortCursor& rhs) const {
Expand All @@ -67,7 +67,7 @@ struct SortCursorCmp {
return true;
}
int row = 0;
MergeSortCursorImpl* impl = nullptr;
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
};

class PartitionSorter final : public Sorter {
Expand Down
71 changes: 37 additions & 34 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,48 +59,44 @@ namespace doris::vectorized {
void MergeSorterState::reset() {
auto empty_queue = std::priority_queue<MergeSortCursor>();
priority_queue_.swap(empty_queue);
std::vector<MergeSortCursorImpl> empty_cursors(0);
cursors_.swap(empty_cursors);
std::vector<Block> empty_blocks(0);
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
std::vector<std::shared_ptr<Block>> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}

Status MergeSorterState::add_sorted_block(Block& block) {
auto rows = block.rows();
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
auto rows = block->rows();
if (0 == rows) {
return Status::OK();
return;
}
in_mem_sorted_bocks_size_ += block.bytes();
sorted_blocks_.emplace_back(std::move(block));
in_mem_sorted_bocks_size_ += block->bytes();
sorted_blocks_.emplace_back(block);
num_rows_ += rows;
return Status::OK();
}

Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
for (auto& block : sorted_blocks_) {
cursors_.emplace_back(block, sort_description);
}

if (sorted_blocks_.size() > 1) {
for (auto& cursor : cursors_) {
priority_queue_.emplace(&cursor);
}
priority_queue_.emplace(
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
}

sorted_blocks_.clear();
return Status::OK();
}

Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
bool* eos) {
if (sorted_blocks_.empty()) {
DCHECK(sorted_blocks_.empty());
DCHECK(unsorted_block_->empty());
if (priority_queue_.empty()) {
*eos = true;
} else if (sorted_blocks_.size() == 1) {
} else if (priority_queue_.size() == 1) {
if (offset_ != 0) {
sorted_blocks_[0].skip_num_rows(offset_);
priority_queue_.top().impl->block->skip_num_rows(offset_);
}
block->swap(sorted_blocks_[0]);
block->swap(*priority_queue_.top().impl->block);
*eos = true;
} else {
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
Expand All @@ -110,9 +106,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba

Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
bool* eos) {
size_t num_columns = sorted_blocks_[0].columns();
if (priority_queue_.empty()) {
*eos = true;
return Status::OK();
}
size_t num_columns = priority_queue_.top().impl->block->columns();

MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
block, *priority_queue_.top().impl->block);
MutableColumns& merged_columns = m_block.mutable_columns();

/// Take rows from queue in right order and push to 'merged'.
Expand All @@ -123,7 +124,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized

if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
++merged_rows;
} else {
offset_--;
Expand All @@ -134,7 +135,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
priority_queue_.push(current);
}

if (merged_rows == batch_size) break;
if (merged_rows == batch_size) {
break;
}
}
block->set_columns(std::move(merged_columns));

Expand Down Expand Up @@ -261,22 +264,22 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
} else {
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(desc_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
_state->add_sorted_block(tmp_cursor_impl->block);
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
}
}
} else {
// dispose normal sort logic
static_cast<void>(_state->add_sorted_block(desc_block));
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
}
return Status::OK();
}
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/common/sort/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MergeSorterState {

~MergeSorterState() = default;

Status add_sorted_block(Block& block);
void add_sorted_block(std::shared_ptr<Block> block);

Status build_merge_tree(const SortDescription& sort_description);

Expand All @@ -72,23 +72,19 @@ class MergeSorterState {

uint64_t num_rows() const { return num_rows_; }

Block& last_sorted_block() { return sorted_blocks_.back(); }
std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }

std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
void reset();

std::unique_ptr<Block> unsorted_block_;

private:
int _calc_spill_blocks_to_merge() const;

Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);

std::priority_queue<MergeSortCursor> priority_queue_;
std::vector<MergeSortCursorImpl> cursors_;
std::vector<Block> sorted_blocks_;
std::vector<std::shared_ptr<Block>> sorted_blocks_;
size_t in_mem_sorted_bocks_size_ = 0;
uint64_t num_rows_ = 0;

Expand Down
17 changes: 8 additions & 9 deletions be/src/vec/common/sort/topn_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
} else {
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(sorted_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
_state->add_sorted_block(block_cursor.impl->block);
_block_priority_queue.emplace(tmp_cursor_impl);
}
}
} else {
Expand Down
Loading
Loading