Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,15 @@ void state_machine_manager::maybe_start_background_apply(
return entry->background_apply_mutex.get_units().then(
[this, entry](auto u) {
return ss::with_scheduling_group(
_apply_sg,
[this, entry] { return background_apply_fiber(entry); })
.finally([u = std::move(u)] {});
_apply_sg, [this, entry, u = std::move(u)]() mutable {
return background_apply_fiber(entry, std::move(u));
});
});
});
}

ss::future<> state_machine_manager::background_apply_fiber(entry_ptr entry) {
ss::future<> state_machine_manager::background_apply_fiber(
entry_ptr entry, ssx::semaphore_units units) {
while (!_as.abort_requested() && entry->stm->next() < _next) {
storage::log_reader_config config(
entry->stm->next(), _next, ss::default_priority_class());
Expand Down Expand Up @@ -421,6 +422,7 @@ ss::future<> state_machine_manager::background_apply_fiber(entry_ptr entry) {
co_await ss::sleep_abortable(1s, _as);
}
}
units.return_all();
vlog(
_log.debug,
"finished background apply for '{}' state machine",
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class state_machine_manager final {
using state_machines_t = absl::flat_hash_map<ss::sstring, entry_ptr>;

void maybe_start_background_apply(const entry_ptr&);
ss::future<> background_apply_fiber(entry_ptr);
ss::future<> background_apply_fiber(entry_ptr, ssx::semaphore_units);

ss::future<> apply_raft_snapshot();
ss::future<> do_apply_raft_snapshot(
Expand Down
25 changes: 25 additions & 0 deletions src/v/raft/tests/stm_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ TEST_F_CORO(state_machine_fixture, test_apply_throwing_exception) {
ASSERT_EQ_CORO(stm->state, expected);
}
}
TEST_F_CORO(
state_machine_fixture, test_apply_throwing_exception_waiting_for_each_batch) {
/**
* Create 3 replicas group with simple_kv STM
*/
create_nodes();
std::vector<ss::shared_ptr<simple_kv>> stms;

for (auto& [id, node] : nodes()) {
raft::state_machine_manager_builder builder;
auto kv_stm = builder.create_stm<simple_kv>(*node);
auto throwing_kv_stm = builder.create_stm<throwing_kv>(*node);
co_await node->init_and_start(all_vnodes(), std::move(builder));
stms.push_back(kv_stm);
stms.push_back(ss::dynamic_pointer_cast<simple_kv>(throwing_kv_stm));
}

auto expected = co_await build_random_state(5000, wait_for_each_batch::yes);

co_await wait_for_apply();

for (auto& stm : stms) {
ASSERT_EQ_CORO(stm->state, expected);
}
}

TEST_F_CORO(state_machine_fixture, test_recovery_without_snapshot) {
/**
Expand Down
9 changes: 8 additions & 1 deletion src/v/raft/tests/stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/bool_class.hh>
#include <seastar/util/log.hh>

#include <ostream>
Expand Down Expand Up @@ -141,6 +142,7 @@ struct simple_kv : public raft::state_machine_base {
state_t state;
raft_node_instance& raft_node;
};
using wait_for_each_batch = ss::bool_class<struct wait_for_each_tag>;

struct state_machine_fixture : raft_fixture {
ss::future<result<raft::replicate_result>>
Expand Down Expand Up @@ -171,7 +173,8 @@ struct state_machine_fixture : raft_fixture {
}

ss::future<absl::flat_hash_map<ss::sstring, value_entry>>
build_random_state(int op_cnt) {
build_random_state(
int op_cnt, wait_for_each_batch wait_for_each = wait_for_each_batch::no) {
absl::flat_hash_map<ss::sstring, value_entry> state;

for (int i = 0; i < op_cnt;) {
Expand Down Expand Up @@ -212,6 +215,10 @@ struct state_machine_fixture : raft_fixture {
logger().debug,
"replication result: [last_offset: {}]",
result.value().last_offset);

if (wait_for_each) {
co_await wait_for_apply();
}
}
co_return state;
}
Expand Down