Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
public:
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
}

~SliceSlotMigration() {
Cancel();
cntx_.JoinErrorHandler();
exec_st_.JoinErrorHandler();
}

// Send DFLYMIGRATE FLOW
Expand All @@ -50,8 +50,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {

VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
exec_st_.ReportError(GenericError(ec, "Couldn't connect to source."));
return;
}

Expand All @@ -61,13 +61,13 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
VLOG(1) << "cmd: " << cmd;

if (auto ec = SendCommandAndReadResponse(cmd); ec) {
cntx_.ReportError(GenericError(ec, cmd));
exec_st_.ReportError(GenericError(ec, cmd));
return;
}

if (!CheckRespIsSimpleReply("OK")) {
cntx_.ReportError(absl::StrCat("Incorrect response for FLOW cmd: ",
ToSV(LastResponseArgs().front().GetBuf())));
exec_st_.ReportError(absl::StrCat("Incorrect response for FLOW cmd: ",
ToSV(LastResponseArgs().front().GetBuf())));
return;
}
}
Expand All @@ -93,7 +93,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

const dfly::GenericError GetError() const {
return cntx_.GetError();
return exec_st_.GetError();
}

private:
Expand All @@ -113,7 +113,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();

cntx_.JoinErrorHandler();
exec_st_.JoinErrorHandler();
// Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); });
Expand Down Expand Up @@ -144,7 +144,7 @@ void OutgoingMigration::Finish(GenericError error) {
next_state = MigrationState::C_ERROR;
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
cntx_.ReportError(std::move(error));
exec_st_.ReportError(std::move(error));
} else {
LOG(INFO) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
Expand Down Expand Up @@ -175,7 +175,7 @@ void OutgoingMigration::Finish(GenericError error) {
CHECK(migration != nullptr);
migration->Cancel();
});
cntx_.JoinErrorHandler();
exec_st_.JoinErrorHandler();
}
}

Expand All @@ -195,16 +195,16 @@ void OutgoingMigration::SyncFb() {
break;
}

if (cntx_.IsError()) {
if (exec_st_.IsError()) {
ResetError();
ThisFiber::SleepFor(500ms); // wait some time before next retry
}

VLOG(1) << "Connecting to target node";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
LOG(WARNING) << "Can't connect to target node";
cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
exec_st_.ReportError(GenericError(ec, "Couldn't connect to source."));
continue;
}

Expand All @@ -217,7 +217,7 @@ void OutgoingMigration::SyncFb() {

if (auto ec = SendCommandAndReadResponse(cmd); ec) {
LOG(WARNING) << "Can't connect to target node";
cntx_.ReportError(GenericError(ec, "Could not send INIT command."));
exec_st_.ReportError(GenericError(ec, "Could not send INIT command."));
continue;
}

Expand All @@ -227,12 +227,12 @@ void OutgoingMigration::SyncFb() {
// we provide 30 seconds to distribute the config to all nodes to avoid extra errors
// reporting
if (passed >= absl::Milliseconds(30000)) {
cntx_.ReportError(GenericError(LastResponseArgs().front().GetString()));
exec_st_.ReportError(GenericError(LastResponseArgs().front().GetString()));
} else {
ThisFiber::SleepFor(500ms); // to prevent too many attempts
}
} else {
cntx_.ReportError(GenericError(LastResponseArgs().front().GetString()));
exec_st_.ReportError(GenericError(LastResponseArgs().front().GetString()));
}
continue;
}
Expand All @@ -249,7 +249,7 @@ void OutgoingMigration::SyncFb() {
}

OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (!cntx_.IsRunning()) {
if (!exec_st_.IsRunning()) {
continue;
}

Expand All @@ -260,13 +260,13 @@ void OutgoingMigration::SyncFb() {
OnAllShards([](auto& migration) { migration->PrepareSync(); });
}

if (!cntx_.IsRunning()) {
if (!exec_st_.IsRunning()) {
continue;
}

OnAllShards([](auto& migration) { migration->RunSync(); });

if (!cntx_.IsRunning()) {
if (!exec_st_.IsRunning()) {
continue;
}

Expand All @@ -276,7 +276,7 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
if (!cntx_.IsRunning()) {
if (!exec_st_.IsRunning()) {
continue;
}
break;
Expand All @@ -291,11 +291,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
if (attempt > 1) {
if (!cntx_.IsRunning()) {
if (!exec_st_.IsRunning()) {
return true;
}
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
return false;
Expand Down Expand Up @@ -369,7 +369,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
}

if (!cntx_.GetError()) {
if (!exec_st_.GetError()) {
Finish();
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
Expand All @@ -381,7 +381,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
void OutgoingMigration::Start() {
VLOG(1) << "Resolving host DNS for outgoing migration";
if (error_code ec = ResolveHostDns(); ec) {
cntx_.ReportError(GenericError(ec, "Could not resolve host dns."));
exec_st_.ReportError(GenericError(ec, "Could not resolve host dns."));
return;
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class OutgoingMigration : private ProtocolClient {
}

void ResetError() {
if (cntx_.IsError()) {
SetLastError(cntx_.GetError());
cntx_.Reset(nullptr);
if (exec_st_.IsError()) {
SetLastError(exec_st_.GetError());
exec_st_.Reset(nullptr);
}
}

Expand Down
34 changes: 18 additions & 16 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
<< ", expecting " << shard->journal()->GetLsn();
return false;
}
if (!replica->cntx.IsRunning()) {
if (!replica->exec_st.IsRunning()) {
return false;
}
VLOG(1) << "Replica lsn:" << flow->last_acked_lsn
Expand All @@ -111,7 +111,7 @@ void DflyCmd::ReplicaInfo::Cancel() {

// Update state and cancel context.
replica_state = SyncState::CANCELLED;
cntx.ReportCancelError();
exec_st.ReportCancelError();
// Wait for tasks to finish.
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
VLOG(2) << "Disconnecting flow " << shard->shard_id();
Expand All @@ -124,7 +124,7 @@ void DflyCmd::ReplicaInfo::Cancel() {
flow->conn = nullptr;
});
// Wait for error handler to quit.
cntx.JoinErrorHandler();
exec_st.JoinErrorHandler();
VLOG(1) << "Disconnecting replica " << address << ":" << listening_port;
}

Expand Down Expand Up @@ -332,8 +332,8 @@ void DflyCmd::Sync(CmdArgList args, Transaction* tx, RedisReplyBuilder* rb) {

// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
status =
StartFullSyncInThread(&replica_ptr->flows[shard->shard_id()], &replica_ptr->cntx, shard);
status = StartFullSyncInThread(&replica_ptr->flows[shard->shard_id()], &replica_ptr->exec_st,
shard);
};
shard_set->RunBlockingInParallel(std::move(cb));

Expand Down Expand Up @@ -370,11 +370,11 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];

status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
status = StopFullSyncInThread(flow, &replica_ptr->exec_st, shard);
if (*status != OpStatus::OK) {
return;
}
StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
StartStableSyncInThread(flow, &replica_ptr->exec_st, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));

Expand Down Expand Up @@ -548,7 +548,8 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
rb->SendOk();
}

OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) {
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st,
EngineShard* shard) {
DCHECK(shard);
DCHECK(flow->conn);

Expand Down Expand Up @@ -577,30 +578,31 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, En
ec = saver->SaveHeader({});
}
if (ec) {
cntx->ReportError(ec);
exec_st->ReportError(ec);
return OpStatus::CANCELLED;
}

if (flow->start_partial_sync_at.has_value())
saver->StartIncrementalSnapshotInShard(*flow->start_partial_sync_at, cntx, shard);
saver->StartIncrementalSnapshotInShard(*flow->start_partial_sync_at, exec_st, shard);
else
saver->StartSnapshotInShard(true, cntx, shard);
saver->StartSnapshotInShard(true, exec_st, shard);

return OpStatus::OK;
}

OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) {
OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* exec_st,
EngineShard* shard) {
DCHECK(shard);

error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
exec_st->ReportError(ec);
return OpStatus::CANCELLED;
}

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
exec_st->ReportError(ec);
return OpStatus::CANCELLED;
}

Expand All @@ -610,12 +612,12 @@ OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, Eng
return OpStatus::OK;
}

void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) {
void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, EngineShard* shard) {
// Create streamer for shard flows.
DCHECK(shard);
DCHECK(flow->conn);

flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx));
flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st));
bool send_lsn = flow->version >= DflyVersion::VER4;
flow->streamer->Start(flow->conn->socket(), send_lsn);

Expand Down
4 changes: 2 additions & 2 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DflyCmd {
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
ExecutionState::ErrHandler err_handler)
: replica_state{SyncState::PREPARATION},
cntx{std::move(err_handler)},
exec_st{std::move(err_handler)},
address{std::move(address)},
listening_port(listening_port),
flows{flow_count} {
Expand All @@ -115,7 +115,7 @@ class DflyCmd {
void Cancel();

SyncState replica_state; // always guarded by shared_mu
ExecutionState cntx;
ExecutionState exec_st;

std::string id;
std::string address;
Expand Down
2 changes: 1 addition & 1 deletion src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov
}

ProtocolClient::~ProtocolClient() {
cntx_.JoinErrorHandler();
exec_st_.JoinErrorHandler();

// FIXME: We should close the socket explictly outside of the destructor. This currently
// breaks test_cancel_replication_immediately.
Expand Down
2 changes: 1 addition & 1 deletion src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class ProtocolClient {
util::fb2::Mutex sock_mu_;

protected:
ExecutionState cntx_; // context for tasks in replica.
ExecutionState exec_st_; // context for tasks in replica.

std::string last_cmd_;
std::string last_resp_;
Expand Down
Loading
Loading