Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1776,30 +1776,61 @@ struct ScanOptions {
: range(_start, _upper_bound) {}
};

class BlockBasedTable;
class PrefetchRateLimiter {
public:
PrefetchRateLimiter() = default;
virtual ~PrefetchRateLimiter() = default;

virtual size_t acquire(const BlockBasedTable* table, size_t bytes,
bool all_or_nothing) = 0;
virtual bool release(size_t bytes) = 0;
};

class DefaultPrefetchRateLimiter : public PrefetchRateLimiter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need not be declared in a public header file. RocksDB typically just exposes an allocator, like NewDefaultPrefetchRateLimiter().

public:
explicit DefaultPrefetchRateLimiter(size_t max_bytes)
: max_bytes_(max_bytes), cur_bytes_(max_bytes) {}
virtual ~DefaultPrefetchRateLimiter() override = default;

virtual size_t acquire(const BlockBasedTable* table, size_t bytes,
bool all_or_nothing) override;
virtual bool release(size_t bytes) override;

private:
const size_t max_bytes_;
std::atomic<size_t> cur_bytes_;
};

// Container for multiple scan ranges that can be used with MultiScan.
// This replaces std::vector<ScanOptions> with a more efficient implementation
// that can merge overlapping ranges.
class MultiScanArgs {
public:
// Constructor that takes a comparator
explicit MultiScanArgs(const Comparator* comparator = BytewiseComparator())
: comp_(comparator) {}
: prefetch_rate_limiter(nullptr), comp_(comparator) {}

// Copy Constructor
MultiScanArgs(const MultiScanArgs& other) {
comp_ = other.comp_;
original_ranges_ = other.original_ranges_;
io_coalesce_threshold = other.io_coalesce_threshold;
}
MultiScanArgs(const MultiScanArgs& other)
: io_coalesce_threshold(other.io_coalesce_threshold),
prefetch_rate_limiter(other.prefetch_rate_limiter),
comp_(other.comp_),
original_ranges_(other.original_ranges_) {}

MultiScanArgs(MultiScanArgs&& other) noexcept
: io_coalesce_threshold(other.io_coalesce_threshold),
prefetch_rate_limiter(std::move(other.prefetch_rate_limiter)),
comp_(other.comp_),
original_ranges_(std::move(other.original_ranges_)) {}

MultiScanArgs& operator=(const MultiScanArgs& other) {
comp_ = other.comp_;
original_ranges_ = other.original_ranges_;
io_coalesce_threshold = other.io_coalesce_threshold;
if (this != &other) {
comp_ = other.comp_;
original_ranges_ = other.original_ranges_;
io_coalesce_threshold = other.io_coalesce_threshold;
prefetch_rate_limiter = other.prefetch_rate_limiter;
}
return *this;
}

Expand All @@ -1808,6 +1839,7 @@ class MultiScanArgs {
comp_ = other.comp_;
original_ranges_ = std::move(other.original_ranges_);
io_coalesce_threshold = other.io_coalesce_threshold;
prefetch_rate_limiter = std::move(other.prefetch_rate_limiter);
}
return *this;
}
Expand Down Expand Up @@ -1849,6 +1881,12 @@ class MultiScanArgs {

uint64_t io_coalesce_threshold = 16 << 10; // 16KB by default

std::shared_ptr<PrefetchRateLimiter> prefetch_rate_limiter;

PrefetchRateLimiter& GetMutablePrefetchRateLimiter() const {
return *prefetch_rate_limiter.get();
}

private:
// The comparator used for ordering ranges
const Comparator* comp_;
Expand Down
73 changes: 73 additions & 0 deletions table/block_based/block_based_table_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@

namespace ROCKSDB_NAMESPACE {

size_t DefaultPrefetchRateLimiter::acquire(const BlockBasedTable* /*unused*/,
size_t bytes, bool all_or_nothing) {
bool done = false;
size_t amount = 0;
// Quick check if we have nothing.
if (cur_bytes_ == 0) {
return amount;
}
while (!done) {
// Check again here.
size_t current = cur_bytes_.load();
if (current == 0) {
amount = 0;
return amount;
}
if (all_or_nothing) {
if (current >= bytes) {
done = cur_bytes_.compare_exchange_weak(current, current - bytes);
amount = bytes;
} else {
amount = 0;
return amount;
}
} else {
if (current > bytes) {
done = cur_bytes_.compare_exchange_weak(current, current - bytes);
amount = bytes;
} else {
done = cur_bytes_.compare_exchange_weak(current, 0);
amount = current;
}
}
}
return amount;
}

bool DefaultPrefetchRateLimiter::release(size_t bytes) {
bool done = false;
while (!done) {
// Check again here.
size_t current = cur_bytes_.load();
if (current + bytes >= max_bytes_) {
done = cur_bytes_.compare_exchange_weak(current, max_bytes_);
} else {
done = cur_bytes_.compare_exchange_weak(current, current + bytes);
}
}
return true;
}

void BlockBasedTableIterator::SeekToFirst() { SeekImpl(nullptr, false); }

void BlockBasedTableIterator::Seek(const Slice& target) {
Expand Down Expand Up @@ -984,6 +1034,7 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) {
std::vector<BlockHandle> blocks_to_prepare;
Status s;
std::vector<std::tuple<size_t, size_t>> block_ranges_per_scan;
total_acquired_ = 0;
for (const auto& scan_opt : *scan_opts) {
size_t num_blocks = 0;
// Current scan overlap the last block of the previous scan.
Expand All @@ -1000,6 +1051,16 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) {
index_iter_->user_key(),
/*a_has_ts*/ true, *scan_opt.range.limit,
/*b_has_ts=*/false) <= 0)) {
// Lets make sure we are rate limited on how many blocks to prepare
if (multiscan_opts->prefetch_rate_limiter) {
auto blocks = multiscan_opts->GetMutablePrefetchRateLimiter().acquire(
table_, index_iter_->value().handle.size(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write last parameter as /*all_or_nothing=*/true (Google C++ style guide - https://google.github.io/styleguide/cppguide.html#Function_Argument_Comments)

total_acquired_ += blocks;
if (blocks == 0) {
break;
}
}

if (check_overlap &&
blocks_to_prepare.back() == index_iter_->value().handle) {
// Skip the current block since it's already in the list
Expand Down Expand Up @@ -1162,6 +1223,10 @@ void BlockBasedTableIterator::Prepare(const MultiScanArgs* multiscan_opts) {
}
}

if (read_reqs.size() == 0) {
return;
}

AlignedBuf aligned_buf;
s = table_->get_rep()->file.get()->MultiRead(
io_opts, read_reqs.data(), read_reqs.size(),
Expand Down Expand Up @@ -1345,6 +1410,14 @@ void BlockBasedTableIterator::FindBlockForwardInMultiScan() {
}
// Move to the next pinned data block
ResetDataIter();
if (multi_scan_->prefetch_rate_limiter) {
size_t releasing =
multi_scan_->pinned_data_blocks[multi_scan_->cur_data_block_idx]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinned_data_blocks[multi_scan_->cur_data_block_idx] would not be valid at this point I think, since it would've been transferred to the data block iter. Or am I missing something?

.GetValue()
->size();
multi_scan_->prefetch_rate_limiter->release(releasing);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd do this in a cleanup function registered with block_iter_ (which is derived from Cleanable) so that the release happens whenever block_iter_ is reset.

total_acquired_ -= releasing;
}
++multi_scan_->cur_data_block_idx;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_,
Expand Down
23 changes: 21 additions & 2 deletions table/block_based/block_based_table_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "table/block_based/reader_common.h"

namespace ROCKSDB_NAMESPACE {

// Iterates over the contents of BlockBasedTable.
class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// compaction_readahead_size: its value will only be used if for_compaction =
Expand Down Expand Up @@ -47,7 +48,17 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
is_last_level_(table->IsLastLevel()),
block_iter_points_to_real_block_(false) {}

~BlockBasedTableIterator() override { ClearBlockHandles(); }
~BlockBasedTableIterator() override {
ClearBlockHandles();

// Release any acquired bytes from the rate limiter if we have a multi_scan_
// Use the stored rate limiter copy to avoid accessing potentially invalid
// scan_opts
if (multi_scan_ && multi_scan_->prefetch_rate_limiter &&
total_acquired_ > 0) {
multi_scan_->prefetch_rate_limiter->release(total_acquired_);
}
}

void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
Expand Down Expand Up @@ -373,6 +384,8 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// *** END States used by both regular scan and multiscan

// *** BEGIN MultiScan related states ***
size_t total_acquired_ = 0;

struct MultiScanState {
// bool prepared_ = false;
const MultiScanArgs* scan_opts;
Expand All @@ -385,6 +398,10 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
size_t next_scan_idx;
size_t cur_data_block_idx;

// Store the rate limiter separately to avoid accessing potentially invalid
// scan_opts
std::shared_ptr<PrefetchRateLimiter> prefetch_rate_limiter;

MultiScanState(
const MultiScanArgs* _scan_opts,
std::vector<CachableEntry<Block>>&& _pinned_data_blocks,
Expand All @@ -393,7 +410,9 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
pinned_data_blocks(std::move(_pinned_data_blocks)),
block_ranges_per_scan(std::move(_block_ranges_per_scan)),
next_scan_idx(0),
cur_data_block_idx(0) {}
cur_data_block_idx(0),
prefetch_rate_limiter(_scan_opts ? _scan_opts->prefetch_rate_limiter
: nullptr) {}
};

std::unique_ptr<MultiScanState> multi_scan_;
Expand Down
Loading
Loading