Skip to content

Commit cd52133

Browse files
committed
[Feature](insert-overwrite) Support create partition for auto partition table when insert overwrite (apache#38628)
introduced session variable `enable_auto_create_when_overwrite` when it's true: 1. `insert overwrite table auto_partition_table [values xxx| select xxx]` support overwrite old datas and create partition(s) for new datas if need. 2. `insert overwrite table auto_partition_table partition(*) [values xxx| select xxx]` support overwrite old datas for values-relative partitions(as it was before) and create partition(s) for new datas if need. doc pr: apache/doris-website#936
1 parent 4086774 commit cd52133

File tree

16 files changed

+357
-103
lines changed

16 files changed

+357
-103
lines changed

be/src/exec/tablet_info.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,7 @@ Status VOlapTablePartitionParam::replace_partitions(
724724

725725
// add new partitions with new id.
726726
_partitions.emplace_back(part);
727+
VLOG_NOTICE << "params add new partition " << part->id;
727728

728729
// replace items in _partition_maps
729730
if (_is_in_partition) {

be/src/pipeline/exec/exchange_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() {
265265
vectorized::Block tmp_block =
266266
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
267267
auto& p = _parent->cast<ExchangeSinkOperatorX>();
268-
// these order is only.
268+
// these order is unique.
269269
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
270270
// 2. deal batched block
271271
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.

be/src/vec/sink/vrow_distribution.cpp

Lines changed: 117 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
#include <cstdint>
2525
#include <memory>
26-
#include <sstream>
26+
#include <string>
2727

2828
#include "common/logging.h"
2929
#include "common/status.h"
@@ -116,6 +116,10 @@ Status VRowDistribution::automatic_create_partition() {
116116
if (result.status.status_code == TStatusCode::OK) {
117117
// add new created partitions
118118
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
119+
for (const auto& part : result.partitions) {
120+
_new_partition_ids.insert(part.id);
121+
VLOG_TRACE << "record new id: " << part.id;
122+
}
119123
RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
120124
}
121125

@@ -134,7 +138,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg
134138

135139
// use _partitions and replace them
136140
Status VRowDistribution::_replace_overwriting_partition() {
137-
SCOPED_TIMER(_add_partition_request_timer);
141+
SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
138142
TReplacePartitionRequest request;
139143
TReplacePartitionResult result;
140144
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
@@ -144,16 +148,20 @@ Status VRowDistribution::_replace_overwriting_partition() {
144148
// only request for partitions not recorded for replacement
145149
std::set<int64_t> id_deduper;
146150
for (const auto* part : _partitions) {
147-
if (part == nullptr) [[unlikely]] {
148-
return Status::InternalError(
149-
"Cannot found origin partitions in auto detect overwriting, stop processing");
150-
}
151-
if (_new_partition_ids.contains(part->id)) {
152-
// this is a new partition. dont replace again.
153-
} else {
154-
// request for replacement
155-
id_deduper.insert(part->id);
156-
}
151+
if (part != nullptr) {
152+
if (_new_partition_ids.contains(part->id)) {
153+
// this is a new partition. dont replace again.
154+
VLOG_TRACE << "skip new partition: " << part->id;
155+
} else {
156+
// request for replacement
157+
id_deduper.insert(part->id);
158+
}
159+
} else if (_missing_map.empty()) {
160+
// no origin partition. and not allow to create.
161+
return Status::InvalidArgument(
162+
"Cannot found origin partitions in auto detect overwriting, stop "
163+
"processing");
164+
} // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here.
157165
}
158166
if (id_deduper.empty()) {
159167
return Status::OK(); // no need to request
@@ -182,6 +190,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
182190
// record new partitions
183191
for (const auto& part : result.partitions) {
184192
_new_partition_ids.insert(part.id);
193+
VLOG_TRACE << "record new id: " << part.id;
185194
}
186195
// replace data in _partitions
187196
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions));
@@ -304,6 +313,52 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
304313
return Status::OK();
305314
}
306315

316+
Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
317+
const std::vector<uint16_t>& partition_cols_idx,
318+
int64_t& rows_stat_val) {
319+
// for missing partition keys, calc the missing partition and save in _partitions_need_create
320+
auto [part_ctxs, part_exprs] = _get_partition_function();
321+
auto part_col_num = part_exprs.size();
322+
// the two vectors are in column-first-order
323+
std::vector<std::vector<std::string>> col_strs;
324+
std::vector<const NullMap*> col_null_maps;
325+
col_strs.resize(part_col_num);
326+
col_null_maps.reserve(part_col_num);
327+
328+
for (int i = 0; i < part_col_num; ++i) {
329+
auto return_type = part_exprs[i]->data_type();
330+
// expose the data column. the return type would be nullable
331+
const auto& [range_left_col, col_const] =
332+
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
333+
if (range_left_col->is_nullable()) {
334+
col_null_maps.push_back(&(
335+
assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data()));
336+
} else {
337+
col_null_maps.push_back(nullptr);
338+
}
339+
for (auto row : _missing_map) {
340+
col_strs[i].push_back(
341+
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
342+
}
343+
}
344+
345+
// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
346+
RETURN_IF_ERROR(
347+
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));
348+
349+
size_t new_bt_rows = _batching_block->rows();
350+
size_t new_bt_bytes = _batching_block->bytes();
351+
rows_stat_val -= new_bt_rows - _batching_rows;
352+
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
353+
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
354+
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
355+
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
356+
_batching_rows = new_bt_rows;
357+
_batching_bytes = new_bt_bytes;
358+
359+
return Status::OK();
360+
}
361+
307362
Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
308363
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
309364
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
@@ -329,63 +384,64 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
329384
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
330385

331386
if (!_missing_map.empty()) {
332-
// for missing partition keys, calc the missing partition and save in _partitions_need_create
333-
auto [part_ctxs, part_exprs] = _get_partition_function();
334-
auto part_col_num = part_exprs.size();
335-
// the two vectors are in column-first-order
336-
std::vector<std::vector<std::string>> col_strs;
337-
std::vector<const NullMap*> col_null_maps;
338-
col_strs.resize(part_col_num);
339-
col_null_maps.reserve(part_col_num);
340-
341-
for (int i = 0; i < part_col_num; ++i) {
342-
auto return_type = part_exprs[i]->data_type();
343-
// expose the data column. the return type would be nullable
344-
const auto& [range_left_col, col_const] =
345-
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
346-
if (range_left_col->is_nullable()) {
347-
col_null_maps.push_back(&(assert_cast<const ColumnNullable*>(range_left_col.get())
348-
->get_null_map_data()));
349-
} else {
350-
col_null_maps.push_back(nullptr);
351-
}
352-
for (auto row : _missing_map) {
353-
col_strs[i].push_back(
354-
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
355-
}
356-
}
357-
358-
// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
359-
RETURN_IF_ERROR(
360-
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));
361-
362-
size_t new_bt_rows = _batching_block->rows();
363-
size_t new_bt_bytes = _batching_block->bytes();
364-
rows_stat_val -= new_bt_rows - _batching_rows;
365-
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
366-
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
367-
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
368-
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
369-
_batching_rows = new_bt_rows;
370-
_batching_bytes = new_bt_bytes;
387+
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
371388
}
372389
return Status::OK();
373390
}
374391

375392
Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
376-
vectorized::Block* block, bool has_filtered_rows,
377-
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
393+
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
394+
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
395+
int64_t& rows_stat_val) {
378396
auto num_rows = block->rows();
379397

398+
// for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc,
399+
// and find the new partitions to use.
400+
// for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto
401+
// partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz
402+
// we already saved missing values.
380403
bool stop_processing = false;
381-
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
382-
_tablet_indexes, stop_processing, _skip));
404+
if (_vpartition->is_auto_partition() &&
405+
_state->query_options().enable_auto_create_when_overwrite) {
406+
// allow auto create partition for missing rows.
407+
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
408+
auto partition_col = block->get_by_position(partition_keys[0]);
409+
_missing_map.clear();
410+
_missing_map.reserve(partition_col.column->size());
411+
412+
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
413+
_tablet_indexes, stop_processing, _skip,
414+
&_missing_map));
415+
416+
// allow and really need to create during auto-detect-overwriting.
417+
if (!_missing_map.empty()) {
418+
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
419+
}
420+
} else {
421+
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
422+
_tablet_indexes, stop_processing, _skip));
423+
}
383424
RETURN_IF_ERROR(_replace_overwriting_partition());
384425

385426
// regenerate locations for new partitions & tablets
386427
_reset_find_tablets(num_rows);
387-
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
388-
_tablet_indexes, stop_processing, _skip));
428+
if (_vpartition->is_auto_partition() &&
429+
_state->query_options().enable_auto_create_when_overwrite) {
430+
// here _missing_map is just a placeholder
431+
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
432+
_tablet_indexes, stop_processing, _skip,
433+
&_missing_map));
434+
if (VLOG_TRACE_IS_ON) {
435+
std::string tmp;
436+
for (auto v : _missing_map) {
437+
tmp += std::to_string(v).append(", ");
438+
}
439+
VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
440+
}
441+
} else {
442+
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
443+
_tablet_indexes, stop_processing, _skip));
444+
}
389445
if (has_filtered_rows) {
390446
for (int i = 0; i < num_rows; i++) {
391447
_skip[i] = _skip[i] || _block_convertor->filter_map()[i];
@@ -456,10 +512,11 @@ Status VRowDistribution::generate_rows_distribution(
456512
}
457513

458514
Status st = Status::OK();
459-
if (_vpartition->is_auto_detect_overwrite()) {
515+
if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
460516
// when overwrite, no auto create partition allowed.
461-
st = _generate_rows_distribution_for_auto_overwrite(block.get(), has_filtered_rows,
462-
row_part_tablet_ids);
517+
st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx,
518+
has_filtered_rows, row_part_tablet_ids,
519+
rows_stat_val);
463520
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
464521
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
465522
has_filtered_rows, row_part_tablet_ids,

be/src/vec/sink/vrow_distribution.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,19 @@ class VRowDistribution {
162162
vectorized::Block* block, const std::vector<uint16_t>& partition_col_idx,
163163
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
164164
int64_t& rows_stat_val);
165+
// the whole process to deal missing rows. will call _save_missing_values
166+
Status _deal_missing_map(vectorized::Block* block,
167+
const std::vector<uint16_t>& partition_cols_idx,
168+
int64_t& rows_stat_val);
165169

166170
Status _generate_rows_distribution_for_non_auto_partition(
167171
vectorized::Block* block, bool has_filtered_rows,
168172
std::vector<RowPartTabletIds>& row_part_tablet_ids);
169173

170174
Status _generate_rows_distribution_for_auto_overwrite(
171-
vectorized::Block* block, bool has_filtered_rows,
172-
std::vector<RowPartTabletIds>& row_part_tablet_ids);
175+
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
176+
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
177+
int64_t& rows_stat_val);
173178
Status _replace_overwriting_partition();
174179

175180
void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,

be/src/vec/sink/writer/vtablet_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1422,7 +1422,7 @@ Status VTabletWriter::_send_new_partition_batch() {
14221422

14231423
Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
14241424

1425-
// these order is only.
1425+
// these order is unique.
14261426
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
14271427
// 2. deal batched block
14281428
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.

be/src/vec/sink/writer/vtablet_writer_v2.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
531531

532532
Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
533533

534-
// these order is only.
534+
// these order is unique.
535535
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
536536
// 2. deal batched block
537537
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.

fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public class NativeInsertStmt extends InsertStmt {
167167

168168
boolean hasEmptyTargetColumns = false;
169169
private boolean allowAutoPartition = true;
170-
private boolean withAutoDetectOverwrite = false;
171170

172171
enum InsertType {
173172
NATIVE_INSERT("insert_"),
@@ -333,11 +332,6 @@ public boolean isTransactionBegin() {
333332
return isTransactionBegin;
334333
}
335334

336-
public NativeInsertStmt withAutoDetectOverwrite() {
337-
this.withAutoDetectOverwrite = true;
338-
return this;
339-
}
340-
341335
protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
342336
super.analyze(analyzer);
343337

fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,10 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) {
611611
LogicalPlan plan = visitQuery(ctx.query());
612612
// partitionSpec may be NULL. means auto detect partition. only available when IOT
613613
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
614+
// partitionSpec.second :
615+
// null - auto detect
616+
// zero - whole table
617+
// others - specific partitions
614618
boolean isAutoDetect = partitionSpec.second == null;
615619
LogicalSink<?> sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
616620
tableName.build(),

0 commit comments

Comments
 (0)