Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ auto RedisParser::ParseNum(Buffer str, int64_t* res) -> Result {
if (str.size() < 4) {
return INPUT_PENDING;
}
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%');
DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~');

char* s = reinterpret_cast<char*>(str.data() + 1);
char* pos = reinterpret_cast<char*>(memchr(s, '\n', str.size() - 1));
Expand Down
1 change: 1 addition & 0 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class MigrationState : uint8_t {
C_CONNECTING,
C_SYNC,
C_FINISHED,
C_CANCELLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need it? If we cancel migration we should remove it at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the usage near calling FinishMigration(), although I'm open to suggestions here :)

C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
};

Expand Down
36 changes: 36 additions & 0 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ static string_view StateToStr(MigrationState state) {
return "SYNC"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_CANCELLED:
return "CANCELLED"sv;
case MigrationState::C_MAX_INVALID:
break;
}
Expand Down Expand Up @@ -736,8 +738,18 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector<MigrationInfo>& m
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
[&m](const auto& om) { return m == om->GetMigrationInfo(); });
DCHECK(it != outgoing_migration_jobs_.end());
DCHECK(it->get() != nullptr);
OutgoingMigration& migration = *it->get();
if (migration.GetState() != MigrationState::C_FINISHED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can cancel even finished migration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state can be changed in the different thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can cancel even finished migration

Not really, it requires unregistering from db slice, for which we may have not registered to (like in the case of the first config assignment)

state can be changed in the different thread

Do you mean that there could be a race here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to say that we shouldn't check for FINISH or should do it under lock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both checks are done under migration_mu_ - is there another lock I should use?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migration_mu_ is used to create migrations, but state is changed inside migration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There doesn't seem to be a lock which locks the inner fiber's state changes at all, right?

LOG(INFO) << "Outgoing migration cancelled: slots "
<< SlotRange::ToString(migration.GetSlots()) << " to " << migration.GetHostIp()
<< ":" << migration.GetPort();
migration.CancelAll();
}
outgoing_migration_jobs_.erase(it);
}

// Flushing of removed slots is done outside this function.
}

void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) {
Expand All @@ -748,6 +760,30 @@ void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& m
return m.node_id == im->GetSourceID() && m.slot_ranges == im->GetSlots();
});
DCHECK(it != incoming_migrations_jobs_.end());
DCHECK(it->get() != nullptr);
IncomingSlotMigration& migration = *it->get();

// Flush non-owned migrations
SlotSet migration_slots(migration.GetSlots());
SlotSet removed = migration_slots.GetRemovedSlots(tl_cluster_config->GetOwnedSlots());

// First cancel socket, then flush slots, so that new entries won't arrive after we flush.
if (migration.GetState() != MigrationState::C_FINISHED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state can be changed in the different thread so we don't know is it finished or not when you do cancel

migration.Cancel();
}

if (!removed.Empty()) {
auto removed_ranges = make_shared<SlotRanges>(removed.ToSlotRanges());
LOG_IF(WARNING, migration.GetState() == MigrationState::C_FINISHED)
<< "Flushing slots of removed FINISHED migration " << migration.GetSourceID()
<< ", slots: " << SlotRange::ToString(*removed_ranges);
shard_set->pool()->DispatchOnAll([removed_ranges](unsigned, ProactorBase*) {
if (EngineShard* shard = EngineShard::tlocal(); shard) {
shard->db_slice().FlushSlots(*removed_ranges);
}
});
}

incoming_migrations_jobs_.erase(it);
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class ClusterShardMigration {
executor_ = std::make_unique<JournalExecutor>(service);
}

void Start(Context* cntx, io::Source* source) {
void Start(Context* cntx, util::FiberSocketBase* source) {
socket_ = source;
JournalReader reader{source, 0};
TransactionReader tx_reader{false};

Expand All @@ -50,6 +51,14 @@ class ClusterShardMigration {
}
}

void Cancel() {
if (socket_ != nullptr) {
socket_->proactor()->Dispatch([s = socket_, sid = source_shard_id_]() {
s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
});
}
}

private:
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
Expand All @@ -68,6 +77,7 @@ class ClusterShardMigration {

private:
uint32_t source_shard_id_;
util::FiberSocketBase* socket_ = nullptr;
std::unique_ptr<JournalExecutor> executor_;
};

Expand All @@ -85,15 +95,28 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
}

IncomingSlotMigration::~IncomingSlotMigration() {
sync_fb_.JoinIfNeeded();
}

void IncomingSlotMigration::Join() {
bc_->Wait();
state_ = MigrationState::C_FINISHED;
}

void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) {
void IncomingSlotMigration::Cancel() {
LOG(INFO) << "Cancelling incoming migration of slots " << SlotRange::ToString(slots_);
cntx_.Cancel();

auto cb = [this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
if (auto& flow = shard_flows_[shard->shard_id()]; flow) {
flow->Cancel();
}
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can not use shard_id for incoming migrations, we don't know where every flow is executed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right! good catch
furthermore, this isn't even needed, inside the inner Cancel() I forward to the correct proactor.

shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}

void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
VLOG(1) << "Start flow for shard: " << shard;

shard_flows_[shard]->Start(&cntx_, source);
Expand Down
6 changes: 4 additions & 2 deletions src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include "helio/io/io.h"
#include "helio/util/fiber_socket_base.h"
#include "server/cluster/cluster_config.h"

namespace dfly {
Expand All @@ -20,10 +21,12 @@ class IncomingSlotMigration {
~IncomingSlotMigration();

// process data from FDLYMIGRATE FLOW cmd
void StartFlow(uint32_t shard, io::Source* source);
void StartFlow(uint32_t shard, util::FiberSocketBase* source);
// wait untill all flows are got FIN opcode
void Join();

void Cancel();

MigrationState GetState() const {
return state_;
}
Expand All @@ -45,7 +48,6 @@ class IncomingSlotMigration {
Context cntx_;

util::fb2::BlockingCounter bc_;
util::fb2::Fiber sync_fb_;
};

} // namespace dfly
18 changes: 15 additions & 3 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}

void OutgoingMigration::CancelAll() {
state_.store(MigrationState::C_CANCELLED);

auto start_cb = [this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()]->Cancel();
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(start_cb));
}

MigrationState OutgoingMigration::GetState() const {
return state_.load();
}
Expand All @@ -103,12 +114,13 @@ void OutgoingMigration::SyncFb() {
for (auto& migration : slot_migrations_) {
migration->WaitForSnapshotFinished();
}
VLOG(1) << "Migrations snapshot is finihed";

VLOG(1) << "Migrations snapshot is finished";

// TODO implement blocking on migrated slots only

long attempt = 0;
while (!FinishMigration(++attempt)) {
while (state_.load() != MigrationState::C_CANCELLED && !FinishMigration(++attempt)) {
// process commands that were on pause and try again
ThisFiber::SleepFor(500ms);
}
Expand All @@ -124,7 +136,7 @@ bool OutgoingMigration::FinishMigration(long attempt) {
LOG(WARNING) << "Cluster migration finalization time out";
}

absl::Cleanup cleanup([&is_block_active, &pause_fb_opt] {
absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
});
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class OutgoingMigration : private ProtocolClient {
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

void CancelAll();

MigrationState GetState() const;

const std::string& GetHostIp() const {
Expand Down
34 changes: 25 additions & 9 deletions src/server/cluster/slot_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

#include <bitset>
#include <memory>
#include <string>
#include <vector>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"

namespace dfly {

using SlotId = uint16_t;
Expand All @@ -23,24 +27,39 @@ struct SlotRange {
bool IsValid() {
return start <= end && start <= kMaxSlotId && end <= kMaxSlotId;
}

std::string ToString() const {
return absl::StrCat("[", start, ", ", end, "]");
}

static std::string ToString(const std::vector<SlotRange>& ranges) {
return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) {
absl::StrAppend(out, range.ToString());
});
}
};

using SlotRanges = std::vector<SlotRange>;

class SlotSet {
public:
static constexpr SlotId kSlotsNumber = SlotRange::kMaxSlotId + 1;
using TBitSet = std::bitset<kSlotsNumber>;

SlotSet(bool full_house = false) : slots_(std::make_unique<BitsetType>()) {
SlotSet(bool full_house = false) {
if (full_house)
slots_->flip();
}

SlotSet(const SlotRanges& slot_ranges) : SlotSet() {
SlotSet(const SlotRanges& slot_ranges) {
Set(slot_ranges, true);
}

SlotSet(const SlotSet& s) : SlotSet() {
SlotSet(const TBitSet& s) {
*slots_ = s;
}

SlotSet(const SlotSet& s) {
*slots_ = *s.slots_;
}

Expand Down Expand Up @@ -73,10 +92,8 @@ class SlotSet {
}

// Get SlotSet that are absent in the slots
SlotSet GetRemovedSlots(SlotSet slots) {
slots.slots_->flip();
*slots.slots_ &= *slots_;
return slots;
SlotSet GetRemovedSlots(const SlotSet& slots) const {
return *slots_ & ~*slots.slots_;
}

SlotRanges ToSlotRanges() const {
Expand All @@ -97,8 +114,7 @@ class SlotSet {
}

private:
using BitsetType = std::bitset<kSlotsNumber>;
std::unique_ptr<BitsetType> slots_;
std::unique_ptr<TBitSet> slots_{std::make_unique<TBitSet>()};
};

} // namespace dfly
Loading