Skip to content

Commit 7d4ce60

Browse files
committed
[improvement](cloud) Accelerate creating table by batching RPC
fix fix feut fix fix fix fe merge rpc
1 parent 89265cf commit 7d4ce60

File tree

20 files changed

+437
-47
lines changed

20 files changed

+437
-47
lines changed

cloud/src/common/bvars.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"
7979
BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status");
8080
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status");
8181

82+
BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
83+
8284
// txn_kv's bvars
8385
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
8486
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");

cloud/src/common/bvars.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
174174
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
175175
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
176176
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
177+
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
177178

178179
// txn_kv's bvars
179180
extern bvar::LatencyRecorder g_bvar_txn_kv_get;

cloud/src/meta-service/meta_service.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ class MetaServiceImpl : public cloud::MetaService {
145145
void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request,
146146
IndexResponse* response, ::google::protobuf::Closure* done) override;
147147

148+
void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
149+
CheckKVResponse* response, ::google::protobuf::Closure* done) override;
150+
148151
void prepare_partition(::google::protobuf::RpcController* controller,
149152
const PartitionRequest* request, PartitionResponse* response,
150153
::google::protobuf::Closure* done) override;
@@ -280,6 +283,13 @@ class MetaServiceImpl : public cloud::MetaService {
280283
const AlterInstanceRequest* request,
281284
std::function<std::pair<MetaServiceCode, std::string>(InstanceInfoPB*)> action);
282285

286+
using check_create_table_type = std::function<const std::tuple<
287+
const ::google::protobuf::RepeatedField<int64_t>, std::string,
288+
std::function<std::string(std::string, int64_t)>>(const CheckKVRequest* request)>;
289+
void check_create_table(std::string instance_id, const CheckKVRequest* request,
290+
CheckKVResponse* response, MetaServiceCode* code, std::string* msg,
291+
check_create_table_type get_check_info);
292+
283293
std::shared_ptr<TxnKv> txn_kv_;
284294
std::shared_ptr<ResourceManager> resource_mgr_;
285295
std::shared_ptr<RateLimiter> rate_limiter_;
@@ -426,6 +436,11 @@ class MetaServiceProxy final : public MetaService {
426436
call_impl(&cloud::MetaService::drop_index, controller, request, response, done);
427437
}
428438

439+
void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
440+
CheckKVResponse* response, ::google::protobuf::Closure* done) override {
441+
call_impl(&cloud::MetaService::check_kv, controller, request, response, done);
442+
}
443+
429444
void prepare_partition(::google::protobuf::RpcController* controller,
430445
const PartitionRequest* request, PartitionResponse* response,
431446
::google::protobuf::Closure* done) override {

cloud/src/meta-service/meta_service_partition.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ static TxnErrorCode index_exists(Transaction* txn, const std::string& instance_i
6767
return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND;
6868
}
6969

70+
static TxnErrorCode check_recycle_key_exist(Transaction* txn, const std::string& key) {
71+
std::string val;
72+
return txn->get(key, &val);
73+
}
74+
7075
void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controller,
7176
const IndexRequest* request, IndexResponse* response,
7277
::google::protobuf::Closure* done) {
@@ -614,4 +619,90 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
614619
}
615620
}
616621

622+
void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVRequest* request,
623+
CheckKVResponse* response, MetaServiceCode* code,
624+
std::string* msg, check_create_table_type get_check_info) {
625+
std::unique_ptr<Transaction> txn;
626+
TxnErrorCode err = txn_kv_->create_txn(&txn);
627+
if (err != TxnErrorCode::TXN_OK) {
628+
*code = cast_as<ErrCategory::READ>(err);
629+
*msg = "failed to create txn";
630+
return;
631+
}
632+
auto& [keys, hint, key_func] = get_check_info(request);
633+
634+
if (keys.empty()) {
635+
*code = MetaServiceCode::INVALID_ARGUMENT;
636+
*msg = "empty partition_ids";
637+
return;
638+
}
639+
640+
for (auto id : keys) {
641+
auto key = key_func(instance_id, id);
642+
err = check_recycle_key_exist(txn.get(), key);
643+
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
644+
continue;
645+
} else if (err == TxnErrorCode::TXN_OK) {
646+
// find not match, prepare commit
647+
*code = MetaServiceCode::INVALID_ARGUMENT;
648+
*msg = "prepare and commit rpc not match, recycle key remained";
649+
return;
650+
} else {
651+
// err != TXN_OK, fdb read err
652+
*code = MetaServiceCode::INVALID_ARGUMENT;
653+
*msg = "ms read key error";
654+
return;
655+
}
656+
}
657+
LOG_INFO("check {} success request={}", hint, request->ShortDebugString());
658+
return;
659+
}
660+
661+
void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller,
662+
const CheckKVRequest* request, CheckKVResponse* response,
663+
::google::protobuf::Closure* done) {
664+
RPC_PREPROCESS(check_kv);
665+
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
666+
if (instance_id.empty()) {
667+
code = MetaServiceCode::INVALID_ARGUMENT;
668+
msg = "empty instance_id";
669+
return;
670+
}
671+
if (!request->has_op()) {
672+
code = MetaServiceCode::INVALID_ARGUMENT;
673+
msg = "op not given";
674+
return;
675+
}
676+
if (!request->has_check_keys()) {
677+
code = MetaServiceCode::INVALID_ARGUMENT;
678+
msg = "empty check keys";
679+
return;
680+
}
681+
RPC_RATE_LIMIT(check_kv);
682+
switch (request->op()) {
683+
case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: {
684+
check_create_table(
685+
instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) {
686+
return std::make_tuple(request->check_keys().index_ids(), "index",
687+
[](std::string instance_id, int64_t id) {
688+
return recycle_index_key({instance_id, id});
689+
});
690+
});
691+
break;
692+
}
693+
case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: {
694+
check_create_table(
695+
instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) {
696+
return std::make_tuple(request->check_keys().partition_ids(), "partition",
697+
[](std::string instance_id, int64_t id) {
698+
return recycle_partition_key({instance_id, id});
699+
});
700+
});
701+
break;
702+
}
703+
default:
704+
DCHECK(false);
705+
};
706+
}
707+
617708
} // namespace doris::cloud

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
475475
DynamicPartitionUtil.checkAlterAllowed(
476476
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
477477
}
478-
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause);
478+
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true);
479479
} else if (alterClause instanceof AddPartitionLikeClause) {
480480
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
481481
DynamicPartitionUtil.checkAlterAllowed(

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3232,8 +3232,23 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio
32323232
getInternalCatalog().createTableAsSelect(stmt);
32333233
}
32343234

3235-
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
3236-
getInternalCatalog().addPartition(db, tableName, addPartitionClause);
3235+
/**
3236+
* Adds a partition to a table
3237+
*
3238+
* @param db
3239+
* @param tableName
3240+
* @param addPartitionClause clause in the CreateTableStmt
3241+
* @param isCreateTable this call is for creating table
3242+
* @param generatedPartitionId the preset partition id for the partition to add
3243+
* @param writeEditLog whether to write an edit log for this addition
3244+
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
3245+
* @throws DdlException
3246+
*/
3247+
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
3248+
boolean isCreateTable, long generatedPartitionId,
3249+
boolean writeEditLog) throws DdlException {
3250+
return getInternalCatalog().addPartition(db, tableName, addPartitionClause,
3251+
isCreateTable, generatedPartitionId, writeEditLog);
32373252
}
32383253

32393254
public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)

fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.doris.catalog.Env;
3434
import org.apache.doris.catalog.HashDistributionInfo;
3535
import org.apache.doris.catalog.MTMV;
36+
import org.apache.doris.catalog.MetaIdGenerator;
3637
import org.apache.doris.catalog.OlapTable;
3738
import org.apache.doris.catalog.Partition;
3839
import org.apache.doris.catalog.PartitionItem;
@@ -45,11 +46,14 @@
4546
import org.apache.doris.common.FeConstants;
4647
import org.apache.doris.common.Pair;
4748
import org.apache.doris.common.util.AutoBucketUtils;
49+
import org.apache.doris.common.util.DebugPointUtil;
4850
import org.apache.doris.common.util.DynamicPartitionUtil;
4951
import org.apache.doris.common.util.MasterDaemon;
5052
import org.apache.doris.common.util.PropertyAnalyzer;
5153
import org.apache.doris.common.util.RangeUtils;
5254
import org.apache.doris.common.util.TimeUtils;
55+
import org.apache.doris.datasource.InternalCatalog;
56+
import org.apache.doris.persist.PartitionPersistInfo;
5357
import org.apache.doris.thrift.TStorageMedium;
5458

5559
import com.google.common.base.Strings;
@@ -71,6 +75,7 @@
7175
import java.util.List;
7276
import java.util.Map;
7377
import java.util.Set;
78+
import java.util.stream.Collectors;
7479

7580
/**
7681
* This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties
@@ -582,9 +587,60 @@ private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartiti
582587
}
583588

584589
if (!skipAddPartition) {
585-
for (AddPartitionClause addPartitionClause : addPartitionClauses) {
590+
// get partitionIds and indexIds
591+
List<Long> indexIds = new ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet());
592+
List<Long> generatedPartitionIds = new ArrayList<>();
593+
if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) {
594+
AddPartitionClause addPartitionClause = addPartitionClauses.get(0);
595+
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
586596
try {
587-
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClause);
597+
DistributionInfo distributionInfo = distributionDesc
598+
.toDistributionInfo(olapTable.getBaseSchema());
599+
if (distributionDesc == null) {
600+
distributionInfo = olapTable.getDefaultDistributionInfo()
601+
.toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema());
602+
}
603+
long allPartitionBufferSize = 0;
604+
for (int i = 0; i < addPartitionClauses.size(); i++) {
605+
long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(),
606+
distributionInfo.getBucketNum(),
607+
addPartitionClause.getSingeRangePartitionDesc()
608+
.getReplicaAlloc().getTotalReplicaNum(),
609+
db, tableName);
610+
allPartitionBufferSize += bufferSize;
611+
}
612+
MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv()
613+
.getIdGeneratorBuffer(allPartitionBufferSize);
614+
addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId()));
615+
// executeFirstTime true
616+
Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(),
617+
generatedPartitionIds, indexIds, true);
618+
} catch (Exception e) {
619+
LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}",
620+
db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage());
621+
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
622+
throw new DdlException("cloud in prepare step err");
623+
}
624+
}
625+
626+
List<PartitionPersistInfo> partsInfo = new ArrayList<>();
627+
for (int i = 0; i < addPartitionClauses.size(); i++) {
628+
try {
629+
boolean needWriteEditLog = true;
630+
// ATTN: !executeFirstTime, needWriteEditLog
631+
// here executeFirstTime is create table, so in cloud edit log will postpone
632+
if (Config.isCloudMode()) {
633+
needWriteEditLog = !executeFirstTime;
634+
}
635+
PartitionPersistInfo info =
636+
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
637+
executeFirstTime,
638+
executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0,
639+
needWriteEditLog);
640+
if (info == null) {
641+
throw new Exception("null persisted partition returned");
642+
}
643+
partsInfo.add(info);
588644
clearCreatePartitionFailedMsg(olapTable.getId());
589645
} catch (Exception e) {
590646
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
@@ -594,6 +650,55 @@ private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartiti
594650
}
595651
}
596652
}
653+
List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo
654+
-> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
655+
if (executeFirstTime && Config.isCloudMode() && !addPartitionClauses.isEmpty()) {
656+
try {
657+
// ATTN: failedPids = generatedPartitionIds - succeedPartitionIds,
658+
// means some partitions failed when addPartition, failedPids will be recycled by recycler
659+
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) {
660+
LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e");
661+
// not commit, not log edit
662+
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
663+
}
664+
Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(),
665+
succeedPartitionIds, indexIds, true);
666+
LOG.info("begin write edit log to add partitions in batch, "
667+
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
668+
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
669+
// ATTN: here, edit log must after commit cloud partition,
670+
// prevent commit RPC failure from causing data loss
671+
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) {
672+
LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
673+
// committed, but not log edit
674+
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
675+
}
676+
for (int i = 0; i < partsInfo.size(); i++) {
677+
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
678+
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
679+
if (i == partsInfo.size() / 2) {
680+
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
681+
// committed, but log some edit, others failed
682+
throw new Exception("debug point FE.DynamicPartitionScheduler"
683+
+ ".in.commitCloudPartition");
684+
}
685+
}
686+
}
687+
LOG.info("finish write edit log to add partitions in batch, "
688+
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
689+
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
690+
} catch (Exception e) {
691+
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}",
692+
db.getFullName(), tableName, olapTable.getId(), e.getMessage());
693+
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
694+
throw new DdlException("cloud in commit step err");
695+
}
696+
}
697+
// cloud mode, check recycle key not remained
698+
if (Config.isCloudMode() && executeFirstTime) {
699+
Env.getCurrentInternalCatalog().checkCreatePartitions(db.getId(), olapTable.getId(),
700+
succeedPartitionIds, indexIds);
701+
}
597702
}
598703
}
599704
}

0 commit comments

Comments
 (0)