Skip to content

Commit 7226024

Browse files
authored
chore: improve reply latency of SendScoredArray (#2929)
* chore: improve reply latency of SendScoredArray Use SendStringArrInternal that reduces dramatically number of socket calls for long arrays. --------- Signed-off-by: Roman Gershman <[email protected]>
1 parent 9289f12 commit 7226024

File tree

3 files changed

+69
-24
lines changed

3 files changed

+69
-24
lines changed

src/facade/reply_builder.cc

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,6 @@ void RedisReplyBuilder::SetResp3(bool is_resp3) {
285285
is_resp3_ = is_resp3;
286286
}
287287

288-
bool RedisReplyBuilder::IsResp3() const {
289-
return is_resp3_;
290-
}
291-
292288
void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
293289
VLOG(1) << "Error: " << str;
294290

@@ -380,21 +376,33 @@ void RedisReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string,
380376
bool with_scores) {
381377
ReplyAggregator agg(this);
382378
if (!with_scores) {
383-
StartArray(arr.size());
384-
for (const auto& p : arr) {
385-
SendBulkString(p.first);
386-
}
379+
auto cb = [&](size_t indx) -> string_view { return arr[indx].first; };
380+
381+
SendStringArrInternal(arr.size(), std::move(cb), CollectionType::ARRAY);
387382
return;
388383
}
384+
385+
char buf[DoubleToStringConverter::kBase10MaximalLength * 3]; // to be on the safe side.
386+
389387
if (!is_resp3_) { // RESP2 formats withscores as a flat array.
390-
StartArray(arr.size() * 2);
391-
for (const auto& p : arr) {
392-
SendBulkString(p.first);
393-
SendDouble(p.second);
394-
}
388+
auto cb = [&](size_t indx) -> string_view {
389+
if (indx % 2 == 0)
390+
return arr[indx / 2].first;
391+
392+
// NOTE: we reuse the same buffer, assuming that SendStringArrInternal does not reference
393+
// previous string_views. The assumption holds for small strings like
394+
// doubles because SendStringArrInternal employs small string optimization.
395+
// It's a bit hacky but saves allocations.
396+
return FormatDouble(arr[indx / 2].second, buf, sizeof(buf));
397+
};
398+
399+
SendStringArrInternal(arr.size() * 2, std::move(cb), CollectionType::ARRAY);
395400
return;
396401
}
402+
397403
// Resp3 formats withscores as array of (key, score) pairs.
404+
// TODO: to implement efficient serializing by extending SendStringArrInternal to support
405+
// 2-level arrays.
398406
StartArray(arr.size());
399407
for (const auto& p : arr) {
400408
StartArray(2);
@@ -406,15 +414,13 @@ void RedisReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string,
406414
void RedisReplyBuilder::SendDouble(double val) {
407415
char buf[64];
408416

409-
StringBuilder sb(buf, sizeof(buf));
410-
CHECK(dfly_conv.ToShortest(val, &sb));
417+
char* start = FormatDouble(val, buf, sizeof(buf));
411418

412419
if (!is_resp3_) {
413-
SendBulkString(sb.Finalize());
420+
SendBulkString(start);
414421
} else {
415422
// RESP3
416-
string str = absl::StrCat(",", sb.Finalize(), kCRLF);
417-
SendRaw(str);
423+
SendRaw(absl::StrCat(",", start, kCRLF));
418424
}
419425
}
420426

@@ -576,10 +582,11 @@ void RedisReplyBuilder::SendStringArrInternal(
576582
}
577583

578584
// When vector length is too long, Send returns EMSGSIZE.
579-
size_t vec_len = std::min<size_t>(128u, size);
585+
size_t vec_len = std::min<size_t>(124u, size);
580586

581587
absl::FixedArray<iovec, 16> vec(vec_len * 2 + 2);
582-
absl::FixedArray<char, 64> meta((vec_len + 1) * 16); // 16 bytes per length.
588+
absl::FixedArray<char, 128> meta(vec_len * 32 + 64); // 32 bytes per element + spare space
589+
583590
char* next = meta.data();
584591

585592
auto serialize_len = [&](char prefix, size_t len) {
@@ -598,18 +605,27 @@ void RedisReplyBuilder::SendStringArrInternal(
598605
for (unsigned i = 0; i < size; ++i) {
599606
src = producer(i);
600607
serialize_len('$', src.size());
608+
601609
// add serialized len blob
602610
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
603611
DCHECK_GT(next - start, 0);
604612

605613
start = next;
606614

607-
vec[vec_indx++] = IoVec(src);
608-
615+
// copy data either by referencing via an iovec or copying inline into meta buf.
616+
if (src.size() >= 30) {
617+
vec[vec_indx++] = IoVec(src);
618+
} else if (src.size() > 0) {
619+
memcpy(next, src.data(), src.size());
620+
vec[vec_indx - 1].iov_len += src.size(); // extend the reference
621+
next += src.size();
622+
start = next;
623+
}
609624
*next++ = '\r';
610625
*next++ = '\n';
611626

612-
if (vec_indx + 1 >= vec.size()) {
627+
// we keep at least 40 bytes to have enough place for a small string as well as its length.
628+
if (vec_indx + 1 >= vec.size() || (meta.end() - next < 40)) {
613629
// Flush the iovec array.
614630
if (i < size - 1 || vec_indx == vec.size()) {
615631
Send(vec.data(), vec_indx);

src/facade/reply_builder.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,9 @@ class RedisReplyBuilder : public SinkReplyBuilder {
208208
RedisReplyBuilder(::io::Sink* stream);
209209

210210
void SetResp3(bool is_resp3);
211-
bool IsResp3() const;
211+
bool IsResp3() const {
212+
return is_resp3_;
213+
}
212214

213215
void SendError(std::string_view str, std::string_view type = {}) override;
214216
using SinkReplyBuilder::SendError;

src/server/main_service.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ class InterpreterReplier : public RedisReplyBuilder {
281281
void SendBulkString(std::string_view str) final;
282282

283283
void StartCollection(unsigned len, CollectionType type) final;
284+
void SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
285+
bool with_scores) final;
284286

285287
private:
286288
void PostItem();
@@ -457,6 +459,31 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType) {
457459
}
458460
}
459461

462+
void InterpreterReplier::SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
463+
bool with_scores) {
464+
if (with_scores) {
465+
if (IsResp3()) {
466+
StartCollection(arr.size(), CollectionType::ARRAY);
467+
for (size_t i = 0; i < arr.size(); ++i) {
468+
StartArray(2);
469+
SendBulkString(arr[i].first);
470+
SendDouble(arr[i].second);
471+
}
472+
} else {
473+
StartCollection(arr.size() * 2, CollectionType::ARRAY);
474+
for (size_t i = 0; i < arr.size(); ++i) {
475+
SendBulkString(arr[i].first);
476+
SendDouble(arr[i].second);
477+
}
478+
}
479+
} else {
480+
StartCollection(arr.size(), CollectionType::ARRAY);
481+
for (size_t i = 0; i < arr.size(); ++i) {
482+
SendBulkString(arr[i].first);
483+
}
484+
}
485+
}
486+
460487
bool IsSHA(string_view str) {
461488
for (auto c : str) {
462489
if (!absl::ascii_isxdigit(c))

0 commit comments

Comments
 (0)