Skip to content

Commit d99b0ed

Browse files
authored
feat: retry ACK if the configs are different #2833 (#2906)
* feat: retry ACK if the configs are different #2833
1 parent f58ded4 commit d99b0ed

File tree

7 files changed

+69
-16
lines changed

7 files changed

+69
-16
lines changed

src/server/cluster/cluster_config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ class ClusterConfig {
102102
std::vector<MigrationInfo> GetFinishedIncomingMigrations(
103103
const std::shared_ptr<ClusterConfig>& prev) const;
104104

105+
std::vector<MigrationInfo> GetIncomingMigrations() const {
106+
return my_incoming_migrations_;
107+
}
108+
105109
private:
106110
struct SlotEntry {
107111
const ClusterShardInfo* shard = nullptr;

src/server/cluster/cluster_family.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
834834
}
835835

836836
VLOG(1) << "DFLYMIGRATE ACK" << args;
837+
auto in_migrations = tl_cluster_config->GetIncomingMigrations();
838+
auto m_it = std::find_if(in_migrations.begin(), in_migrations.end(),
839+
[source_id](const auto& m) { return m.node_id == source_id; });
840+
if (m_it == in_migrations.end()) {
841+
LOG(WARNING) << "migration isn't in config";
842+
return cntx->SendLong(OutgoingMigration::kInvalidAttempt);
843+
}
844+
837845
auto migration = GetIncomingMigration(source_id);
838846
if (!migration)
839847
return cntx->SendError(kIdNotFound);
@@ -847,8 +855,9 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
847855
}
848856

849857
UpdateConfig(migration->GetSlots(), true);
858+
VLOG(1) << "Config is updated for " << MyID();
850859

851-
cntx->SendLong(attempt);
860+
return cntx->SendLong(attempt);
852861
}
853862

854863
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);

src/server/cluster/incoming_slot_migration.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void IncomingSlotMigration::Join() {
9595

9696
void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) {
9797
VLOG(1) << "Start flow for shard: " << shard;
98+
state_.store(MigrationState::C_SYNC);
9899

99100
shard_flows_[shard]->Start(&cntx_, source);
100101
bc_->Dec();

src/server/cluster/incoming_slot_migration.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class IncomingSlotMigration {
2525
void Join();
2626

2727
MigrationState GetState() const {
28-
return state_;
28+
return state_.load();
2929
}
3030

3131
const SlotRanges& GetSlots() const {
@@ -41,7 +41,7 @@ class IncomingSlotMigration {
4141
Service& service_;
4242
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
4343
SlotRanges slots_;
44-
MigrationState state_ = MigrationState::C_NO_STATE;
44+
std::atomic<MigrationState> state_ = MigrationState::C_NO_STATE;
4545
Context cntx_;
4646

4747
util::fb2::BlockingCounter bc_;

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
#include "server/journal/streamer.h"
1818
#include "server/server_family.h"
1919

20-
ABSL_FLAG(int, source_connect_timeout_ms, 20000,
21-
"Timeout for establishing connection to a source node");
20+
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
2221

2322
using namespace std;
2423
using namespace facade;
@@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
3736
}
3837

3938
std::error_code Start(const std::string& node_id, uint32_t shard_id) {
40-
RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_));
39+
RETURN_ON_ERR(
40+
ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_));
4141
ResetParser(/*server_mode=*/false);
4242

4343
std::string cmd = absl::StrCat("DFLYMIGRATE FLOW ", node_id, " ", shard_id);
@@ -86,6 +86,7 @@ MigrationState OutgoingMigration::GetState() const {
8686
}
8787

8888
void OutgoingMigration::SyncFb() {
89+
state_.store(MigrationState::C_SYNC);
8990
auto start_cb = [this](util::ProactorBase* pb) {
9091
if (auto* shard = EngineShard::tlocal(); shard) {
9192
server_family_->journal()->StartInThread();
@@ -146,10 +147,10 @@ bool OutgoingMigration::FinishMigration(long attempt) {
146147
LOG_IF(WARNING, err) << err;
147148

148149
if (!err) {
149-
long attempt_res = -1;
150+
long attempt_res = kInvalidAttempt;
150151
do { // we can have response from previos time so we need to read until get response for the
151152
// last attempt
152-
auto resp = ReadRespReply(absl::GetFlag(FLAGS_source_connect_timeout_ms));
153+
auto resp = ReadRespReply(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms));
153154

154155
if (!resp) {
155156
LOG(WARNING) << resp.error();
@@ -163,6 +164,9 @@ bool OutgoingMigration::FinishMigration(long attempt) {
163164
return false;
164165
}
165166
attempt_res = get<int64_t>(LastResponseArgs().front().u);
167+
if (attempt_res == kInvalidAttempt) {
168+
return false;
169+
}
166170
} while (attempt_res != attempt);
167171

168172
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
@@ -172,6 +176,7 @@ bool OutgoingMigration::FinishMigration(long attempt) {
172176

173177
state_.store(MigrationState::C_FINISHED);
174178
cf_->UpdateConfig(migration_info_.slot_ranges, false);
179+
VLOG(1) << "Config is updated for " << cf_->MyID();
175180
return true;
176181
} else {
177182
// TODO implement connection issue error processing
@@ -196,7 +201,7 @@ std::error_code OutgoingMigration::Start(ConnectionContext* cntx) {
196201
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));
197202

198203
VLOG(1) << "Connecting to source";
199-
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
204+
ec = ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_);
200205
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
201206

202207
VLOG(1) << "Migration initiating";

src/server/cluster/outgoing_slot_migration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class OutgoingMigration : private ProtocolClient {
4949
return migration_info_;
5050
}
5151

52+
static constexpr long kInvalidAttempt = -1;
53+
5254
private:
5355
MigrationState GetStateImpl() const;
5456
// SliceSlotMigration manages state and data transfering for the corresponding shard

tests/dragonfly/cluster_test.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -892,9 +892,8 @@ async def test_random_keys():
892892
await close_clients(client, *c_masters, *c_masters_admin, *c_replicas, *c_replicas_admin)
893893

894894

895-
@pytest.mark.skip(reason="Test needs refactoring because of cluster design change")
896895
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
897-
async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
896+
async def test_config_consistency(df_local_factory: DflyInstanceFactory):
898897
# Check slot migration from one node to another
899898
nodes = [
900899
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@@ -928,10 +927,8 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
928927
c_nodes_admin,
929928
)
930929

931-
status = await c_nodes_admin[1].execute_command(
932-
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
933-
)
934-
assert "NO_STATE" == status
930+
for node in c_nodes_admin:
931+
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
935932

936933
migation_config = f"""
937934
[
@@ -950,12 +947,47 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
950947
]
951948
"""
952949

950+
# push config only to source node
951+
await push_config(
952+
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
953+
[c_nodes_admin[0]],
954+
)
955+
956+
assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
957+
f"""out {node_ids[1]} SYNC keys:0"""
958+
]
959+
960+
assert await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
961+
f"""in {node_ids[0]} SYNC keys:0"""
962+
]
963+
964+
# migration shouldn't be finished until we set the same config to target node
965+
await asyncio.sleep(0.5)
966+
967+
# push config to target node
953968
await push_config(
954969
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
970+
[c_nodes_admin[1]],
971+
)
972+
973+
while "FINISHED" not in await c_nodes_admin[1].execute_command(
974+
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
975+
):
976+
logging.debug("SLOT-MIGRATION-STATUS is not FINISHED")
977+
await asyncio.sleep(0.05)
978+
979+
assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
980+
f"""out {node_ids[1]} FINISHED keys:0"""
981+
]
982+
983+
# remove finished migrations
984+
await push_config(
985+
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
955986
c_nodes_admin,
956987
)
957988

958-
# TODO add a check for correct results after the same config apply
989+
for node in c_nodes_admin:
990+
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
959991

960992
await close_clients(*c_nodes, *c_nodes_admin)
961993

0 commit comments

Comments
 (0)