Skip to content

Commit 1855c1c

Browse files
authored
fix: broken memcached error reporting (#1741)
* fix DispatchCommand error reporting when memcached protocol is used (one example is when we use SET command on the replica -- previously we crashed now we properly report an error) * SendError(ErrorReply) moved to SinkReplyBuilder from RedisReplyBuilder * SendError(OpStatus) moved to SinkReplyBuilder from RedisReplyBuilder * added tests for SendError(ErrorReply) in RedisReplyBuilder
1 parent 35a5433 commit 1855c1c

File tree

9 files changed

+137
-58
lines changed

9 files changed

+137
-58
lines changed

src/facade/conn_context.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class ConnectionContext {
3737
}
3838

3939
// A convenient proxy for redis interface.
40+
// Use with caution -- should only be used only
41+
// in execution paths that are Redis *only*
4042
RedisReplyBuilder* operator->();
4143

4244
SinkReplyBuilder* reply_builder() {
@@ -50,6 +52,18 @@ class ConnectionContext {
5052
return res;
5153
}
5254

55+
void SendError(std::string_view str, std::string_view type = std::string_view{}) {
56+
rbuilder_->SendError(str, type);
57+
}
58+
59+
void SendError(ErrorReply&& error) {
60+
rbuilder_->SendError(std::move(error));
61+
}
62+
63+
void SendSimpleString(std::string_view str) {
64+
rbuilder_->SendSimpleString(str);
65+
}
66+
5367
// connection state / properties.
5468
bool conn_closing : 1;
5569
bool req_auth : 1;

src/facade/op_status.cc

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,37 @@
11
#include "facade/op_status.h"
22

3-
namespace facade {} // namespace facade
3+
#include "base/logging.h"
4+
#include "facade/error.h"
5+
#include "facade/resp_expr.h"
6+
7+
namespace facade {
8+
9+
std::string_view StatusToMsg(OpStatus status) {
10+
switch (status) {
11+
case OpStatus::OK:
12+
return "OK";
13+
case OpStatus::KEY_NOTFOUND:
14+
return kKeyNotFoundErr;
15+
case OpStatus::WRONG_TYPE:
16+
return kWrongTypeErr;
17+
case OpStatus::OUT_OF_RANGE:
18+
return kIndexOutOfRange;
19+
case OpStatus::INVALID_FLOAT:
20+
return kInvalidFloatErr;
21+
case OpStatus::INVALID_INT:
22+
return kInvalidIntErr;
23+
case OpStatus::SYNTAX_ERR:
24+
return kSyntaxErr;
25+
case OpStatus::OUT_OF_MEMORY:
26+
return kOutOfMemory;
27+
case OpStatus::BUSY_GROUP:
28+
return "-BUSYGROUP Consumer Group name already exists";
29+
case OpStatus::INVALID_NUMERIC_RESULT:
30+
return kInvalidNumericResult;
31+
default:
32+
LOG(ERROR) << "Unsupported status " << status;
33+
return "Internal error";
34+
}
35+
}
36+
37+
} // namespace facade

src/facade/op_status.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ inline bool operator==(OpStatus st, const OpResultBase& ob) {
124124
return ob.operator==(st);
125125
}
126126

127+
std::string_view StatusToMsg(OpStatus status);
128+
127129
} // namespace facade
128130

129131
namespace std {

src/facade/reply_builder.cc

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,22 @@ void SinkReplyBuilder::SendRaw(std::string_view raw) {
9494
Send(&v, 1);
9595
}
9696

97+
void SinkReplyBuilder::SendError(ErrorReply error) {
98+
if (error.status)
99+
return SendError(*error.status);
100+
101+
string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message);
102+
SendError(message_sv, error.kind);
103+
}
104+
105+
void SinkReplyBuilder::SendError(OpStatus status) {
106+
if (status == OpStatus::OK) {
107+
SendOk();
108+
} else {
109+
SendError(StatusToMsg(status));
110+
}
111+
}
112+
97113
void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
98114
absl::FixedArray<iovec, 16> arr(msg_vec.size());
99115

@@ -223,14 +239,6 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
223239
}
224240
}
225241

226-
void RedisReplyBuilder::SendError(ErrorReply error) {
227-
if (error.status)
228-
return SendError(*error.status);
229-
230-
string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message);
231-
SendError(message_sv, error.kind);
232-
}
233-
234242
void RedisReplyBuilder::SendProtocolError(std::string_view str) {
235243
SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error");
236244
}
@@ -277,42 +285,6 @@ void RedisReplyBuilder::SendBulkString(std::string_view str) {
277285
return Send(v, ABSL_ARRAYSIZE(v));
278286
}
279287

280-
std::string_view RedisReplyBuilder::StatusToMsg(OpStatus status) {
281-
switch (status) {
282-
case OpStatus::OK:
283-
return "OK";
284-
case OpStatus::KEY_NOTFOUND:
285-
return kKeyNotFoundErr;
286-
case OpStatus::WRONG_TYPE:
287-
return kWrongTypeErr;
288-
case OpStatus::OUT_OF_RANGE:
289-
return kIndexOutOfRange;
290-
case OpStatus::INVALID_FLOAT:
291-
return kInvalidFloatErr;
292-
case OpStatus::INVALID_INT:
293-
return kInvalidIntErr;
294-
case OpStatus::SYNTAX_ERR:
295-
return kSyntaxErr;
296-
case OpStatus::OUT_OF_MEMORY:
297-
return kOutOfMemory;
298-
case OpStatus::BUSY_GROUP:
299-
return "-BUSYGROUP Consumer Group name already exists";
300-
case OpStatus::INVALID_NUMERIC_RESULT:
301-
return kInvalidNumericResult;
302-
default:
303-
LOG(ERROR) << "Unsupported status " << status;
304-
return "Internal error";
305-
}
306-
}
307-
308-
void RedisReplyBuilder::SendError(OpStatus status) {
309-
if (status == OpStatus::OK) {
310-
SendOk();
311-
} else {
312-
SendError(StatusToMsg(status));
313-
}
314-
}
315-
316288
void RedisReplyBuilder::SendLong(long num) {
317289
string str = absl::StrCat(":", num, kCRLF);
318290
SendRaw(str);

src/facade/reply_builder.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class SinkReplyBuilder {
4242
}
4343

4444
virtual void SendError(std::string_view str, std::string_view type = {}) = 0; // MC and Redis
45+
virtual void SendError(ErrorReply error);
46+
virtual void SendError(OpStatus status);
4547

4648
virtual void SendStored() = 0; // Reply for set commands.
4749
virtual void SendSetSkipped() = 0;
@@ -177,13 +179,12 @@ class RedisReplyBuilder : public SinkReplyBuilder {
177179
void SetResp3(bool is_resp3);
178180

179181
void SendError(std::string_view str, std::string_view type = {}) override;
180-
virtual void SendError(ErrorReply error);
182+
using SinkReplyBuilder::SendError;
181183

182184
void SendMGetResponse(absl::Span<const OptResp>) override;
183185

184186
void SendStored() override;
185187
void SendSetSkipped() override;
186-
virtual void SendError(OpStatus status);
187188
void SendProtocolError(std::string_view str) override;
188189

189190
virtual void SendNullArray(); // Send *-1
@@ -206,10 +207,6 @@ class RedisReplyBuilder : public SinkReplyBuilder {
206207

207208
static char* FormatDouble(double val, char* dest, unsigned dest_len);
208209

209-
// You normally should not call this - maps the status
210-
// into the string that would be sent
211-
static std::string_view StatusToMsg(OpStatus status);
212-
213210
protected:
214211
struct WrappedStrSpan : public StrSpan {
215212
size_t Size() const;

src/facade/reply_builder_test.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
232232
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
233233
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
234234
for (const auto& err : error_codes) {
235-
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
235+
const std::string_view error_name = StatusToMsg(err);
236236
const std::string_view error_type = GetErrorType(error_name);
237237

238238
sink_.Clear();
@@ -251,14 +251,39 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
251251
}
252252
}
253253

254+
TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) {
255+
ErrorReply err{OpStatus::OUT_OF_RANGE};
256+
builder_->SendError(err);
257+
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
258+
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
259+
ASSERT_EQ(builder_->err_count().at(kIndexOutOfRange), 1);
260+
ASSERT_EQ(str(), BuildExpectedErrorString(kIndexOutOfRange));
261+
262+
auto parsing_output = Parse();
263+
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
264+
ASSERT_TRUE(parsing_output.IsError());
265+
sink_.Clear();
266+
267+
err = ErrorReply{"e1", "e2"};
268+
builder_->SendError(err);
269+
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
270+
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
271+
ASSERT_EQ(builder_->err_count().at("e2"), 1);
272+
ASSERT_EQ(str(), BuildExpectedErrorString("e1"));
273+
274+
parsing_output = Parse();
275+
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
276+
ASSERT_TRUE(parsing_output.IsError());
277+
}
278+
254279
TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
255280
// All these op codes creating the same error message
256281
OpStatus none_unique_codes[] = {OpStatus::ENTRIES_ADDED_SMALL, OpStatus::SKIPPED,
257282
OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE,
258283
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
259284
uint64_t error_count = 0;
260285
for (const auto& err : none_unique_codes) {
261-
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
286+
const std::string_view error_name = StatusToMsg(err);
262287
const std::string_view error_type = GetErrorType(error_name);
263288

264289
sink_.Clear();

src/server/main_service.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
879879
const auto [cid, args_no_cmd] = FindCmd(args);
880880

881881
if (cid == nullptr) {
882-
return (*cntx)->SendError(ReportUnknownCmd(ArgS(args, 0)));
882+
return cntx->SendError(ReportUnknownCmd(ArgS(args, 0)));
883883
}
884884

885885
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
@@ -899,7 +899,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
899899
if (auto& exec_info = dfly_cntx->conn_state.exec_info; exec_info.IsCollecting())
900900
exec_info.state = ConnectionState::ExecInfo::EXEC_ERROR;
901901

902-
(*dfly_cntx)->SendError(std::move(*err));
902+
dfly_cntx->SendError(std::move(*err));
903903
return;
904904
}
905905

@@ -909,13 +909,13 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
909909
StoredCmd stored_cmd{cid, args_no_cmd};
910910
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
911911

912-
return (*cntx)->SendSimpleString("QUEUED");
912+
return cntx->SendSimpleString("QUEUED");
913913
}
914914

915915
uint64_t start_ns = absl::GetCurrentTimeNanos();
916916

917917
if (cid->opt_mask() & CO::DENYOOM) {
918-
int64_t used_memory = etl.GetUsedMemory(start_ns);
918+
uint64_t used_memory = etl.GetUsedMemory(start_ns);
919919
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
920920
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
921921
return cntx->reply_builder()->SendError(kOutOfMemory);

tests/dragonfly/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def admin_port(self) -> int:
106106
def mc_port(self) -> int:
107107
if self.params.existing_mc_port:
108108
return self.params.existing_mc_port
109-
return int(self.args.get("mc_port", "11211"))
109+
return int(self.args.get("memcached_port", "11211"))
110110

111111
@staticmethod
112112
def format_args(args):

tests/dragonfly/replication_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from redis import asyncio as aioredis
88
from .utility import *
99
from . import DflyInstanceFactory, dfly_args
10+
import pymemcache
1011
import logging
1112

1213
BASE_PORT = 1111
@@ -1506,3 +1507,37 @@ async def test_replicaof_flag_disconnect(df_local_factory):
15061507

15071508
role = await c_replica.role()
15081509
assert role[0] == b"master"
1510+
1511+
1512+
@pytest.mark.asyncio
1513+
async def test_df_crash_on_memcached_error(df_local_factory):
1514+
master = df_local_factory.create(
1515+
port=BASE_PORT,
1516+
memcached_port=11211,
1517+
proactor_threads=2,
1518+
)
1519+
1520+
replica = df_local_factory.create(
1521+
port=master.port + 1,
1522+
memcached_port=master.mc_port + 1,
1523+
proactor_threads=2,
1524+
)
1525+
1526+
master.start()
1527+
replica.start()
1528+
1529+
c_master = aioredis.Redis(port=master.port)
1530+
await wait_available_async(c_master)
1531+
1532+
c_replica = aioredis.Redis(port=replica.port)
1533+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
1534+
await wait_available_async(c_replica)
1535+
await wait_for_replica_status(c_replica, status="up")
1536+
await c_replica.close()
1537+
1538+
memcached_client = pymemcache.Client(f"localhost:{replica.mc_port}")
1539+
1540+
with pytest.raises(pymemcache.exceptions.MemcacheClientError):
1541+
memcached_client.set(b"key", b"data", noreply=False)
1542+
1543+
await c_master.close()

0 commit comments

Comments
 (0)