-
Notifications
You must be signed in to change notification settings - Fork 6.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Compressor interface #7650
base: main
Are you sure you want to change the base?
Add Compressor interface #7650
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally I like where this is heading. I have some questions on CompressionInfo (whether it is turned upside down enough) and on the CompressorRegistry but I think this is a very good thing to have.
bfcc7c6
to
c81fd0e
Compare
Addressed some of the comments above and rebased branch. There are two additional changes from the previous version.
|
c81fd0e
to
af7a4ea
Compare
2982813
to
004b920
Compare
Keep in mind that for us to accept this, the performance penalty would have to be minimal. |
Absolutely. Performance is a key requirement for this feature. From the data I've collected so far with db_bench (readrandom and readrandomwriterandom workloads), the performance difference is in general <1% (throughput, p99 latency). I'm collecting more data covering different conditions and additional metrics, so we can have a more accurate picture. |
004b920
to
55ca263
Compare
db/column_family.cc
Outdated
" is not linked with the binary."); | ||
} else if (!moptions.compressor_per_level.empty()) { | ||
for (const auto& compressor : moptions.compressor_per_level) { | ||
if (!compressor->Supported()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any possibility that this compressor is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- compressor_per_level is initialized as an empty vector.
- MutableCFOptions::RefreshDerivedOptions populates compressor_per_level using the content of compression_per_level.
- Compressors are obtained from BuiltinCompressor::GetCompressor, which could return nullptr in case of an invalid selection in compression_per_level.
I thought an invalid selection in compression_per_level would be caught before reaching this method, but that's incorrect. I added a null check and a test in compression_test (to verify the expected error is returned in case of invalid selection in compression_per_level).
db/db_test.cc
Outdated
@@ -5010,7 +5010,8 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { | |||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { | |||
Compaction* compaction = reinterpret_cast<Compaction*>(arg); | |||
if (compaction->output_level() == 4) { | |||
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression); | |||
ASSERT_EQ(compaction->output_compression()->GetCompressionType(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I wonder if output_compression should be renamed to compressor or output_compressor?
- Does it make sense to have a compaction->GetCompressionType shortcut method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed output_compression to output_compressor.
I did not add the shortcut method for compression type. I see both GetCompressionType and GetId called on the compressor returned by output_compressor in different places.
db/db_test2.cc
Outdated
ASSERT_LE(TestGetTickerCount(options, | ||
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), | ||
2 * kBlockLen); | ||
2 * kBlockLen + sizeof(UncompressionDict)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be the sizeof the Uncompression Dictionary itself or the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a previous version of the code, I added a Compressor* member to UncompressionDict, and these tests started failing due to the size being slightly larger than 2 blocks. With the current code, these changes are actually not needed, and they were removed.
fillseq and readrandom with -readonly should suffice, with low block cache hit rate on data blocks. You can do readrandom on the same DB before-and-after to eliminate LSM structure variances. But I would consider 1% overall ops/sec as a significant regression for Meta to absorb for a feature that Meta doesn't need. |
55ca263
to
7bd8334
Compare
I ran fillseq and readrandom benchmarks, comparing the PR against the previous commit on the main branch. The relative ops/s differences (PR vs main, mean of 5 runs) are listed below, although they are not statistically significant:
The instruction path length is also verified to be matched within 0.2%. |
7bd8334
to
871e767
Compare
871e767
to
8348e5b
Compare
8348e5b
to
d239489
Compare
@mrambacher has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
d239489
to
558dc2e
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
558dc2e
to
44fe2ad
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
44fe2ad
to
5d6b3d7
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
5d6b3d7
to
c45c959
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
virtual Status Compress(const CompressionInfo& info, const Slice& input, | ||
std::string* output) = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud, I wonder if the names of the Compress and Uncompress functions should be different. I can see there eventually being (at least) three signatures:
- This method that takes a block (Slice) and returns a block (output)
- An aysnchronous method that does something similar, invoking some sort of callback when the Compression is complete
- A method for the compression of streams
With this in mind, I wonder if these methods should be named "CompressBlock" (or something similar) instead of just Compress
Status Compressor::CreateDictionary( | ||
std::vector<std::string>& data_block_buffers, | ||
std::unique_ptr<CompressionDict>* compression_dict) { | ||
uint32_t max_dict_bytes = GetMaxDictBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(compression_dict)
and maybe assert that this Compressor supports a Dictionary? Or return an error if it does not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert added for compression_dict (in BlockBasedTableBuilder::EnterUnbuffered, where CreateDictionary is called). Added check for DictCompressionSupported in CreateDictionary. Also added comment for IsDictionaryEnabled to return false if DictCompressionSupported is false.
util/compressor.cc
Outdated
if (use_dict_trainer) { | ||
dict = ZSTD_TrainDictionary(compression_dict_samples, | ||
compression_dict_sample_lens, max_dict_bytes); | ||
} else { | ||
dict = ZSTD_FinalizeDictionary(compression_dict_samples, | ||
compression_dict_sample_lens, | ||
max_dict_bytes, level); | ||
} | ||
compression_dict->reset(NewCompressionDict(dict)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is referring to ZSTD methods, this seems like it should be done in a derived (not the base) class, even if multiple compressors might use the same dictionary trainer/finalize methods.
Can we add a virtual method to build the dictionary from the samples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR #6717 already introduced a separate SampleAndTrainDictionary method to make it easier for plugins to override only parts of CreateDictionary (and use the default sampling/training methods inside a customized implementation of CreateDictionary). I now split sampling and training into two virtual methods (SampleDict and TrainDict). TrainDict's default implementation still uses ZSTD_TrainDictionary and ZSTD_FinalizeDictionary, so plugins are able to use those as default.
I also renamed CreateDictionary as CreateDict and IsDictionaryEnabled as IsDictEnabled for consistency with other methods.
Some more tweaks may be needed to finalize the interface.
util/compressor.cc
Outdated
uint32_t Compressor::GetMaxDictBytes() const { | ||
uint32_t max_dict_bytes = 0; | ||
#ifndef ROCKSDB_LITE | ||
std::string value; | ||
ConfigOptions config_options; | ||
Status s = GetOption(config_options, "max_dict_bytes", &value); | ||
if (s.ok()) { | ||
max_dict_bytes = static_cast<uint32_t>(ParseUint64(value)); | ||
} | ||
#endif // ROCKSDB_LITE | ||
return max_dict_bytes; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not in favor of this mechanism. Is there a reason not to just make these virtual and have the derived class do the right thing?
Also, are dictionaries not supported in LITE mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. All compressors so far override these methods. The methods are now changed to simply return a default value. This also eliminates the different behavior for the LITE version, which doesn't support GetOption.
util/compressor.h
Outdated
|
||
// Returns a new compression dictionary from the input dict. | ||
// Classes which have a ProcessedDict should override this method. | ||
virtual CompressionDict* NewCompressionDict(const std::string& dict) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has been an effort to not return raw pointers as much in the code. Can you tell me if these can easily be made into std::unique_ptr ? That would satisfy the "not raw" and can help prevent leaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple change. CreateDictionary was already placing the raw pointer in a std::unique_ptr. We cannot do the same for UncompressionDict without further changes, as the Create method of BlocklikeTraitsreturns a raw pointer and that's aligned with other specializations.
tools/db_bench_tool.cc
Outdated
CompressionInfo info(CompressionDict::GetEmptyDict(), | ||
compress_format_version, | ||
FLAGS_sample_for_compression); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be cleaner as:
CompressionInfo info;
info.sample_for_compression = FLAGS_sample_for_compression
(and eliminate the empty dictionary and format version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified CompressionDict constructor to take sample_for_compression as parameter with default value. This made the code in db_bench_tool.cc more concise.
tools/db_bench_tool.cc
Outdated
CompressionInfo info(CompressionDict::GetEmptyDict(), | ||
compress_format_version, FLAGS_sample_for_compression); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above on cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
tools/db_bench_tool.cc
Outdated
CompressionInfo compression_info(CompressionDict::GetEmptyDict(), | ||
compress_format_version, | ||
FLAGS_sample_for_compression); | ||
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(), | ||
compress_format_version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above on cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
utilities/blob_db/blob_db_impl.cc
Outdated
UncompressionInfo info( | ||
UncompressionDict::GetEmptyDict(), | ||
GetCompressFormatForVersion(kBlockBasedTableVersionFormat)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just UncompressionInfo info; (like CompressionInfo above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is. Updated
@@ -191,7 +193,7 @@ std::string RandomName(Random* rnd, const size_t len) { | |||
|
|||
CompressionType RandomCompressionType(Random* rnd) { | |||
auto ret = static_cast<CompressionType>(rnd->Uniform(6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that this is your bug, but 6 seems wrong here. The builtin types go from 0x0 to 0x7 (so 8 of them)
c45c959
to
6927ed5
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
ea966fb
to
59ea03f
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
59ea03f
to
85818a2
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
85818a2
to
8f4a3e6
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
8f4a3e6
to
b738894
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
b738894
to
6acf396
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
6acf396
to
7727d48
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
7727d48
to
374251f
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
374251f
to
89fc8cc
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
89fc8cc
to
72af189
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
72af189
to
e267425
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
rep_->table_properties->compression_name != | ||
CompressionTypeToString(kNoCompression); | ||
ConfigOptions config_options; | ||
s = Compressor::CreateFromString(config_options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like that this implementation requires dependency injection (ObjectRegistry) (also known as "spooky action at a distance") to use a custom compressor. Reliable dependency injection can be a pain point for some people integrating RocksDB, e.g. through a shared object file.
Using prefix_extractor as an example, when the SST file uses the same prefix extractor as what is currently configured (for new SST files), dependency injection is not required to use it for reads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compressor::CreateFromString will reuse an existing instance if an equivalent one is already available. The existing instances are tracked by the Compressor class rather than passed to the table like prefix_extractor.
To follow a similar approach as for prefix_extractor, it should be possible to pass a custom compressor instance to RocksDB via options, without adding a factory function to ObjectRegistry. Would this address the concern with dependency injection? I will update PR #6717 (which adds support for custom compressors) to include and test this scenario.
@@ -725,6 +732,14 @@ struct BlockBasedTable::Rep { | |||
#endif // ROCKSDB_MALLOC_USABLE_SIZE | |||
return usage; | |||
} | |||
|
|||
std::shared_ptr<Compressor> GetCompressor(CompressionType compression_type) { | |||
if (compression_type == compressor->GetCompressionType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important to note that only one custom compression type can be used per file. Allowing various custom compressors in a single file would be a substantial schema+code change. Yes someone could add their own layer within the custom compressor, but that would be extra overhead.
We don't currently foresee needing various compressions within a file (except for some blocks not compressed) but we want to be sure everyone is on board with this decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is a limitation for plugin compressors. For built-in compressors, it's still possible to have different compressors per file. Their CompressionType id is still stored in each block as before. Plugins, on the other hand, all have the same id and rely on the compressor name in the properties block.
If various custom compressors per file are needed, we may be able to handle it by assigning "dynamic" numeric ids to custom compressors and recording them in blocks as usual. I can look into the implementation details if there is interest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may be able to handle it by assigning "dynamic" numeric ids to custom compressors and recording them in blocks as usual
Right. Might be worth keeping this in mind as a possible future feature, especially for schema purposes. I think the ideal state right now would be to reserve what that would look like in the schema, detect on read if that future feature is in use and provide a good error message.
s = UncompressSerializedBlock( | ||
info, req.result.data() + req_offset, handle.size(), &contents, | ||
footer.format_version(), rep_->ioptions, memory_allocator); | ||
std::shared_ptr<Compressor> compressor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating temporary copies of shared_ptrs on the read "hot path" is a performance hazard.
In the past we have had to re-engineer some new customizable integrations because of excessive shared_ptr ops.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The copy of shared_ptr is needed here in case the block is compressed by a different compressor than the one specified for the table (in rep_). I'll try to remove the copy at least for the most common case (block compressed with the table compressor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think that's an over-simplified interface. There should be immortal (de-)compressors for built-in algorithms. And the needed custom compressors should be known and kept alive by the table reader (shared_ptr). (Are the compressors thread-safe by the way?)
While it might be nice to have a "GetCompressor" that copies a shared_ptr for arbitrary lifetime extension, the one used most would be to get a (de-)compressor reference or pointer that we promise not to use beyond the life of the table reader (which is guaranteed by the table cache to survive pending read operations).
@@ -152,15 +152,16 @@ Status ReadAndParseBlockFromFile( | |||
BlockCreateContext& create_context, bool maybe_compressed, | |||
const UncompressionDict& uncompression_dict, | |||
const PersistentCacheOptions& cache_options, | |||
MemoryAllocator* memory_allocator, bool for_compaction, bool async_read) { | |||
MemoryAllocator* memory_allocator, bool for_compaction, | |||
const std::shared_ptr<Compressor>& compressor, bool async_read) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider const shared_ptr<X>&
parameters to be an anti-pattern. It prevents move semantics where it might be useful, and is non-committal about intention to save a reference. I suggest deciding between const X&
and plain shared_ptr<X>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the "mixed message" of passing const shared_ptr&. I'll review all occurrences and verify if they're justified (e.g., if a copy of the shared_ptr is eventually made).
UncompressionDict& operator=(const CompressionDict&) = delete; | ||
}; | ||
|
||
// Interface for each compression algorithm to implement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the thread safety expectations?
By the way, we are finally very interested in this line of features. Thanks for working on it! |
That's great, thank you! I'll work on the items above and let's continue the review. I'll also rebase, as I see some new conflicts. |
Hi. We are planning to resume the pluggable compression work covered by this PR and PR #6717. We'd like to confirm there is still interest in driving this forward. If so, we will rebase on the latest version, address the feedback above, and coordinate to review when ready. Please let us know. Looking forward to completing this feature and making it available in RocksDB! |
e267425
to
494b2ac
Compare
@lucagiac81 has updated the pull request. You must reimport the pull request before landing. |
This PR refactors compression by introducing a Compressor interface.
This is a first step towards supporting compressors as plugins. PR #6717 covers the next step.
Compressor class
The Compressor class defines an interface for each compression algorithm to implement. It is a Customizable class, like other extensible components in RocksDB.
A Compressor has
Streaming compression is not included in the Compressor class yet. The plan is to cover that in a separate PR.
Built-in compressors
The existing compression algorithms (referred to as "built-in") are encapsulated in Compressor classes. The classes inherit from BuiltinCompressor, which includes functionality shared by all built-in compressors.
Built-in compressors can be referenced by their numeric id (as defined by the CompressionType enum) to ensure backward compatibility. BuiltinCompressor uses the existing CompressionOptions struct as its configurable options.
Compressor lifecycle
For this PR, compression options exposed in the public API are unchanged (exposing Compressor in the public API and options is the scope of PR #6717).