Skip to content

Commit 709ea83

Browse files
committed
storage: fix race between segment.ms and appends
H/T to VladLazar for a similar change that inspired this one: VladLazar@682aea5 The problem statement: ``` We have seen a couple of races between the application of `segment.ms` and the normal append path. They had the following pattern in common: 1. application of `segment.ms` begins 2. a call to `segment::append` is interleaved 3. the append finishes first and and advances the dirty offsets, which the rolling logic in `segment.ms` does not expect -- or -- 4. `segment.ms` releases the current appender while the append is ongoing, which the append logic does not expect ``` The proposed fix was to introduce a new appender lock to the segment, and ensure that it is held while appending an while segment.ms rolling. This addressed problem #3, but wasn't sufficient to address redpanda-data#4. The issue with introducing another lock to the segment is that the unexpected behavior when appending to a segment happens in the context of an already referenced segment. I.e. the appending fiber may proceed to reference an appender, only for it to be destructed by the housekeeping fiber before segment::append() is called, resulting in a segfault. This patch extends usage of the existing disk_log_impl::_segments_rolling_lock to cover the entire duration of append (i.e. not just the underlying segment::append() call), ensuring that segment.ms rolls and appends are mutually exclusive. (cherry picked from commit 78a9749)
1 parent f83434c commit 709ea83

File tree

3 files changed

+58
-23
lines changed

3 files changed

+58
-23
lines changed

src/v/storage/disk_log_appender.cc

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,22 @@ ss::future<> disk_log_appender::initialize() {
5050
}
5151

5252
bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const {
53-
/**
54-
* _log._segs.empty() is a tricky condition. It is here to suppor concurrent
55-
* truncation (from 0) of an active log segment while we hold the lock of a
56-
* valid segment.
57-
*
58-
* Checking for term is because we support multiple term appends which
59-
* always roll
60-
*
61-
* _bytes_left_in_segment is for initial condition
62-
*
63-
*/
53+
if (!_seg || !_seg->has_appender()) {
54+
// The latest segment with which this log_appender has called
55+
// initialize() has been rolled and no longer has an segment appender
56+
// (e.g. because segment.ms rolled onto a new segment). There is likely
57+
// already a new segment and segment appender and we should reset to
58+
// use them.
59+
return false;
60+
}
61+
// _log._segs.empty() is a tricky condition. It is here to support
62+
// concurrent truncation (from 0) of an active log segment while we hold the
63+
// lock of a valid segment.
64+
//
65+
// Checking for term is because we support multiple term appends which
66+
// always roll
67+
//
68+
// _bytes_left_in_segment is for initial condition
6469
return _bytes_left_in_segment > 0 && _log.term() == batch_term
6570
&& !_log._segs.empty() /*see above before removing this condition*/;
6671
}
@@ -73,6 +78,19 @@ void disk_log_appender::release_lock() {
7378

7479
ss::future<ss::stop_iteration>
7580
disk_log_appender::operator()(model::record_batch& batch) {
81+
// We use a fast path here since this lock should very rarely be contested.
82+
// An open segment may only have one in-flight append at any given time and
83+
// the only other places this lock is held are during truncation
84+
// (infrequent) or when enforcing segment.ms (which should rarely happen in
85+
// high throughput scenarios).
86+
auto segment_roll_lock_holder = _log.try_segment_roll_lock();
87+
if (!segment_roll_lock_holder.has_value()) {
88+
vlog(
89+
stlog.warn,
90+
"Segment roll lock contested for {}",
91+
_log.config().ntp());
92+
segment_roll_lock_holder = co_await _log.segment_roll_lock();
93+
}
7694
batch.header().base_offset = _idx;
7795
batch.header().header_crc = model::internal_header_only_crc(batch.header());
7896
if (_last_term != batch.term()) {
@@ -88,7 +106,8 @@ disk_log_appender::operator()(model::record_batch& batch) {
88106
// we might actually have space in the current log, but the
89107
// terms do not match for the current append, so we must roll
90108
release_lock();
91-
co_await _log.maybe_roll(_last_term, _idx, _config.io_priority);
109+
co_await _log.maybe_roll_unlocked(
110+
_last_term, _idx, _config.io_priority);
92111
co_await initialize();
93112
}
94113
co_return co_await append_batch_to_segment(batch);

src/v/storage/disk_log_impl.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,15 +1115,11 @@ ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
11151115
});
11161116
}
11171117

1118-
ss::future<> disk_log_impl::maybe_roll(
1118+
ss::future<> disk_log_impl::maybe_roll_unlocked(
11191119
model::term_id t, model::offset next_offset, ss::io_priority_class iopc) {
1120-
// This lock will only rarely be contended. If it is held, then
1121-
// we must wait for do_housekeeping to complete before proceeding, because
1122-
// the log might be in a state mid-roll where it has no appender.
1123-
// We need to take this irrespective of whether we're actually rolling
1124-
// or not, in order to ensure that writers wait for a background roll
1125-
// to complete if one is ongoing.
1126-
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
1120+
vassert(
1121+
!_segments_rolling_lock.ready(),
1122+
"Must have taken _segments_rolling_lock");
11271123

11281124
vassert(t >= term(), "Term:{} must be greater than base:{}", t, term());
11291125
if (_segs.empty()) {
@@ -1146,8 +1142,12 @@ ss::future<> disk_log_impl::maybe_roll(
11461142

11471143
ss::future<> disk_log_impl::apply_segment_ms() {
11481144
auto gate = _compaction_housekeeping_gate.hold();
1149-
// do_housekeeping races with maybe_roll to use new_segment.
1150-
// take a lock to prevent problems
1145+
// Holding the lock blocks writes to the last open segment.
1146+
// This is required in order to avoid the logic in this function
1147+
// racing with an inflight append. Contention on this lock should
1148+
// be very light, since we wouldn't need to enforce segment.ms
1149+
// if this partition was high throughput (segment would have rolled
1150+
// naturally).
11511151
auto lock = co_await _segments_rolling_lock.get_units();
11521152

11531153
if (_segs.empty()) {

src/v/storage/disk_log_impl.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class disk_log_impl final : public log::impl {
9393
get_term_last_offset(model::term_id term) const final;
9494
std::ostream& print(std::ostream&) const final;
9595

96-
ss::future<> maybe_roll(
96+
// Must be called while _segments_rolling_lock is held.
97+
ss::future<> maybe_roll_unlocked(
9798
model::term_id, model::offset next_offset, ss::io_priority_class);
9899

99100
// roll immediately with the current term. users should prefer the
@@ -111,6 +112,14 @@ class disk_log_impl final : public log::impl {
111112

112113
int64_t compaction_backlog() const final;
113114

115+
std::optional<ssx::semaphore_units> try_segment_roll_lock() {
116+
return _segments_rolling_lock.try_get_units();
117+
}
118+
119+
ss::future<ssx::semaphore_units> segment_roll_lock() {
120+
return _segments_rolling_lock.get_units();
121+
}
122+
114123
private:
115124
friend class disk_log_appender; // for multi-term appends
116125
friend class disk_log_builder; // for tests
@@ -225,6 +234,13 @@ class disk_log_impl final : public log::impl {
225234

226235
// Mutually exclude operations that will cause segment rolling
227236
// do_housekeeping and maybe_roll
237+
//
238+
// This lock will only rarely be contended. If it is held, then we must
239+
// wait for housekeeping or truncation to complete before proceeding,
240+
// because the log might be in a state mid-roll where it has no appender.
241+
// We need to take this irrespective of whether we're actually rolling or
242+
// not, in order to ensure that writers wait for a background roll to
243+
// complete if one is ongoing.
228244
mutex _segments_rolling_lock;
229245
};
230246

0 commit comments

Comments
 (0)