Skip to content

Commit

Permalink
Save progress
Browse files Browse the repository at this point in the history
Signed-off-by: v01dstar <[email protected]>
  • Loading branch information
v01dstar committed Apr 12, 2024
1 parent 05aa72b commit 850f50c
Show file tree
Hide file tree
Showing 22 changed files with 637 additions and 204 deletions.
15 changes: 13 additions & 2 deletions src/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options,
return;
#endif
}
// alignment_size_ = cf_options_.alignment_size;
alignment_size_ = 4 * 1024;
WriteHeader();
}

Expand Down Expand Up @@ -68,6 +70,7 @@ void BlobFileBuilder::Add(const BlobRecord& record,
} else {
encoder_.EncodeRecord(record);
WriteEncoderData(&ctx->new_blob_index.blob_handle);
FillBlockWithPadding();
out_ctx->emplace_back(std::move(ctx));
}

Expand Down Expand Up @@ -143,15 +146,22 @@ void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) {
handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size;
if (alignment_size_ > 0) {
live_blocks_ += handle->size / alignment_size_ +
(handle->size % alignment_size_ ? 1 : 0);
}

status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
status_ = file_->Append(encoder_.GetRecord());
num_entries_++;
if (ok()) {
FillBlockWithPadding();
}
}
}

void BlobFileBuilder::FillFSBlockWithPadding() {
void BlobFileBuilder::FillBlockWithPadding() {
if (alignment_size_ == 0) {
return;
}
Expand Down Expand Up @@ -211,13 +221,14 @@ Status BlobFileBuilder::Finish(OutContexts* out_ctx) {
BlobFileFooter footer;
// if has compression dictionary, encode it into meta blocks
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
assert(blob_file_version_ == BlobFileHeader::kVersion2);
assert(blob_file_version_ >= BlobFileHeader::kVersion2);
BlockHandle meta_index_handle;
MetaIndexBuilder meta_index_builder;
WriteCompressionDictBlock(&meta_index_builder);
WriteRawBlock(meta_index_builder.Finish(), &meta_index_handle);
footer.meta_index_handle = meta_index_handle;
}
footer.alignment_size = alignment_size_;

std::string buffer;
footer.EncodeTo(&buffer);
Expand Down
5 changes: 3 additions & 2 deletions src/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class BlobFileBuilder {
// caller to sync and close the file after calling Finish().
BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, WritableFileWriter* file,
uint32_t blob_file_version = BlobFileHeader::kVersion2);
uint32_t blob_file_version = BlobFileHeader::kVersion3);

// Tries to add the record to the file
// Notice:
Expand Down Expand Up @@ -123,7 +123,7 @@ class BlobFileBuilder {
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void FlushSampleRecords(OutContexts* out_ctx);
void WriteEncoderData(BlobHandle* handle);
void FillFSBlockWithPadding();
void FillBlockWithPadding();

TitanCFOptions cf_options_;
WritableFileWriter* file_;
Expand All @@ -143,6 +143,7 @@ class BlobFileBuilder {
std::string smallest_key_;
std::string largest_key_;
uint64_t live_data_size_ = 0;
uint64_t live_blocks_ = 0;

uint64_t alignment_size_ = 0;
};
Expand Down
1 change: 1 addition & 0 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ void BlobFileIterator::PrefetchAndGet() {
// If the record is valid (not punch-holed), we can return. Otherwise,
// continue iterating until we find a valid record.
if (live) return;
iterate_offset_ += alignment_size_;
}
valid_ = false;
}
Expand Down
6 changes: 6 additions & 0 deletions src/blob_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class BlobFileManager {
(void)handles;
return Status::OK();
}

virtual Status BatchUpdateFiles(
const std::vector<std::shared_ptr<BlobFileMeta>>& files) {
(void)files;
return Status::OK();
}
};

} // namespace titandb
Expand Down
37 changes: 36 additions & 1 deletion src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ void BlobFileMeta::EncodeTo(std::string* dst) const {
PutVarint32(dst, file_level_);
PutLengthPrefixedSlice(dst, smallest_key_);
PutLengthPrefixedSlice(dst, largest_key_);
PutVarint64(dst, alignment_size_);
PutVarint64(dst, live_blocks_);
PutVarint64(dst, hole_punchable_blocks_);
}

Status BlobFileMeta::DecodeFromLegacy(Slice* src) {
Expand Down Expand Up @@ -171,11 +174,39 @@ Status BlobFileMeta::DecodeFrom(Slice* src) {
return Status::OK();
}

Status BlobFileMeta::DecodeFromV3(Slice* src) {
if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_) ||
!GetVarint64(src, &file_entries_) || !GetVarint32(src, &file_level_)) {
return Status::Corruption("BlobFileMeta decode failed");
}
Slice str;
if (GetLengthPrefixedSlice(src, &str)) {
smallest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobSmallestKey Decode failed");
}
if (GetLengthPrefixedSlice(src, &str)) {
largest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobLargestKey decode failed");
}
uint64_t alignment_size, live_blocks, hole_punchable_blocks;
if (!GetVarint64(src, &alignment_size) || !GetVarint64(src, &live_blocks) ||
!GetVarint64(src, &hole_punchable_blocks)) {
return Status::Corruption("BlobFileMeta decode failed");
}
alignment_size_ = alignment_size;
live_blocks_.store(live_blocks);
hole_punchable_blocks_.store(hole_punchable_blocks);
return Status::OK();
}

bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs) {
return (lhs.file_number_ == rhs.file_number_ &&
lhs.file_size_ == rhs.file_size_ &&
lhs.file_entries_ == rhs.file_entries_ &&
lhs.file_level_ == rhs.file_level_);
lhs.file_level_ == rhs.file_level_ &&
lhs.live_blocks_.load() == rhs.live_blocks_.load());
}

void BlobFileMeta::FileStateTransit(const FileEvent& event) {
Expand Down Expand Up @@ -234,6 +265,10 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
assert(state_ == FileState::kNormal);
state_ = FileState::kToMerge;
break;
case FileEvent::kPunchHoleOutput:
assert(state_ == FileState::kBeingGC);
state_ = FileState::kNormal;
break;
case FileEvent::kReset:
state_ = FileState::kNormal;
break;
Expand Down
50 changes: 42 additions & 8 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "rocksdb/status.h"
#include "table/format.h"

#include "blob_gc.h"
#include "util.h"

namespace rocksdb {
Expand Down Expand Up @@ -38,7 +39,6 @@ namespace titandb {
const uint64_t kBlobMaxHeaderSize = 12;
const uint64_t kRecordHeaderSize = 9;
const uint64_t kBlobFooterSize = 8 + BlockHandle::kMaxEncodedLength + 8 + 4;
const std::string kAlignmentSizeBlockName = "titan.alignment_size";

// Format of blob record (not fixed size):
//
Expand Down Expand Up @@ -210,6 +210,7 @@ class BlobFileMeta {
kFlushOrCompactionOutput,
kDelete,
kNeedMerge,
kPunchHoleOutput,
kReset, // reset file to normal for test
};

Expand All @@ -229,28 +230,41 @@ class BlobFileMeta {
BlobFileMeta(uint64_t _file_number, uint64_t _file_size,
uint64_t _file_entries, uint32_t _file_level,
const std::string& _smallest_key,
const std::string& _largest_key)
const std::string& _largest_key, uint64_t _alignment_size,
uint64_t _live_blocks)
: file_number_(_file_number),
file_size_(_file_size),
file_entries_(_file_entries),
file_level_(_file_level),
smallest_key_(_smallest_key),
largest_key_(_largest_key) {}
largest_key_(_largest_key),
alignment_size_(_alignment_size),
live_blocks_(_live_blocks),
hole_punchable_blocks_(0) {}

friend bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs);

void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
Status DecodeFromLegacy(Slice* src);
Status DecodeFromV3(Slice* src);

void set_live_data_size(uint64_t size) { live_data_size_ = size; }
void set_live_blocks(uint64_t size) { live_blocks_ = size; }

uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }
const uint64_t live_blocks() const { return live_blocks_; }
const uint64_t hole_punchable_blocks() const {
return hole_punchable_blocks_;
}

const uint64_t alignment_size() const { return alignment_size_; }

void set_live_data_size(int64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
Expand All @@ -273,6 +287,22 @@ class BlobFileMeta {
return 1 - (static_cast<double>(live_data_size_) /
(file_size_ - kBlobMaxHeaderSize - kBlobFooterSize));
}

double GetPunchHoleScore() const {
// Only hole-punch a file if we can at least reclaim 256 blocks and
// the remaining live data is more than 20% of the file size.
if (hole_punchable_blocks_ > 256 &&
double((live_blocks_ - hole_punchable_blocks_)) * 1024 * 4 /
file_size_ >
0.2) {
return hole_punchable_blocks_ * 1024 * 4 / file_size_;
}
return 0.0;
}

void set_hole_punchable_blocks(uint64_t size) {
hole_punchable_blocks_ = size;
}
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
void Dump(bool with_keys) const;

Expand All @@ -294,14 +324,18 @@ class BlobFileMeta {
// Size of data with reference from SST files.
//
// Because the new generated SST is added to superversion before
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is a
// later compaction trigger by the new generated SST, the later
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is
// a later compaction trigger by the new generated SST, the later
// `OnCompactionCompleted()` maybe called before the previous events'
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called.
// So when state_ == kPendingLSM, it uses this to record the delta as a
// positive number if any later compaction is trigger before previous
// `OnCompactionCompleted()` is called.
std::atomic<int64_t> live_data_size_{0};

uint64_t alignment_size_{0};
std::atomic<uint64_t> live_blocks_{0};
std::atomic<uint64_t> hole_punchable_blocks_{0};
std::atomic<FileState> state_{FileState::kNone};
};

Expand All @@ -321,8 +355,8 @@ class BlobFileMeta {
// | Fixed32 | Fixed32 | Fixed32 |
// +--------------+---------+---------+
//
// The header is mean to be compatible with header of BlobDB blob files, except
// we use a different magic number.
// The header is mean to be compatible with header of BlobDB blob files,
// except we use a different magic number.
struct BlobFileHeader {
// The first 32bits from $(echo titandb/blob | sha1sum).
static const uint32_t kHeaderMagicNumber = 0x2be0a614ul;
Expand Down
2 changes: 1 addition & 1 deletion src/blob_format_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST(BlobFormatTest, BlobIndex) {
}

TEST(BlobFormatTest, BlobFileMeta) {
BlobFileMeta input(2, 3, 0, 0, "0", "9");
BlobFileMeta input(2, 3, 0, 0, "0", "9", 0, 0);
CheckCodec(input);
}

Expand Down
21 changes: 18 additions & 3 deletions src/blob_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ namespace rocksdb {
namespace titandb {

BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
TitanCFOptions&& _titan_cf_options, bool need_trigger_next,
uint64_t cf_id, bool punch_hole)
: inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
trigger_next_(need_trigger_next),
cf_id_(cf_id),
use_punch_hole_(punch_hole) {
MarkFilesBeingGC();
}

BlobGC::~BlobGC() {}
BlobGC::~BlobGC() {
// Release snapshot requires db pointer, so we can't release it internally.
// In case the caller forgets to release the snapshot, we assert here, prefer
// to crash in the runtime than leak.
assert(snapshot_ == nullptr);
}

void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) { cfh_ = cfh; }

Expand Down Expand Up @@ -40,5 +48,12 @@ void BlobGC::ReleaseGcFiles() {
}
}

void BlobGC::ReleaseSnapshot(DB* db) {
if (snapshot_ != nullptr) {
db->ReleaseSnapshot(snapshot_);
snapshot_ = nullptr;
}
}

} // namespace titandb
} // namespace rocksdb
25 changes: 24 additions & 1 deletion src/blob_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace titandb {
class BlobGC {
public:
BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
TitanCFOptions&& _titan_cf_options, bool need_trigger_next,
uint64_t cf_id, bool punch_hole = false);

// No copying allowed
BlobGC(const BlobGC&) = delete;
Expand All @@ -38,21 +39,43 @@ class BlobGC {

void ReleaseGcFiles();

uint64_t cf_id() { return cf_id_; }

const Snapshot* snapshot() {
assert(use_punch_hole_);
assert(snapshot_ != nullptr);
return snapshot_;
}
void SetSnapshot(const Snapshot* snapshot) { snapshot_ = snapshot; }
void ReleaseSnapshot(DB* db);

bool use_punch_hole() { return use_punch_hole_; }

bool trigger_next() { return trigger_next_; }

private:
std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
uint64_t cf_id_;
ColumnFamilyHandle* cfh_{nullptr};
// Whether need to trigger gc after this gc or not
const bool trigger_next_;
const bool use_punch_hole_;
const Snapshot* snapshot_{nullptr};
};

struct GCScore {
uint64_t file_number;
double score;
};

struct PunchHoleScore {
uint64_t file_number;
uint64_t file_size;
uint64_t live_blocks;
uint64_t hole_punchable_blocks;
};

} // namespace titandb
} // namespace rocksdb
Loading

0 comments on commit 850f50c

Please sign in to comment.