Skip to content

Commit 931c9df

Browse files
vjnadimpallifacebook-github-bot
authored andcommitted
Use separate status code for column family drop and db shutdown in progress (#5275)
Summary: Currently RocksDB uses Status::ShutdownInProgress to inform about column family drop. I would like to have a separate Status code for this event. https://github.com/facebook/rocksdb/blob/master/include/rocksdb/status.h#L55 Comment on this: https://github.com/facebook/rocksdb/blob/abc4202e47eb433dc731911af38f232d2148428c/db/version_set.cc#L2742:L2743 Pull Request resolved: #5275 Differential Revision: D15204583 Pulled By: vjnadimpalli fbshipit-source-id: 95e99e34b27bc165b554ecb8a48a7f8e60f21e2a
1 parent 5c0e304 commit 931c9df

File tree

11 files changed

+75
-34
lines changed

11 files changed

+75
-34
lines changed

HISTORY.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414
* Merging iterator to avoid child iterator reseek for some cases
1515
* Reduce iterator key comparision for upper/lower bound check.
1616

17+
### General Improvements
18+
* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress.
19+
1720
### Bug Fixes
1821

22+
1923
## 6.2.0 (4/30/2019)
2024
### New Features
2125
* Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`.

db/compaction_job.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,10 +1004,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
10041004
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
10051005
RecordCompactionIOStats();
10061006

1007-
if (status.ok() &&
1008-
(shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) {
1009-
status = Status::ShutdownInProgress(
1010-
"Database shutdown or Column family drop during compaction");
1007+
if (status.ok() && cfd->IsDropped()) {
1008+
status =
1009+
Status::ColumnFamilyDropped("Column family dropped during compaction");
1010+
}
1011+
if ((status.ok() || status.IsColumnFamilyDropped()) &&
1012+
shutting_down_->load(std::memory_order_relaxed)) {
1013+
status = Status::ShutdownInProgress("Database shutdown");
10111014
}
10121015
if (status.ok()) {
10131016
status = input->status();

db/db_compaction_test.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3890,11 +3890,17 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
38903890
}
38913891
Flush(1);
38923892
}
3893-
auto manual_compaction_thread = port::Thread([this]() {
3893+
auto manual_compaction_thread = port::Thread([this, i]() {
38943894
CompactRangeOptions cro;
38953895
cro.allow_write_stall = false;
3896-
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
3897-
.IsShutdownInProgress());
3896+
Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
3897+
if (i == 0) {
3898+
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
3899+
.IsColumnFamilyDropped());
3900+
} else {
3901+
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
3902+
.IsShutdownInProgress());
3903+
}
38983904
});
38993905

39003906
TEST_SYNC_POINT(

db/db_flush_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
431431
cf_ids.push_back(cf_id);
432432
}
433433
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
434-
ASSERT_TRUE(Flush(cf_ids).IsShutdownInProgress());
434+
ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
435435
Destroy(options);
436436
}
437437

db/db_impl_compaction_flush.cc

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ Status DBImpl::FlushMemTableToOutputFile(
201201
cfd->current()->storage_info()->LevelSummary(&tmp));
202202
}
203203

204-
if (!s.ok() && !s.IsShutdownInProgress()) {
204+
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
205205
Status new_bg_error = s;
206206
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
207207
}
@@ -254,7 +254,7 @@ Status DBImpl::FlushMemTablesToOutputFiles(
254254
snapshot_checker, log_buffer, thread_pri);
255255
if (!s.ok()) {
256256
status = s;
257-
if (!s.IsShutdownInProgress()) {
257+
if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
258258
// At this point, DB is not shutting down, nor is cfd dropped.
259259
// Something is wrong, thus we break out of the loop.
260260
break;
@@ -385,7 +385,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
385385
for (const auto& e : exec_status) {
386386
if (!e.second.ok()) {
387387
s = e.second;
388-
if (!e.second.IsShutdownInProgress()) {
388+
if (!e.second.IsShutdownInProgress() &&
389+
!e.second.IsColumnFamilyDropped()) {
389390
// If a flush job did not return OK, and the CF is not dropped, and
390391
// the DB is not shutting down, then we have to return this result to
391392
// caller later.
@@ -397,15 +398,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
397398
s = error_status.ok() ? s : error_status;
398399
}
399400

400-
// If db is NOT shutting down, and one or more column families have been
401-
// dropped.
402-
// TODO: use separate status code for db shutdown and column family dropped.
403-
if (s.IsShutdownInProgress() &&
404-
!shutting_down_.load(std::memory_order_acquire)) {
401+
if (s.IsColumnFamilyDropped()) {
405402
s = Status::OK();
406403
}
407404

408-
if (s.ok() || s.IsShutdownInProgress()) {
405+
if (s.ok() || s.IsShutdownInProgress() || s.IsColumnFamilyDropped()) {
409406
// Sync on all distinct output directories.
410407
for (auto dir : distinct_output_dirs) {
411408
if (dir != nullptr) {
@@ -523,7 +520,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
523520

524521
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
525522
// it is not because of CF drop.
526-
if (!s.ok() && !s.IsShutdownInProgress()) {
523+
if (!s.ok() && !s.IsColumnFamilyDropped()) {
527524
// Have to cancel the flush jobs that have NOT executed because we need to
528525
// unref the versions.
529526
for (int i = 0; i != num_cfs; ++i) {
@@ -1052,7 +1049,7 @@ Status DBImpl::CompactFilesImpl(
10521049

10531050
if (status.ok()) {
10541051
// Done
1055-
} else if (status.IsShutdownInProgress()) {
1052+
} else if (status.IsColumnFamilyDropped()) {
10561053
// Ignore compaction errors found during shutting down
10571054
} else {
10581055
ROCKS_LOG_WARN(immutable_db_options_.info_log,
@@ -1697,7 +1694,10 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
16971694
cfd->GetName().c_str());
16981695
bg_cv_.Wait();
16991696
}
1700-
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
1697+
if (cfd->IsDropped()) {
1698+
return Status::ColumnFamilyDropped();
1699+
}
1700+
if (shutting_down_.load(std::memory_order_acquire)) {
17011701
return Status::ShutdownInProgress();
17021702
}
17031703

@@ -2159,7 +2159,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
21592159

21602160
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
21612161
&reason, thread_pri);
2162-
if (!s.ok() && !s.IsShutdownInProgress() &&
2162+
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
21632163
reason != FlushReason::kErrorRecovery) {
21642164
// Wait a little bit before retrying background flush in
21652165
// case this is an environmental problem and we do not want to
@@ -2184,7 +2184,8 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
21842184

21852185
// If flush failed, we want to delete all temporary files that we might have
21862186
// created. Thus, we force full scan in FindObsoleteFiles()
2187-
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
2187+
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2188+
!s.IsColumnFamilyDropped());
21882189
// delete unnecessary files if any, this is done outside the mutex
21892190
if (job_context.HaveSomethingToClean() ||
21902191
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
@@ -2248,7 +2249,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
22482249
mutex_.Unlock();
22492250
env_->SleepForMicroseconds(10000); // prevent hot loop
22502251
mutex_.Lock();
2251-
} else if (!s.ok() && !s.IsShutdownInProgress()) {
2252+
} else if (!s.ok() && !s.IsShutdownInProgress() &&
2253+
!s.IsColumnFamilyDropped()) {
22522254
// Wait a little bit before retrying background compaction in
22532255
// case this is an environmental problem and we do not want to
22542256
// chew up resources for failed compactions for the duration of
@@ -2272,7 +2274,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
22722274
// If compaction failed, we want to delete all temporary files that we might
22732275
// have created (they might not be all recorded in job_context in case of a
22742276
// failure). Thus, we force full scan in FindObsoleteFiles()
2275-
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
2277+
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2278+
!s.IsColumnFamilyDropped());
22762279
TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
22772280

22782281
// delete unnecessary files if any, this is done outside the mutex
@@ -2710,7 +2713,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
27102713

27112714
if (status.ok() || status.IsCompactionTooLarge()) {
27122715
// Done
2713-
} else if (status.IsShutdownInProgress()) {
2716+
} else if (status.IsColumnFamilyDropped()) {
27142717
// Ignore compaction errors found during shutting down
27152718
} else {
27162719
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",

db/flush_job.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,12 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
229229
// This will release and re-acquire the mutex.
230230
Status s = WriteLevel0Table();
231231

232-
if (s.ok() &&
233-
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
234-
s = Status::ShutdownInProgress(
235-
"Database shutdown or Column family drop during flush");
232+
if (s.ok() && cfd_->IsDropped()) {
233+
s = Status::ColumnFamilyDropped("Column family dropped during compaction");
234+
}
235+
if ((s.ok() || s.IsColumnFamilyDropped()) &&
236+
shutting_down_->load(std::memory_order_acquire)) {
237+
s = Status::ShutdownInProgress("Database shutdown");
236238
}
237239

238240
if (!s.ok()) {

db/memtable_list.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ Status InstallMemtableAtomicFlushResults(
598598
imm->InstallNewVersion();
599599
}
600600

601-
if (s.ok() || s.IsShutdownInProgress()) {
601+
if (s.ok() || s.IsColumnFamilyDropped()) {
602602
for (size_t i = 0; i != cfds.size(); ++i) {
603603
if (cfds[i]->IsDropped()) {
604604
continue;

db/version_set.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3842,16 +3842,14 @@ Status VersionSet::LogAndApply(
38423842
}
38433843
}
38443844
if (0 == num_undropped_cfds) {
3845-
// TODO (yanqin) maybe use a different status code to denote column family
3846-
// drop other than OK and ShutdownInProgress
38473845
for (int i = 0; i != num_cfds; ++i) {
38483846
manifest_writers_.pop_front();
38493847
}
38503848
// Notify new head of manifest write queue.
38513849
if (!manifest_writers_.empty()) {
38523850
manifest_writers_.front()->cv.Signal();
38533851
}
3854-
return Status::ShutdownInProgress();
3852+
return Status::ColumnFamilyDropped();
38553853
}
38563854

38573855
return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,

include/rocksdb/status.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ class Status {
5858
kBusy = 11,
5959
kExpired = 12,
6060
kTryAgain = 13,
61-
kCompactionTooLarge = 14
61+
kCompactionTooLarge = 14,
62+
kColumnFamilyDropped = 15,
63+
kMaxCode
6264
};
6365

6466
Code code() const { return code_; }
@@ -184,6 +186,15 @@ class Status {
184186
return Status(kCompactionTooLarge, msg, msg2);
185187
}
186188

189+
static Status ColumnFamilyDropped(SubCode msg = kNone) {
190+
return Status(kColumnFamilyDropped, msg);
191+
}
192+
193+
static Status ColumnFamilyDropped(const Slice& msg,
194+
const Slice& msg2 = Slice()) {
195+
return Status(kColumnFamilyDropped, msg, msg2);
196+
}
197+
187198
static Status NoSpace() { return Status(kIOError, kNoSpace); }
188199
static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) {
189200
return Status(kIOError, kNoSpace, msg, msg2);
@@ -256,6 +267,9 @@ class Status {
256267
// Returns true iff the status indicates the proposed compaction is too large
257268
bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; }
258269

270+
// Returns true iff the status indicates Column Family Dropped
271+
bool IsColumnFamilyDropped() const { return code() == kColumnFamilyDropped; }
272+
259273
// Returns true iff the status indicates a NoSpace error
260274
// This is caused by an I/O error returning the specific "out of space"
261275
// error condition. Stricto sensu, an NoSpace error is an I/O error

java/rocksjni/portal.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,8 @@ class StatusJni : public RocksDBNativeClass<rocksdb::Status*, StatusJni> {
467467
return 0xC;
468468
case rocksdb::Status::Code::kTryAgain:
469469
return 0xD;
470+
case rocksdb::Status::Code::kColumnFamilyDropped:
471+
return 0xE;
470472
default:
471473
return 0x7F; // undefined
472474
}
@@ -584,6 +586,12 @@ class StatusJni : public RocksDBNativeClass<rocksdb::Status*, StatusJni> {
584586
new rocksdb::Status(rocksdb::Status::TryAgain(
585587
rocksdb::SubCodeJni::toCppSubCode(jsub_code_value))));
586588
break;
589+
case 0xE:
590+
// ColumnFamilyDropped
591+
status = std::unique_ptr<rocksdb::Status>(
592+
new rocksdb::Status(rocksdb::Status::ColumnFamilyDropped(
593+
rocksdb::SubCodeJni::toCppSubCode(jsub_code_value))));
594+
break;
587595
case 0x7F:
588596
default:
589597
return nullptr;

0 commit comments

Comments
 (0)