Skip to content

Commit 5f7bdf2

Browse files
committed
feat: introduce configurable flags for I/O limits
Introduced `max_multi_bulk_len` as a max limit when parsing RESP arrays as well as `max_client_iobuf_len` as a max limit on the iobuf used to read from a socket. Signed-off-by: Roman Gershman <[email protected]>
1 parent f99ed70 commit 5f7bdf2

File tree

6 files changed

+36
-30
lines changed

6 files changed

+36
-30
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,38 @@
2121
#include "util/tls/tls_socket.h"
2222
#endif
2323

24+
using namespace std;
25+
2426
ABSL_FLAG(bool, tcp_nodelay, false,
2527
"Configures dragonfly connections with socket option TCP_NODELAY");
2628
ABSL_FLAG(bool, primary_port_http_enabled, true,
2729
"If true allows accessing http console on main TCP port");
2830

29-
ABSL_FLAG(
30-
std::uint16_t, admin_port, 0,
31-
"If set, would enable admin access to console on the assigned port. This supports both HTTP "
32-
"and RESP protocols");
33-
ABSL_FLAG(std::string, admin_bind, "",
31+
ABSL_FLAG(uint16_t, admin_port, 0,
32+
"If set, would enable admin access to console on the assigned port. "
33+
"This supports both HTTP and RESP protocols");
34+
35+
ABSL_FLAG(string, admin_bind, "",
3436
"If set, the admin consol TCP connection would be bind the given address. "
35-
"This supports both HTTP and RESP "
36-
"protocols");
37+
"This supports both HTTP and RESP protocols");
3738

38-
ABSL_FLAG(std::uint64_t, request_cache_limit, 1ULL << 26, // 64MB
39+
ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
3940
"Amount of memory to use for request cache in bytes - per IO thread.");
4041

4142
ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
4243

43-
ABSL_FLAG(std::uint64_t, pipeline_squash, 0,
44+
ABSL_FLAG(uint64_t, pipeline_squash, 0,
4445
"Number of queued pipelined commands above which squashing is enabled, 0 means disabled");
4546

47+
// When changing this constant, also update `test_large_cmd` test in connection_test.py.
48+
ABSL_FLAG(uint32_t, max_multi_bulk_len, 1u << 16,
49+
"Maximum multi-bulk (array) length that is "
50+
"allowed to be accepted when parsing RESP protocol");
51+
52+
ABSL_FLAG(size_t, max_client_iobuf_len, 1u << 16,
53+
"Maximum io buffer length that is used to read client requests.");
54+
4655
using namespace util;
47-
using namespace std;
4856
using nonstd::make_unexpected;
4957

5058
namespace facade {
@@ -79,8 +87,6 @@ bool MatchHttp11Line(string_view line) {
7987
}
8088

8189
constexpr size_t kMinReadSize = 256;
82-
constexpr size_t kMaxReadSize = 64_KB;
83-
8490
constexpr size_t kMaxDispatchQMemory = 5_MB;
8591

8692
thread_local uint32_t free_req_release_weight = 0;
@@ -235,7 +241,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
235241

236242
switch (protocol) {
237243
case Protocol::REDIS:
238-
redis_parser_.reset(new RedisParser);
244+
redis_parser_.reset(new RedisParser(absl::GetFlag(FLAGS_max_multi_bulk_len)));
239245
break;
240246
case Protocol::MEMCACHE:
241247
memcache_parser_.reset(new MemcacheParser);
@@ -676,6 +682,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
676682
error_code ec;
677683
ParserStatus parse_status = OK;
678684

685+
size_t max_iobfuf_len = absl::GetFlag(FLAGS_max_client_iobuf_len);
686+
679687
do {
680688
FetchBuilderStats(stats_, orig_builder);
681689

@@ -709,13 +717,13 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
709717
parse_status = OK;
710718

711719
size_t capacity = io_buf_.Capacity();
712-
if (capacity < kMaxReadSize) {
720+
if (capacity < max_iobfuf_len) {
713721
size_t parser_hint = 0;
714722
if (redis_parser_)
715723
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.
716724

717725
if (parser_hint > capacity) {
718-
io_buf_.Reserve(std::min(kMaxReadSize, parser_hint));
726+
io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint));
719727
}
720728

721729
if (io_buf_.AppendLen() < 64u) {

src/facade/memcache_parser.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
154154
return UNKNOWN_CMD;
155155
}
156156

157-
if (cmd->type <= CAS) { // Store command
158-
if (num_tokens < 5 || tokens[1].size() > 250) {
157+
if (cmd->type <= CAS) { // Store command
158+
if (num_tokens < 5 || tokens[1].size() > 250) { // key length limit
159159
return MP::PARSE_ERROR;
160160
}
161161

src/facade/redis_parser.cc

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,6 @@ namespace facade {
1111

1212
using namespace std;
1313

14-
namespace {
15-
16-
// When changing this constant, also update `test_large_cmd` test in connection_test.py.
17-
constexpr int kMaxArrayLen = 65536;
18-
constexpr int64_t kMaxBulkLen = 256 * (1ul << 20); // 256MB.
19-
20-
} // namespace
21-
2214
auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> Result {
2315
*consumed = 0;
2416
res->clear();
@@ -251,8 +243,8 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> Result {
251243
case BAD_INT:
252244
return BAD_ARRAYLEN;
253245
case OK:
254-
if (len < -1 || len > kMaxArrayLen) {
255-
LOG_IF(WARNING, len > kMaxArrayLen) << "Multi bulk len is too big " << len;
246+
if (len < -1 || len > max_arr_len_) {
247+
LOG_IF(WARNING, len > max_arr_len_) << "Multibulk len is too large " << len;
256248

257249
return BAD_ARRAYLEN;
258250
}

src/facade/redis_parser.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ namespace facade {
2222
*/
2323
class RedisParser {
2424
public:
25+
constexpr static long kMaxBulkLen = 256 * (1ul << 20); // 256MB.
26+
2527
enum Result { OK, INPUT_PENDING, BAD_ARRAYLEN, BAD_BULKLEN, BAD_STRING, BAD_INT, BAD_DOUBLE };
2628
using Buffer = RespExpr::Buffer;
2729

28-
explicit RedisParser(bool server_mode = true) : server_mode_(server_mode) {
30+
explicit RedisParser(uint32_t max_arr_len = UINT32_MAX, bool server_mode = true)
31+
: max_arr_len_(max_arr_len), server_mode_(server_mode) {
2932
}
3033

3134
/**
@@ -98,6 +101,8 @@ class RedisParser {
98101
using BlobPtr = std::unique_ptr<uint8_t[]>;
99102
std::vector<BlobPtr> buf_stash_;
100103
RespVec* cached_expr_ = nullptr;
104+
uint32_t max_arr_len_;
105+
101106
bool is_broken_token_ = false;
102107
bool server_mode_ = true;
103108
};

src/server/protocol_client.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ error_code ProtocolClient::SendCommandAndReadResponse(string_view command) {
425425
}
426426

427427
void ProtocolClient::ResetParser(bool server_mode) {
428-
parser_.reset(new RedisParser(server_mode));
428+
// We accept any length for the parser because it has been approved by the master.
429+
parser_.reset(new RedisParser(UINT32_MAX, server_mode));
429430
}
430431

431432
uint64_t ProtocolClient::LastIoTime() const {

src/server/test_utils.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ RespVec BaseFamilyTest::TestConnWrapper::ParseResponse(bool fully_consumed) {
423423
auto s_copy = s;
424424

425425
uint32_t consumed = 0;
426-
parser_.reset(new RedisParser{false}); // Client mode.
426+
parser_.reset(new RedisParser{UINT32_MAX, false}); // Client mode.
427427
RespVec res;
428428
RedisParser::Result st = parser_->Parse(buf, &consumed, &res);
429429

0 commit comments

Comments
 (0)