Skip to content

Commit 2d5b249

Browse files
romangekostasrim
authored andcommitted
fix: fix memcache bugs (#1745)
1. If the first request sent to the connection is large (2kb or more) Dragonfly was closing the connection. 2. Changed server side error reporting according to memcache protocol: https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L172 3. Fixed the wrong casting in DispatchCommand. 4. Remove practically unused code that translated opstatus to strings. Signed-off-by: Roman Gershman <[email protected]>
1 parent cc16036 commit 2d5b249

File tree

8 files changed

+46
-71
lines changed

8 files changed

+46
-71
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ bool MatchHttp11Line(string_view line) {
7878
}
7979

8080
constexpr size_t kMinReadSize = 256;
81-
constexpr size_t kMaxReadSize = 32_KB;
81+
constexpr size_t kMaxReadSize = 64_KB;
8282

8383
constexpr size_t kMaxDispatchQMemory = 5_MB;
8484

@@ -467,6 +467,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
467467

468468
// Main loop.
469469
if (parse_status != ERROR && !ec) {
470+
if (io_buf_.AppendLen() < 64) {
471+
io_buf_.EnsureCapacity(io_buf_.Capacity() * 2);
472+
}
470473
auto res = IoLoop(peer, orig_builder);
471474

472475
if (holds_alternative<error_code>(res)) {
@@ -646,8 +649,8 @@ auto Connection::ParseMemcache() -> ParserStatus {
646649
return NEED_MORE;
647650
}
648651

649-
if (result == MemcacheParser::PARSE_ERROR) {
650-
builder->SendError(""); // ERROR.
652+
if (result == MemcacheParser::PARSE_ERROR || result == MemcacheParser::UNKNOWN_CMD) {
653+
builder->SendSimpleString("ERROR");
651654
} else if (result == MemcacheParser::BAD_DELTA) {
652655
builder->SendClientError("invalid numeric delta argument");
653656
} else if (result != MemcacheParser::OK) {
@@ -679,6 +682,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
679682
FetchBuilderStats(stats_, orig_builder);
680683

681684
io::MutableBytes append_buf = io_buf_.AppendBuffer();
685+
DCHECK(!append_buf.empty());
686+
682687
phase_ = READ_SOCKET;
683688

684689
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
@@ -713,7 +718,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
713718

714719
if (parser_hint > capacity) {
715720
io_buf_.Reserve(std::min(kMaxReadSize, parser_hint));
716-
} else if (append_buf.size() == *recv_sz && append_buf.size() > capacity / 2) {
721+
}
722+
723+
if (io_buf_.AppendLen() < 64u) {
717724
// Last io used most of the io_buf to the end.
718725
io_buf_.Reserve(capacity * 2); // Valid growth range.
719726
}
@@ -722,6 +729,13 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
722729
VLOG(1) << "Growing io_buf to " << io_buf_.Capacity();
723730
stats_->read_buf_capacity += (io_buf_.Capacity() - capacity);
724731
}
732+
DCHECK_GT(io_buf_.AppendLen(), 0U);
733+
} else if (io_buf_.AppendLen() == 0) {
734+
// We have a full buffer and we can not progress with parsing.
735+
// This means that we have request too large.
736+
LOG(ERROR) << "Request is too large, closing connection";
737+
parse_status = ERROR;
738+
break;
725739
}
726740
} else if (parse_status != OK) {
727741
break;

src/facade/op_status.cc

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,3 @@
11
#include "facade/op_status.h"
22

3-
namespace facade {
4-
5-
const char* DebugString(OpStatus op) {
6-
switch (op) {
7-
case OpStatus::OK:
8-
return "OK";
9-
case OpStatus::KEY_EXISTS:
10-
return "KEY EXISTS";
11-
case OpStatus::KEY_NOTFOUND:
12-
return "KEY NOTFOUND";
13-
case OpStatus::SKIPPED:
14-
return "SKIPPED";
15-
case OpStatus::INVALID_VALUE:
16-
return "INVALID VALUE";
17-
case OpStatus::OUT_OF_RANGE:
18-
return "OUT OF RANGE";
19-
case OpStatus::WRONG_TYPE:
20-
return "WRONG TYPE";
21-
case OpStatus::TIMED_OUT:
22-
return "TIMED OUT";
23-
case OpStatus::OUT_OF_MEMORY:
24-
return "OUT OF MEMORY";
25-
case OpStatus::INVALID_FLOAT:
26-
return "INVALID FLOAT";
27-
case OpStatus::INVALID_INT:
28-
return "INVALID INT";
29-
case OpStatus::SYNTAX_ERR:
30-
return "INVALID SYNTAX";
31-
case OpStatus::BUSY_GROUP:
32-
return "BUSY GROUP";
33-
case OpStatus::STREAM_ID_SMALL:
34-
return "STREAM ID TO SMALL";
35-
case OpStatus::ENTRIES_ADDED_SMALL:
36-
return "ENTRIES ADDED IS TO SMALL";
37-
case OpStatus::INVALID_NUMERIC_RESULT:
38-
return "INVALID NUMERIC RESULT";
39-
case OpStatus::CANCELLED:
40-
return "CANCELLED";
41-
}
42-
return "Unknown Error Code"; // we should not be here, but this is how enums works in c++
43-
}
44-
const char* OpResultBase::DebugFormat() const {
45-
return DebugString(st_);
46-
}
47-
48-
} // namespace facade
3+
namespace facade {} // namespace facade

src/facade/op_status.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ enum class OpStatus : uint16_t {
2929
CANCELLED,
3030
};
3131

32-
const char* DebugString(OpStatus op);
33-
3432
class OpResultBase {
3533
public:
3634
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {

src/facade/reply_builder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ void MCReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
162162
}
163163

164164
void MCReplyBuilder::SendError(string_view str, std::string_view type) {
165-
SendSimpleString("ERROR");
165+
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
166166
}
167167

168168
void MCReplyBuilder::SendProtocolError(std::string_view str) {

src/facade/reply_builder_test.cc

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -232,25 +232,22 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
232232
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
233233
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
234234
for (const auto& err : error_codes) {
235-
const std::string_view error_code_name = DebugString(err);
236235
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
237236
const std::string_view error_type = GetErrorType(error_name);
238237

239238
sink_.Clear();
240239
builder_->SendError(err);
241-
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
242-
<< " invalid start char for " << error_code_name;
243-
ASSERT_TRUE(absl::EndsWith(str(), kCRLF))
244-
<< " failed to find correct termination at " << error_code_name;
240+
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
241+
ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err;
245242
ASSERT_EQ(builder_->err_count().at(error_type), 1)
246-
<< " number of error count is invalid for " << error_code_name;
243+
<< " number of error count is invalid for " << err;
247244
ASSERT_EQ(str(), BuildExpectedErrorString(error_name))
248245
<< " error different from expected - '" << str() << "'";
249246

250247
auto parsing_output = Parse();
251248
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
252-
<< " verify for the result is invalid for " << error_code_name;
253-
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
249+
<< " verify for the result is invalid for " << err;
250+
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
254251
}
255252
}
256253

@@ -261,24 +258,21 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
261258
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
262259
uint64_t error_count = 0;
263260
for (const auto& err : none_unique_codes) {
264-
const std::string_view error_code_name = DebugString(err);
265261
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
266262
const std::string_view error_type = GetErrorType(error_name);
267263

268264
sink_.Clear();
269265
builder_->SendError(err);
270-
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
271-
<< " invalid start char for " << error_code_name;
266+
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
272267
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
273268
auto current_error_count = builder_->err_count().at(error_type);
274269
error_count++;
275-
ASSERT_EQ(current_error_count, error_count)
276-
<< " number of error count is invalid for " << error_code_name;
270+
ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err;
277271
auto parsing_output = Parse();
278272
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
279-
<< " verify for the result is invalid for " << error_code_name;
273+
<< " verify for the result is invalid for " << err;
280274

281-
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
275+
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
282276
}
283277
}
284278

src/server/generic_family.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1182,7 +1182,6 @@ void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) {
11821182
<< result.value().size();
11831183
(*cntx)->SendBulkString(*result);
11841184
} else {
1185-
DVLOG(1) << "Dump failed: " << result.DebugFormat() << key << " nil";
11861185
(*cntx)->SendNull();
11871186
}
11881187
}

src/server/main_service.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
917917
int64_t used_memory = etl.GetUsedMemory(start_ns);
918918
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
919919
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
920-
return (*cntx)->SendError(kOutOfMemory);
920+
return cntx->reply_builder()->SendError(kOutOfMemory);
921921
}
922922
}
923923

tests/dragonfly/connection_test.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from redis import asyncio as aioredis
55
from redis.exceptions import ConnectionError as redis_conn_error
66
import async_timeout
7-
7+
import pymemcache
88
from dataclasses import dataclass
99

1010
from . import DflyInstance, dfly_args
@@ -516,3 +516,18 @@ async def test_squashed_pipeline(async_client: aioredis.Redis):
516516
async def test_squashed_pipeline_seeder(df_server, df_seeder_factory):
517517
seeder = df_seeder_factory.create(port=df_server.port, keys=10_000)
518518
await seeder.run(target_deviation=0.1)
519+
520+
521+
@pytest.mark.asyncio
522+
async def test_memcached_large_request(df_local_factory):
523+
server = df_local_factory.create(
524+
port=BASE_PORT,
525+
memcached_port=11211,
526+
proactor_threads=2,
527+
)
528+
529+
server.start()
530+
531+
memcached_client = pymemcache.Client(("localhost", server.mc_port), default_noreply=False)
532+
533+
assert memcached_client.set(b"key", b"d" * 4096, noreply=False)

0 commit comments

Comments
 (0)