Skip to content

Commit

Permalink
Check blob cache before getting blob file reader (#312)
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 authored Mar 1, 2024
1 parent 5a9cee8 commit ff85fd1
Show file tree
Hide file tree
Showing 18 changed files with 157 additions and 130 deletions.
2 changes: 1 addition & 1 deletion include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// If non-NULL use the specified cache for blob records.
//
// Default: nullptr
std::shared_ptr<Cache> blob_cache;
std::shared_ptr<Cache> blob_cache{nullptr};

// Max batch size for GC.
//
Expand Down
4 changes: 2 additions & 2 deletions src/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,

Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* value) {
OwnedSlice* buffer) {
Cache::Handle* cache_handle = nullptr;
Status s = GetBlobFileReaderHandle(file_number, &cache_handle);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, value);
s = reader->Get(options, handle, record, buffer);
cache_->Release(cache_handle);
return s;
}
Expand Down
3 changes: 1 addition & 2 deletions src/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class BlobFileCache {
// bytes. The provided buffer is used to store the record data, so
// the buffer must be valid when the record is used.
Status Get(const ReadOptions& options, uint64_t file_number,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* value);
const BlobHandle& handle, BlobRecord* record, OwnedSlice* buffer);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number,
Expand Down
76 changes: 6 additions & 70 deletions src/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "file/filename.h"
#include "file/readahead_raf.h"
#include "rocksdb/cache.h"
#include "table/block_based/block.h"
#include "table/internal_iterator.h"
#include "table/meta_blocks.h"
Expand Down Expand Up @@ -44,22 +45,6 @@ const uint64_t kMaxReadaheadSize = 256 << 10;

namespace {

void GenerateCachePrefix(std::string* dst, Cache* cc,
FSRandomAccessFile* file) {
char buffer[kMaxVarint64Length * 3 + 1];
auto size = file->GetUniqueId(buffer, sizeof(buffer));
if (size == 0) {
auto end = EncodeVarint64(buffer, cc->NewId());
size = end - buffer;
}
dst->assign(buffer, size);
}

void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) {
dst->assign(prefix.data(), prefix.size());
PutVarint64(dst, offset);
}

// Seek to the specified meta block.
// Return true if it successfully seeks to that block.
Status SeekToMetaBlock(InternalIterator* meta_iter,
Expand Down Expand Up @@ -140,62 +125,13 @@ Status BlobFileReader::ReadHeader(std::unique_ptr<RandomAccessFileReader>& file,

BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file,
TitanStats* stats)
: options_(options),
file_(std::move(file)),
cache_(options.blob_cache),
stats_(stats) {
if (cache_) {
GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file());
}
}
TitanStats* _stats)
: options_(options), file_(std::move(file)) {}

Status BlobFileReader::Get(const ReadOptions& /*options*/,
Status BlobFileReader::Get(const ReadOptions& _options,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* value) {
OwnedSlice* buffer) {
TEST_SYNC_POINT("BlobFileReader::Get");

std::string cache_key;
Cache::Handle* cache_handle = nullptr;
if (cache_) {
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
cache_handle = cache_->Lookup(cache_key);
if (cache_handle) {
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
Status s = DecodeInto(*blob, record);
if (!s.ok()) return s;
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
return s;
}
}
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_MISS);

OwnedSlice blob;
Status s = ReadRecord(handle, record, &blob);
if (!s.ok()) {
return s;
}

if (cache_) {
auto cache_value = new OwnedSlice(std::move(blob));
auto cache_size = cache_value->size() + sizeof(*cache_value);
cache_->Insert(cache_key, cache_value, cache_size,
&DeleteCacheValue<OwnedSlice>, &cache_handle,
Cache::Priority::BOTTOM);
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
} else {
value->PinSlice(record->value, OwnedSlice::CleanupFunc, blob.release(),
nullptr);
}

return Status::OK();
}

Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer) {
Slice blob;
CacheAllocationPtr ubuf =
AllocateBlock(handle.size, options_.memory_allocator());
Expand Down Expand Up @@ -224,7 +160,7 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,

Status BlobFilePrefetcher::Get(const ReadOptions& options,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer) {
OwnedSlice* buffer) {
if (handle.offset == last_offset_) {
last_offset_ = handle.offset + handle.size;
if (handle.offset + handle.size > readahead_limit_) {
Expand Down
11 changes: 3 additions & 8 deletions src/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BlobFileReader {
// of the record is stored in the value slice underlying, so the value slice
// must be valid when the record is used.
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* value);
BlobRecord* record, OwnedSlice* buffer);

private:
friend class BlobFilePrefetcher;
Expand All @@ -37,23 +37,18 @@ class BlobFileReader {
std::unique_ptr<RandomAccessFileReader> file,
TitanStats* stats);

Status ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer);
static Status ReadHeader(std::unique_ptr<RandomAccessFileReader>& file,
BlobFileHeader* header);

TitanCFOptions options_;
std::unique_ptr<RandomAccessFileReader> file_;

std::shared_ptr<Cache> cache_;
std::string cache_prefix_;

// Information read from the file.
BlobFileFooter footer_;

std::unique_ptr<UncompressionDict> uncompression_dict_ = nullptr;

TitanStats* stats_;
// TitanStats* stats_;
};

// Performs readahead on continuous reads.
Expand All @@ -64,7 +59,7 @@ class BlobFilePrefetcher : public Cleanable {
BlobFilePrefetcher(BlobFileReader* reader) : reader_(reader) {}

Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer);
BlobRecord* record, OwnedSlice* buffer);

private:
BlobFileReader* reader_;
Expand Down
11 changes: 7 additions & 4 deletions src/blob_file_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ BlobFileSet::BlobFileSet(const TitanDBOptions& options, TitanStats* stats,
}

Status BlobFileSet::Open(
const std::map<uint32_t, TitanCFOptions>& column_families) {
const std::map<uint32_t, TitanCFOptions>& column_families,
const std::string& cache_prefix) {
// Sets up initial column families.
AddColumnFamilies(column_families);
AddColumnFamilies(column_families, cache_prefix);

Status s = env_->FileExists(CurrentFileName(dirname_));
if (s.ok()) {
Expand Down Expand Up @@ -314,12 +315,14 @@ Status BlobFileSet::LogAndApply(VersionEdit& edit) {
}

void BlobFileSet::AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families) {
const std::map<uint32_t, TitanCFOptions>& column_families,
const std::string& cache_prefix) {
for (auto& cf : column_families) {
auto file_cache = std::make_shared<BlobFileCache>(db_options_, cf.second,
file_cache_, stats_);
auto blob_storage = std::make_shared<BlobStorage>(
db_options_, cf.second, cf.first, file_cache, stats_, initialized_);
db_options_, cf.second, cf.first, cache_prefix, file_cache, stats_,
initialized_);
if (stats_ != nullptr) {
stats_->InitializeCF(cf.first, blob_storage);
}
Expand Down
6 changes: 4 additions & 2 deletions src/blob_file_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class BlobFileSet {
// If the manifest exists, it will recover from the latest one.
// It is a corruption if the persistent storage contains data
// outside of the provided column families.
Status Open(const std::map<uint32_t, TitanCFOptions>& column_families);
Status Open(const std::map<uint32_t, TitanCFOptions>& column_families,
const std::string& cache_prefix);

// Applies *edit and saved to the manifest.
// REQUIRES: mutex is held
Expand All @@ -50,7 +51,8 @@ class BlobFileSet {
// Adds some column families with the specified options.
// REQUIRES: mutex is held
void AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families);
const std::map<uint32_t, TitanCFOptions>& column_families,
const std::string& cache_prefix);

// Drops some column families. The obsolete files will be deleted in
// background when they will not be accessed anymore.
Expand Down
10 changes: 2 additions & 8 deletions src/blob_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,14 @@ class BlobFileTest : public testing::Test {
expect.key = key;
expect.value = value;
BlobRecord record;
PinnableSlice buffer;
OwnedSlice buffer;
BlobHandle blob_handle = contexts[i]->new_blob_index.blob_handle;
ASSERT_OK(cache.Get(ro, file_number_, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(cache.Get(ro, file_number_, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
}
Expand Down Expand Up @@ -187,17 +184,14 @@ class BlobFileTest : public testing::Test {
expect.key = key;
expect.value = value;
BlobRecord record;
PinnableSlice buffer;
OwnedSlice buffer;
BlobHandle blob_handle = contexts[i]->new_blob_index.blob_handle;
ASSERT_OK(cache.Get(ro, file_number_, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(cache.Get(ro, file_number_, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect);
}
Expand Down
2 changes: 1 addition & 1 deletion src/blob_gc_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BlobGCPickerTest : public testing::Test {
auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128), nullptr);
blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, 0,
blob_file_cache, nullptr, nullptr));
"", blob_file_cache, nullptr, nullptr));
basic_blob_gc_picker_.reset(
new BasicBlobGCPicker(titan_db_options, titan_cf_options, nullptr));
}
Expand Down
60 changes: 58 additions & 2 deletions src/blob_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,66 @@
namespace rocksdb {
namespace titandb {

std::string BlobStorage::EncodeBlobCache(const BlobIndex& index) {
std::string cache_key(cache_prefix_);
PutVarint64(&cache_key, index.file_number);
PutVarint64(&cache_key, index.blob_handle.offset);
return cache_key;
}

Status BlobStorage::TryGetBlobCache(const std::string& cache_key,
BlobRecord* record, PinnableSlice* value,
bool* cache_hit) {
Status s;
Cache::Handle* cache_handle = blob_cache_->Lookup(cache_key);
if (cache_handle) {
*cache_hit = true;
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(blob_cache_->Value(cache_handle));
s = DecodeInto(*blob, record);
if (!s.ok()) return s;

value->PinSlice(record->value, UnrefCacheHandle, blob_cache_.get(),
cache_handle);
return s;
}
*cache_hit = false;
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_MISS);
return s;
}

Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index,
BlobRecord* record, PinnableSlice* value) {
return file_cache_->Get(options, index.file_number, index.blob_handle, record,
value);
std::string cache_key;

if (blob_cache_) {
cache_key = EncodeBlobCache(index);
bool cache_hit;
Status s = TryGetBlobCache(cache_key, record, value, &cache_hit);
if (!s.ok()) return s;
if (cache_hit) return s;
}

OwnedSlice blob;
Status s = file_cache_->Get(options, index.file_number, index.blob_handle,
record, &blob);
if (!s.ok()) {
return s;
}

if (blob_cache_ && options.fill_cache) {
Cache::Handle* cache_handle = nullptr;
auto cache_value = new OwnedSlice(std::move(blob));
blob_cache_->Insert(
cache_key, cache_value, cache_value->size() + sizeof(*cache_value),
&DeleteCacheValue<OwnedSlice>, &cache_handle, Cache::Priority::BOTTOM);
value->PinSlice(record->value, UnrefCacheHandle, blob_cache_.get(),
cache_handle);
} else {
value->PinSlice(record->value, OwnedSlice::CleanupFunc, blob.release(),
nullptr);
}
return s;
}

Status BlobStorage::NewPrefetcher(uint64_t file_number,
Expand Down
Loading

0 comments on commit ff85fd1

Please sign in to comment.