Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern "C" {

ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);

namespace dfly {

Expand Down Expand Up @@ -99,6 +100,11 @@ class DflyRenameCommandTest : public DflyEngineTest {
absl::SetFlag(&FLAGS_rename_command,
std::vector<std::string>({"flushall=myflushall", "flushdb="}));
}

void TearDown() {
absl::SetFlag(&FLAGS_rename_command, std::vector<std::string>({""}));
DflyEngineTest::TearDown();
}
};

TEST_F(DflyRenameCommandTest, RenameCommand) {
Expand Down Expand Up @@ -335,10 +341,10 @@ TEST_F(DflyEngineTest, FlushAll) {

TEST_F(DflyEngineTest, OOM) {
shard_set->TEST_EnableHeartBeat();
max_memory_limit = 0;
max_memory_limit = 300000;
size_t i = 0;
RespExpr resp;
for (; i < 5000; i += 3) {
for (; i < 10000; i += 3) {
resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2),
"bar"});
if (resp != "OK")
Expand Down Expand Up @@ -376,25 +382,41 @@ TEST_F(DflyEngineTest, OOM) {
TEST_F(DflyEngineTest, Bug207) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);

max_memory_limit = 0;
max_memory_limit = 300000;

ssize_t i = 0;
RespExpr resp;
for (; i < 5000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
// we evict some items because 5000 is too much when max_memory_limit is zero.
// we evict some items because 5000 is too much when max_memory_limit is 300000.
ASSERT_EQ(resp, "OK");
}

auto evicted_count = [](const string& str) -> size_t {
const string matcher = "evicted_keys:";
const auto pos = str.find(matcher) + matcher.size();
const auto sub = str.substr(pos, 1);
return atoi(sub.c_str());
};

resp = Run({"info", "stats"});
EXPECT_GT(evicted_count(resp.GetString()), 0);

for (; i > 0; --i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
ASSERT_EQ(resp, "OK");
}
}

TEST_F(DflyEngineTest, StickyEviction) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);

max_memory_limit = 300000;

string tmp_val(100, '.');
Expand Down
11 changes: 11 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");
ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");

bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) {
int64_t val;
Expand Down Expand Up @@ -893,6 +896,14 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)

uint64_t start_ns = ProactorBase::GetMonotonicTimeNs(), end_ns;

if (cid->opt_mask() & CO::DENYOOM) {
int64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
return (*cntx)->SendError(kOutOfMemory);
}
}

// Create command transaction
intrusive_ptr<Transaction> dist_trans;

Expand Down
9 changes: 9 additions & 0 deletions src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ void ServerState::Destroy() {
state_ = nullptr;
}

uint64_t ServerState::GetUsedMemory(uint64_t now_ns) {
static constexpr uint64_t kCacheEveryNs = 1000;
if (now_ns > used_mem_last_update_ + kCacheEveryNs) {
used_mem_last_update_ = now_ns;
used_mem_cached_ = used_mem_current.load(std::memory_order_relaxed);
}
return used_mem_cached_;
}

bool ServerState::AllowInlineScheduling() const {
// We can't allow inline scheduling during a full sync, because then journaling transactions
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
Expand Down
4 changes: 4 additions & 0 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization.
gstate_ = s;
}

uint64_t GetUsedMemory(uint64_t now_ns);

bool AllowInlineScheduling() const;

// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
Expand Down Expand Up @@ -226,6 +228,8 @@ class ServerState { // public struct - to allow initialization.

absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
uint32_t thread_index_ = 0;
uint64_t used_mem_cached_ = 0; // thread local cache of used_mem_current
uint64_t used_mem_last_update_ = 0;

static __thread ServerState* state_;
};
Expand Down
1 change: 1 addition & 0 deletions src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ void BaseFamilyTest::ResetService() {
Service::InitOpts opts;
opts.disable_time_update = true;
service_->Init(nullptr, {}, opts);
used_mem_current = 0;

TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
auto cb = [&](EngineShard* s) { s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); };
Expand Down