Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Compressor interface #7650

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc",
"util/compressor.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/crc32c_arm64.cc",
Expand Down Expand Up @@ -4705,6 +4706,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="compression_test",
srcs=["util/compression_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="configurable_test",
srcs=["options/configurable_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ set(SOURCES
util/comparator.cc
util/compression.cc
util/compression_context_cache.cc
util/compressor.cc
util/concurrent_task_limiter_impl.cc
util/crc32c.cc
util/data_structure.cc
Expand Down Expand Up @@ -1453,6 +1454,7 @@ if(WITH_TESTS)
util/autovector_test.cc
util/bloom_test.cc
util/coding_test.cc
util/compression_test.cc
util/crc32c_test.cc
util/defer_test.cc
util/dynamic_bloom_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,9 @@ cache_test: $(OBJ_DIR)/cache/cache_test.o $(TEST_LIBRARY) $(LIBRARY)
coding_test: $(OBJ_DIR)/util/coding_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

hash_test: $(OBJ_DIR)/util/hash_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
31 changes: 14 additions & 17 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
kNoCompression, CacheTier::kVolatileTier,
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(
cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
auto compressor =
BuiltinCompressor::GetCompressor(cache_options_.compression_type);
UncompressionInfo uncompression_info(
UncompressionDict::GetEmptyDict(),
cache_options_.compress_format_version, allocator);

size_t uncompressed_size{0};
CacheAllocationPtr uncompressed =
UncompressData(uncompression_info, (char*)data_ptr,
handle_value_charge, &uncompressed_size,
cache_options_.compress_format_version, allocator);
CacheAllocationPtr uncompressed = uncompression_info.UncompressData(
compressor.get(), (char*)data_ptr, handle_value_charge,
&uncompressed_size);

if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
Expand Down Expand Up @@ -198,17 +197,15 @@ Status CompressedSecondaryCache::InsertInternal(
type == kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size);
CompressionContext compression_context(cache_options_.compression_type,
cache_options_.compression_opts);
auto compressor = BuiltinCompressor::GetCompressor(
cache_options_.compression_type, cache_options_.compression_opts);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
cache_options_.compression_opts, compression_context,
CompressionDict::GetEmptyDict(), cache_options_.compression_type,
sample_for_compression);
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
cache_options_.compress_format_version,
sample_for_compression);

bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
compression_info.CompressData(compressor.get(), val, &compressed_val);

if (!success) {
return Status::Corruption("Error compressing value.");
Expand Down
1 change: 1 addition & 0 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/string_util.h"
#include "util/user_comparator_wrapper.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
26 changes: 11 additions & 15 deletions db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
blob_compressor_(mutable_cf_options->blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
write_options_(write_options),
Expand All @@ -92,6 +92,10 @@ BlobFileBuilder::BlobFileBuilder(
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());

if (blob_compressor_ == nullptr) {
blob_compressor_ = BuiltinCompressor::GetCompressor(kNoCompression);
}
}

BlobFileBuilder::~BlobFileBuilder() = default;
Expand Down Expand Up @@ -151,7 +155,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compressor_->GetCompressionType());

return Status::OK();
}
Expand Down Expand Up @@ -228,7 +232,8 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
BlobLogHeader header(column_family_id_,
blob_compressor_->GetCompressionType(), has_ttl,
expiration_range);

{
Expand Down Expand Up @@ -256,27 +261,18 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
assert(compressed_blob->empty());
assert(immutable_options_);

if (blob_compression_type_ == kNoCompression) {
if (blob_compressor_->GetCompressionType() == kNoCompression) {
return Status::OK();
}

// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_, sample_for_compression);

constexpr uint32_t compression_format_version = 2;
CompressionInfo info;

bool success = false;

{
StopWatch stop_watch(immutable_options_->clock, immutable_options_->stats,
BLOB_DB_COMPRESSION_MICROS);
success =
CompressData(*blob, info, compression_format_version, compressed_blob);
success = info.CompressData(blob_compressor_.get(), *blob, compressed_blob);
}

if (!success) {
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -90,7 +91,7 @@ class BlobFileBuilder {
const ImmutableOptions* immutable_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
std::shared_ptr<Compressor> blob_compressor_;
PrepopulateBlobCache prepopulate_blob_cache_;
const FileOptions* file_options_;
const WriteOptions* write_options_;
Expand Down
12 changes: 4 additions & 8 deletions db/blob/blob_file_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,12 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number);
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);

CompressionOptions opts;
CompressionContext context(kSnappyCompression, opts);
constexpr uint64_t sample_for_compression = 0;

CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
kSnappyCompression, sample_for_compression);
auto compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
ASSERT_NE(compressor, nullptr);

std::string compressed_value;
ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(),
uncompressed_value.size(), &compressed_value));
ASSERT_OK(compressor->Compress(CompressionInfo(), uncompressed_value,
&compressed_value));

ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(),
BlobLogRecord::kHeaderSize + key_size + compressed_value.size());
Expand Down
46 changes: 22 additions & 24 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ Status BlobFileReader::Create(

Statistics* const statistics = immutable_options.stats;

CompressionType compression_type = kNoCompression;
std::shared_ptr<Compressor> compressor;

{
const Status s =
ReadHeader(file_reader.get(), read_options, column_family_id,
statistics, &compression_type);
const Status s = ReadHeader(file_reader.get(), read_options,
column_family_id, statistics, &compressor);
if (!s.ok()) {
return s;
}
Expand All @@ -70,7 +69,7 @@ Status BlobFileReader::Create(
}

blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type,
new BlobFileReader(std::move(file_reader), file_size, compressor,
immutable_options.clock, statistics));

return Status::OK();
Expand Down Expand Up @@ -140,9 +139,9 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint32_t column_family_id,
Statistics* statistics,
CompressionType* compression_type) {
std::shared_ptr<Compressor>* compressor) {
assert(file_reader);
assert(compression_type);
assert(compressor);

Slice header_slice;
Buffer buf;
Expand Down Expand Up @@ -184,7 +183,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
return Status::Corruption("Column family ID mismatch");
}

*compression_type = header.compression;
*compressor = BuiltinCompressor::GetCompressor(header.compression);
mrambacher marked this conversation as resolved.
Show resolved Hide resolved

return Status::OK();
}
Expand Down Expand Up @@ -281,11 +280,11 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,

BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type, SystemClock* clock,
const std::shared_ptr<Compressor>& compressor, SystemClock* clock,
Statistics* statistics)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type),
compressor_(compressor),
clock_(clock),
statistics_(statistics) {
assert(file_reader_);
Expand All @@ -295,7 +294,7 @@ BlobFileReader::~BlobFileReader() = default;

Status BlobFileReader::GetBlob(
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
uint64_t value_size, CompressionType compression_type,
uint64_t value_size, const std::shared_ptr<Compressor>& compressor,
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
assert(result);
Expand All @@ -306,7 +305,7 @@ Status BlobFileReader::GetBlob(
return Status::Corruption("Invalid blob offset");
}

if (compression_type != compression_type_) {
if (compressor->GetCompressionType() != compressor_->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}

Expand Down Expand Up @@ -374,7 +373,7 @@ Status BlobFileReader::GetBlob(

{
const Status s = UncompressBlobIfNeeded(
value_slice, compression_type, allocator, clock_, statistics_, result);
value_slice, compressor.get(), allocator, clock_, statistics_, result);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -420,7 +419,8 @@ void BlobFileReader::MultiGetBlob(
*req->status = Status::Corruption("Invalid blob offset");
continue;
}
if (req->compression != compression_type_) {
if (req->compressor->GetCompressionType() !=
compressor_->GetCompressionType()) {
*req->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
Expand Down Expand Up @@ -522,7 +522,7 @@ void BlobFileReader::MultiGetBlob(
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], req->len);
*req->status =
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
UncompressBlobIfNeeded(value_slice, compressor_.get(), allocator,
clock_, statistics_, &blob_reqs[i].second);
if (req->status->ok()) {
total_bytes += record_slice.size();
Expand Down Expand Up @@ -579,32 +579,30 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
}

Status BlobFileReader::UncompressBlobIfNeeded(
const Slice& value_slice, CompressionType compression_type,
const Slice& value_slice, Compressor* compressor,
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
std::unique_ptr<BlobContents>* result) {
assert(compressor);
assert(result);

if (compression_type == kNoCompression) {
if (compressor->GetCompressionType() == kNoCompression) {
lucagiac81 marked this conversation as resolved.
Show resolved Hide resolved
BlobContentsCreator::Create(result, nullptr, value_slice, kNoCompression,
allocator);
return Status::OK();
}

UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);

size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;
UncompressionInfo info(UncompressionDict::GetEmptyDict(),
compression_format_version, allocator);

CacheAllocationPtr output;

{
PERF_TIMER_GUARD(blob_decompress_time);
StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
output = UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version,
allocator);
output = info.UncompressData(compressor, value_slice.data(),
value_slice.size(), &uncompressed_size);
}

TEST_SYNC_POINT_CALLBACK(
Expand Down
Loading
Loading