Skip to content

Commit 44aa314

Browse files
authored
[Improvement](sort) Free sort blocks if this block is exhausted (#39306)
1 parent f1c8426 commit 44aa314

File tree

9 files changed

+112
-151
lines changed

9 files changed

+112
-151
lines changed

be/src/pipeline/dependency.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -886,8 +886,6 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
886886

887887
void create_dependencies(int local_exchange_id) override {
888888
sink_deps.resize(source_deps.size());
889-
std::vector<DependencySPtr> new_deps(sink_deps.size(), nullptr);
890-
source_deps.swap(new_deps);
891889
for (size_t i = 0; i < source_deps.size(); i++) {
892890
source_deps[i] =
893891
std::make_shared<Dependency>(local_exchange_id, local_exchange_id,

be/src/vec/common/sort/partition_sorter.cpp

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) {
5858
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
5959
DCHECK(input_block->columns() == sorted_block.columns());
6060
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
61-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
61+
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
6262
return Status::OK();
6363
}
6464

6565
Status PartitionSorter::prepare_for_read() {
66-
auto& cursors = _state->get_cursors();
6766
auto& blocks = _state->get_sorted_block();
6867
auto& priority_queue = _state->get_priority_queue();
6968
for (auto& block : blocks) {
70-
cursors.emplace_back(block, _sort_description);
71-
}
72-
for (auto& cursor : cursors) {
73-
priority_queue.push(MergeSortCursor(&cursor));
69+
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
7470
}
71+
blocks.clear();
7572
return Status::OK();
7673
}
7774

@@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
8481
}
8582

8683
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
87-
if (_state->get_sorted_block().empty()) {
84+
if (_state->get_priority_queue().empty()) {
85+
*eos = true;
86+
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
87+
block->swap(*_state->get_priority_queue().top().impl->block);
88+
block->set_num_rows(_partition_inner_limit);
8889
*eos = true;
8990
} else {
90-
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
91-
auto& sorted_block = _state->get_sorted_block()[0];
92-
block->swap(sorted_block);
93-
block->set_num_rows(_partition_inner_limit);
94-
*eos = true;
95-
} else {
96-
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
97-
}
91+
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
9892
}
9993
return Status::OK();
10094
}
10195

10296
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
103-
const auto& sorted_block = _state->get_sorted_block()[0];
104-
size_t num_columns = sorted_block.columns();
97+
auto& priority_queue = _state->get_priority_queue();
98+
if (priority_queue.empty()) {
99+
*eos = true;
100+
return Status::OK();
101+
}
102+
const auto& sorted_block = priority_queue.top().impl->block;
103+
size_t num_columns = sorted_block->columns();
105104
MutableBlock m_block =
106-
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
105+
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
107106
MutableColumns& merged_columns = m_block.mutable_columns();
108107
size_t current_output_rows = 0;
109-
auto& priority_queue = _state->get_priority_queue();
110108

111109
bool get_enough_data = false;
112110
while (!priority_queue.empty()) {
@@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
121119
//1 row_number no need to check distinct, just output partition_inner_limit row
122120
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
123121
for (size_t i = 0; i < num_columns; ++i) {
124-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
122+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
125123
}
126124
} else {
127125
//rows has get enough
@@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
155153
}
156154
}
157155
for (size_t i = 0; i < num_columns; ++i) {
158-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
156+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
159157
}
160158
break;
161159
}
@@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
180178
*_previous_row = current;
181179
}
182180
for (size_t i = 0; i < num_columns; ++i) {
183-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
181+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
184182
}
185183
current_output_rows++;
186184
break;

be/src/vec/common/sort/partition_sorter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct SortCursorCmp {
5050
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}
5151

5252
void reset() {
53-
impl = nullptr;
53+
impl->reset();
5454
row = 0;
5555
}
5656
bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ struct SortCursorCmp {
6767
return true;
6868
}
6969
int row = 0;
70-
MergeSortCursorImpl* impl = nullptr;
70+
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
7171
};
7272

7373
class PartitionSorter final : public Sorter {

be/src/vec/common/sort/sorter.cpp

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -59,48 +59,44 @@ namespace doris::vectorized {
5959
void MergeSorterState::reset() {
6060
auto empty_queue = std::priority_queue<MergeSortCursor>();
6161
priority_queue_.swap(empty_queue);
62-
std::vector<MergeSortCursorImpl> empty_cursors(0);
63-
cursors_.swap(empty_cursors);
64-
std::vector<Block> empty_blocks(0);
62+
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
63+
std::vector<std::shared_ptr<Block>> empty_blocks(0);
6564
sorted_blocks_.swap(empty_blocks);
6665
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
6766
in_mem_sorted_bocks_size_ = 0;
6867
}
6968

70-
Status MergeSorterState::add_sorted_block(Block& block) {
71-
auto rows = block.rows();
69+
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
70+
auto rows = block->rows();
7271
if (0 == rows) {
73-
return Status::OK();
72+
return;
7473
}
75-
in_mem_sorted_bocks_size_ += block.bytes();
76-
sorted_blocks_.emplace_back(std::move(block));
74+
in_mem_sorted_bocks_size_ += block->bytes();
75+
sorted_blocks_.emplace_back(block);
7776
num_rows_ += rows;
78-
return Status::OK();
7977
}
8078

8179
Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
8280
for (auto& block : sorted_blocks_) {
83-
cursors_.emplace_back(block, sort_description);
84-
}
85-
86-
if (sorted_blocks_.size() > 1) {
87-
for (auto& cursor : cursors_) {
88-
priority_queue_.emplace(&cursor);
89-
}
81+
priority_queue_.emplace(
82+
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
9083
}
9184

85+
sorted_blocks_.clear();
9286
return Status::OK();
9387
}
9488

9589
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
9690
bool* eos) {
97-
if (sorted_blocks_.empty()) {
91+
DCHECK(sorted_blocks_.empty());
92+
DCHECK(unsorted_block_->empty());
93+
if (priority_queue_.empty()) {
9894
*eos = true;
99-
} else if (sorted_blocks_.size() == 1) {
95+
} else if (priority_queue_.size() == 1) {
10096
if (offset_ != 0) {
101-
sorted_blocks_[0].skip_num_rows(offset_);
97+
priority_queue_.top().impl->block->skip_num_rows(offset_);
10298
}
103-
block->swap(sorted_blocks_[0]);
99+
block->swap(*priority_queue_.top().impl->block);
104100
*eos = true;
105101
} else {
106102
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -110,9 +106,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
110106

111107
Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
112108
bool* eos) {
113-
size_t num_columns = sorted_blocks_[0].columns();
109+
if (priority_queue_.empty()) {
110+
*eos = true;
111+
return Status::OK();
112+
}
113+
size_t num_columns = priority_queue_.top().impl->block->columns();
114114

115-
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
115+
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
116+
block, *priority_queue_.top().impl->block);
116117
MutableColumns& merged_columns = m_block.mutable_columns();
117118

118119
/// Take rows from queue in right order and push to 'merged'.
@@ -123,7 +124,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
123124

124125
if (offset_ == 0) {
125126
for (size_t i = 0; i < num_columns; ++i)
126-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
127+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
127128
++merged_rows;
128129
} else {
129130
offset_--;
@@ -134,7 +135,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
134135
priority_queue_.push(current);
135136
}
136137

137-
if (merged_rows == batch_size) break;
138+
if (merged_rows == batch_size) {
139+
break;
140+
}
138141
}
139142
block->set_columns(std::move(merged_columns));
140143

@@ -261,22 +264,22 @@ Status FullSorter::_do_sort() {
261264
// if one block totally greater the heap top of _block_priority_queue
262265
// we can throw the block data directly.
263266
if (_state->num_rows() < _offset + _limit) {
264-
static_cast<void>(_state->add_sorted_block(desc_block));
265-
_block_priority_queue.emplace(_pool->add(
266-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
267+
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
268+
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
269+
_state->last_sorted_block(), _sort_description));
267270
} else {
268-
auto tmp_cursor_impl =
269-
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
270-
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
271+
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
272+
Block::create_shared(std::move(desc_block)), _sort_description);
273+
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
271274
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
272-
static_cast<void>(_state->add_sorted_block(desc_block));
273-
_block_priority_queue.emplace(_pool->add(
274-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
275+
_state->add_sorted_block(tmp_cursor_impl->block);
276+
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
277+
_state->last_sorted_block(), _sort_description));
275278
}
276279
}
277280
} else {
278281
// dispose normal sort logic
279-
static_cast<void>(_state->add_sorted_block(desc_block));
282+
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
280283
}
281284
return Status::OK();
282285
}

be/src/vec/common/sort/sorter.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MergeSorterState {
5959

6060
~MergeSorterState() = default;
6161

62-
Status add_sorted_block(Block& block);
62+
void add_sorted_block(std::shared_ptr<Block> block);
6363

6464
Status build_merge_tree(const SortDescription& sort_description);
6565

@@ -72,23 +72,19 @@ class MergeSorterState {
7272

7373
uint64_t num_rows() const { return num_rows_; }
7474

75-
Block& last_sorted_block() { return sorted_blocks_.back(); }
75+
std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
7676

77-
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
77+
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
7878
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
79-
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
8079
void reset();
8180

8281
std::unique_ptr<Block> unsorted_block_;
8382

8483
private:
85-
int _calc_spill_blocks_to_merge() const;
86-
8784
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);
8885

8986
std::priority_queue<MergeSortCursor> priority_queue_;
90-
std::vector<MergeSortCursorImpl> cursors_;
91-
std::vector<Block> sorted_blocks_;
87+
std::vector<std::shared_ptr<Block>> sorted_blocks_;
9288
size_t in_mem_sorted_bocks_size_ = 0;
9389
uint64_t num_rows_ = 0;
9490

be/src/vec/common/sort/topn_sorter.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) {
7272
// if one block totally greater the heap top of _block_priority_queue
7373
// we can throw the block data directly.
7474
if (_state->num_rows() < _offset + _limit) {
75-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
76-
_block_priority_queue.emplace(_pool->add(
77-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
75+
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
76+
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
77+
_state->last_sorted_block(), _sort_description));
7878
} else {
79-
auto tmp_cursor_impl =
80-
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
81-
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
79+
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
80+
Block::create_shared(std::move(sorted_block)), _sort_description);
81+
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
8282
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
83-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
84-
_block_priority_queue.emplace(_pool->add(
85-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
83+
_state->add_sorted_block(block_cursor.impl->block);
84+
_block_priority_queue.emplace(tmp_cursor_impl);
8685
}
8786
}
8887
} else {

0 commit comments

Comments
 (0)