Skip to content

Commit ef808fc

Browse files
authored
Raft: Backport FAP enhancement (#10208)
close #8673 Signed-off-by: JaySon-Huang <[email protected]>
1 parent edcbbfc commit ef808fc

File tree

13 files changed

+304
-54
lines changed

13 files changed

+304
-54
lines changed

dbms/src/Common/ProfileEvents.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
M(S3PutDMFile) \
145145
M(S3PutDMFileRetry) \
146146
M(S3WriteDMFileBytes) \
147+
M(S3PageReaderRead) \
147148
M(DTDeltaIndexError)
148149

149150
namespace ProfileEvents

dbms/src/Storages/DeltaMerge/DMContext.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <Common/config.h> // for ENABLE_NEXT_GEN
1516
#include <Interpreters/Context.h>
1617
#include <Storages/DeltaMerge/DMContext.h>
1718
#include <Storages/DeltaMerge/ScanContext.h>
@@ -67,6 +68,11 @@ DMContext::DMContext(
6768
, read_stable_only(settings.dt_read_stable_only)
6869
, enable_relevant_place(settings.dt_enable_relevant_place)
6970
, enable_skippable_place(settings.dt_enable_skippable_place)
71+
#if ENABLE_NEXT_GEN
72+
, fap_use_segment_to_end_map_cache(settings.fap_use_segment_to_end_map_cache)
73+
#else
74+
, fap_use_segment_to_end_map_cache(true)
75+
#endif
7076
, tracing_id(tracing_id_)
7177
, scan_context(scan_context_ ? scan_context_ : std::make_shared<ScanContext>())
7278
{}

dbms/src/Storages/DeltaMerge/DMContext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ struct DMContext : private boost::noncopyable
9999
const bool read_stable_only;
100100
const bool enable_relevant_place;
101101
const bool enable_skippable_place;
102+
const bool fap_use_segment_to_end_map_cache;
102103

103104
String tracing_id;
104105

dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1281,8 +1281,8 @@ UInt64 DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
12811281

12821282
auto restored_segments = checkpoint_info->getRestoredSegments();
12831283
auto updated_segments = ingestSegmentsUsingSplit(dm_context, range, restored_segments);
1284-
size_t estimated_bytes = 0;
12851284

1285+
size_t estimated_bytes = 0;
12861286
for (const auto & segment : restored_segments)
12871287
{
12881288
estimated_bytes += segment->getEstimatedBytes();

dbms/src/Storages/DeltaMerge/Segment.cpp

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
#include <Storages/KVStore/TMTContext.h>
6767
#include <Storages/KVStore/Utils/AsyncTasks.h>
6868
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
69+
#include <Storages/Page/V3/Universal/S3PageReader.h>
6970
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
7071
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
7172
#include <Storages/PathPool.h>
@@ -477,7 +478,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
477478

478479
auto end_to_segment_id_cache = checkpoint_info->checkpoint_data_holder->getEndToSegmentIdCache(
479480
KeyspaceTableID{context.keyspace_id, context.physical_table_id});
480-
481+
bool use_cache = context.fap_use_segment_to_end_map_cache;
481482
// Protected by whatever lock.
482483
auto build_segments = [&](bool is_cache_ready, PageIdU64 current_segment_id)
483484
-> std::optional<std::pair<std::vector<std::pair<DM::RowKeyValue, UInt64>>, SegmentMetaInfos>> {
@@ -490,19 +491,34 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
490491
// The map is used to build cache.
491492
std::vector<std::pair<DM::RowKeyValue, UInt64>> end_key_and_segment_ids;
492493
SegmentMetaInfos segment_infos;
494+
ReadBufferFromRandomAccessFilePtr reusable_buf = nullptr;
495+
size_t total_processed_segments = 0;
496+
size_t total_skipped_segments = 0;
497+
PS::V3::S3PageReader::ReuseStatAgg reused_agg;
498+
// TODO If the regions are added in a slower rate, the cache may not be reused even if the TiFlash region replicas are always added in one table as a whole.
499+
// This is because later added regions could use later checkpoints. So, there could be another optimization to avoid generating the cache.
493500
while (current_segment_id != 0)
494501
{
495502
if (cancel_handle->isCanceled())
496503
{
497-
LOG_INFO(log, "FAP is canceled when building segments, built={}", end_key_and_segment_ids.size());
504+
LOG_INFO(
505+
log,
506+
"FAP is canceled when building segments, built={}, total_processed_segments={} "
507+
"total_skipped_segments={} reused_agg={}",
508+
end_key_and_segment_ids.size(),
509+
total_processed_segments,
510+
total_skipped_segments,
511+
reused_agg.toString());
498512
// FAP task would be cleaned in FastAddPeerImplWrite. So returning empty result is OK.
499513
return std::nullopt;
500514
}
501515
Segment::SegmentMetaInfo segment_info;
502516
auto target_id = UniversalPageIdFormat::toFullPageId(
503517
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Meta, context.physical_table_id),
504518
current_segment_id);
505-
auto page = checkpoint_info->temp_ps->read(target_id, nullptr, {}, false);
519+
PS::V3::S3PageReader::ReuseStat reason = PS::V3::S3PageReader::ReuseStat::Reused;
520+
auto page = checkpoint_info->temp_ps->read(target_id, nullptr, {}, false, reusable_buf, reason);
521+
reused_agg.observe(reason);
506522
if unlikely (!page.isValid())
507523
{
508524
// After #7642, DELTA_MERGE_FIRST_SEGMENT_ID may not exist, however, such checkpoint won't be selected.
@@ -519,6 +535,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
519535
readSegmentMetaInfo(buf, segment_info);
520536
if (!is_cache_ready)
521537
{
538+
FAIL_POINT_PAUSE(FailPoints::pause_when_building_fap_segments);
522539
end_key_and_segment_ids.emplace_back(
523540
segment_info.range.getEnd().toRowKeyValue(),
524541
segment_info.segment_id);
@@ -528,61 +545,95 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
528545
{
529546
segment_infos.emplace_back(segment_info);
530547
}
531-
// if not build cache, stop as early as possible.
532-
if (is_cache_ready && segment_info.range.end.value->compare(*target_range.end.value) >= 0)
548+
else
533549
{
534-
break;
550+
total_skipped_segments++;
535551
}
552+
if (segment_info.range.end.value->compare(*target_range.end.value) >= 0)
553+
{
554+
// if not build cache, stop as early as possible.
555+
if (is_cache_ready)
556+
break;
557+
}
558+
total_processed_segments++;
536559
}
560+
LOG_INFO(
561+
log,
562+
"Finish building segments, target_range={} infos_size={} total_processed_segments={} "
563+
"total_skipped_segments={} reused_agg={} use_cache={}",
564+
target_range.toDebugString(),
565+
segment_infos.size(),
566+
total_processed_segments,
567+
total_skipped_segments,
568+
reused_agg.toString(),
569+
use_cache);
537570
return std::make_pair(end_key_and_segment_ids, segment_infos);
538571
};
539572

573+
if (use_cache)
540574
{
541-
// If there is a table building cache, then other table may block to read the built cache.
542-
// If the remote reader causes much time to retrieve data, then these tasks could block here.
543-
// However, when the execlusive holder is canceled due to timeout, the readers could eventually get the lock.
544-
auto lock = end_to_segment_id_cache->writeLock();
545-
// - Set to `true`: The building task is done.
546-
// - Set to `false`: It is not build yet, or it is building.
547-
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
548-
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_wait_build)
549-
.Observe(sw.elapsedSecondsFromLastTime());
550-
551-
if (!is_cache_ready)
575+
LOG_DEBUG(log, "Start read all segments meta info by cache");
552576
{
553-
// We are the cache builder.
554-
FAIL_POINT_PAUSE(FailPoints::pause_when_building_fap_segments);
577+
// If there is a table building cache, then other table may block to read the built cache.
578+
// If the remote reader causes much time to retrieve data, then these tasks could block here.
579+
// However, when the exclusive holder is canceled due to timeout, the readers could eventually get the lock.
580+
auto lock = end_to_segment_id_cache->writeLock();
581+
// - Set to `true`: The building task is done.
582+
// - Set to `false`: It is not build yet, or it is building.
583+
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
584+
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_wait_build)
585+
.Observe(sw.elapsedSecondsFromLastTime());
555586

556-
auto res = build_segments(is_cache_ready, DELTA_MERGE_FIRST_SEGMENT_ID);
557-
// After all segments are scanned, we try to build a cache,
558-
// so other FAP tasks that share the same checkpoint could reuse the cache.
587+
if (!is_cache_ready)
588+
{
589+
// We are the cache builder.
590+
591+
auto res = build_segments(is_cache_ready, DELTA_MERGE_FIRST_SEGMENT_ID);
592+
// After all segments are scanned, we try to build a cache,
593+
// so other FAP tasks that share the same checkpoint could reuse the cache.
594+
if (!res)
595+
return {};
596+
auto & [end_key_and_segment_ids, segment_infos] = *res;
597+
LOG_DEBUG(
598+
log,
599+
"Segment meta info cache has been built, num_segments={}",
600+
end_key_and_segment_ids.size());
601+
end_to_segment_id_cache->build(lock, std::move(end_key_and_segment_ids));
602+
return std::move(segment_infos);
603+
}
604+
}
605+
{
606+
// If we found the cache is built, which could be normal cases when the checkpoint is reused.
607+
auto lock = end_to_segment_id_cache->readLock();
608+
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
609+
RUNTIME_CHECK(is_cache_ready, checkpoint_info->region_id, context.keyspace_id, context.physical_table_id);
610+
GET_METRIC(tiflash_fap_task_result, type_reuse_chkpt_cache).Increment();
611+
// ... then we could seek to `current_segment_id` in cache to avoid some read.
612+
auto current_segment_id
613+
= end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue());
614+
auto res = build_segments(is_cache_ready, current_segment_id);
559615
if (!res)
560616
return {};
561-
auto & [end_key_and_segment_ids, segment_infos] = *res;
562-
LOG_DEBUG(log, "Segment meta info cache has been built, num_segments={}", end_key_and_segment_ids.size());
563-
end_to_segment_id_cache->build(lock, std::move(end_key_and_segment_ids));
564-
return std::move(segment_infos);
617+
return std::move(res->second);
565618
}
566619
}
620+
else
567621
{
568-
// If we found the cache is built, which could be normal cases when the checkpoint is reused.
569-
auto lock = end_to_segment_id_cache->readLock();
570-
bool is_cache_ready = end_to_segment_id_cache->isReady(lock);
571-
RUNTIME_CHECK(is_cache_ready, checkpoint_info->region_id, context.keyspace_id, context.physical_table_id);
572-
GET_METRIC(tiflash_fap_task_result, type_reuse_chkpt_cache).Increment();
573-
// ... then we could seek to `current_segment_id` in cache to avoid some read.
574-
auto current_segment_id
575-
= end_to_segment_id_cache->getSegmentIdContainingKey(lock, target_range.getStart().toRowKeyValue());
576-
auto res = build_segments(is_cache_ready, current_segment_id);
622+
LOG_DEBUG(log, "Start read all segments meta info by direct");
623+
// Set `is_cache_ready == true` to let `build_segments` return once it finds all
624+
// overlapped segments
625+
auto res = build_segments(true, DELTA_MERGE_FIRST_SEGMENT_ID);
577626
if (!res)
578627
return {};
579-
return std::move(res->second);
628+
auto & [_end_key_and_segment_ids, segment_infos] = *res;
629+
UNUSED(_end_key_and_segment_ids);
630+
return std::move(segment_infos);
580631
}
581632

582633
if (cancel_handle->isCanceled())
583634
{
584635
LOG_INFO(log, "FAP is canceled when building segments");
585-
// FAP task would be cleaned in FastAddPeerImplWrite. So returning incompelete result could be OK.
636+
// FAP task would be cleaned in FastAddPeerImplWrite. So returning incomplete result could be OK.
586637
return {};
587638
}
588639
}

dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,14 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
275275
LOG_INFO(
276276
log,
277277
"Select checkpoint with data_seq={}, remote_store_id={} elapsed={} size(candidate_store_id)={} "
278-
"region_id={}",
278+
"region_id={} region={} range={}",
279279
data_seq,
280280
checkpoint_info->remote_store_id,
281281
watch.elapsedSeconds(),
282282
candidate_store_ids.size(),
283-
region_id);
283+
region_id,
284+
region->getDebugString(),
285+
region->getRange()->toDebugString());
284286
GET_METRIC(tiflash_fap_task_duration_seconds, type_select_stage).Observe(watch.elapsedSeconds());
285287
return maybe_region_info.value();
286288
}
@@ -376,6 +378,7 @@ FastAddPeerRes FastAddPeerImplWrite(
376378
DM::Segments segments;
377379
try
378380
{
381+
LOG_INFO(log, "FAP begins to build segments, range={}", new_key_range.toDebugString());
379382
segments = dm_storage->buildSegmentsFromCheckpointInfo(cancel_handle, new_key_range, checkpoint_info, settings);
380383
}
381384
catch (...)
@@ -857,6 +860,17 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
857860
// Otherwise, we will let the next polling do the check again.
858861
[[maybe_unused]] auto s = fap_ctx->tasks_trace->blockedCancelRunningTask(region_id);
859862
}
863+
// Previously, `fap_fallback_to_slow` could do the cleaning job,
864+
// however, it will not clean what's in memory.
865+
// So, if a msgSnapshot is sent, then a regular raft snapshot handling may be started,
866+
// and if the we failed to clean the fap snapshot by `fap_fallback_to_slow` before prehandling,
867+
// the process could panic.
868+
fap_ctx->cleanTask(
869+
*(server->tmt),
870+
server->proxy_helper,
871+
region_id,
872+
CheckpointIngestInfo::CleanReason::ProxyFallback);
873+
LOG_INFO(log, "Finished clean task from proxy region_id={} new_peer_id={}", region_id, new_peer_id);
860874
GET_METRIC(tiflash_fap_task_state, type_blocking_cancel_stage).Decrement();
861875
// Return Canceled because it is cancel from outside FAP worker.
862876
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);

dbms/src/Storages/KVStore/MultiRaft/Disagg/ServerlessUtils.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ String getCompactibleInnerKey(UniversalPageStoragePtr uni_ps, uint32_t keyspace_
148148
{
149149
LOG_INFO(
150150
DB::Logger::get(),
151-
"Failed to find compactible inner key, region_id={}, keyspace={}",
151+
"Failed to find compactible inner key, region_id={} keyspace={}",
152152
region_id,
153153
keyspace_id);
154154
}
@@ -214,7 +214,7 @@ String getCompactibleEncKey(UniversalPageStoragePtr uni_ps, uint32_t keyspace_id
214214
{
215215
LOG_INFO(
216216
DB::Logger::get(),
217-
"Failed to find compactible enc key, region_id={}, keyspace={}",
217+
"Failed to find compactible enc key, region_id={} keyspace={}",
218218
region_id,
219219
keyspace_id);
220220
}

0 commit comments

Comments
 (0)