From ef2b051f5a76bd4f476409179e4003110d7acc5a Mon Sep 17 00:00:00 2001 From: Borys Date: Fri, 28 Mar 2025 10:42:58 +0200 Subject: [PATCH] refactor: rename cntx variable to exec_st --- src/server/cluster/outgoing_slot_migration.cc | 52 ++++++------- src/server/cluster/outgoing_slot_migration.h | 6 +- src/server/dflycmd.cc | 34 +++++---- src/server/dflycmd.h | 4 +- src/server/protocol_client.cc | 2 +- src/server/protocol_client.h | 2 +- src/server/replica.cc | 76 +++++++++---------- src/server/replica.h | 2 +- 8 files changed, 90 insertions(+), 88 deletions(-) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 8949578faac2..3d2971e6f3ce 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -35,13 +35,13 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { public: SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) - : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) { - cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); + : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { + exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); } ~SliceSlotMigration() { Cancel(); - cntx_.JoinErrorHandler(); + exec_st_.JoinErrorHandler(); } // Send DFLYMIGRATE FLOW @@ -50,8 +50,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; - if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { - cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); + if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { + exec_st_.ReportError(GenericError(ec, "Couldn't connect to source.")); return; } @@ -61,13 +61,13 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { VLOG(1) << "cmd: " << cmd; if (auto ec = SendCommandAndReadResponse(cmd); ec) { - cntx_.ReportError(GenericError(ec, cmd)); + exec_st_.ReportError(GenericError(ec, cmd)); return; } if (!CheckRespIsSimpleReply("OK")) { - cntx_.ReportError(absl::StrCat("Incorrect response for FLOW cmd: ", - ToSV(LastResponseArgs().front().GetBuf()))); + exec_st_.ReportError(absl::StrCat("Incorrect response for FLOW cmd: ", + ToSV(LastResponseArgs().front().GetBuf()))); return; } } @@ -93,7 +93,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } const dfly::GenericError GetError() const { - return cntx_.GetError(); + return exec_st_.GetError(); } private: @@ -113,7 +113,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); - cntx_.JoinErrorHandler(); + exec_st_.JoinErrorHandler(); // Destroy each flow in its dedicated thread, because we could be the last // owner of the db tables OnAllShards([](auto& migration) { migration.reset(); }); @@ -144,7 +144,7 @@ void OutgoingMigration::Finish(GenericError error) { next_state = MigrationState::C_ERROR; LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); - cntx_.ReportError(std::move(error)); + exec_st_.ReportError(std::move(error)); } else { LOG(INFO) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id; @@ -175,7 +175,7 @@ void OutgoingMigration::Finish(GenericError error) { CHECK(migration != nullptr); migration->Cancel(); }); - cntx_.JoinErrorHandler(); + exec_st_.JoinErrorHandler(); } } @@ -195,16 +195,16 @@ void OutgoingMigration::SyncFb() { break; } - if (cntx_.IsError()) { + if (exec_st_.IsError()) { ResetError(); ThisFiber::SleepFor(500ms); // wait some time before next retry } VLOG(1) << "Connecting to target node"; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; - if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { + if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { LOG(WARNING) << "Can't connect to target node"; - cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); + exec_st_.ReportError(GenericError(ec, "Couldn't connect to source.")); continue; } @@ -217,7 +217,7 @@ void OutgoingMigration::SyncFb() { if (auto ec = SendCommandAndReadResponse(cmd); ec) { LOG(WARNING) << "Can't connect to target node"; - cntx_.ReportError(GenericError(ec, "Could not send INIT command.")); + exec_st_.ReportError(GenericError(ec, "Could not send INIT command.")); continue; } @@ -227,12 +227,12 @@ void OutgoingMigration::SyncFb() { // we provide 30 seconds to distribute the config to all nodes to avoid extra errors // reporting if (passed >= absl::Milliseconds(30000)) { - cntx_.ReportError(GenericError(LastResponseArgs().front().GetString())); + exec_st_.ReportError(GenericError(LastResponseArgs().front().GetString())); } else { ThisFiber::SleepFor(500ms); // to prevent too many attempts } } else { - cntx_.ReportError(GenericError(LastResponseArgs().front().GetString())); + exec_st_.ReportError(GenericError(LastResponseArgs().front().GetString())); } continue; } @@ -249,7 +249,7 @@ void OutgoingMigration::SyncFb() { } OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { continue; } @@ -260,13 +260,13 @@ void OutgoingMigration::SyncFb() { OnAllShards([](auto& migration) { migration->PrepareSync(); }); } - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { continue; } OnAllShards([](auto& migration) { migration->RunSync(); }); - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { continue; } @@ -276,7 +276,7 @@ void OutgoingMigration::SyncFb() { VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { continue; } break; @@ -291,11 +291,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt; if (attempt > 1) { - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { return true; } auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; - if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { + if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt; return false; @@ -369,7 +369,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } } - if (!cntx_.GetError()) { + if (!exec_st_.GetError()) { Finish(); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, @@ -381,7 +381,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { void OutgoingMigration::Start() { VLOG(1) << "Resolving host DNS for outgoing migration"; if (error_code ec = ResolveHostDns(); ec) { - cntx_.ReportError(GenericError(ec, "Could not resolve host dns.")); + exec_st_.ReportError(GenericError(ec, "Could not resolve host dns.")); return; } diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index b7152d883309..4cab0a22dccc 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -51,9 +51,9 @@ class OutgoingMigration : private ProtocolClient { } void ResetError() { - if (cntx_.IsError()) { - SetLastError(cntx_.GetError()); - cntx_.Reset(nullptr); + if (exec_st_.IsError()) { + SetLastError(exec_st_.GetError()); + exec_st_.Reset(nullptr); } } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index a69f8c493b93..57f092124e8d 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -88,7 +88,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r << ", expecting " << shard->journal()->GetLsn(); return false; } - if (!replica->cntx.IsRunning()) { + if (!replica->exec_st.IsRunning()) { return false; } VLOG(1) << "Replica lsn:" << flow->last_acked_lsn @@ -111,7 +111,7 @@ void DflyCmd::ReplicaInfo::Cancel() { // Update state and cancel context. replica_state = SyncState::CANCELLED; - cntx.ReportCancelError(); + exec_st.ReportCancelError(); // Wait for tasks to finish. shard_set->RunBlockingInParallel([this](EngineShard* shard) { VLOG(2) << "Disconnecting flow " << shard->shard_id(); @@ -124,7 +124,7 @@ void DflyCmd::ReplicaInfo::Cancel() { flow->conn = nullptr; }); // Wait for error handler to quit. - cntx.JoinErrorHandler(); + exec_st.JoinErrorHandler(); VLOG(1) << "Disconnecting replica " << address << ":" << listening_port; } @@ -332,8 +332,8 @@ void DflyCmd::Sync(CmdArgList args, Transaction* tx, RedisReplyBuilder* rb) { // Use explicit assignment for replica_ptr, because capturing structured bindings is C++20. auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { - status = - StartFullSyncInThread(&replica_ptr->flows[shard->shard_id()], &replica_ptr->cntx, shard); + status = StartFullSyncInThread(&replica_ptr->flows[shard->shard_id()], &replica_ptr->exec_st, + shard); }; shard_set->RunBlockingInParallel(std::move(cb)); @@ -370,11 +370,11 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard); + status = StopFullSyncInThread(flow, &replica_ptr->exec_st, shard); if (*status != OpStatus::OK) { return; } - StartStableSyncInThread(flow, &replica_ptr->cntx, shard); + StartStableSyncInThread(flow, &replica_ptr->exec_st, shard); }; shard_set->RunBlockingInParallel(std::move(cb)); @@ -548,7 +548,8 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn rb->SendOk(); } -OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { +OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st, + EngineShard* shard) { DCHECK(shard); DCHECK(flow->conn); @@ -577,30 +578,31 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, En ec = saver->SaveHeader({}); } if (ec) { - cntx->ReportError(ec); + exec_st->ReportError(ec); return OpStatus::CANCELLED; } if (flow->start_partial_sync_at.has_value()) - saver->StartIncrementalSnapshotInShard(*flow->start_partial_sync_at, cntx, shard); + saver->StartIncrementalSnapshotInShard(*flow->start_partial_sync_at, exec_st, shard); else - saver->StartSnapshotInShard(true, cntx, shard); + saver->StartSnapshotInShard(true, exec_st, shard); return OpStatus::OK; } -OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { +OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st, + EngineShard* shard) { DCHECK(shard); error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { - cntx->ReportError(ec); + exec_st->ReportError(ec); return OpStatus::CANCELLED; } ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); if (ec) { - cntx->ReportError(ec); + exec_st->ReportError(ec); return OpStatus::CANCELLED; } @@ -610,12 +612,12 @@ OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, Eng return OpStatus::OK; } -void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { +void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, EngineShard* shard) { // Create streamer for shard flows. DCHECK(shard); DCHECK(flow->conn); - flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx)); + flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st)); bool send_lsn = flow->version >= DflyVersion::VER4; flow->streamer->Start(flow->conn->socket(), send_lsn); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 80ac570e151e..8230c0ca67e2 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -105,7 +105,7 @@ class DflyCmd { ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port, ExecutionState::ErrHandler err_handler) : replica_state{SyncState::PREPARATION}, - cntx{std::move(err_handler)}, + exec_st{std::move(err_handler)}, address{std::move(address)}, listening_port(listening_port), flows{flow_count} { @@ -115,7 +115,7 @@ class DflyCmd { void Cancel(); SyncState replica_state; // always guarded by shared_mu - ExecutionState cntx; + ExecutionState exec_st; std::string id; std::string address; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index f127c7c04ce6..3ddcdd158c3d 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -117,7 +117,7 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov } ProtocolClient::~ProtocolClient() { - cntx_.JoinErrorHandler(); + exec_st_.JoinErrorHandler(); // FIXME: We should close the socket explictly outside of the destructor. This currently // breaks test_cancel_replication_immediately. diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 281dc9538656..b4ef0f6bf7f7 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -122,7 +122,7 @@ class ProtocolClient { util::fb2::Mutex sock_mu_; protected: - ExecutionState cntx_; // context for tasks in replica. + ExecutionState exec_st_; // context for tasks in replica. std::string last_cmd_; std::string last_resp_; diff --git a/src/server/replica.cc b/src/server/replica.cc index 88bc09dcc91b..9df810804e83 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -92,11 +92,11 @@ GenericError Replica::Start() { CHECK(mythread); auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError { - if (!cntx_.IsRunning()) { + if (!exec_st_.IsRunning()) { return {"replication cancelled"}; } if (ec) { - cntx_.ReportCancelError(); + exec_st_.ReportCancelError(); return {absl::StrCat(msg, ec.message())}; } return ec; @@ -104,7 +104,7 @@ GenericError Replica::Start() { // 0. Set basic error handler that is reponsible for cleaning up on errors. // Can return an error only if replication was cancelled immediately. - auto err = cntx_.SwitchErrorHandler([this](const auto& ge) { this->DefaultErrorHandler(ge); }); + auto err = exec_st_.SwitchErrorHandler([this](const auto& ge) { this->DefaultErrorHandler(ge); }); RETURN_ON_GENERIC_ERR(check_connection_error(err, "replication cancelled")); // 1. Resolve dns. @@ -114,7 +114,7 @@ GenericError Replica::Start() { // 2. Connect socket. VLOG(1) << "Connecting to master"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &exec_st_); RETURN_ON_GENERIC_ERR(check_connection_error(ec, kConnErr)); // 3. Greet. @@ -142,8 +142,8 @@ void Replica::Stop() { // Stops the loop in MainReplicationFb. proactor_->Await([this] { - state_mask_.store(0); // Specifically ~R_ENABLED. - cntx_.ReportCancelError(); // Context is fully resposible for cleanup. + state_mask_.store(0); // Specifically ~R_ENABLED. + exec_st_.ReportCancelError(); // Context is fully resposible for cleanup. }); // Make sure the replica fully stopped and did all cleanup, @@ -191,7 +191,7 @@ void Replica::MainReplicationFb() { error_code ec; while (state_mask_.load() & R_ENABLED) { // Discard all previous errors and set default error handler. - cntx_.Reset([this](const GenericError& ge) { this->DefaultErrorHandler(ge); }); + exec_st_.Reset([this](const GenericError& ge) { this->DefaultErrorHandler(ge); }); // 1. Connect socket. if ((state_mask_.load() & R_TCP_CONNECTED) == 0) { ThisFiber::SleepFor(500ms); @@ -206,7 +206,7 @@ void Replica::MainReplicationFb() { // Give a lower timeout for connect, because we're reconnect_count_++; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &cntx_); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &exec_st_); if (ec) { LOG(WARNING) << "Error connecting to " << server().Description() << " " << ec; continue; @@ -262,7 +262,7 @@ void Replica::MainReplicationFb() { } // Wait for unblocking cleanup to finish. - cntx_.JoinErrorHandler(); + exec_st_.JoinErrorHandler(); // Revert shard states to normal state. SetShardStates(false); @@ -411,8 +411,8 @@ error_code Replica::InitiatePSync() { // Set LOADING state. if (!service_.RequestLoadingState()) { - return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), - "Failed to enter LOADING state"); + return exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable), + "Failed to enter LOADING state"); } absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); }; @@ -500,12 +500,12 @@ error_code Replica::InitiateDflySync() { flow->Cancel(); }; - RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); + RETURN_ON_ERR(exec_st_.SwitchErrorHandler(std::move(err_handler))); // Make sure we're in LOADING state. if (!service_.RequestLoadingState()) { - return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), - "Failed to enter LOADING state"); + return exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable), + "Failed to enter LOADING state"); } // Start full sync flows. @@ -527,14 +527,14 @@ error_code Replica::InitiateDflySync() { DCHECK(!last_journal_LSNs_ || last_journal_LSNs_->size() == num_df_flows); auto shard_cb = [&](unsigned index, auto*) { for (auto id : thread_flow_map_[index]) { - auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &cntx_, + auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &exec_st_, last_journal_LSNs_.has_value() ? std::optional((*last_journal_LSNs_)[id]) : std::nullopt); if (ec.has_value()) is_full_sync[id] = ec.value(); else - cntx_.ReportError(ec.error()); + exec_st_.ReportError(ec.error()); } }; // Lock to prevent the error handler from running instantly @@ -559,16 +559,16 @@ error_code Replica::InitiateDflySync() { sync_type = "partial"; } else { last_journal_LSNs_.reset(); - cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), - "Won't do a partial sync: some flows must fully resync"); + exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable), + "Won't do a partial sync: some flows must fully resync"); } } - RETURN_ON_ERR(cntx_.GetError()); + RETURN_ON_ERR(exec_st_.GetError()); // Send DFLY SYNC. if (auto ec = SendNextPhaseRequest("SYNC"); ec) { - return cntx_.ReportError(ec); + return exec_st_.ReportError(ec); } LOG(INFO) << "Started " << sync_type << " sync with " << server().Description(); @@ -579,21 +579,21 @@ error_code Replica::InitiateDflySync() { sync_block->Wait(); // Check if we woke up due to cancellation. - if (!cntx_.IsRunning()) - return cntx_.GetError(); + if (!exec_st_.IsRunning()) + return exec_st_.GetError(); RdbLoader::PerformPostLoad(&service_); // Send DFLY STARTSTABLE. if (auto ec = SendNextPhaseRequest("STARTSTABLE"); ec) { - return cntx_.ReportError(ec); + return exec_st_.ReportError(ec); } // Joining flows and resetting state is done by cleanup. double seconds = double(absl::ToInt64Milliseconds(absl::Now() - start_time)) / 1000; LOG(INFO) << sync_type << " sync finished in " << strings::HumanReadableElapsedTime(seconds); - return cntx_.GetError(); + return exec_st_.GetError(); } error_code Replica::ConsumeRedisStream() { @@ -626,7 +626,7 @@ error_code Replica::ConsumeRedisStream() { replica_waker_.notifyAll(); DefaultErrorHandler(ge); }; - RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); + RETURN_ON_ERR(exec_st_.SwitchErrorHandler(std::move(err_handler))); facade::CmdArgVec args_vector; @@ -636,7 +636,7 @@ error_code Replica::ConsumeRedisStream() { auto response = ReadRespReply(&io_buf, /*copy_msg=*/false); if (!response.has_value()) { VLOG(1) << "ConsumeRedisStream finished"; - cntx_.ReportError(response.error()); + exec_st_.ReportError(response.error()); acks_fb_.JoinIfNeeded(); return response.error(); } @@ -679,7 +679,7 @@ error_code Replica::ConsumeDflyStream() { } multi_shard_exe_->CancelAllBlockingEntities(); }; - RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); + RETURN_ON_ERR(exec_st_.SwitchErrorHandler(std::move(err_handler))); LOG(INFO) << "Transitioned into stable sync"; // Transition flows into stable sync. @@ -687,9 +687,9 @@ error_code Replica::ConsumeDflyStream() { auto shard_cb = [&](unsigned index, auto*) { const auto& local_ids = thread_flow_map_[index]; for (unsigned id : local_ids) { - auto ec = shard_flows_[id]->StartStableSyncFlow(&cntx_); + auto ec = shard_flows_[id]->StartStableSyncFlow(&exec_st_); if (ec) - cntx_.ReportError(ec); + exec_st_.ReportError(ec); } }; @@ -707,9 +707,9 @@ error_code Replica::ConsumeDflyStream() { LOG(INFO) << "Exit stable sync"; // The only option to unblock is to cancel the context. - CHECK(cntx_.GetError()); + CHECK(exec_st_.GetError()); - return cntx_.GetError(); + return exec_st_.GetError(); } void Replica::JoinDflyFlows() { @@ -741,7 +741,7 @@ io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt proactor_index_ = ProactorBase::me()->GetPoolIndex(); RETURN_ON_ERR_T(make_unexpected, - ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_)); + ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &exec_st_)); VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " " << master_context_.dfly_session_id << " " << flow_id_; @@ -891,18 +891,18 @@ void Replica::RedisStreamAcksFb() { std::string ack_cmd; auto next_ack_tp = std::chrono::steady_clock::now(); - while (cntx_.IsRunning()) { + while (exec_st_.IsRunning()) { VLOG(2) << "Sending an ACK with offset=" << repl_offs_; ack_cmd = absl::StrCat("REPLCONF ACK ", repl_offs_); next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval; if (auto ec = SendCommand(ack_cmd); ec) { - cntx_.ReportError(ec); + exec_st_.ReportError(ec); break; } ack_offs_ = repl_offs_; replica_waker_.await_until( - [&]() { return repl_offs_ > ack_offs_ + kAckRecordMaxInterval || (!cntx_.IsRunning()); }, + [&]() { return repl_offs_ > ack_offs_ + kAckRecordMaxInterval || (!exec_st_.IsRunning()); }, next_ack_tp); } } @@ -980,7 +980,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // and replica recieved all the commands from all shards. multi_shard_data.block->Wait(); // Check if we woke up due to cancellation. - if (!cntx_.IsRunning()) + if (!exec_st_.IsRunning()) return; VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished"; @@ -988,7 +988,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // Wait until all shards flows get to execution step of this transaction. multi_shard_data.barrier.Wait(); // Check if we woke up due to cancellation. - if (!cntx_.IsRunning()) + if (!exec_st_.IsRunning()) return; // Global command will be executed only from one flow fiber. This ensure corectness of data in // replica. @@ -999,7 +999,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // executed. multi_shard_data.barrier.Wait(); // Check if we woke up due to cancellation. - if (!cntx_.IsRunning()) + if (!exec_st_.IsRunning()) return; // Erase from map can be done only after all flow fibers executed the transaction commands. diff --git a/src/server/replica.h b/src/server/replica.h index 47012f831bed..5c9fb9de834c 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -73,7 +73,7 @@ class Replica : ProtocolClient { std::error_code TakeOver(std::string_view timeout, bool save_flag); bool IsContextCancelled() const { - return !cntx_.IsRunning(); + return !exec_st_.IsRunning(); } private: /* Main standalone mode functions */