Skip to content

Commit

Permalink
Universal test for RangeTombstoneSnapshotMigrateFromLast, refactor (#…
Browse files Browse the repository at this point in the history
…13244)

Summary:
* Expand RangeTombstoneSnapshotMigrateFromLast in tiered_compaction_test (originally from #13124) to reproduce a failure in universal compaciton (as well as leveled), when a specific part of the test is uncommented.
* Small refactoring to eliminate unnecessary fields in SubcompactionState. Adding a bool parameter to SubcompactionState::AddToOutput here will make more sense in the next PR (which I'm trying to keep
from getting too big).
* Improve debuggability and performance of some other tests
* Remove accidentally committed test "BlahPrecludeLastLevel" which was a temporary copy of CompactionServiceTest.PrecludeLastLevel

Pull Request resolved: #13244

Test Plan: existing tests, updated/expanded tests

Reviewed By: cbi42

Differential Revision: D67605076

Pulled By: pdillinger

fbshipit-source-id: 9be83c2173f77545b5fe17ff9dc67db497c7afc9
  • Loading branch information
pdillinger authored and facebook-github-bot committed Dec 23, 2024
1 parent 18cecb9 commit 30d5162
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 89 deletions.
6 changes: 4 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// and `close_file_func`.
// TODO: it would be better to have the compaction file open/close moved
// into `CompactionOutputs` which has the output file information.
status = sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func);
status =
sub_compact->AddToOutput(*c_iter, c_iter->output_to_penultimate_level(),
open_file_func, close_file_func);
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1882,7 +1884,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
// enabled and applicable
if (last_level_temp != Temperature::kUnknown &&
sub_compact->compaction->is_last_level() &&
!sub_compact->IsCurrentPenultimateLevel()) {
!outputs.IsPenultimateLevel()) {
temperature = last_level_temp;
}
fo_copy.temperature = temperature;
Expand Down
21 changes: 8 additions & 13 deletions db/compaction/subcompaction_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace ROCKSDB_NAMESPACE {
void SubcompactionState::AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& compaction_stats) const {
compaction_stats.stats.Add(compaction_outputs_.stats_);
if (HasPenultimateLevelOutputs()) {
if (penultimate_level_outputs_.HasOutput() || HasRangeDel()) {
compaction_stats.has_penultimate_level_output = true;
compaction_stats.penultimate_level_stats.Add(
penultimate_level_outputs_.stats_);
Expand Down Expand Up @@ -52,7 +52,7 @@ void SubcompactionState::Cleanup(Cache* cache) {
}

Slice SubcompactionState::SmallestUserKey() const {
if (has_penultimate_level_outputs_) {
if (penultimate_level_outputs_.HasOutput()) {
Slice a = compaction_outputs_.SmallestUserKey();
Slice b = penultimate_level_outputs_.SmallestUserKey();
if (a.empty()) {
Expand All @@ -74,7 +74,7 @@ Slice SubcompactionState::SmallestUserKey() const {
}

Slice SubcompactionState::LargestUserKey() const {
if (has_penultimate_level_outputs_) {
if (penultimate_level_outputs_.HasOutput()) {
Slice a = compaction_outputs_.LargestUserKey();
Slice b = penultimate_level_outputs_.LargestUserKey();
if (a.empty()) {
Expand All @@ -96,18 +96,13 @@ Slice SubcompactionState::LargestUserKey() const {
}

Status SubcompactionState::AddToOutput(
const CompactionIterator& iter,
const CompactionIterator& iter, bool use_penultimate_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// update target output first
is_current_penultimate_level_ = iter.output_to_penultimate_level();
current_outputs_ = is_current_penultimate_level_ ? &penultimate_level_outputs_
: &compaction_outputs_;
if (is_current_penultimate_level_) {
has_penultimate_level_outputs_ = true;
}

return Current().AddToOutput(iter, open_file_func, close_file_func);
// update target output
current_outputs_ = use_penultimate_output ? &penultimate_level_outputs_
: &compaction_outputs_;
return current_outputs_->AddToOutput(iter, open_file_func, close_file_func);
}

} // namespace ROCKSDB_NAMESPACE
22 changes: 5 additions & 17 deletions db/compaction/subcompaction_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,11 @@ class SubcompactionState {
sub_job_id(state.sub_job_id),
compaction_outputs_(std::move(state.compaction_outputs_)),
penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
is_current_penultimate_level_(state.is_current_penultimate_level_),
has_penultimate_level_outputs_(state.has_penultimate_level_outputs_),
range_del_agg_(std::move(state.range_del_agg_)) {
current_outputs_ = is_current_penultimate_level_
? &penultimate_level_outputs_
: &compaction_outputs_;
}

bool HasPenultimateLevelOutputs() const {
return has_penultimate_level_outputs_ || HasRangeDel();
}

bool IsCurrentPenultimateLevel() const {
return is_current_penultimate_level_;
current_outputs_ =
state.current_outputs_ == &state.penultimate_level_outputs_
? &penultimate_level_outputs_
: &compaction_outputs_;
}

// Add all the new files from this compaction to version_edit
Expand Down Expand Up @@ -195,6 +186,7 @@ class SubcompactionState {

// Add compaction_iterator key/value to the `Current` output group.
Status AddToOutput(const CompactionIterator& iter,
bool use_penultimate_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func);

Expand All @@ -207,11 +199,9 @@ class SubcompactionState {
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
// CloseOutput() may open new compaction output files.
is_current_penultimate_level_ = true;
Status s = penultimate_level_outputs_.CloseOutput(
curr_status, per_key ? range_del_agg_.get() : nullptr, open_file_func,
close_file_func);
is_current_penultimate_level_ = false;
s = compaction_outputs_.CloseOutput(
s, per_key ? nullptr : range_del_agg_.get(), open_file_func,
close_file_func);
Expand All @@ -223,8 +213,6 @@ class SubcompactionState {
CompactionOutputs compaction_outputs_;
CompactionOutputs penultimate_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
bool is_current_penultimate_level_ = false;
bool has_penultimate_level_outputs_ = false;
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
};

Expand Down
115 changes: 58 additions & 57 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,41 +177,6 @@ class TieredCompactionTest : public DBTestBase {
}
};

TEST_F(TieredCompactionTest, BlahPrecludeLastLevel) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;

Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.last_level_temperature = Temperature::kCold;
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 10;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// ReopenWithCompactionService(&options);

// This is simpler than setting up mock time to make the user option work.
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
[&](void* arg) { *static_cast<SequenceNumber*>(arg) = 100; });
SyncPoint::GetInstance()->EnableProcessing();

for (int i = 0; i < kNumTrigger - 1; i++) {
for (int j = 0; j < kNumKeys; j++) {
ASSERT_OK(Put(Key(i * 10 + j), "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
// ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));

// Data split between penultimate (kUnknown) and last (kCold) levels
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
}

TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
Expand Down Expand Up @@ -474,12 +439,12 @@ TEST_F(TieredCompactionTest, RangeBasedTieredStorageUniversal) {
{
MutexLock l(&mutex);
hot_start = Key(1);
hot_end = Key(1000);
hot_end = Key(300);
}

// generate files just enough to trigger compaction
for (int i = 0; i < kNumTrigger - 1; i++) {
for (int j = 0; j < 1000; j++) {
for (int j = 0; j < 300; j++) {
ASSERT_OK(Put(Key(j), "value" + std::to_string(j)));
}
ASSERT_OK(Flush());
Expand Down Expand Up @@ -1651,7 +1616,8 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) {

ASSERT_EQ("0,0,0,0,0,2,2", FilesPerLevel());

auto VerifyLogicalState = [&]() {
auto VerifyLogicalState = [&](int line) {
SCOPED_TRACE("Called from line " + std::to_string(line));
// First with snapshot
ASSERT_EQ("val2", Get(Key(2), snapshot));
ASSERT_EQ("val3", Get(Key(3), snapshot));
Expand All @@ -1665,7 +1631,7 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) {
ASSERT_EQ("val100", Get(Key(100)));
};

VerifyLogicalState();
VerifyLogicalState(__LINE__);

// Try to compact keys up
CompactRangeOptions cro;
Expand All @@ -1676,24 +1642,40 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) {
// type:1 vs. file #15 smallest key: '6B6579303030303033' seq:104, type:1
ASSERT_OK(CompactRange(cro, Key(1), Key(2)));

VerifyLogicalState();
VerifyLogicalState(__LINE__);

db_->ReleaseSnapshot(snapshot);
Close();
}

TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {
INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest,
::testing::Bool());

class PrecludeWithCompactStyleTest : public PrecludeLastLevelTestBase,
public testing::WithParamInterface<bool> {
public:
void ApplyConfigChange(
Options* options,
const std::unordered_map<std::string, std::string>& config_change,
const std::unordered_map<std::string, std::string>& db_config_change =
{}) {
// Depends on dynamic config change while holding a snapshot
ApplyConfigChangeImpl(true /*dynamic*/, options, config_change,
db_config_change);
}
};

TEST_P(PrecludeWithCompactStyleTest, RangeTombstoneSnapshotMigrateFromLast) {
// Reproducer for issue originally described in
// https://github.com/facebook/rocksdb/pull/9964/files#r1024449523
if (!UseDynamicConfig()) {
// Depends on config change while holding a snapshot
return;
}
const bool universal = GetParam();
const int kNumLevels = 7;
auto options = CurrentOptions();
options.env = mock_env_.get();
options.last_level_temperature = Temperature::kCold;
options.level_compaction_dynamic_level_bytes = true;
options.compaction_style =
universal ? kCompactionStyleUniversal : kCompactionStyleLevel;
options.compaction_options_universal.allow_trivial_move = true;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
options.max_subcompactions = 10;
Expand All @@ -1713,7 +1695,13 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {
ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), Key(1), Key(5)));
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);

// Send to last level
if (universal) {
ASSERT_OK(CompactRange({}, {}, {}));
} else {
MoveFilesToLevel(6);
}
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());

ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}});
Expand All @@ -1732,10 +1720,18 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });

ASSERT_OK(Flush());
MoveFilesToLevel(5);

// Send three files to next-to-last level (if explicitly needed)
if (universal) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
} else {
MoveFilesToLevel(5);
}

ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());

auto VerifyLogicalState = [&]() {
auto VerifyLogicalState = [&](int line) {
SCOPED_TRACE("Called from line " + std::to_string(line));
// First with snapshot
if (snapshot) {
ASSERT_EQ("NOT_FOUND", Get(Key(0), snapshot));
Expand All @@ -1755,21 +1751,26 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {
ASSERT_EQ("val7", Get(Key(7)));
};

VerifyLogicalState();
VerifyLogicalState(__LINE__);

// Try a limited range compaction
// FIXME: this currently hits the "Unsafe to store Seq later than snapshot"
// FIXME: these currently hit the "Unsafe to store Seq later than snapshot"
// error. Needs to work safely for preclude option to be user mutable.
// ASSERT_OK(CompactRange({}, Key(3), Key(4)));
if (universal) {
// uint64_t middle_l5 = GetLevelFileMetadatas(5)[1]->fd.GetNumber();
// ASSERT_OK(db_->CompactFiles({}, {MakeTableFileName(middle_l5)}, 6));
} else {
// ASSERT_OK(CompactRange({}, Key(3), Key(4)));
}
EXPECT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
VerifyLogicalState();
VerifyLogicalState(__LINE__);

// Compact everything, but some data still goes to both penultimate and last
// levels. A full-range compaction should be safe to "migrate" data from the
// last level to penultimate (because of preclude setting change).
ASSERT_OK(CompactRange({}, {}, {}));
EXPECT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
VerifyLogicalState();
VerifyLogicalState(__LINE__);
// Key1 should have been migrated out of the last level
auto& meta = *GetLevelFileMetadatas(6)[0];
ASSERT_LT(Key(1), meta.smallest.user_key().ToString());
Expand All @@ -1781,13 +1782,13 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {

ASSERT_OK(CompactRange({}, {}, {}));
EXPECT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
VerifyLogicalState();
VerifyLogicalState(__LINE__);

Close();
}

INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest,
::testing::Bool());
INSTANTIATE_TEST_CASE_P(PrecludeWithCompactStyleTest,
PrecludeWithCompactStyleTest, ::testing::Bool());

class TimedPutPrecludeLastLevelTest
: public PrecludeLastLevelTestBase,
Expand Down

0 comments on commit 30d5162

Please sign in to comment.