Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ auto RedisParser::ParseNum(Buffer str, int64_t* res) -> Result {
if (str.size() < 4) {
return INPUT_PENDING;
}
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%');
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');

char* s = reinterpret_cast<char*>(str.data() + 1);
char* pos = reinterpret_cast<char*>(memchr(s, '\n', str.size() - 1));
Expand Down
1 change: 1 addition & 0 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class MigrationState : uint8_t {
C_CONNECTING,
C_SYNC,
C_FINISHED,
C_CANCELLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need it? If we cancel migration we should remove it at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the usage near calling FinishMigration(), although I'm open to suggestions here :)

C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
};

Expand Down
31 changes: 31 additions & 0 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ static string_view StateToStr(MigrationState state) {
return "SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_CANCELLED:
return "CANCELLED"sv;
case MigrationState::C_MAX_INVALID:
break;
}
Expand Down Expand Up @@ -736,8 +738,15 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector<MigrationInfo>& m
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
[&m](const auto& om) { return m == om->GetMigrationInfo(); });
DCHECK(it != outgoing_migration_jobs_.end());
DCHECK(it->get() != nullptr);
OutgoingMigration& migration = *it->get();
LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(migration.GetSlots())
<< " to " << migration.GetHostIp() << ":" << migration.GetPort();
migration.Cancel();
outgoing_migration_jobs_.erase(it);
}

// Flushing of removed slots is done outside this function.
}

void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) {
Expand All @@ -748,6 +757,28 @@ void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& m
return m.node_id == im->GetSourceID() && m.slot_ranges == im->GetSlots();
});
DCHECK(it != incoming_migrations_jobs_.end());
DCHECK(it->get() != nullptr);
IncomingSlotMigration& migration = *it->get();

// Flush non-owned migrations
SlotSet migration_slots(migration.GetSlots());
SlotSet removed = migration_slots.GetRemovedSlots(tl_cluster_config->GetOwnedSlots());

// First cancel socket, then flush slots, so that new entries won't arrive after we flush.
migration.Cancel();

if (!removed.Empty()) {
auto removed_ranges = make_shared<SlotRanges>(removed.ToSlotRanges());
LOG_IF(WARNING, migration.GetState() == MigrationState::C_FINISHED)
<< "Flushing slots of removed FINISHED migration " << migration.GetSourceID()
<< ", slots: " << SlotRange::ToString(*removed_ranges);
shard_set->pool()->DispatchOnAll([removed_ranges](unsigned, ProactorBase*) {
if (EngineShard* shard = EngineShard::tlocal(); shard) {
shard->db_slice().FlushSlots(*removed_ranges);
}
});
}

incoming_migrations_jobs_.erase(it);
}
}
Expand Down
28 changes: 25 additions & 3 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class ClusterShardMigration {
executor_ = std::make_unique<JournalExecutor>(service);
}

void Start(Context* cntx, io::Source* source) {
void Start(Context* cntx, util::FiberSocketBase* source) {
socket_ = source;
JournalReader reader{source, 0};
TransactionReader tx_reader{false};

Expand All @@ -48,6 +49,18 @@ class ClusterShardMigration {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
}

socket_ = nullptr;
}

void Cancel() {
if (socket_ != nullptr) {
socket_->proactor()->Await([s = socket_, sid = source_shard_id_]() {
if (s->IsOpen()) {
s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
}
});
}
}

private:
Expand All @@ -68,6 +81,7 @@ class ClusterShardMigration {

private:
uint32_t source_shard_id_;
util::FiberSocketBase* socket_ = nullptr;
std::unique_ptr<JournalExecutor> executor_;
};

Expand All @@ -85,15 +99,23 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
}

IncomingSlotMigration::~IncomingSlotMigration() {
sync_fb_.JoinIfNeeded();
}

void IncomingSlotMigration::Join() {
bc_->Wait();
state_ = MigrationState::C_FINISHED;
}

void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) {
void IncomingSlotMigration::Cancel() {
LOG(INFO) << "Cancelling incoming migration of slots " << SlotRange::ToString(slots_);
cntx_.Cancel();

for (auto& flow : shard_flows_) {
flow->Cancel();
}
}

void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
VLOG(1) << "Start flow for shard: " << shard;

shard_flows_[shard]->Start(&cntx_, source);
Expand Down
6 changes: 4 additions & 2 deletions src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include "helio/io/io.h"
#include "helio/util/fiber_socket_base.h"
#include "server/cluster/cluster_config.h"

namespace dfly {
Expand All @@ -20,10 +21,12 @@ class IncomingSlotMigration {
~IncomingSlotMigration();

// process data from FDLYMIGRATE FLOW cmd
void StartFlow(uint32_t shard, io::Source* source);
void StartFlow(uint32_t shard, util::FiberSocketBase* source);
// wait untill all flows are got FIN opcode
void Join();

void Cancel();

MigrationState GetState() const {
return state_;
}
Expand All @@ -45,7 +48,6 @@ class IncomingSlotMigration {
Context cntx_;

util::fb2::BlockingCounter bc_;
util::fb2::Fiber sync_fb_;
};

} // namespace dfly
18 changes: 15 additions & 3 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::Cancel() {
state_.store(MigrationState::C_CANCELLED);

auto start_cb = [this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()]->Cancel();
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(start_cb));
}

MigrationState OutgoingMigration::GetState() const {
return state_.load();
}
Expand All @@ -103,12 +114,13 @@ void OutgoingMigration::SyncFb() {
for (auto& migration : slot_migrations_) {
migration->WaitForSnapshotFinished();
}
VLOG(1) << "Migrations snapshot is finihed";

VLOG(1) << "Migrations snapshot is finished";

// TODO implement blocking on migrated slots only

long attempt = 0;
while (!FinishMigration(++attempt)) {
while (state_.load() != MigrationState::C_CANCELLED && !FinishMigration(++attempt)) {
// process commands that were on pause and try again
ThisFiber::SleepFor(500ms);
}
Expand All @@ -124,7 +136,7 @@ bool OutgoingMigration::FinishMigration(long attempt) {
LOG(WARNING) << "Cluster migration finalization time out";
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] {
absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
});
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class OutgoingMigration : private ProtocolClient {
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

void Cancel();

MigrationState GetState() const;

const std::string& GetHostIp() const {
Expand Down
34 changes: 25 additions & 9 deletions src/server/cluster/slot_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

#include <bitset>
#include <memory>
#include <string>
#include <vector>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"

namespace dfly {

using SlotId = uint16_t;
Expand All @@ -23,24 +27,39 @@ struct SlotRange {
bool IsValid() {
return start <= end && start <= kMaxSlotId && end <= kMaxSlotId;
}

std::string ToString() const {
return absl::StrCat("[", start, ", ", end, "]");
}

static std::string ToString(const std::vector<SlotRange>& ranges) {
return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) {
absl::StrAppend(out, range.ToString());
});
}
};

using SlotRanges = std::vector<SlotRange>;

class SlotSet {
public:
static constexpr SlotId kSlotsNumber = SlotRange::kMaxSlotId + 1;
using TBitSet = std::bitset<kSlotsNumber>;

SlotSet(bool full_house = false) : slots_(std::make_unique<BitsetType>()) {
SlotSet(bool full_house = false) {
if (full_house)
slots_->flip();
}

SlotSet(const SlotRanges& slot_ranges) : SlotSet() {
SlotSet(const SlotRanges& slot_ranges) {
Set(slot_ranges, true);
}

SlotSet(const SlotSet& s) : SlotSet() {
SlotSet(const TBitSet& s) {
*slots_ = s;
}

SlotSet(const SlotSet& s) {
*slots_ = *s.slots_;
}

Expand Down Expand Up @@ -73,10 +92,8 @@ class SlotSet {
}

// Get SlotSet that are absent in the slots
SlotSet GetRemovedSlots(SlotSet slots) {
slots.slots_->flip();
*slots.slots_ &= *slots_;
return slots;
SlotSet GetRemovedSlots(const SlotSet& slots) const {
return *slots_ & ~*slots.slots_;
}

SlotRanges ToSlotRanges() const {
Expand All @@ -97,8 +114,7 @@ class SlotSet {
}

private:
using BitsetType = std::bitset<kSlotsNumber>;
std::unique_ptr<BitsetType> slots_;
std::unique_ptr<TBitSet> slots_{std::make_unique<TBitSet>()};
};

} // namespace dfly
9 changes: 6 additions & 3 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ RestoreStreamer::~RestoreStreamer() {
}

void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
db_slice_->UnregisterOnChange(snapshot_version_);
JournalStreamer::Cancel();
if (snapshot_version_ != 0) {
fiber_cancellation_.Cancel();
db_slice_->UnregisterOnChange(snapshot_version_);
JournalStreamer::Cancel();
snapshot_version_ = 0;
}
}

bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
Expand Down
Loading