Skip to content

Commit 84e923d

Browse files
committed
chore: improve reply latency of SendScoredArray
Use SendStringArrInternal that reduces dramatically number of socket calls for long arrays. Signed-off-by: Roman Gershman <[email protected]>
1 parent 9289f12 commit 84e923d

File tree

3 files changed

+71
-19
lines changed

3 files changed

+71
-19
lines changed

src/facade/reply_builder.cc

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -380,21 +380,34 @@ void RedisReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string,
380380
bool with_scores) {
381381
ReplyAggregator agg(this);
382382
if (!with_scores) {
383-
StartArray(arr.size());
384-
for (const auto& p : arr) {
385-
SendBulkString(p.first);
386-
}
383+
auto cb = [&](size_t indx) -> string_view { return arr[indx].first; };
384+
385+
SendStringArrInternal(arr.size(), std::move(cb), CollectionType::ARRAY);
387386
return;
388387
}
388+
389+
// DoubleToStringConverter::kBase10MaximalLength is 17.
390+
char buf[64];
391+
389392
if (!is_resp3_) { // RESP2 formats withscores as a flat array.
390-
StartArray(arr.size() * 2);
391-
for (const auto& p : arr) {
392-
SendBulkString(p.first);
393-
SendDouble(p.second);
394-
}
393+
auto cb = [&](size_t indx) -> string_view {
394+
if (indx % 2 == 0)
395+
return arr[indx / 2].first;
396+
397+
// NOTE: we reuse the same buffer, assuming that SendStringArrInternal does not reference
398+
// previous string_views. The assumption holds for small strings like
399+
// doubles because SendStringArrInternal employs small string optimization.
400+
// It's a bit hacky but saves allocations.
401+
return FormatDouble(arr[indx / 2].second, buf, sizeof(buf));
402+
};
403+
404+
SendStringArrInternal(arr.size() * 2, std::move(cb), CollectionType::ARRAY);
395405
return;
396406
}
407+
397408
// Resp3 formats withscores as array of (key, score) pairs.
409+
// TODO: to implement efficient serializing by extending SendStringArrInternal to support
410+
// 2-level arrays.
398411
StartArray(arr.size());
399412
for (const auto& p : arr) {
400413
StartArray(2);
@@ -406,15 +419,13 @@ void RedisReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string,
406419
void RedisReplyBuilder::SendDouble(double val) {
407420
char buf[64];
408421

409-
StringBuilder sb(buf, sizeof(buf));
410-
CHECK(dfly_conv.ToShortest(val, &sb));
422+
char* start = FormatDouble(val, buf, sizeof(buf));
411423

412424
if (!is_resp3_) {
413-
SendBulkString(sb.Finalize());
425+
SendBulkString(start);
414426
} else {
415427
// RESP3
416-
string str = absl::StrCat(",", sb.Finalize(), kCRLF);
417-
SendRaw(str);
428+
SendRaw(absl::StrCat(",", start, kCRLF));
418429
}
419430
}
420431

@@ -576,10 +587,11 @@ void RedisReplyBuilder::SendStringArrInternal(
576587
}
577588

578589
// When vector length is too long, Send returns EMSGSIZE.
579-
size_t vec_len = std::min<size_t>(128u, size);
590+
size_t vec_len = std::min<size_t>(124u, size);
580591

581592
absl::FixedArray<iovec, 16> vec(vec_len * 2 + 2);
582-
absl::FixedArray<char, 64> meta((vec_len + 1) * 16); // 16 bytes per length.
593+
absl::FixedArray<char, 128> meta(vec_len * 32 + 64); // 32 bytes per element + spare space
594+
583595
char* next = meta.data();
584596

585597
auto serialize_len = [&](char prefix, size_t len) {
@@ -598,18 +610,27 @@ void RedisReplyBuilder::SendStringArrInternal(
598610
for (unsigned i = 0; i < size; ++i) {
599611
src = producer(i);
600612
serialize_len('$', src.size());
613+
601614
// add serialized len blob
602615
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
603616
DCHECK_GT(next - start, 0);
604617

605618
start = next;
606619

607-
vec[vec_indx++] = IoVec(src);
608-
620+
// copy data either by referencing via an iovec or copying inline into meta buf.
621+
if (src.size() >= 30) {
622+
vec[vec_indx++] = IoVec(src);
623+
} else if (src.size() > 0) {
624+
memcpy(next, src.data(), src.size());
625+
vec[vec_indx - 1].iov_len += src.size(); // extend the reference
626+
next += src.size();
627+
start = next;
628+
}
609629
*next++ = '\r';
610630
*next++ = '\n';
611631

612-
if (vec_indx + 1 >= vec.size()) {
632+
// we keep at least 40 bytes to have enough place for a small string as well as its length.
633+
if (vec_indx + 1 >= vec.size() || (meta.end() - next < 40)) {
613634
// Flush the iovec array.
614635
if (i < size - 1 || vec_indx == vec.size()) {
615636
Send(vec.data(), vec_indx);

src/facade/reply_builder.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ class RedisReplyBuilder : public SinkReplyBuilder {
246246
std::string_view operator[](size_t index) const;
247247
};
248248

249+
bool is_resp3() const {
250+
return is_resp3_;
251+
}
252+
249253
private:
250254
void SendStringArrInternal(size_t size, absl::FunctionRef<std::string_view(unsigned)> producer,
251255
CollectionType type);

src/server/main_service.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ class InterpreterReplier : public RedisReplyBuilder {
281281
void SendBulkString(std::string_view str) final;
282282

283283
void StartCollection(unsigned len, CollectionType type) final;
284+
void SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
285+
bool with_scores) final;
284286

285287
private:
286288
void PostItem();
@@ -457,6 +459,31 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType) {
457459
}
458460
}
459461

462+
void InterpreterReplier::SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
463+
bool with_scores) {
464+
if (with_scores) {
465+
if (is_resp3()) {
466+
StartCollection(arr.size(), CollectionType::ARRAY);
467+
for (size_t i = 0; i < arr.size(); ++i) {
468+
StartArray(2);
469+
SendBulkString(arr[i].first);
470+
SendDouble(arr[i].second);
471+
}
472+
} else {
473+
StartCollection(arr.size() * 2, CollectionType::ARRAY);
474+
for (size_t i = 0; i < arr.size(); ++i) {
475+
SendBulkString(arr[i].first);
476+
SendDouble(arr[i].second);
477+
}
478+
}
479+
} else {
480+
StartCollection(arr.size(), CollectionType::ARRAY);
481+
for (size_t i = 0; i < arr.size(); ++i) {
482+
SendBulkString(arr[i].first);
483+
}
484+
}
485+
}
486+
460487
bool IsSHA(string_view str) {
461488
for (auto c : str) {
462489
if (!absl::ascii_isxdigit(c))

0 commit comments

Comments
 (0)