Skip to content

Commit 2284fb1

Browse files
author
Xingbo Wang
committed
Support Super Block Alignment
Summary: Add restart point in index builder when super block alignment padded the data block Test Plan: Unit Test Reviewers: Subscribers: Tasks: T227973776 Tags:
1 parent ac4d563 commit 2284fb1

10 files changed

+180
-27
lines changed

db/db_flush_test.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3561,6 +3561,69 @@ TEST_F(DBFlushTest, VerifyOutputRecordCount) {
35613561
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
35623562
}
35633563
}
3564+
3565+
std::string formatKey(int i) {
3566+
int desired_length = 10;
3567+
char buffer[64];
3568+
sprintf(buffer, "%0*d", desired_length, i);
3569+
return buffer;
3570+
}
3571+
3572+
std::string formatValue(int i) { return std::string(1234, 'a' + (i % 26)); }
3573+
3574+
TEST_F(DBFlushTest, SuperBlock) {
3575+
constexpr int key_count = 12345;
3576+
Options options;
3577+
BlockBasedTableOptions block_options;
3578+
block_options.block_align = false;
3579+
block_options.super_block_align = true;
3580+
block_options.super_block_alignment_size = 64 * 1024;
3581+
block_options.super_block_alignment_max_padding_size = 4 * 1024;
3582+
options.table_factory.reset(NewBlockBasedTableFactory(block_options));
3583+
3584+
Reopen(options);
3585+
3586+
int super_block_pad_count = 0;
3587+
SyncPoint::GetInstance()->SetCallBack(
3588+
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
3589+
"SuperBlockAlignment",
3590+
[&super_block_pad_count](void* /*arg*/) { super_block_pad_count++; });
3591+
SyncPoint::GetInstance()->EnableProcessing();
3592+
3593+
// Add lots of keys
3594+
for (int i = 0; i < key_count; ++i) {
3595+
Put(formatKey(i), formatValue(i));
3596+
}
3597+
3598+
// flush the data in memory to disk to verify with super block alignment, the
3599+
// data could be read back properly
3600+
Reopen(options);
3601+
3602+
SyncPoint::GetInstance()->DisableProcessing();
3603+
SyncPoint::GetInstance()->ClearAllCallBacks();
3604+
3605+
ASSERT_GT(super_block_pad_count, 0);
3606+
3607+
// verify the values are correct
3608+
// Test GET
3609+
for (int i = 0; i < key_count; ++i) {
3610+
PinnableSlice value;
3611+
ASSERT_OK(Get(formatKey(i), &value));
3612+
ASSERT_EQ(value.ToString(), formatValue(i));
3613+
}
3614+
// Test iterator
3615+
{
3616+
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
3617+
int i = 0;
3618+
for (it->SeekToFirst(); it->Valid(); it->Next()) {
3619+
ASSERT_OK(it->status());
3620+
ASSERT_EQ((it->key()).ToString(), formatKey(i));
3621+
ASSERT_EQ((it->value()).ToString(), formatValue(i));
3622+
i++;
3623+
}
3624+
ASSERT_EQ(i, key_count);
3625+
}
3626+
}
35643627
} // namespace ROCKSDB_NAMESPACE
35653628

35663629
int main(int argc, char** argv) {

include/rocksdb/table.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,17 @@ struct BlockBasedTableOptions {
597597
// Align data blocks on lesser of page size and block size
598598
bool block_align = false;
599599

600+
// Align data blocks on super block alignment. Avoid a data block split across
601+
// super block boundaries.
602+
bool super_block_align = false;
603+
604+
// Super block alignment size. Default to 512 KB. It has to be a power of 2
605+
// and higher than block size.
606+
size_t super_block_alignment_size = 512 * 1024;
607+
608+
// Maximum number of bytes allowed to be padded for super block alignment.
609+
size_t super_block_alignment_max_padding_size = 32 * 1024;
610+
600611
// This enum allows trading off increased index size for improved iterator
601612
// seek performance in some situations, particularly when block cache is
602613
// disabled (ReadOptions::fill_cache = false) and direct IO is

table/block_based/block_based_table_builder.cc

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,7 +1319,9 @@ void BlockBasedTableBuilder::EmitBlock(std::string& uncompressed,
13191319
r->get_offset());
13201320
r->pc_rep->EmitBlock(block_rep);
13211321
} else {
1322-
WriteBlock(uncompressed, &r->pending_handle, BlockType::kData);
1322+
bool should_add_restart_point_on_index_block = false;
1323+
WriteBlock(uncompressed, &r->pending_handle, BlockType::kData,
1324+
&should_add_restart_point_on_index_block);
13231325
if (ok()) {
13241326
// We do not emit the index entry for a block until we have seen the
13251327
// first key for the next data block. This allows us to use shorter
@@ -1330,14 +1332,14 @@ void BlockBasedTableBuilder::EmitBlock(std::string& uncompressed,
13301332
// blocks.
13311333
r->index_builder->AddIndexEntry(
13321334
last_key_in_current_block, first_key_in_next_block, r->pending_handle,
1333-
&r->index_separator_scratch);
1335+
&r->index_separator_scratch, should_add_restart_point_on_index_block);
13341336
}
13351337
}
13361338
}
13371339

1338-
void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
1339-
BlockHandle* handle,
1340-
BlockType block_type) {
1340+
void BlockBasedTableBuilder::WriteBlock(
1341+
const Slice& uncompressed_block_data, BlockHandle* handle,
1342+
BlockType block_type, bool* should_add_restart_point_on_index_block) {
13411343
Rep* r = rep_;
13421344
assert(r->state == Rep::State::kUnbuffered);
13431345
CompressionType type;
@@ -1358,7 +1360,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
13581360
WriteMaybeCompressedBlock(type == kNoCompression
13591361
? uncompressed_block_data
13601362
: Slice(r->single_threaded_compressed_output),
1361-
type, handle, block_type, &uncompressed_block_data);
1363+
type, handle, block_type, &uncompressed_block_data,
1364+
should_add_restart_point_on_index_block);
13621365
r->single_threaded_compressed_output.Reset();
13631366
if (is_data_block) {
13641367
r->props.data_size = r->get_offset();
@@ -1489,13 +1492,17 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
14891492

14901493
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
14911494
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
1492-
BlockType block_type, const Slice* uncompressed_block_data) {
1495+
BlockType block_type, const Slice* uncompressed_block_data,
1496+
bool* should_add_restart_point_on_index_block) {
14931497
// File format contains a sequence of blocks where each block has:
14941498
// block_data: uint8[n]
14951499
// compression_type: uint8
14961500
// checksum: uint32
14971501
Rep* r = rep_;
14981502
bool is_data_block = block_type == BlockType::kData;
1503+
if (should_add_restart_point_on_index_block != nullptr) {
1504+
*should_add_restart_point_on_index_block = false;
1505+
}
14991506
IOOptions io_options;
15001507
IOStatus io_s =
15011508
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
@@ -1505,7 +1512,37 @@ void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
15051512
}
15061513
// Old, misleading name of this function: WriteRawBlock
15071514
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
1508-
const uint64_t offset = r->get_offset();
1515+
1516+
uint64_t offset = r->get_offset();
1517+
// try to align the page to the super alignment size, if enabled
1518+
if (r->table_options.super_block_align && is_data_block) {
1519+
auto super_block_alignment_mask =
1520+
r->table_options.super_block_alignment_size - 1;
1521+
if ((offset & (~super_block_alignment_mask)) !=
1522+
((offset + block_contents.size()) & (~super_block_alignment_mask))) {
1523+
// new block would cross the super block boundary
1524+
auto pad_bytes = r->table_options.super_block_alignment_size -
1525+
(offset & super_block_alignment_mask);
1526+
if (pad_bytes <=
1527+
r->table_options.super_block_alignment_max_padding_size) {
1528+
io_s = r->file->Pad(io_options, pad_bytes);
1529+
if (!io_s.ok()) {
1530+
r->SetIOStatus(io_s);
1531+
return;
1532+
}
1533+
r->pre_compression_size += pad_bytes;
1534+
offset += pad_bytes;
1535+
r->set_offset(offset);
1536+
if (should_add_restart_point_on_index_block != nullptr) {
1537+
*should_add_restart_point_on_index_block = true;
1538+
}
1539+
TEST_SYNC_POINT(
1540+
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
1541+
"SuperBlockAlignment");
1542+
}
1543+
}
1544+
}
1545+
15091546
handle->set_offset(offset);
15101547
handle->set_size(block_contents.size());
15111548
assert(status().ok());

table/block_based/block_based_table_builder.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,13 @@ class BlockBasedTableBuilder : public TableBuilder {
126126

127127
// Compress and write block content to the file.
128128
void WriteBlock(const Slice& block_contents, BlockHandle* handle,
129-
BlockType block_type);
129+
BlockType block_type,
130+
bool* should_add_restart_point_on_index_block = nullptr);
130131
// Directly write data to the file.
131132
void WriteMaybeCompressedBlock(
132133
const Slice& block_contents, CompressionType, BlockHandle* handle,
133-
BlockType block_type, const Slice* uncompressed_block_data = nullptr);
134+
BlockType block_type, const Slice* uncompressed_block_data = nullptr,
135+
bool* should_add_restart_point_on_index_block = nullptr);
134136

135137
void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);
136138

table/block_based/block_based_table_factory.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,24 @@ Status BlockBasedTableFactory::ValidateOptions(
690690
return Status::InvalidArgument(
691691
"block size exceeds maximum number (4GiB) allowed");
692692
}
693+
if (table_options_.super_block_align &&
694+
(table_options_.super_block_alignment_size &
695+
(table_options_.super_block_alignment_size - 1))) {
696+
return Status::InvalidArgument(
697+
"Super Block alignment requested but super block alignment size is not "
698+
"a power of 2");
699+
}
700+
if (table_options_.super_block_alignment_size >
701+
std::numeric_limits<uint32_t>::max()) {
702+
return Status::InvalidArgument(
703+
"Super block alignment size exceeds maximum number (4GiB) allowed");
704+
}
705+
if (table_options_.super_block_alignment_max_padding_size >
706+
table_options_.super_block_alignment_size) {
707+
return Status::InvalidArgument(
708+
"Super block alignment max padding size exceeds super block alignment "
709+
"size");
710+
}
693711
if (table_options_.data_block_index_type ==
694712
BlockBasedTableOptions::kDataBlockBinaryAndHash &&
695713
table_options_.data_block_hash_table_util_ratio <= 0) {

table/block_based/block_builder.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ class BlockBuilder {
7474
: 0);
7575
}
7676

77+
// Add a restart point, if it is not at a restart point.
78+
void Restart() {
79+
if (counter_ != 0) {
80+
restarts_.push_back(static_cast<uint32_t>(buffer_.size()));
81+
estimate_ += sizeof(uint32_t);
82+
counter_ = 0;
83+
}
84+
}
85+
7786
// Returns an estimated block size after appending key and value.
7887
size_t EstimateSizeAfterKV(const Slice& key, const Slice& value) const;
7988

table/block_based/index_builder.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,16 +248,16 @@ void PartitionedIndexBuilder::FinishIndexEntry(const BlockHandle& block_handle,
248248
Slice PartitionedIndexBuilder::AddIndexEntry(
249249
const Slice& last_key_in_current_block,
250250
const Slice* first_key_in_next_block, const BlockHandle& block_handle,
251-
std::string* separator_scratch) {
251+
std::string* separator_scratch, bool add_restart_point) {
252252
// At least when running without parallel compression, maintain behavior of
253253
// avoiding a last index partition with just one entry
254254
if (first_key_in_next_block) {
255255
MaybeFlush(last_key_in_current_block, block_handle);
256256
}
257257

258-
auto sep = sub_index_builder_->AddIndexEntry(last_key_in_current_block,
259-
first_key_in_next_block,
260-
block_handle, separator_scratch);
258+
auto sep = sub_index_builder_->AddIndexEntry(
259+
last_key_in_current_block, first_key_in_next_block, block_handle,
260+
separator_scratch, add_restart_point);
261261
entries_.back().key.assign(sep.data(), sep.size());
262262

263263
if (!must_use_separator_with_seq_ &&

table/block_based/index_builder.h

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ class IndexBuilder {
7575
virtual Slice AddIndexEntry(const Slice& last_key_in_current_block,
7676
const Slice* first_key_in_next_block,
7777
const BlockHandle& block_handle,
78-
std::string* separator_scratch) = 0;
78+
std::string* separator_scratch,
79+
bool add_restart_point) = 0;
7980

8081
// An abstract (extensible) holder for passing data from PrepareIndexEntry to
8182
// FinishIndexEntry (see below).
@@ -286,12 +287,14 @@ class ShortenedIndexBuilder : public IndexBuilder {
286287
void AddIndexEntryImpl(const Slice& separator_with_seq,
287288
const Slice& first_internal_key,
288289
const BlockHandle& block_handle,
289-
bool must_use_separator_with_seq) {
290+
bool must_use_separator_with_seq,
291+
bool add_restart_point) {
290292
IndexValue entry(block_handle, first_internal_key);
291293
std::string encoded_entry;
292294
std::string delta_encoded_entry;
293295
entry.EncodeTo(&encoded_entry, include_first_key_, nullptr);
294-
if (use_value_delta_encoding_ && !last_encoded_handle_.IsNull()) {
296+
if (use_value_delta_encoding_ && !last_encoded_handle_.IsNull() &&
297+
!add_restart_point) {
295298
entry.EncodeTo(&delta_encoded_entry, include_first_key_,
296299
&last_encoded_handle_);
297300
} else {
@@ -301,6 +304,10 @@ class ShortenedIndexBuilder : public IndexBuilder {
301304
last_encoded_handle_ = block_handle;
302305
const Slice delta_encoded_entry_slice(delta_encoded_entry);
303306

307+
if (add_restart_point) {
308+
index_block_builder_.Restart();
309+
}
310+
304311
// TODO(yuzhangyu): fix this when "FindShortInternalKeySuccessor"
305312
// optimization is available.
306313
// Timestamp aware comparator currently doesn't provide override for
@@ -322,15 +329,16 @@ class ShortenedIndexBuilder : public IndexBuilder {
322329
Slice AddIndexEntry(const Slice& last_key_in_current_block,
323330
const Slice* first_key_in_next_block,
324331
const BlockHandle& block_handle,
325-
std::string* separator_scratch) override {
332+
std::string* separator_scratch,
333+
bool add_restart_point) override {
326334
Slice separator_with_seq = GetSeparatorWithSeq(
327335
last_key_in_current_block, first_key_in_next_block, separator_scratch);
328336

329337
std::string first_internal_key_buf;
330338
Slice first_internal_key = GetFirstInternalKey(&first_internal_key_buf);
331339

332340
AddIndexEntryImpl(separator_with_seq, first_internal_key, block_handle,
333-
must_use_separator_with_seq_);
341+
must_use_separator_with_seq_, add_restart_point);
334342
current_block_first_internal_key_.clear();
335343
return separator_with_seq;
336344
}
@@ -388,7 +396,7 @@ class ShortenedIndexBuilder : public IndexBuilder {
388396
ShortenedPreparedIndexEntry* entry =
389397
static_cast<ShortenedPreparedIndexEntry*>(base_entry);
390398
AddIndexEntryImpl(entry->separator_with_seq, entry->first_internal_key,
391-
block_handle, entry->must_use_separator_with_seq);
399+
block_handle, entry->must_use_separator_with_seq, false);
392400
}
393401

394402
using IndexBuilder::Finish;
@@ -483,11 +491,12 @@ class HashIndexBuilder : public IndexBuilder {
483491
Slice AddIndexEntry(const Slice& last_key_in_current_block,
484492
const Slice* first_key_in_next_block,
485493
const BlockHandle& block_handle,
486-
std::string* separator_scratch) override {
494+
std::string* separator_scratch,
495+
bool add_restart_point) override {
487496
++current_restart_index_;
488497
return primary_index_builder_.AddIndexEntry(
489498
last_key_in_current_block, first_key_in_next_block, block_handle,
490-
separator_scratch);
499+
separator_scratch, add_restart_point);
491500
}
492501

493502
std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() override {
@@ -611,7 +620,8 @@ class PartitionedIndexBuilder : public IndexBuilder {
611620
Slice AddIndexEntry(const Slice& last_key_in_current_block,
612621
const Slice* first_key_in_next_block,
613622
const BlockHandle& block_handle,
614-
std::string* separator_scratch) override;
623+
std::string* separator_scratch,
624+
bool add_restart_point) override;
615625

616626
std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() override;
617627
void PrepareIndexEntry(const Slice& last_key_in_current_block,

table/block_based/partitioned_filter_block_test.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,8 @@ class PartitionedFilterBlockTest
315315
std::string(*InternalKey(user_key, 0, ValueType::kTypeValue).rep());
316316
BlockHandle dont_care_block_handle(1, 1);
317317
std::string scratch;
318-
builder->AddIndexEntry(key, nullptr, dont_care_block_handle, &scratch);
318+
builder->AddIndexEntry(key, nullptr, dont_care_block_handle, &scratch,
319+
false);
319320
}
320321

321322
void CutABlock(PartitionedIndexBuilder* builder, const std::string& user_key,
@@ -327,7 +328,8 @@ class PartitionedFilterBlockTest
327328
BlockHandle dont_care_block_handle(1, 1);
328329
Slice slice = Slice(next_key.data(), next_key.size());
329330
std::string scratch;
330-
builder->AddIndexEntry(key, &slice, dont_care_block_handle, &scratch);
331+
builder->AddIndexEntry(key, &slice, dont_care_block_handle, &scratch,
332+
false);
331333
}
332334

333335
int CountNumOfIndexPartitions(PartitionedIndexBuilder* builder) {

table/block_based/user_defined_index_wrapper.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class UserDefinedIndexBuilderWrapper : public IndexBuilder {
4141
Slice AddIndexEntry(const Slice& last_key_in_current_block,
4242
const Slice* first_key_in_next_block,
4343
const BlockHandle& block_handle,
44-
std::string* separator_scratch) override {
44+
std::string* separator_scratch,
45+
bool add_restart_point) override {
4546
UserDefinedIndexBuilder::BlockHandle handle;
4647
handle.offset = block_handle.offset();
4748
handle.size = block_handle.size();
@@ -66,7 +67,7 @@ class UserDefinedIndexBuilderWrapper : public IndexBuilder {
6667
}
6768
return internal_index_builder_->AddIndexEntry(
6869
last_key_in_current_block, first_key_in_next_block, block_handle,
69-
separator_scratch);
70+
separator_scratch, add_restart_point);
7071
}
7172

7273
// Not supported with parallel compression

0 commit comments

Comments
 (0)