Skip to content

Commit

Permalink
Add compressor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagiac81 committed Jan 16, 2024
1 parent c4228ab commit e267425
Show file tree
Hide file tree
Showing 90 changed files with 3,476 additions and 2,095 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ set(SOURCES
util/comparator.cc
util/compression.cc
util/compression_context_cache.cc
util/compressor.cc
util/concurrent_task_limiter_impl.cc
util/crc32c.cc
util/data_structure.cc
Expand Down Expand Up @@ -1410,6 +1411,7 @@ if(WITH_TESTS)
util/autovector_test.cc
util/bloom_test.cc
util/coding_test.cc
util/compression_test.cc
util/crc32c_test.cc
util/defer_test.cc
util/dynamic_bloom_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc",
"util/compressor.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/crc32c_arm64.cc",
Expand Down Expand Up @@ -4666,6 +4667,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compression_test",
srcs=["util/compression_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="configurable_test",
srcs=["options/configurable_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
31 changes: 14 additions & 17 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
kNoCompression, CacheTier::kVolatileTier,
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(
cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
UncompressionInfo uncompression_info(
UncompressionDict::GetEmptyDict(),
cache_options_.compress_format_version, allocator);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed =
UncompressData(uncompression_info, (char*)data_ptr,
handle_value_charge, &uncompressed_size,
cache_options_.compress_format_version, allocator);
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
compressor.get(), (char*)data_ptr, handle_value_charge,
&uncompressed_size);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
Expand Down Expand Up @@ -192,17 +191,15 @@ Status CompressedSecondaryCache::InsertInternal(
type == kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size);
CompressionContext compression_context(cache_options_.compression_type,
cache_options_.compression_opts);
auto compressor = BuiltinCompressor::GetCompressor(
cache_options_.compression_type, cache_options_.compression_opts);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
cache_options_.compression_opts, compression_context,
CompressionDict::GetEmptyDict(), cache_options_.compression_type,
sample_for_compression);
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
sample_for_compression);

bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
compression_info.CompressData(compressor.get(), val, &compressed_val);

if (!success) {
return Status::Corruption("Error compressing value.");
Expand Down
1 change: 1 addition & 0 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/string_util.h"
#include "util/user_comparator_wrapper.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
26 changes: 11 additions & 15 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
blob_compressor_(mutable_cf_options->blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
write_options_(write_options),
Expand All @@ -92,6 +92,10 @@ BlobFileBuilder::BlobFileBuilder(
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());

if (blob_compressor_ == nullptr) {
blob_compressor_ = BuiltinCompressor::GetCompressor(kNoCompression);
}
}

BlobFileBuilder::~BlobFileBuilder() = default;
Expand Down Expand Up @@ -151,7 +155,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compressor_->GetCompressionType());

return Status::OK();
}
Expand Down Expand Up @@ -228,7 +232,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
BlobLogHeader header(column_family_id_,
blob_compressor_->GetCompressionType(), has_ttl,
expiration_range);

{
Expand Down Expand Up @@ -256,27 +261,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
assert(compressed_blob->empty());
assert(immutable_options_);

if (blob_compression_type_ == kNoCompression) {
if (blob_compressor_->GetCompressionType() == kNoCompression) {
return Status::OK();
}

// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_, sample_for_compression);

constexpr uint32_t compression_format_version = 2;
CompressionInfo info;

bool success = false;

{
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
BLOB_DB_COMPRESSION_MICROS);
success =
CompressData(*blob, info, compression_format_version, compressed_blob);
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
}

if (!success) {
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -90,7 +91,7 @@ class BlobFileBuilder {
const ImmutableOptions* immutable_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
std::shared_ptr<Compressor> blob_compressor_;
PrepopulateBlobCache prepopulate_blob_cache_;
const FileOptions* file_options_;
const WriteOptions* write_options_;
Expand Down
12 changes: 4 additions & 8 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);

CompressionOptions opts;
CompressionContext context(kSnappyCompression, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
kSnappyCompression, sample_for_compression);
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
ASSERT_NE(compressor, nullptr);

std::string compressed_value;
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
uncompressed_value.size(), &compressed_value));
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
&compressed_value));

ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());
Expand Down
46 changes: 22 additions & 24 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ Status BlobFileReader::Create(

Statistics* const statistics = immutable_options.stats;

CompressionType compression_type = kNoCompression;
std::shared_ptr<Compressor> compressor;

{
const Status s =
ReadHeader(file_reader.get(), read_options, column_family_id,
statistics, &compression_type);
const Status s = ReadHeader(file_reader.get(), read_options,
column_family_id, statistics, &compressor);
if (!s.ok()) {
return s;
}
Expand All @@ -70,7 +69,7 @@ Status BlobFileReader::Create(
}

blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type,
new BlobFileReader(std::move(file_reader), file_size, compressor,
immutable_options.clock, statistics));

return Status::OK();
Expand Down Expand Up @@ -140,9 +139,9 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint32_t column_family_id,
Statistics* statistics,
CompressionType* compression_type) {
std::shared_ptr<Compressor>* compressor) {
assert(file_reader);
assert(compression_type);
assert(compressor);

Slice header_slice;
Buffer buf;
Expand Down Expand Up @@ -184,7 +183,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
return Status::Corruption("Column family ID mismatch");
}

*compression_type = header.compression;
*compressor = BuiltinCompressor::GetCompressor(header.compression);

return Status::OK();
}
Expand Down Expand Up @@ -281,11 +280,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,

BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type, SystemClock* clock,
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
Statistics* statistics)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type),
compressor_(compressor),
clock_(clock),
statistics_(statistics) {
assert(file_reader_);
Expand All @@ -295,7 +294,7 @@ BlobFileReader::~BlobFileReader() = default;

Status BlobFileReader::GetBlob(
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
uint64_t value_size, CompressionType compression_type,
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
assert(result);
Expand All @@ -306,7 +305,7 @@ Status BlobFileReader::GetBlob(
return Status::Corruption("Invalid blob offset");
}

if (compression_type != compression_type_) {
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}

Expand Down Expand Up @@ -374,7 +373,7 @@ Status BlobFileReader::GetBlob(

{
const Status s = UncompressBlobIfNeeded(
value_slice, compression_type, allocator, clock_, statistics_, result);
value_slice, compressor.get(), allocator, clock_, statistics_, result);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -420,7 +419,8 @@ void BlobFileReader::MultiGetBlob(
*req->status = Status::Corruption("Invalid blob offset");
continue;
}
if (req->compression != compression_type_) {
if (req->compressor->GetCompressionType() !=
compressor_->GetCompressionType()) {
*req->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
Expand Down Expand Up @@ -522,7 +522,7 @@ void BlobFileReader::MultiGetBlob(
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], req->len);
*req->status =
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
clock_, statistics_, &blob_reqs[i].second);
if (req->status->ok()) {
total_bytes += record_slice.size();
Expand Down Expand Up @@ -579,32 +579,30 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
}

Status BlobFileReader::UncompressBlobIfNeeded(
const Slice& value_slice, CompressionType compression_type,
const Slice& value_slice, Compressor* compressor,
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
std::unique_ptr<BlobContents>* result) {
assert(compressor);
assert(result);

if (compression_type == kNoCompression) {
if (compressor->GetCompressionType() == kNoCompression) {
BlobContentsCreator::Create(result, nullptr, value_slice, kNoCompression,
allocator);
return Status::OK();
}

UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);

size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;
UncompressionInfo info(UncompressionDict::GetEmptyDict(),
compression_format_version, allocator);

CacheAllocationPtr output;

{
PERF_TIMER_GUARD(blob_decompress_time);
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
output = UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version,
allocator);
output = info.UncompressData(compressor, value_slice.data(),
value_slice.size(), &uncompressed_size);
}

TEST_SYNC_POINT_CALLBACK(
Expand Down
Loading

0 comments on commit e267425

Please sign in to comment.