Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(bitops_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb LABELS DFLY)
testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb
testdata/redis_json.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY)
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
Expand Down
90 changes: 88 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadGeneric(rdbtype);
}
break;
case RDB_TYPE_MODULE_2:
iores = ReadRedisJson();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;

Expand Down Expand Up @@ -1777,6 +1780,44 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
}

auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
auto json_magic_number = LoadLen(nullptr);
if (!json_magic_number) {
return Unexpected(errc::rdb_file_corrupted);
}

constexpr string_view kJsonModule = "ReJSON-RL"sv;
string module_name = ModuleTypeName(*json_magic_number);
if (module_name != kJsonModule) {
LOG(ERROR) << "Unsupported module: " << module_name;
return Unexpected(errc::unsupported_operation);
}

int encver = *json_magic_number & 1023;
if (encver != 3) {
LOG(ERROR) << "Unsupported ReJSON version: " << encver;
return Unexpected(errc::unsupported_operation);
}

auto opcode = FetchInt<uint8_t>();
if (!opcode || *opcode != RDB_MODULE_OPCODE_STRING) {
return Unexpected(errc::rdb_file_corrupted);
}

RdbVariant dest;
error_code ec = ReadStringObj(&dest);
if (ec) {
return make_unexpected(ec);
}

opcode = FetchInt<uint8_t>();
if (!opcode || *opcode != RDB_MODULE_OPCODE_EOF) {
return Unexpected(errc::rdb_file_corrupted);
}

return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
}

auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
RdbVariant dest;
error_code ec = ReadStringObj(&dest);
Expand Down Expand Up @@ -2001,8 +2042,9 @@ error_code RdbLoader::Load(io::Source* src) {
SET_OR_RETURN(LoadLen(nullptr), module_id);
string module_name = ModuleTypeName(module_id);

LOG(ERROR) << "Modules are not supported, error loading module " << module_name;
return RdbError(errc::feature_not_supported);
LOG(WARNING) << "WARNING: Skipping data for module " << module_name;
RETURN_ON_ERR(SkipModuleData());
continue;
}

if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START ||
Expand Down Expand Up @@ -2142,6 +2184,50 @@ void RdbLoaderBase::AllocateDecompressOnce(int op_type) {
}
}

error_code RdbLoaderBase::SkipModuleData() {
uint64_t opcode;
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when_opcode'
if (opcode != RDB_MODULE_OPCODE_UINT)
return RdbError(errc::rdb_file_corrupted);
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when'

while (true) {
SET_OR_RETURN(LoadLen(nullptr), opcode);

switch (opcode) {
case RDB_MODULE_OPCODE_EOF:
return kOk; // Module data end

case RDB_MODULE_OPCODE_SINT:
case RDB_MODULE_OPCODE_UINT: {
[[maybe_unused]] uint64_t _;
SET_OR_RETURN(LoadLen(nullptr), _);
break;
}

case RDB_MODULE_OPCODE_STRING: {
RdbVariant dest;
error_code ec = ReadStringObj(&dest);
if (ec) {
return ec;
}
break;
}

case RDB_MODULE_OPCODE_DOUBLE: {
[[maybe_unused]] double _;
SET_OR_RETURN(FetchBinaryDouble(), _);
break;
}

default:
// TODO: handle RDB_MODULE_OPCODE_FLOAT
LOG(ERROR) << "Unsupported module section: " << opcode;
return RdbError(errc::rdb_file_corrupted);
}
}
}

error_code RdbLoaderBase::HandleCompressedBlob(int op_type) {
AllocateDecompressOnce(op_type);
// Fetch uncompress blob
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadStreams();
::io::Result<OpaqueObj> ReadRedisJson();
::io::Result<OpaqueObj> ReadJson();

std::error_code SkipModuleData();
std::error_code HandleCompressedBlob(int op_type);
std::error_code HandleCompressedBlobFinish();
void AllocateDecompressOnce(int op_type);
Expand Down
25 changes: 25 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,29 @@ TEST_F(RdbTest, LoadSmall7) {
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), ElementsAre("einstein", "schrodinger"));
}

TEST_F(RdbTest, RedisJson) {
// RDB file generated via:
// ./redis-server --save "" --appendonly no --loadmodule ../lib/rejson.so
// and then:
// JSON.SET json-str $ '"hello"'
// JSON.SET json-arr $ "[1, true, \"hello\", 3.14]"
// JSON.SET json-obj $
// '{"company":"DragonflyDB","product":"Dragonfly","website":"https://dragondlydb.io","years-active":[2021,2022,2023,2024,"and
// more!"]}'
io::FileSource fs = GetSource("redis_json.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });

ASSERT_FALSE(ec) << ec.message();

EXPECT_EQ(Run({"JSON.GET", "json-str"}), "\"hello\"");
EXPECT_EQ(Run({"JSON.GET", "json-arr"}), "[1,true,\"hello\",3.14]");
EXPECT_EQ(Run({"JSON.GET", "json-obj"}),
"{\"company\":\"DragonflyDB\",\"product\":\"Dragonfly\",\"website\":\"https://"
"dragondlydb.io\",\"years-active\":[2021,2022,2023,2024,\"and more!\"]}");
}
} // namespace dfly
Binary file added src/server/testdata/redis_json.rdb
Binary file not shown.