Skip to content

Commit a3f4a92

Browse files
authored
[refactor](minor) Init counter in prepare phase (#39287)
1 parent a0cf8b9 commit a3f4a92

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
9999
// Make sure brpc stub is ready before execution.
100100
for (int i = 0; i < channels.size(); ++i) {
101101
RETURN_IF_ERROR(channels[i]->init_stub(state));
102+
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
103+
fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1));
102104
}
105+
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
103106
return Status::OK();
104107
}
105108

@@ -142,18 +145,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
142145
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
143146
_broadcast_pb_mem_limiter =
144147
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
145-
_wait_broadcast_buffer_timer =
146-
ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
147148
} else if (local_size > 0) {
148149
size_t dep_id = 0;
149150
for (auto* channel : channels) {
150151
if (channel->is_local()) {
151152
if (auto dep = channel->get_local_channel_dependency()) {
152153
_local_channels_dependency.push_back(dep);
153154
DCHECK(_local_channels_dependency[dep_id] != nullptr);
154-
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
155-
fmt::format("WaitForLocalExchangeBuffer{}", dep_id), TUnit ::TIME_NS,
156-
timer_name, 1));
157155
dep_id++;
158156
} else {
159157
LOG(WARNING) << "local recvr is null: query id = "

be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase
3434
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
3535
parent->get_name() + "_SPILL_DEPENDENCY", true);
3636
}
37+
3738
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
3839
doris::pipeline::LocalSinkStateInfo& info) {
3940
RETURN_IF_ERROR(Base::init(state, info));
@@ -66,6 +67,7 @@ Status PartitionedAggSinkLocalState::open(RuntimeState* state) {
6667
SCOPED_TIMER(Base::_open_timer);
6768
return Base::open(state);
6869
}
70+
6971
Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_status) {
7072
SCOPED_TIMER(Base::exec_time_counter());
7173
SCOPED_TIMER(Base::_close_timer);

be/src/vec/common/sort/sorter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ void MergeSorterState::reset() {
6666
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
6767
in_mem_sorted_bocks_size_ = 0;
6868
}
69+
6970
Status MergeSorterState::add_sorted_block(Block& block) {
7071
auto rows = block.rows();
7172
if (0 == rows) {
@@ -279,6 +280,7 @@ Status FullSorter::_do_sort() {
279280
}
280281
return Status::OK();
281282
}
283+
282284
size_t FullSorter::data_size() const {
283285
return _state->data_size();
284286
}

0 commit comments

Comments
 (0)