Skip to content

Commit d187d42

Browse files
Raft: Increase max retry times to avoid too large remote requests (#10301) (#10307)
close #10300 Raft: Increase max retry times to avoid too large remote requests * Increase the max retry number between LearnerRead and acquiring snapshot from the storage layer by the number of query regions Signed-off-by: ti-chi-bot <[email protected]> Signed-off-by: JaySon-Huang <[email protected]> Co-authored-by: JaySon <[email protected]> Co-authored-by: JaySon-Huang <[email protected]>
1 parent 6c4f337 commit d187d42

File tree

2 files changed

+60
-9
lines changed

2 files changed

+60
-9
lines changed

dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -945,15 +945,12 @@ bool DAGStorageInterpreter::checkRetriableForBatchCopOrMPP(
945945
const TableID & table_id,
946946
const SelectQueryInfo & query_info,
947947
const RegionException & e,
948-
int num_allow_retry)
948+
const Int32 num_allow_retry)
949949
{
950950
const DAGContext & dag_context = *context.getDAGContext();
951951
assert((dag_context.isBatchCop() || dag_context.isMPPTask()));
952952
const auto & dag_regions = dag_context.getTableRegionsInfoByTableID(table_id).local_regions;
953953
FmtBuffer buffer;
954-
// Normally there is only few regions need to retry when super batch is enabled. Retry to read
955-
// from local first. However, too many retry in different places may make the whole process
956-
// time out of control. We limit the number of retries to 1 now.
957954
if (likely(num_allow_retry > 0))
958955
{
959956
auto & regions_query_info = query_info.mvcc_query_info->regions_query_info;
@@ -967,16 +964,22 @@ bool DAGStorageInterpreter::checkRetriableForBatchCopOrMPP(
967964
region_retry_from_local_region.emplace_back(region_iter->second);
968965
buffer.fmtAppend("{},", region_iter->first);
969966
}
967+
// remove the unavailable region for next local read attempt
970968
iter = regions_query_info.erase(iter);
971969
}
972970
else
973971
{
974972
++iter;
975973
}
976974
}
975+
// `tot_num_remote_region` is the total number of regions that we will retry from other tiflash nodes among all retries
976+
// `current_retry_regions` is the number of regions that we will retry from other tiflash nodes in this retry
977977
LOG_WARNING(
978978
log,
979-
"RegionException after read from storage, regions [{}], message: {}{}",
979+
"RegionException after read from storage, tot_num_remote_region={} cur_retry_regions={}"
980+
" regions [{}], message: {}{}",
981+
region_retry_from_local_region.size(),
982+
e.unavailable_region.size(),
980983
buffer.toString(),
981984
e.message(),
982985
(regions_query_info.empty() ? "" : ", retry to read from local"));
@@ -996,15 +999,44 @@ bool DAGStorageInterpreter::checkRetriableForBatchCopOrMPP(
996999
buffer.fmtAppend("{},", iter->first);
9971000
}
9981001
}
1002+
// `tot_num_remote_region` is the total number of regions that we will retry from other tiflash nodes among all retries
1003+
// `current_retry_regions` is the number of regions that we will retry from other tiflash nodes in this retry
9991004
LOG_WARNING(
10001005
log,
1001-
"RegionException after read from storage, regions [{}], message: {}",
1006+
"RegionException after read from storage, tot_num_remote_region={} cur_retry_regions={}"
1007+
" regions [{}], message: {}",
1008+
region_retry_from_local_region.size(),
1009+
e.unavailable_region.size(),
10021010
buffer.toString(),
10031011
e.message());
10041012
return false; // break retry loop
10051013
}
10061014
}
10071015

1016+
namespace
1017+
{
1018+
Int32 getMaxAllowRetryForLocalRead(const SelectQueryInfo & query_info)
1019+
{
1020+
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
1021+
if (region_num > 1000)
1022+
{
1023+
// 1000 regions is about 93GB for 96MB region size / 250GB for 256MB region size.
1024+
return 10;
1025+
}
1026+
else if (region_num > 500)
1027+
{
1028+
// 500 regions is about 46.5GB for 96MB region size / 125GB for 256MB region size.
1029+
return 8;
1030+
}
1031+
else if (region_num > 100)
1032+
{
1033+
// 100 regions is about 9.3GB for 96MB region size / 25GB for 256MB region size.
1034+
return 5;
1035+
}
1036+
return 1;
1037+
}
1038+
} // namespace
1039+
10081040
DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
10091041
const TableID & table_id,
10101042
const SelectQueryInfo & query_info,
@@ -1021,7 +1053,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
10211053

10221054
const DAGContext & dag_context = *context.getDAGContext();
10231055
const auto keyspace_id = dag_context.getKeyspaceID();
1024-
for (int num_allow_retry = 1; num_allow_retry >= 0; --num_allow_retry)
1056+
// Normally there is only few regions need to retry when super batch is enabled. Retry to read
1057+
// from local first.
1058+
// When the table is large and too hot for writing, the number of regions may be large
1059+
// and region split is frequent. In this case, we allow more retries for building
1060+
// inputstream from local in order to avoid large number of RemoteRead requests.
1061+
// However, too many retry may make the whole execution time out of control.
1062+
Int32 num_allow_retry = getMaxAllowRetryForLocalRead(query_info);
1063+
for (; num_allow_retry >= 0; --num_allow_retry)
10251064
{
10261065
try
10271066
{
@@ -1063,7 +1102,7 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
10631102
// clean all streams from local because we are not sure the correctness of those streams
10641103
pipeline.streams.clear();
10651104
if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry)))
1066-
continue;
1105+
continue; // next retry to read from local storage
10671106
else
10681107
break;
10691108
}

dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,19 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
349349
|| (file->getId() != (*old_persisted_files_iter)->getId())
350350
|| (file->getRows() != (*old_persisted_files_iter)->getRows())))
351351
{
352-
throw Exception("Compaction algorithm broken", ErrorCodes::LOGICAL_ERROR);
352+
throw Exception(
353+
ErrorCodes::LOGICAL_ERROR,
354+
"Compaction algorithm broken, "
355+
"compaction={{{}}} persisted_files={} "
356+
"old_persisted_files_iter.is_end={} "
357+
"file->getId={} old_persist_files->getId={} file->getRows={} old_persist_files->getRows={}",
358+
compaction->info(),
359+
detailInfo(),
360+
old_persisted_files_iter == persisted_files.end(),
361+
file->getId(),
362+
old_persisted_files_iter == persisted_files.end() ? -1 : (*old_persisted_files_iter)->getId(),
363+
file->getRows(),
364+
old_persisted_files_iter == persisted_files.end() ? -1 : (*old_persisted_files_iter)->getRows());
353365
}
354366
old_persisted_files_iter++;
355367
}

0 commit comments

Comments
 (0)