Skip to content

Commit

Permalink
Add more debug info, fix update hole_punchable_block bug
Browse files Browse the repository at this point in the history
Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
v01dstar committed May 8, 2024
1 parent 10e03c8 commit 75824b3
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 20 deletions.
9 changes: 8 additions & 1 deletion src/blob_aligned_blocks_collector.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#include "blob_aligned_blocks_collector.h"

#include "base_db_listener.h"
#include "titan_logging.h"

namespace rocksdb {
namespace titandb {

TablePropertiesCollector*
BlobAlignedBlocksCollectorFactory::CreateTablePropertiesCollector(
rocksdb::TablePropertiesCollectorFactory::Context /* context */) {
return new BlobAlignedBlocksCollector();
return new BlobAlignedBlocksCollector(info_logger_);
}

const std::string BlobAlignedBlocksCollector::kPropertiesName =
Expand Down Expand Up @@ -74,6 +75,12 @@ Status BlobAlignedBlocksCollector::Finish(UserCollectedProperties* properties) {
if (aligned_blocks_.empty()) {
return Status::OK();
}
if (info_logger_ != nullptr) {
TITAN_LOG_INFO(
info_logger_,
"BlobAlignedBlocksCollector::Finish: aligned_blocks size %zu",
aligned_blocks_.size());
}

std::string res;
bool ok __attribute__((__unused__)) = Encode(aligned_blocks_, &res);
Expand Down
14 changes: 14 additions & 0 deletions src/blob_aligned_blocks_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ class BlobAlignedBlocksCollectorFactory final
TablePropertiesCollectorFactory::Context context) override;

const char* Name() const override { return "BlobAlignedBlocksCollector"; }

explicit BlobAlignedBlocksCollectorFactory(
std::shared_ptr<Logger> info_logger)
: info_logger_(info_logger) {}
BlobAlignedBlocksCollectorFactory(const BlobAlignedBlocksCollectorFactory&) =
delete;
BlobAlignedBlocksCollectorFactory& operator=(
const BlobAlignedBlocksCollectorFactory&) = delete;

std::shared_ptr<Logger> info_logger_;
};

class BlobAlignedBlocksCollector final : public TablePropertiesCollector {
Expand All @@ -43,8 +53,12 @@ class BlobAlignedBlocksCollector final : public TablePropertiesCollector {
}
const char* Name() const override { return "BlobAlignedBlocksCollector"; }

BlobAlignedBlocksCollector(std::shared_ptr<Logger> info_logger)
: info_logger_(info_logger) {}

private:
std::map<uint64_t, uint64_t> aligned_blocks_;
std::shared_ptr<Logger> info_logger_;
};

} // namespace titandb
Expand Down
2 changes: 1 addition & 1 deletion src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Status BlobGCJob::HolePunchSingleBlobFile(std::shared_ptr<BlobFileMeta> file) {
// to update the hole_punchable_blocks to reflect the actual value instead
// of resetting it to 0.
// TODO: test this case.
auto hole_punched_blocks = live_blocks - file->live_blocks();
auto hole_punched_blocks = file->live_blocks() - live_blocks;
auto new_blob_file = std::make_shared<BlobFileMeta>(
file->file_number(), file->file_size(), 0, 0, file->smallest_key(),
file->largest_key());
Expand Down
42 changes: 32 additions & 10 deletions src/blob_gc_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,15 @@ TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); }
TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); }

TEST_F(BlobGCJobTest, PunchHole) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"BlobGCJobTest::PunchHole:AfterCompact",
"TitanDBImpl::BackgroundCallGC:BeforeGCRunning"},
{"TitanDBImpl::BackgroundCallGC:AfterGCRunning",
"BlobGCJobTest::PunchHole:BeforeVerify"},
});
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"BlobGCJobTest::PunchHole:AfterCompact",
"TitanDBImpl::BackgroundCallGC:BeforeGCRunning"},
{"TitanDBImpl::BackgroundCallGC:AfterGCRunning",
"BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsQueued"},
{"BlobGCJobTest::PunchHole:AfterReleaseSnapshot",
"TitanDBImpl::BackgroundCallGC:BeforeRunScheduledPunchHoleGC"},
{"TitanDBImpl::BackgroundCallGC:AfterRunScheduledPunchHoleGC",
"BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsFinished"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

DisableMergeSmall();
Expand Down Expand Up @@ -322,17 +325,36 @@ TEST_F(BlobGCJobTest, PunchHole) {
}
Flush();
CompactAll();

files.clear();
b->ExportBlobFiles(files);
ASSERT_EQ(files.size(), 1);
ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 334);
ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 1000);

auto snapshot = db_->GetSnapshot();
db_->Put(WriteOptions(), GenKey(100000), GenValue(1));

TEST_SYNC_POINT("BlobGCJobTest::PunchHole:AfterCompact");
TEST_SYNC_POINT("BlobGCJobTest::PunchHole:BeforeVerify");
TEST_SYNC_POINT("BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsQueued");

files.clear();
b->ExportBlobFiles(files);
ASSERT_EQ(files.size(), 1);
ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 334);
ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 1000);

db_->ReleaseSnapshot(snapshot);
TEST_SYNC_POINT("BlobGCJobTest::PunchHole:AfterReleaseSnapshot");
TEST_SYNC_POINT("BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsFinished");

files.clear();
b->ExportBlobFiles(files);
ASSERT_EQ(files.size(), 1);
auto post_punch_hole_file_size = files.begin()->second.lock()->file_size();
auto post_punch_hole_live_blocks =
files.begin()->second.lock()->live_blocks();
ASSERT_EQ(post_punch_hole_file_size, file_size);
ASSERT_LT(post_punch_hole_live_blocks, live_blocks);
ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 666);
ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 0);
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 3 == 0) {
std::string value;
Expand Down
15 changes: 13 additions & 2 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,
cf_opts.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>());
cf_opts.table_properties_collector_factories.emplace_back(
std::make_shared<BlobAlignedBlocksCollectorFactory>());
std::make_shared<BlobAlignedBlocksCollectorFactory>(
db_options_.info_log));
titan_table_factories.push_back(std::make_shared<TitanTableFactory>(
db_options_, desc.options, blob_manager_, &mutex_, blob_file_set_.get(),
stats_.get()));
Expand Down Expand Up @@ -481,7 +482,8 @@ Status TitanDBImpl::CreateColumnFamilies(
options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>());
options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobAlignedBlocksCollectorFactory>());
std::make_shared<BlobAlignedBlocksCollectorFactory>(
db_options_.info_log));
if (options.compaction_filter != nullptr ||
options.compaction_filter_factory != nullptr) {
std::shared_ptr<TitanCompactionFilterFactory> titan_cf_factory =
Expand Down Expand Up @@ -1413,7 +1415,16 @@ void TitanDBImpl::OnCompactionCompleted(
cf_options.num_levels - 1 == compaction_job_info.output_level;
bool has_live_blocks_diff = !hole_punchable_blocks_diff.empty();
if (has_live_blocks_diff) {
TITAN_LOG_INFO(db_options_.info_log,
"OnCompactionCompleted[%d]: blob_file_size_diff.size=%zu, "
"hole_punchable_blocks_diff.size=%zu",
compaction_job_info.job_id, blob_file_size_diff.size(),
hole_punchable_blocks_diff.size());
assert(hole_punchable_blocks_diff.size() == blob_file_size_diff.size());
} else {
TITAN_LOG_INFO(db_options_.info_log,
"OnCompactionCompleted[%d]: blob_file_size_diff.size=%zu",
compaction_job_info.job_id, blob_file_size_diff.size());
}

for (const auto& file_diff : blob_file_size_diff) {
Expand Down
43 changes: 37 additions & 6 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ Status TitanDBImpl::AsyncInitializeGC(

std::map<uint64_t, int64_t> blob_file_size_diff;
std::map<uint64_t, int64_t>
_hole_punchable_blocks_diff; // Not used, this is not required while
hole_punchable_blocks_diffs; // Not used, this is not required while
// initializing GC. The initial state of
// punch hole GC is determined by
// BlobFileMeta (in MANIFEST).
for (auto& file : collection) {
s = ExtractGCStatsFromTableProperty(file.second, true /*to_add*/,
&blob_file_size_diff,
&_hole_punchable_blocks_diff);
&hole_punchable_blocks_diffs);
if (!s.ok()) {
MutexLock l(&mutex_);
this->SetBGError(s);
Expand All @@ -165,6 +165,18 @@ Status TitanDBImpl::AsyncInitializeGC(
file->UpdateLiveDataSize(file_size.second);
}
}
for (auto& file_blocks : hole_punchable_blocks_diffs) {
assert(file_blocks.second < 0);
std::shared_ptr<BlobFileMeta> file =
blob_storage->FindFile(file_blocks.first).lock();
if (file != nullptr) {
if (uint64_t(-file_blocks.second) <= file->live_blocks()) {
uint64_t hole_punchable_blocks_diff =
file->live_blocks() + file_blocks.second;
file->UpdateHolePunchableBlocks(hole_punchable_blocks_diff);
}
}
}
blob_storage->InitializeAllFiles();
TITAN_LOG_INFO(db_options_.info_log,
"Titan finish async GC initialization on cf [%s]",
Expand Down Expand Up @@ -205,6 +217,7 @@ void TitanDBImpl::MaybeScheduleGC() {
while ((!gc_queue_.empty() ||
(scheduled_punch_hole_gc_ != nullptr && !punch_hole_gc_running_)) &&
bg_gc_scheduled_ < db_options_.max_background_gc) {
TITAN_LOG_INFO(db_options_.info_log, "Titan schedule GC");
bg_gc_scheduled_++;
thread_pool_->SubmitJob(std::bind(&TitanDBImpl::BGWorkGC, this));
}
Expand All @@ -215,6 +228,10 @@ void TitanDBImpl::BGWorkGC(void* db) {
}

void TitanDBImpl::BackgroundCallGC() {
TITAN_LOG_INFO(
db_options_.info_log,
"Titan background GC thread start, is punch hole gc running %d",
punch_hole_gc_running_);
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");
{
MutexLock l(&mutex_);
Expand All @@ -236,12 +253,20 @@ void TitanDBImpl::BackgroundCallGC() {
scheduled_punch_hole_gc_.reset();
} else if (scheduled_punch_hole_gc_->snapshot()->GetSequenceNumber() ==
GetOldestSnapshotSequence()) {
TEST_SYNC_POINT(
"TitanDBImpl::BackgroundCallGC:BeforeRunScheduledPunchHoleGC");
TITAN_LOG_INFO(db_options_.info_log,
"Titan start scheduled punch hole GC directly");
std::unique_ptr<BlobGC> blob_gc = std::move(scheduled_punch_hole_gc_);
auto cfh = db_impl_->GetColumnFamilyHandleUnlocked(blob_gc->cf_id());
blob_gc->SetColumnFamily(cfh.get());
punch_hole_gc_running_ = true;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
db_options_.info_log.get());
BackgroundGC(&log_buffer, blob_gc.get());
punch_hole_gc_running_ = false;
TEST_SYNC_POINT(
"TitanDBImpl::BackgroundCallGC:AfterRunScheduledPunchHoleGC");
{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
Expand All @@ -267,32 +292,38 @@ void TitanDBImpl::BackgroundCallGC() {
}
}
if (found_non_obsolete_cf) {
std::unique_ptr<ColumnFamilyHandle> cfh;
std::shared_ptr<BlobStorage> blob_storage =
blob_file_set_->GetBlobStorage(cf_id).lock();
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
cf_id, stats_.get());
TITAN_LOG_INFO(db_options_.info_log,
"Titan picking candidate files for GC");
auto blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get(),
!punch_hole_gc_running_);
if (blob_gc != nullptr) {
assert(!blob_gc->use_punch_hole() || !punch_hole_gc_running_);
if (blob_gc->use_punch_hole()) {
TITAN_LOG_INFO(db_options_.info_log,
"Titan picked punch hole GC");
auto snapshot = db_->GetSnapshot();
blob_gc->SetSnapshot(snapshot);
}
cfh = db_impl_->GetColumnFamilyHandleUnlocked(cf_id);
blob_gc->SetColumnFamily(cfh.get());
if (blob_gc->use_punch_hole() &&
blob_gc->snapshot()->GetSequenceNumber() >
GetOldestSnapshotSequence()) {
TITAN_LOG_INFO(db_options_.info_log,
"Titan schedule punch hole GC");
scheduled_punch_hole_gc_ = std::move(blob_gc);
} else {
if (blob_gc->use_punch_hole()) {
punch_hole_gc_running_ = true;
}
auto cfh = db_impl_->GetColumnFamilyHandleUnlocked(cf_id);
blob_gc->SetColumnFamily(cfh.get());
TITAN_LOG_INFO(db_options_.info_log, "Titan start GC directly");
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
db_options_.info_log.get());
BackgroundGC(&log_buffer, blob_gc.get());
Expand Down Expand Up @@ -340,7 +371,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, BlobGC* blob_gc) {
TITAN_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
TITAN_LOG_BUFFER(log_buffer, "Titan GC start, using punch hole: %s",
blob_gc->use_punch_hole());
blob_gc->use_punch_hole() ? "true" : "false");
StopWatch gc_sw(env_->GetSystemClock().get(), statistics(stats_.get()),
TITAN_GC_MICROS);
BlobGCJob blob_gc_job(blob_gc, db_, &mutex_, db_options_, env_,
Expand Down

0 comments on commit 75824b3

Please sign in to comment.