Skip to content

Commit 2d82f10

Browse files
committed
fix CR
Signed-off-by: adi_holden <[email protected]>
1 parent 3625e18 commit 2d82f10

File tree

11 files changed

+106
-77
lines changed

11 files changed

+106
-77
lines changed

src/server/cluster/cluster_config.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ SlotId ClusterConfig::KeySlot(string_view key) {
7070
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
7171
}
7272

73+
OpResult<SlotRange> ClusterConfig::SlotRangeFromStr(std::string_view start, std::string_view end) {
74+
uint32_t slot_id_start, slot_id_end;
75+
if (!absl::SimpleAtoi(start, &slot_id_start)) {
76+
return facade::OpStatus::INVALID_INT;
77+
}
78+
if (slot_id_start > ClusterConfig::kMaxSlotNum) {
79+
return facade::OpStatus::INVALID_VALUE;
80+
}
81+
if (!absl::SimpleAtoi(end, &slot_id_end)) {
82+
return facade::OpStatus::INVALID_INT;
83+
}
84+
if ((slot_id_end > ClusterConfig::kMaxSlotNum) || (slot_id_end < slot_id_start)) {
85+
return facade::OpStatus::INVALID_VALUE;
86+
}
87+
return SlotRange{.start = static_cast<uint16_t>(slot_id_start),
88+
.end = static_cast<uint16_t>(slot_id_end)};
89+
}
90+
7391
namespace {
7492
bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) {
7593
absl::flat_hash_set<string_view> nodes;

src/server/cluster/cluster_config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
#include <vector>
1111

1212
#include "core/json/json_object.h"
13+
#include "src/facade/op_status.h"
1314
#include "src/server/cluster/slot_set.h"
1415
#include "src/server/common.h"
1516

1617
namespace dfly {
1718

19+
using facade::OpResult;
20+
1821
// MigrationState constants are ordered in state changing order
1922
enum class MigrationState : uint8_t {
2023
C_NO_STATE,
@@ -57,6 +60,7 @@ class ClusterConfig {
5760
using ClusterShards = std::vector<ClusterShard>;
5861

5962
static SlotId KeySlot(std::string_view key);
63+
static OpResult<SlotRange> SlotRangeFromStr(std::string_view start, std::string_view end);
6064

6165
static void Initialize();
6266
static bool IsEnabled();

src/server/cluster/slot_set.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class SlotSet {
9999
std::string slots_str;
100100
for (SlotId i = 0; i < kSlotsNumber; ++i) {
101101
if (slots_->test(i)) {
102-
absl::StrAppend(&slots_str, absl::StrCat(i));
102+
absl::StrAppend(&slots_str, absl::StrCat(i, " "));
103103
}
104104
}
105105
return slots_str;

src/server/common.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,4 +395,17 @@ std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag) {
395395
return strings::HumanReadableNumBytes(flag.value);
396396
}
397397

398+
std::ostream& operator<<(std::ostream& os, const GlobalState& state) {
399+
return os << GlobalStateName(state);
400+
}
401+
402+
std::ostream& operator<<(std::ostream& os, ArgSlice list) {
403+
os << "[";
404+
if (!list.empty()) {
405+
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
406+
os << (*(list.end() - 1));
407+
}
408+
return os << "]";
409+
}
410+
398411
} // namespace dfly

src/server/common.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ enum class GlobalState : uint8_t {
160160
TAKEN_OVER,
161161
};
162162

163+
const char* GlobalStateName(GlobalState gs);
164+
165+
std::ostream& operator<<(std::ostream& os, const GlobalState& state);
166+
167+
std::ostream& operator<<(std::ostream& os, ArgSlice list);
168+
163169
enum class TimeUnit : uint8_t { SEC, MSEC };
164170

165171
inline void ToUpper(const MutableSlice* val) {
@@ -195,8 +201,6 @@ int64_t GetMallocCurrentCommitted();
195201
// set upon server start.
196202
extern unsigned kernel_version;
197203

198-
const char* GlobalStateName(GlobalState gs);
199-
200204
template <typename RandGen> std::string GetRandomHex(RandGen& gen, size_t len) {
201205
static_assert(std::is_same<uint64_t, decltype(gen())>::value);
202206
std::string res(len, '\0');

src/server/main_service.cc

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2405,35 +2405,38 @@ VarzValue::Map Service::GetVarzStats() {
24052405

24062406
GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
24072407
lock_guard lk(mu_);
2408-
// Counting switchs to loading state enables us to do several loading processes (replications)
2409-
// at the same time. Only when all loading processes finish we exit loading state.
2410-
// When server is in loading state, calling SwitchState to loading will increase
2411-
// loading_state_counter_. When server is in loading state, calling SwitchState to active will
2412-
// decrease loading_state_counter_, if loading_state_counter_ is 0 switch to active.
2413-
if (global_state_ == to && global_state_ == GlobalState::LOADING) {
2414-
++loading_state_counter_;
2415-
return global_state_;
2416-
}
2417-
if (global_state_ == from && global_state_ == GlobalState::LOADING &&
2418-
loading_state_counter_ > 0) {
2419-
--loading_state_counter_;
2420-
if (loading_state_counter_ > 0) {
2421-
return global_state_;
2422-
}
2423-
}
2424-
24252408
if (global_state_ != from) {
24262409
return global_state_;
24272410
}
24282411

24292412
VLOG(1) << "Switching state from " << GlobalStateName(from) << " to " << GlobalStateName(to);
2430-
24312413
global_state_ = to;
24322414

24332415
pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); });
24342416
return to;
24352417
}
24362418

2419+
void Service::RequestLoadingState() {
2420+
unique_lock lk(mu_);
2421+
++loading_state_counter_;
2422+
if (global_state_ != GlobalState::LOADING) {
2423+
DCHECK_EQ(global_state_, GlobalState::ACTIVE);
2424+
lk.unlock();
2425+
SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
2426+
}
2427+
}
2428+
2429+
void Service::RemoveLoadingState() {
2430+
unique_lock lk(mu_);
2431+
DCHECK_EQ(global_state_, GlobalState::LOADING);
2432+
DCHECK_GT(loading_state_counter_, 0u);
2433+
--loading_state_counter_;
2434+
if (loading_state_counter_ == 0) {
2435+
lk.unlock();
2436+
SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
2437+
}
2438+
}
2439+
24372440
GlobalState Service::GetGlobalState() const {
24382441
lock_guard lk(mu_);
24392442
return global_state_;

src/server/main_service.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class Service : public facade::ServiceInterface {
8787
// Upon switch, updates cached global state in threadlocal ServerState struct.
8888
GlobalState SwitchState(GlobalState from, GlobalState to);
8989

90+
void RequestLoadingState();
91+
void RemoveLoadingState();
92+
9093
GlobalState GetGlobalState() const;
9194

9295
void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;

src/server/replica.cc

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,8 @@ error_code Replica::InitiatePSync() {
376376
io::PrefixSource ps{io_buf.InputBuffer(), Sock()};
377377

378378
// Set LOADING state.
379-
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING);
380-
absl::Cleanup cleanup = [this]() {
381-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
382-
};
379+
service_.RequestLoadingState();
380+
absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); };
383381

384382
if (slot_range_.has_value()) {
385383
JournalExecutor{&service_}.FlushSlots(slot_range_.value());
@@ -434,14 +432,6 @@ error_code Replica::InitiatePSync() {
434432
error_code Replica::InitiateDflySync() {
435433
auto start_time = absl::Now();
436434

437-
absl::Cleanup cleanup = [this]() {
438-
// We do the following operations regardless of outcome.
439-
JoinDflyFlows();
440-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
441-
state_mask_.fetch_and(~R_SYNCING);
442-
last_journal_LSNs_.reset();
443-
};
444-
445435
// Initialize MultiShardExecution.
446436
multi_shard_exe_.reset(new MultiShardExecution());
447437

@@ -471,10 +461,19 @@ error_code Replica::InitiateDflySync() {
471461
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
472462

473463
// Make sure we're in LOADING state.
474-
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING);
464+
service_.RequestLoadingState();
475465

476466
// Start full sync flows.
477467
state_mask_.fetch_or(R_SYNCING);
468+
469+
absl::Cleanup cleanup = [this]() {
470+
// We do the following operations regardless of outcome.
471+
JoinDflyFlows();
472+
service_.RemoveLoadingState();
473+
state_mask_.fetch_and(~R_SYNCING);
474+
last_journal_LSNs_.reset();
475+
};
476+
478477
std::string_view sync_type = "full";
479478
{
480479
// Going out of the way to avoid using std::vector<bool>...

src/server/server_family.cc

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -551,53 +551,49 @@ string_view GetRedisMode() {
551551
}
552552

553553
struct ReplicaOfArgs {
554-
bool no_one = false;
555554
string_view host;
556555
string_view port_sv;
557-
uint32_t port;
556+
uint16_t port;
558557
std::optional<SlotRange> slot_range;
559558
static OpResult<ReplicaOfArgs> FromParams(string_view host, string_view port,
560559
std::optional<string_view> slot_start,
561560
std::optional<string_view> slot_end);
561+
bool IsReplicaOfNoOne();
562562
};
563563

564-
OpResult<ReplicaOfArgs> ReplicaOfArgs::FromParams(string_view host, string_view port,
564+
bool ReplicaOfArgs::IsReplicaOfNoOne() {
565+
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_sv, "one")) {
566+
return true;
567+
}
568+
return false;
569+
}
570+
571+
OpResult<ReplicaOfArgs> ReplicaOfArgs::FromParams(string_view host, string_view port_sv,
565572
std::optional<string_view> slot_start,
566573
std::optional<string_view> slot_end) {
567574
ReplicaOfArgs replicaof_args;
568575
replicaof_args.host = host;
569-
replicaof_args.port_sv = port;
570-
if (absl::EqualsIgnoreCase(replicaof_args.host, "no") &&
571-
absl::EqualsIgnoreCase(replicaof_args.port_sv, "one")) {
572-
replicaof_args.no_one = true;
576+
replicaof_args.port_sv = port_sv;
577+
if (replicaof_args.IsReplicaOfNoOne()) {
573578
return replicaof_args;
574579
}
575580

576-
if (!absl::SimpleAtoi(replicaof_args.port_sv, &replicaof_args.port) || replicaof_args.port < 1 ||
577-
replicaof_args.port > 65535) {
581+
uint32_t port;
582+
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
578583
return facade::OpStatus::INVALID_INT;
579584
}
585+
replicaof_args.port = static_cast<uint16_t>(port);
580586

581587
if (slot_start.has_value()) {
582588
if (!slot_end.has_value()) {
583589
return facade::OpStatus::SYNTAX_ERR;
584590
}
585591

586-
uint32_t slot_id_start, slot_id_end;
587-
if (!absl::SimpleAtoi(*slot_start, &slot_id_start)) {
588-
return facade::OpStatus::INVALID_INT;
589-
}
590-
if (slot_id_start > ClusterConfig::kMaxSlotNum) {
591-
return facade::OpStatus::INVALID_VALUE;
592+
OpResult<SlotRange> slot_range = ClusterConfig::SlotRangeFromStr(*slot_start, *slot_end);
593+
if (!slot_range) {
594+
return slot_range.status();
592595
}
593-
if (!absl::SimpleAtoi(*slot_end, &slot_id_end)) {
594-
return facade::OpStatus::INVALID_INT;
595-
}
596-
if (slot_id_end > ClusterConfig::kMaxSlotNum) {
597-
return facade::OpStatus::INVALID_VALUE;
598-
}
599-
replicaof_args.slot_range = SlotRange{.start = static_cast<uint16_t>(slot_id_start),
600-
.end = static_cast<uint16_t>(slot_id_end)};
596+
replicaof_args.slot_range = slot_range.value();
601597
}
602598
return replicaof_args;
603599
}
@@ -846,9 +842,9 @@ void ServerFamily::Shutdown() {
846842
unique_lock lk(replicaof_mu_);
847843
if (replica_) {
848844
replica_->Stop();
849-
for (auto& replica : cluster_replicas_) {
850-
replica->Stop();
851-
}
845+
}
846+
for (auto& replica : cluster_replicas_) {
847+
replica->Stop();
852848
}
853849

854850
dfly_cmd_->Shutdown();
@@ -2344,7 +2340,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) {
23442340
if (!replicaof_args) {
23452341
return cntx->SendError(replicaof_args.status());
23462342
}
2347-
if (replicaof_args->no_one) {
2343+
if (replicaof_args->IsReplicaOfNoOne()) {
23482344
return cntx->SendError("ADDREPLICAOF does not supprot no one");
23492345
}
23502346
LOG(INFO) << "Add Replica " << replicaof_args->host << ":" << replicaof_args->port_sv;
@@ -2377,7 +2373,7 @@ void ServerFamily::ReplicaOfInternal(std::string_view host, std::string_view por
23772373
}
23782374

23792375
// If NO ONE was supplied, just stop the current replica (if it exists)
2380-
if (replicaof_args->no_one) {
2376+
if (replicaof_args->IsReplicaOfNoOne()) {
23812377
if (!ServerState::tlocal()->is_master) {
23822378
CHECK(replica_);
23832379

@@ -2618,9 +2614,7 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
26182614

26192615
} else {
26202616
unique_lock lk{replicaof_mu_};
2621-
2622-
size_t additional_replication = cluster_replicas_.size();
2623-
rb->StartArray(4 + additional_replication * 3);
2617+
rb->StartArray(4 + cluster_replicas_.size() * 3);
26242618
rb->SendBulkString("replica");
26252619

26262620
auto send_replica_info = [rb](Replica::Info rinfo) {

src/server/test_utils.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,6 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to
3131

3232
namespace dfly {
3333

34-
std::ostream& operator<<(std::ostream& os, ArgSlice list) {
35-
os << "[";
36-
if (!list.empty()) {
37-
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
38-
os << (*(list.end() - 1));
39-
}
40-
return os << "]";
41-
}
42-
4334
std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
4435
os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_size
4536
<< ", tiered_entries: " << stats.tiered_entries << "\n";

0 commit comments

Comments
 (0)