-
Notifications
You must be signed in to change notification settings - Fork 678
[CORE-1620] Migrate ntp_archiver
uploads to segment_collector_stream
interface
#25951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-1620] Migrate ntp_archiver
uploads to segment_collector_stream
interface
#25951
Conversation
🎉 Snyk checks have passed. No issues have been found so far.✅ security/snyk check is complete. No issues have been found. (View Details) ✅ license/snyk check is complete. No issues have been found. (View Details) |
🎉 Snyk checks have passed. No issues have been found so far.✅ security/snyk check is complete. No issues have been found. (View Details) ✅ license/snyk check is complete. No issues have been found. (View Details) |
/dt |
Retry command for Build#65139please wait until all jobs are finished before running the slash command
|
CI test resultstest results on build#65139
test results on build#65155
test results on build#65423
test results on build#65442
test results on build#65485
test results on build#65649
test results on build#65965
test results on build#66000
test results on build#66029
test results on build#66317
test results on build#66505
test results on build#66906
test results on build#67368
|
/dt |
c1963fb
to
a3928ed
Compare
/dt |
Retry command for Build#65155please wait until all jobs are finished before running the slash command
|
a3928ed
to
0f7e1cb
Compare
/dt |
🎉 Snyk checks have passed. No issues have been found so far.✅ security/snyk check is complete. No issues have been found. (View Details) ✅ license/snyk check is complete. No issues have been found. (View Details) |
0f7e1cb
to
08a1cac
Compare
/ci-repeat 1 |
08a1cac
to
9fc4458
Compare
ntp_archiver
uploads to segment_collector_stream
interface
@Lazin FYI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR migrates NTP uploads to the new segment_collector_stream interface and updates associated types and APIs. Key changes include updating function signatures to use new stream‐based types, renaming and refactoring functions in archival_policy and ntp_archiver, and switching partition pointer types from ss::lw_shared_ptr to raw pointers in async uploader files.
Reviewed Changes
Copilot reviewed 14 out of 16 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
tests/rptest/utils/si_utils.py | Enhanced assertion message to include restored_ntps for better debugging. |
src/v/cluster/archival/tests/async_data_uploader_test.cc | Updated get_test_partition() to explicitly obtain the partition via get(). |
src/v/cluster/archival/segment_reupload.h | Introduced new types, parameters, and constructors for reupload candidates and adjusted function signatures. |
src/v/cluster/archival/ntp_archiver_service.h | Modified parameter types (from candidate to stream) and renamed internal functions to support the new streaming interface. |
src/v/cluster/archival/async_data_uploader.h/.cc | Changed partition argument types from smart pointers to raw pointers. |
src/v/cluster/archival/archival_policy.h | Renamed candidate retrieval functions to better reflect their behavior. |
src/v/cluster/archival/adjacent_segment_merger.cc | Updated logging and offset computations to use new stream fields. |
Files not reviewed (2)
- src/v/cluster/archival/tests/BUILD: Language not supported
- src/v/cluster/archival/tests/CMakeLists.txt: Language not supported
Comments suppressed due to low confidence (4)
src/v/cluster/archival/segment_reupload.h:245
- The signature of find_replacement_boundary now requires a mode parameter; please update the related comments or documentation to clarify how this parameter influences the replacement boundary computation.
model::offset find_replacement_boundary(segment_collector_mode mode) const;
src/v/cluster/archival/ntp_archiver_service.h:455
- The function do_upload_local now accepts a segment_collector_stream instead of an upload_candidate; please update the function documentation to reflect this interface change.
ss::future<bool> do_upload_local(archival_stm_fence fence, segment_collector_stream strm, std::optional<std::reference_wrapper<retry_chain_node>> source_rtc);
src/v/cluster/archival/archival_policy.h:36
- [nitpick] The function name changes from get_next_candidate to get_next_compacted_segment (and subsequently to get_next_segment) may cause confusion; updating the inline documentation to explain the distinctions between these methods is recommended.
ss::future<segment_collector_stream_result> get_next_compacted_segment(
src/v/cluster/archival/adjacent_segment_merger.cc:222
- [nitpick] The offset field used for computing the next offset has changed from candidate.final_offset to locks.end_offset; verify that all consumers of this value correctly interpret the new field and update related comments if necessary.
auto next = model::next_offset(find_res.locks.value().end_offset);
static ss::future<result<std::unique_ptr<segment_upload>>> | ||
make_segment_upload( | ||
ss::lw_shared_ptr<cluster::partition> part, | ||
cluster::partition* part, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type change from ss::lw_shared_ptrcluster::partition to raw pointer requires careful lifetime management; please ensure that the caller guarantees the partition object's validity throughout the segment_upload usage.
cluster::partition* part, | |
ss::lw_shared_ptr<cluster::partition> part, |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad bot
9fc4458
to
fe99ed0
Compare
Retry command for Build#65649please wait until all jobs are finished before running the slash command
|
196faae
to
d06aa20
Compare
Retry command for Build#65954please wait until all jobs are finished before running the slash command
|
d06aa20
to
d704fc6
Compare
/ci-repeat 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the upload path looks way more cleaner than ever before
// background operation. We can't background it as is because the | ||
// 'upload_index' call is taking 'rtc' as a reference. So there should be | ||
// some wrapper for this call. | ||
// QUESTION(oren): if we did background this, where would the future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it can be wrapped using the ssx::spawn_with_gate
and the rtc could be captured, or something similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm going to punt this to a followup. A bunch of unit tests need small changes and the rtc accounting is a bit of a pain since they're neither movable nor copyable as written. Better as an isolated change IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess we could use a lw_shared_ptr<retry_chain_node>
w/o issue. point stands about test changes though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the followup is OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's also not super important, I think that previously it was not running in the background
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously it was not running in the background
correct. seems like a sensible change to me though 🙂
auto lazy_abort = lazy_abort_source{ | ||
[this]() { return upload_should_abort(); }}; | ||
auto stream = strm.create_input_stream(); | ||
auto [upload_stream, indexing_stream] = input_stream_fanout<2>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not right now but at some point we should get rid of this.
Currently, the byte stream is split into two parts. One is uploaded and another one is parsed to create an index.
But after conversion to storage::log_reader
it will make no sense at all. We will get record batches from the log reader and then we will serialize them to upload. Somewhere in the middle we can build the index state incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. the way i have this organized in #26099 has this logic staying largely the same but with a log_reader
feeding the input stream rather than a concat_segment_reader_view
. Is there a clear disadvantage to doing it this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to do it this way, the disadvantage is that we need to serialize the data which is comming from the log_reader first, and then feed this data to the fanout stream, then one of the branches of the fanout stream deserializes it (only headers but anyway). We can build the index using the data before the serialization instead. It will be more efficient. But it's not necessary for correctness.
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
The ntp_archiver doesn't store the smart pointer so there is no point to pass smart pointer into the async_data_uploader if async_data_uploader is invoked by the ntp_archiver. Signed-off-by: Evgeny Lazin <[email protected]> Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
…tream Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Mostly related to upload_candidates and whatnot Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
546afcf
to
11d1141
Compare
force push to fix up some stale comments and rebase dev to fix merge conflict |
A previous PR[1] refactored upload_segment and in so doing added retry_strategy::disallow to the retry_chain_node governing the upload. As a result, we saw an uptick in - 'cloud_storage_failed_uploads' - 'cloud_storage_bytes_sent' - 'io_queue_total_read_bytes' Along with an increased incidence of 'backoff quota exceeded' logs from cloud_io. Specifically cloud_io::remote::upload_stream increments 'cloud_storage_failed_uploads' in a number of failure cases, but along with the increased frequency of the backoff quota log line, it's likely bordering on a certainty that the uptick in failures is a result of rtc node retries being exhausted as a result of transient errors that would previously be masked by retry logic inside cloud_io. For now, we should return to the default retry strategy (exponential backoff) and assess the viability of offloading retries to the archival loop at a later time. [1] redpanda-data#25951 Signed-off-by: Oren Leiman <[email protected]>
A previous PR[1] refactored upload_segment and in so doing added retry_strategy::disallow to the retry_chain_node governing the upload. As a result, we saw an uptick in - 'cloud_storage_failed_uploads' - 'cloud_storage_bytes_sent' - 'io_queue_total_read_bytes' Along with an increased incidence of 'backoff quota exceeded' logs from cloud_io. Specifically cloud_io::remote::upload_stream increments 'cloud_storage_failed_uploads' in a number of failure cases, but along with the increased frequency of the backoff quota log line, it's likely bordering on a certainty that the uptick in failures is a result of rtc node retries being exhausted as a result of transient errors that would previously be masked by retry logic inside cloud_io. For now, we should return to the default retry strategy (exponential backoff) and assess the viability of offloading retries to the archival loop at a later time. [1] redpanda-data#25951 Signed-off-by: Oren Leiman <[email protected]>
A previous PR[1] refactored upload_segment and in so doing added retry_strategy::disallow to the retry_chain_node governing the upload. As a result, we saw an uptick in - 'cloud_storage_failed_uploads' - 'cloud_storage_bytes_sent' - 'io_queue_total_read_bytes' Along with an increased incidence of 'backoff quota exceeded' logs from cloud_io. Specifically cloud_io::remote::upload_stream increments 'cloud_storage_failed_uploads' in a number of failure cases, but along with the increased frequency of the backoff quota log line, it's likely bordering on a certainty that the uptick in failures is a result of rtc node retries being exhausted as a result of transient errors that would previously be masked by retry logic inside cloud_io. For now, we should return to the default retry strategy (exponential backoff) and assess the viability of offloading retries to the archival loop at a later time. [1] redpanda-data#25951 Signed-off-by: Oren Leiman <[email protected]> (cherry picked from commit a9dea64)
In a previous PR[1] we began to rely on the archiver loop to retry, and moved away from relying on `cloud_io::remote` for retries in two ways: 1. setting an explicit `disallow` retry policy on the retry node passed to the remote, and 2. setting the `max_retries` passed to `remote::upload_segment()` to 1. In practice, we saw that _not_ relying on the remote resulted in an uptick in the `vectorized_cloud_storage_failed_uploads` metric, which is monitored and alerted on. In [2] we reverted #1, but didn't notice #2. This commit reverts #2. [1] redpanda-data#25951 [2] redpanda-data#26969
In a previous PR[1] we began to rely on the archiver loop to retry, and moved away from relying on `cloud_io::remote` for retries in two ways: 1. setting an explicit `disallow` retry policy on the retry node passed to the remote, and 2. setting the `max_retries` passed to `remote::upload_segment()` to 1. In practice, we saw that _not_ relying on the remote resulted in an uptick in the `vectorized_cloud_storage_failed_uploads` metric, which is monitored and alerted on. In [2] we reverted #1, but didn't notice #2. This commit reverts #2. [1] redpanda-data#25951 [2] redpanda-data#26969
In a previous PR[1] we began to rely on the archiver loop to retry, and moved away from relying on `cloud_io::remote` for retries in two ways: 1. setting an explicit `disallow` retry policy on the retry node passed to the remote, and 2. setting the `max_retries` passed to `remote::upload_segment()` to 1. In practice, we saw that _not_ relying on the remote resulted in an uptick in the `vectorized_cloud_storage_failed_uploads` metric, which is monitored and alerted on. In [2] we reverted redpanda-data#1, but didn't notice redpanda-data#2. This commit reverts redpanda-data#2. [1] redpanda-data#25951 [2] redpanda-data#26969 (cherry picked from commit 7f409da)
Rather than a collection of segments for reading,
segment_collector
produces asegment_collector_stream
struct. Includes corresponding changes to ntp_archiver to upload from one of these.Backports Required
Release Notes