@@ -106,7 +106,8 @@ consensus::consensus(
106
106
config::binding<std::chrono::milliseconds> disk_timeout,
107
107
config::binding<bool > enable_longest_log_detection,
108
108
consensus_client_protocol client,
109
- consensus::leader_cb_t cb,
109
+ remake_cb_t remake_cb,
110
+ consensus::leader_cb_t leader_cb,
110
111
storage::api& storage,
111
112
std::optional<std::reference_wrapper<coordinated_recovery_throttle>>
112
113
recovery_throttle,
@@ -123,7 +124,8 @@ consensus::consensus(
123
124
, _disk_timeout(std::move(disk_timeout))
124
125
, _enable_longest_log_detection(std::move(enable_longest_log_detection))
125
126
, _client_protocol(client)
126
- , _leader_notification(std::move(cb))
127
+ , _remake_notification(std::move(remake_cb))
128
+ , _leader_notification(std::move(leader_cb))
127
129
, _fstats(_self)
128
130
, _batcher(this , config::shard_local_cfg().raft_replicate_batch_window_size())
129
131
, _event_manager(this )
@@ -1709,6 +1711,18 @@ ss::future<> consensus::write_last_applied(model::offset o) {
1709
1711
storage::kvstore::key_space::consensus, std::move (key), std::move (val));
1710
1712
}
1711
1713
1714
+ ss::future<> consensus::truncate_state (model::offset truncate_at) {
1715
+ co_await _log->truncate (storage::truncate_config (truncate_at));
1716
+ _probe->log_truncated ();
1717
+ // update flushed offset
1718
+ _flushed_offset = std::min (
1719
+ model::prev_offset (truncate_at), _flushed_offset);
1720
+
1721
+ co_await _configuration_manager.truncate (truncate_at);
1722
+ _probe->configuration_update ();
1723
+ update_follower_stats (_configuration_manager.get_latest ());
1724
+ }
1725
+
1712
1726
model::offset consensus::read_last_applied () const {
1713
1727
const auto key = last_applied_key ();
1714
1728
auto value = _storage.kvs ().get (
@@ -2235,7 +2249,6 @@ consensus::do_append_entries(append_entries_request&& r) {
2235
2249
last_visible_index (),
2236
2250
_last_leader_visible_offset,
2237
2251
truncate_at);
2238
- _probe->log_truncated ();
2239
2252
2240
2253
_majority_replicated_index = std::min (
2241
2254
model::prev_offset (truncate_at), _majority_replicated_index);
@@ -2248,17 +2261,7 @@ consensus::do_append_entries(append_entries_request&& r) {
2248
2261
model::prev_offset (truncate_at), _flushed_offset);
2249
2262
2250
2263
try {
2251
- co_await _log->truncate (storage::truncate_config (truncate_at));
2252
- // update flushed offset once again after truncation as flush is
2253
- // executed concurrently to append entries and it may race with
2254
- // the truncation
2255
- _flushed_offset = std::min (
2256
- model::prev_offset (truncate_at), _flushed_offset);
2257
-
2258
- co_await _configuration_manager.truncate (truncate_at);
2259
- _probe->configuration_update ();
2260
- update_follower_stats (_configuration_manager.get_latest ());
2261
-
2264
+ co_await truncate_state (truncate_at);
2262
2265
auto lstats = _log->offsets ();
2263
2266
if (unlikely (lstats.dirty_offset != adjusted_prev_log_index)) {
2264
2267
vlog (
@@ -4287,4 +4290,85 @@ size_t consensus::bytes_to_deliver_to_learners() const {
4287
4290
return total;
4288
4291
}
4289
4292
4293
+ ss::future<remake_learner_state_reply>
4294
+ consensus::remake_learner_state (vnode target) {
4295
+ _probe->recovery_reset ();
4296
+ remake_learner_state_request req{
4297
+ .node_id = _self,
4298
+ .target_node_id = target,
4299
+ .group = _group,
4300
+ .term = _term};
4301
+ vlog (_ctxlog.info , " Issuing remake group request {}" , req);
4302
+ static constexpr auto timeout = 10s;
4303
+ result<remake_learner_state_reply> reply
4304
+ = co_await _client_protocol.remake_learner_state (
4305
+ target.id (), req, rpc::client_opts (timeout));
4306
+ if (!reply) {
4307
+ vlog (
4308
+ _ctxlog.warn ,
4309
+ " Unable to issue remake group request {}, {}" ,
4310
+ req,
4311
+ reply.error ());
4312
+ co_return remake_learner_state_reply{};
4313
+ }
4314
+
4315
+ co_return reply.value ();
4316
+ }
4317
+
4318
+ ss::future<remake_learner_state_reply>
4319
+ consensus::do_remake_learner_state (remake_learner_state_request req) {
4320
+ remake_learner_state_reply reply{};
4321
+ using is_success = remake_learner_state_reply::is_success;
4322
+ try {
4323
+ auto units = co_await _op_lock.get_units ();
4324
+
4325
+ // Perform validation of request under _op_lock
4326
+ auto maybe_err = [&]() -> std::optional<raft::errc> {
4327
+ if (req.term != _term) {
4328
+ return raft::errc::not_leader;
4329
+ }
4330
+ if (req.source_node () != _leader_id) {
4331
+ return raft::errc::leadership_transfer_in_progress;
4332
+ }
4333
+ if (req.target_node () != _self) {
4334
+ return raft::errc::invalid_target_node;
4335
+ }
4336
+ if (!is_learner ()) {
4337
+ return raft::errc::not_learner;
4338
+ }
4339
+ if (req.group != _group) {
4340
+ return raft::errc::group_not_exists;
4341
+ }
4342
+
4343
+ return std::nullopt;
4344
+ }();
4345
+
4346
+ if (maybe_err.has_value ()) {
4347
+ reply.success = is_success::no;
4348
+ vlog (
4349
+ _ctxlog.warn ,
4350
+ " Unable to process remake group request {}, raft::errc {}" ,
4351
+ req,
4352
+ maybe_err.value ());
4353
+ } else {
4354
+ auto cluster_err = co_await _remake_notification (req.group );
4355
+ reply.success = cluster_err ? is_success::no : is_success::yes;
4356
+ vlog (
4357
+ _ctxlog.warn ,
4358
+ " Unable to process remake group request {}, cluster::errc {}" ,
4359
+ req,
4360
+ cluster_err);
4361
+ }
4362
+ } catch (...) {
4363
+ vlog (
4364
+ _ctxlog.warn ,
4365
+ " Unable to process remake group request {}, caught exception: {}" ,
4366
+ req,
4367
+ std::current_exception ());
4368
+ reply.success = is_success::no;
4369
+ }
4370
+
4371
+ co_return reply;
4372
+ }
4373
+
4290
4374
} // namespace raft
0 commit comments