Skip to content

Commit

Permalink
Save progress
Browse files Browse the repository at this point in the history
Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
v01dstar committed Apr 5, 2024
1 parent 05aa72b commit 94af934
Show file tree
Hide file tree
Showing 20 changed files with 587 additions and 199 deletions.
10 changes: 9 additions & 1 deletion src/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void BlobFileBuilder::Add(const BlobRecord& record,
} else {
encoder_.EncodeRecord(record);
WriteEncoderData(&ctx->new_blob_index.blob_handle);
FillBlockWithPadding();
out_ctx->emplace_back(std::move(ctx));
}

Expand Down Expand Up @@ -143,15 +144,22 @@ void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) {
handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size;
if (alignment_size_ > 0) {
live_blocks_ += handle->size / alignment_size_ +
(handle->size % alignment_size_ ? 1 : 0);
}

status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
status_ = file_->Append(encoder_.GetRecord());
num_entries_++;
if (ok()) {
FillBlockWithPadding();
}
}
}

void BlobFileBuilder::FillFSBlockWithPadding() {
void BlobFileBuilder::FillBlockWithPadding() {
if (alignment_size_ == 0) {
return;
}
Expand Down
3 changes: 2 additions & 1 deletion src/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class BlobFileBuilder {
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void FlushSampleRecords(OutContexts* out_ctx);
void WriteEncoderData(BlobHandle* handle);
void FillFSBlockWithPadding();
void FillBlockWithPadding();

TitanCFOptions cf_options_;
WritableFileWriter* file_;
Expand All @@ -143,6 +143,7 @@ class BlobFileBuilder {
std::string smallest_key_;
std::string largest_key_;
uint64_t live_data_size_ = 0;
uint64_t live_blocks_ = 0;

uint64_t alignment_size_ = 0;
};
Expand Down
6 changes: 6 additions & 0 deletions src/blob_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class BlobFileManager {
(void)handles;
return Status::OK();
}

virtual Status BatchUpdateFiles(
const std::vector<std::shared_ptr<BlobFileMeta>>& files) {
(void)files;
return Status::OK();
}
};

} // namespace titandb
Expand Down
37 changes: 36 additions & 1 deletion src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ void BlobFileMeta::EncodeTo(std::string* dst) const {
PutVarint32(dst, file_level_);
PutLengthPrefixedSlice(dst, smallest_key_);
PutLengthPrefixedSlice(dst, largest_key_);
PutVarint64(dst, alignment_size_);
PutVarint64(dst, live_blocks_);
PutVarint64(dst, hole_punchable_blocks_);
}

Status BlobFileMeta::DecodeFromLegacy(Slice* src) {
Expand Down Expand Up @@ -171,11 +174,39 @@ Status BlobFileMeta::DecodeFrom(Slice* src) {
return Status::OK();
}

Status BlobFileMeta::DecodeFromV3(Slice* src) {
if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_) ||
!GetVarint64(src, &file_entries_) || !GetVarint32(src, &file_level_)) {
return Status::Corruption("BlobFileMeta decode failed");
}
Slice str;
if (GetLengthPrefixedSlice(src, &str)) {
smallest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobSmallestKey Decode failed");
}
if (GetLengthPrefixedSlice(src, &str)) {
largest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobLargestKey decode failed");
}
uint64_t alignment_size, live_blocks, hole_punchable_blocks;
if (!GetVarint64(src, &alignment_size) || !GetVarint64(src, &live_blocks) ||
!GetVarint64(src, &hole_punchable_blocks)) {
return Status::Corruption("BlobFileMeta decode failed");
}
alignment_size_ = alignment_size;
live_blocks_.store(live_blocks);
hole_punchable_blocks_.store(hole_punchable_blocks);
return Status::OK();
}

bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs) {
return (lhs.file_number_ == rhs.file_number_ &&
lhs.file_size_ == rhs.file_size_ &&
lhs.file_entries_ == rhs.file_entries_ &&
lhs.file_level_ == rhs.file_level_);
lhs.file_level_ == rhs.file_level_ &&
lhs.live_blocks_.load() == rhs.live_blocks_.load());
}

void BlobFileMeta::FileStateTransit(const FileEvent& event) {
Expand Down Expand Up @@ -234,6 +265,10 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
assert(state_ == FileState::kNormal);
state_ = FileState::kToMerge;
break;
case FileEvent::kPunchHoleOutput:
assert(state_ == FileState::kBeingGC);
state_ = FileState::kNormal;
break;
case FileEvent::kReset:
state_ = FileState::kNormal;
break;
Expand Down
39 changes: 36 additions & 3 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "rocksdb/status.h"
#include "table/format.h"

#include "blob_gc.h"
#include "util.h"

namespace rocksdb {
Expand Down Expand Up @@ -210,6 +211,7 @@ class BlobFileMeta {
kFlushOrCompactionOutput,
kDelete,
kNeedMerge,
kPunchHoleOutput,
kReset, // reset file to normal for test
};

Expand All @@ -229,29 +231,43 @@ class BlobFileMeta {
BlobFileMeta(uint64_t _file_number, uint64_t _file_size,
uint64_t _file_entries, uint32_t _file_level,
const std::string& _smallest_key,
const std::string& _largest_key)
const std::string& _largest_key, uint64_t _alignment_size,
uint64_t _live_blocks)
: file_number_(_file_number),
file_size_(_file_size),
file_entries_(_file_entries),
file_level_(_file_level),
smallest_key_(_smallest_key),
largest_key_(_largest_key) {}
largest_key_(_largest_key),
alignment_size_(_alignment_size),
live_blocks_(_live_blocks),
hole_punchable_blocks_(0) {}

friend bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs);

void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
Status DecodeFromLegacy(Slice* src);
Status DecodeFromV3(Slice* src);

void set_live_data_size(uint64_t size) { live_data_size_ = size; }
void set_live_blocks(uint64_t size) { live_blocks_ = size; }

uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }
const uint64_t live_blocks() const { return live_blocks_; }
const uint64_t hole_punchable_blocks() const {
return hole_punchable_blocks_;
}

const uint64_t alignment_size() const { return alignment_size_; }

void set_live_data_size(int64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint64_t set_file_entries(uint64_t size) { file_entries_ = size; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }

Expand All @@ -273,6 +289,19 @@ class BlobFileMeta {
return 1 - (static_cast<double>(live_data_size_) /
(file_size_ - kBlobMaxHeaderSize - kBlobFooterSize));
}

PunchHoleScore GetPunchHoleScore() const {
PunchHoleScore score;
score.file_number = file_number_;
score.file_size = file_size_;
score.live_blocks = live_blocks_;
score.hole_punchable_blocks = hole_punchable_blocks_;
return score;
}

void set_hole_punchable_blocks(uint64_t size) {
hole_punchable_blocks_ = size;
}
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
void Dump(bool with_keys) const;

Expand Down Expand Up @@ -302,6 +331,10 @@ class BlobFileMeta {
// positive number if any later compaction is trigger before previous
// `OnCompactionCompleted()` is called.
std::atomic<int64_t> live_data_size_{0};

uint64_t alignment_size_{0};
std::atomic<uint64_t> live_blocks_{0};
std::atomic<uint64_t> hole_punchable_blocks_{0};
std::atomic<FileState> state_{FileState::kNone};
};

Expand Down
2 changes: 1 addition & 1 deletion src/blob_format_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST(BlobFormatTest, BlobIndex) {
}

TEST(BlobFormatTest, BlobFileMeta) {
BlobFileMeta input(2, 3, 0, 0, "0", "9");
BlobFileMeta input(2, 3, 0, 0, "0", "9", 0, 0);
CheckCodec(input);
}

Expand Down
21 changes: 18 additions & 3 deletions src/blob_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ namespace rocksdb {
namespace titandb {

BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
TitanCFOptions&& _titan_cf_options, bool need_trigger_next,
uint64_t cf_id, bool punch_hole)
: inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
trigger_next_(need_trigger_next),
cf_id_(cf_id),
use_punch_hole_(punch_hole) {
MarkFilesBeingGC();
}

BlobGC::~BlobGC() {}
BlobGC::~BlobGC() {
// Release snapshot requires db pointer, so we can't release it internally.
// In case the caller forgets to release the snapshot, we assert here, prefer
// to crash in the runtime than leak.
assert(snapshot_ == nullptr);
}

void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) { cfh_ = cfh; }

Expand Down Expand Up @@ -40,5 +48,12 @@ void BlobGC::ReleaseGcFiles() {
}
}

void BlobGC::ReleaseSnapshot(DB* db) {
if (snapshot_ != nullptr) {
db->ReleaseSnapshot(snapshot_);
snapshot_ = nullptr;
}
}

} // namespace titandb
} // namespace rocksdb
25 changes: 24 additions & 1 deletion src/blob_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace titandb {
class BlobGC {
public:
BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
TitanCFOptions&& _titan_cf_options, bool need_trigger_next,
uint64_t cf_id, bool punch_hole = false);

// No copying allowed
BlobGC(const BlobGC&) = delete;
Expand All @@ -38,21 +39,43 @@ class BlobGC {

void ReleaseGcFiles();

uint64_t cf_id() { return cf_id_; }

const Snapshot* snapshot() {
assert(use_punch_hole_);
assert(snapshot_ != nullptr);
return snapshot_;
}
void SetSnapshot(const Snapshot* snapshot) { snapshot_ = snapshot; }
void ReleaseSnapshot(DB* db);

bool use_punch_hole() { return use_punch_hole_; }

bool trigger_next() { return trigger_next_; }

private:
std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
uint64_t cf_id_;
ColumnFamilyHandle* cfh_{nullptr};
// Whether need to trigger gc after this gc or not
const bool trigger_next_;
const bool use_punch_hole_;
const Snapshot* snapshot_{nullptr};
};

struct GCScore {
uint64_t file_number;
double score;
};

struct PunchHoleScore {
uint64_t file_number;
uint64_t file_size;
uint64_t live_blocks;
uint64_t hole_punchable_blocks;
};

} // namespace titandb
} // namespace rocksdb
Loading

0 comments on commit 94af934

Please sign in to comment.