Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 92 additions & 146 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include "base/logging.h"
#include "base/stl_util.h"
#include "facade/cmd_arg_parser.h"
#include "facade/op_status.h"
#include "redis/redis_aux.h"
#include "server/acl/acl_commands_def.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/table.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"

Expand Down Expand Up @@ -321,8 +324,7 @@ void OpMSet(const OpArgs& op_args, ArgSlice args, atomic_bool* success) {
size_t i = 0;
for (; i < args.size(); i += 2) {
DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1];
OpResult<optional<string>> res = sg.Set(params, args[i], args[i + 1]);
if (res.status() != OpStatus::OK) { // OOM for example.
if (sg.Set(params, args[i], args[i + 1]) != OpStatus::OK) { // OOM for example.
success->store(false);
break;
}
Expand All @@ -346,18 +348,6 @@ void OpMSet(const OpArgs& op_args, ArgSlice args, atomic_bool* success) {
}
}

// See comment for SetCmd::Set() for when and how OpResult's value (i.e. optional<string>) is set.
OpResult<optional<string>> SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams,
string_view key, string_view value, bool manual_journal) {
DCHECK(cntx->transaction);

auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard), manual_journal);
return sg.Set(sparams, key, value);
};
return cntx->transaction->ScheduleSingleHopT(std::move(cb));
}

// emission_interval_ms assumed to be positive
// limit is assumed to be positive
OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view key,
Expand Down Expand Up @@ -464,32 +454,6 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
return array<int64_t, 5>{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms};
}

class SetResultBuilder {
public:
explicit SetResultBuilder(bool return_prev_value) : return_prev_value_(return_prev_value) {
}

void CachePrevValueIfNeeded(const PrimeValue& pv) {
if (return_prev_value_) {
Comment on lines -467 to -473
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we added this when we already had a solution to fetch the previous value

// We call lazily call GetString() here to save string copying when not needed.
prev_value_ = GetString(pv);
}
}

// Returns either the previous value or `status`, depending on return_prev_value_.
OpResult<optional<string>> Return(OpStatus status) && {
if (return_prev_value_) {
return std::move(prev_value_);
} else {
return status;
}
}

private:
bool return_prev_value_;
std::optional<string> prev_value_;
};

SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t,
EngineShard* shard) {
auto keys = t->GetShardArgs(shard->shard_id());
Expand Down Expand Up @@ -561,119 +525,52 @@ struct StringValue {

} // namespace

OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
string_view value) {
bool fetch_val = params.flags & SET_GET;
SetResultBuilder result_builder(fetch_val);

EngineShard* shard = op_args_.shard;
auto& db_slice = shard->db_slice();
OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) {
auto& db_slice = op_args_.shard->db_slice();

DCHECK(db_slice.IsDbValid(op_args_.db_cntx.db_index));

VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") ";

// if SET_GET is not set then prev_val is null.
DCHECK(fetch_val || params.prev_val == nullptr);

if (params.IsConditionalSet()) {
// We do not always set prev_val and we use result_builder for that.
bool fetch_value = params.prev_val || fetch_val;
DbSlice::ItAndUpdater find_res;
if (fetch_value) {
find_res = db_slice.FindAndFetchMutable(op_args_.db_cntx, key);
} else {
find_res = db_slice.FindMutable(op_args_.db_cntx, key);
}

if (IsValid(find_res.it)) {
if (find_res.it->second.ObjType() != OBJ_STRING) {
return OpStatus::WRONG_TYPE;
}
result_builder.CachePrevValueIfNeeded(find_res.it->second);
}
auto find_res = db_slice.FindMutable(op_args_.db_cntx, key);
if (auto status = CachePrevIfNeeded(params, find_res.it); status != OpStatus::OK)
return status;
Comment on lines -581 to +537
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we're breaking tiering, I'm keeping it simple


// Make sure that we have this key, and only add it if it does exists
if (params.flags & SET_IF_EXISTS) {
if (IsValid(find_res.it)) {
return std::move(result_builder)
.Return(SetExisting(params, find_res.it, find_res.exp_it, key, value));
return SetExisting(params, find_res.it, find_res.exp_it, key, value);
} else {
return std::move(result_builder).Return(OpStatus::SKIPPED);
return OpStatus::SKIPPED;
}
} else {
if (IsValid(find_res.it)) { // if the policy is not to overide and have the key, just return
return std::move(result_builder).Return(OpStatus::SKIPPED);
}
DCHECK(params.flags & SET_IF_NOTEXIST) << params.flags;
if (IsValid(find_res.it)) {
return OpStatus::SKIPPED;
} // else AddNew() is called below
}
}

// At this point we either need to add missing entry, or we
// will override an existing one
// Trying to add a new entry.
auto op_res = db_slice.AddOrFind(op_args_.db_cntx, key);
RETURN_ON_BAD_STATUS(op_res);
auto& add_res = *op_res;

auto it = add_res.it;
if (!add_res.is_new) {
if (fetch_val && it->second.ObjType() != OBJ_STRING) {
return OpStatus::WRONG_TYPE;
}
result_builder.CachePrevValueIfNeeded(it->second);
return std::move(result_builder).Return(SetExisting(params, it, add_res.exp_it, key, value));
}

// Adding new value.
PrimeValue tvalue{value};
tvalue.SetFlag(params.memcache_flags != 0);
it->second = std::move(tvalue);

if (params.expire_after_ms) {
db_slice.AddExpire(op_args_.db_cntx.db_index, it,
params.expire_after_ms + op_args_.db_cntx.time_now_ms);
}

if (params.memcache_flags)
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);

if (params.flags & SET_STICK) {
it->first.SetSticky(true);
}

if (shard->tiered_storage() &&
TieredStorage::EligibleForOffload(value.size())) { // external storage enabled.
shard->tiered_storage()->ScheduleOffloadWithThrottle(op_args_.db_cntx.db_index, it.GetInnerIt(),
key);
}

if (shard->tiered_storage_v2()) { // external storage enabled
shard->tiered_storage_v2()->Stash(key, &it->second);
}
if (!op_res->is_new) {
if (auto status = CachePrevIfNeeded(params, op_res->it); status != OpStatus::OK)
return status;

if (manual_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
return SetExisting(params, op_res->it, op_res->exp_it, key, value);
} else {
AddNew(params, op_res->it, op_res->exp_it, key, value);
return OpStatus::OK;
}

return std::move(result_builder).Return(OpStatus::OK);
}

OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
DbSlice::ExpIterator e_it, string_view key, string_view value) {
if (params.flags & SET_IF_NOTEXIST)
return OpStatus::SKIPPED;
DCHECK_EQ(params.flags & SET_IF_NOTEXIST, 0);

PrimeValue& prime_value = it->second;
EngineShard* shard = op_args_.shard;

if (params.prev_val) {
if (prime_value.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;

string val = GetString(prime_value);
params.prev_val->emplace(std::move(val));
}

DbSlice& db_slice = shard->db_slice();
uint64_t at_ms =
params.expire_after_ms ? params.expire_after_ms + op_args_.db_cntx.time_now_ms : 0;
Expand All @@ -700,7 +597,6 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
prime_value.SetFlag(params.memcache_flags != 0);
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);

db_slice.RemoveFromTiered(it, op_args_.db_cntx.db_index);
// overwrite existing entry.
prime_value.SetString(value);
DCHECK(!prime_value.HasIoPending());
Expand All @@ -712,6 +608,43 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
return OpStatus::OK;
}

void SetCmd::AddNew(const SetParams& params, DbSlice::Iterator it, DbSlice::ExpIterator e_it,
std::string_view key, std::string_view value) {
EngineShard* shard = op_args_.shard;
auto& db_slice = shard->db_slice();

// Adding new value.
PrimeValue tvalue{value};
tvalue.SetFlag(params.memcache_flags != 0);
it->second = std::move(tvalue);

if (params.expire_after_ms) {
db_slice.AddExpire(op_args_.db_cntx.db_index, it,
params.expire_after_ms + op_args_.db_cntx.time_now_ms);
}

if (params.memcache_flags)
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);

if (params.flags & SET_STICK) {
it->first.SetSticky(true);
}
Comment on lines +625 to +631
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this flag setting can be moved to a common function


if (shard->tiered_storage() &&
TieredStorage::EligibleForOffload(value.size())) { // external storage enabled.
shard->tiered_storage()->ScheduleOffloadWithThrottle(op_args_.db_cntx.db_index, it.GetInnerIt(),
key);
}

if (shard->tiered_storage_v2()) { // external storage enabled
shard->tiered_storage_v2()->Stash(key, &it->second);
}

if (manual_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
}
}

void SetCmd::RecordJournal(const SetParams& params, string_view key, string_view value) {
absl::InlinedVector<string_view, 5> cmds({key, value}); // 5 is theoretical maximum;

Expand All @@ -737,6 +670,27 @@ void SetCmd::RecordJournal(const SetParams& params, string_view key, string_view
dfly::RecordJournal(op_args_, "SET", ArgSlice{cmds});
}

OpStatus SetCmd::CachePrevIfNeeded(const SetCmd::SetParams& params, DbSlice::Iterator it) {
if (!params.prev_val || !IsValid(it))
return OpStatus::OK;
if (it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;

*params.prev_val = GetString(it->second);
return OpStatus::OK;
}

// Wrapper to call SetCmd::Set in ScheduleSingleHop
OpStatus SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, string_view key,
string_view value) {
DCHECK(cntx->transaction);

bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
return cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
return SetCmd(t->GetOpArgs(shard), manual_journal).Set(sparams, key, value);
});
}

void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
facade::CmdArgParser parser{args};

Expand Down Expand Up @@ -812,7 +766,11 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
return builder->SendError(kSyntaxErr);
}

OpResult result{SetGeneric(cntx, sparams, key, value, true)};
optional<string> prev;
if (sparams.flags & SetCmd::SET_GET)
sparams.prev_val = &prev;

OpStatus result = SetGeneric(cntx, sparams, key, value);

if (result == OpStatus::WRONG_TYPE) {
return cntx->SendError(kWrongTypeErr);
Expand All @@ -821,8 +779,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
if (sparams.flags & SetCmd::SET_GET) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
// When SET_GET is used, the reply is not affected by whether anything was set.
if (result->has_value()) {
rb->SendBulkString(result->value());
if (prev.has_value()) {
rb->SendBulkString(*prev);
} else {
rb->SendNull();
}
Expand Down Expand Up @@ -861,7 +819,8 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_IF_NOTEXIST;
sparams.memcache_flags = cntx->conn_state.memcache_flag;
const auto results{SetGeneric(cntx, std::move(sparams), key, value, false)};
const auto results{SetGeneric(cntx, sparams, key, value)};

SinkReplyBuilder* builder = cntx->reply_builder();
if (results == OpStatus::OK) {
return builder->SendLong(1); // this means that we successfully set the value
Expand Down Expand Up @@ -939,13 +898,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {

SetCmd::SetParams sparams;
sparams.prev_val = &prev_val;

auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd cmd(t->GetOpArgs(shard), false);

return cmd.Set(sparams, key, value).status();
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
OpStatus status = SetGeneric(cntx, sparams, key, value);

if (status != OpStatus::OK) {
cntx->SendError(status);
Expand Down Expand Up @@ -1201,14 +1154,7 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
sparams.expire_after_ms = unit_vals;
}

auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard), true);
return sg.Set(sparams, key, value).status();
};

OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));

return cntx->SendError(result.status());
cntx->SendError(SetGeneric(cntx, sparams, key, value));
}

void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
Expand Down
Loading