Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Compressor interface #584

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

lucagiac81
Copy link
Contributor

@lucagiac81 lucagiac81 commented Jun 26, 2023

This PR refactors compression by introducing a Compressor interface.
This is a first step towards supporting compressors as plugins. PR #585 covers the next step.
It is equivalent to PR7650 in RocksDB.

Compressor class

The Compressor class defines an interface for each compression algorithm to implement. It is a Customizable class, like other extensible components in Speedb.

A Compressor has

  • A unique name
  • Compress/Uncompress methods
  • CreateDictionary method (for algorithms supporting dictionaries): the logic to build/train a dictionary is moved here, so future compressors have the option to customize it if necessary
  • Methods to handle processed/digested dictionaries (implemented by zstd, for example)
  • Options: each Compressor can define the options it supports using the Configurable framework

Built-in compressors

The existing compression algorithms (referred to as "built-in") are encapsulated in Compressor classes. The classes inherit from BuiltinCompressor, which includes functionality shared by all built-in compressors.
Built-in compressors can be referenced by their numeric id (as defined by the CompressionType enum) to ensure backward compatibility. BuiltinCompressor uses the existing CompressionOptions struct as its configurable options.

Compressor lifecycle

For this PR, compression options exposed in the public API are unchanged (exposing Compressor in the public API and options is the scope of PR #585).

  • The CompressionType and CompressionOptions passed through ColumnFamilyOptions and DBOptions are used to instantiate suitable Compressor instances (this is done in MutableCFOptions and ImmutableDBOptions).
  • The Compressor class keeps track of the instances currently in use, so they can be retrieved and reused.
  • Such Compressor instances are then used in other parts of the code in place of CompressionType and CompressionOptions (e.g., in BlockBasedTableBuilder, BlockBasedTable, FlushJob, Compaction, BlobFileBuilder...).
  • The details of the Compressor used for a block-based table are serialized in the Properties block.
  • When opening the SST file, the info in the Properties block is used to instantiate/retrieve a suitable Compressor for the table. If the compression id for a block doesn't match the table-level Compressor, a suitable Compressor is obtained when reading the block.

TODO

Streaming compression is not included in the Compressor class yet. The plan is to cover that in a separate PR.
It could be offered through additional methods in the Compressor class. For example

  • CreateStreamingCompressContext(): returns a context object similar to StreamingCompress
  • StreamingCompress(context,...): compress method takes context and the usual input/output buffers
  • Similar methods for uncompress
  • StreamingSupported(): to query capability of a Compressor

@lucagiac81 lucagiac81 marked this pull request as ready for review June 27, 2023 14:06
Copy link
Contributor

@mrambacher mrambacher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to finish util/compressor class but have done the rest. Looks good so far!

@@ -150,7 +150,7 @@ Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
}

BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
blob_compressor_->GetCompressionType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through the code, I am not sure if blob_compressor_ is always set. Do you know if that is true? Should there be an assert somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob_compressor_ is initialized in BlobFileBuilder's constructor from mutable_cf_options.blob_compressor. The default constructor of MutableCFOptions sets blob_compressor_ to nullptr.
I'm not sure if we can ever run into a situation where blob_compressor_ is left null, but I'll initialize it to NoCompressor in the BlobFileBuilder's constructor to be safe. This better aligns with the previous code initializing to kNoCompression by default.

bool _manual_compaction, const std::string& _trim_ts, double _score,
bool _deletion_compaction, bool l0_files_might_overlap,
CompactionReason _compaction_reason,
uint32_t _output_path_id, std::shared_ptr<Compressor> _compressor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::shared_ptr& ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in next push

@@ -1150,7 +1149,6 @@ TEST_F(DBOptionsTest, ChangeCompression) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kSnappyCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this line was removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion was added again in the second PR (with some changes). I'm moving it back here.

@@ -26,6 +26,7 @@
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "util/autovector.h"
#include "util/string_util.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that it is. Env::GenerateUniqueId calls PutBaseChars in string_util.h. Include changes in other files led to this.


// Abstract algebra teaches us that a finite cyclic group (such as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this code go? Was it moved or is it no longer necessary?

Copy link
Contributor Author

@lucagiac81 lucagiac81 Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It moved to util/compressor.cc. It is part of the default implementation to sample and create a dictionary. The Compressor interface allows plugins to override the methods and customize dictionary creation if desired.

@@ -142,15 +142,16 @@ Status ReadAndParseBlockFromFile(
BlockCreateContext& create_context, bool maybe_compressed,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* memory_allocator, bool for_compaction, bool async_read) {
MemoryAllocator* memory_allocator, bool for_compaction,
std::shared_ptr<Compressor>& compressor, bool async_read) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

///should this be a const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. Fixed.

&rep_->compressor);
if (!s.ok() || rep_->compressor == nullptr) {
ROCKS_LOG_ERROR(rep_->ioptions.logger,
"Compression type not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also set "blocks_maybe_compressed=false"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compressor::CreateFromString failing means that the compressor needed by this table is missing. Instantiating NoCompressor (condition for blocks_maybe_compressed=false) should never fail. This scenario should eventually lead to decompression failing.

@@ -93,8 +93,7 @@ struct TableBuilderOptions {
const ImmutableOptions& _ioptions, const MutableCFOptions& _moptions,
const InternalKeyComparator& _internal_comparator,
const IntTblPropCollectorFactories* _int_tbl_prop_collector_factories,
CompressionType _compression_type,
const CompressionOptions& _compression_opts, uint32_t _column_family_id,
std::shared_ptr<Compressor> _compressor, uint32_t _column_family_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const & ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -56,8 +56,8 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key,
reader_.reset(new RandomAccessFileReader(std::move(file), filename));
uint64_t offset = 0;
uint64_t footer_offset = 0;
CompressionType compression = kNoCompression;
s = DumpBlobLogHeader(&offset, &compression);
std::shared_ptr<Compressor> compressor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be null or a NoCompression compressor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressor is initialized in DumpBlobLogHeader. It takes compression from BlobLogHeader (kNoCompression by default).

@@ -1,11 +1,684 @@
// Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The year here changed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. We introduced compression.cc first in the PRs, but it got later created in RocksDB too, and the copyright date change was missed.

return true;
}
}
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should type be set to Unknown/None or something on error (similar to how result is cleared)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If StringToType returns false, the caller should not rely on the value of type. All uses of StringToType adhere to this expectation (except for a few tests, where the string is known to be a valid name).

@@ -666,55 +282,66 @@ inline std::string CompressionOptionsToString(
return result;
}

#ifdef SNAPPY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the XXX_Compress/Uncompress functions still needed in a header file or can they be local to the source file now? If they cannot be moved to the source file, can you please add a TODO comment to make that move when it is available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functions were left in the header file for ease of review. They should be moved to the source file eventually. Adding a TODO comment.


// Returns a new uncompression dictionary from the input dict.
// Classes which have a ProcessedDict should override this method.
virtual UncompressionDict* NewUncompressionDict(const std::string& dict) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these return a std::unique_ptr? Maybe something that should happen in the next phase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pointer returned by NewUncompressionDict is used to reset a unique_ptr in the two usages I found in the code (BlockCreateContext::Create and BlockBasedTableBuilder::EnterUnbuffered). This is similar to the previous code, where the new operator was used.

const std::string& compression_dict_samples,
const std::vector<size_t>& compression_dict_sample_lens);

private:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would rather these private functions were not in the interface if it can be avoided. Can you explain when/where/why they are used and if there is a way to move them to something local?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These private getters are used to access certain compression options: GetMaxDictBytes, GetMaxTrainBytes, UseDictTrainer, GetLevel. These options are used by some Compressor methods with default implementations (e.g., SampleDict and TrainDict). Different compressors could customize the names of the underlying options, and the getters provide a standard way to access them.
For example, SampleDict and TrainDict provide the default RocksDB dictionary creation logic. Compressors are free to add their custom logic, but I'd expect many compressors to rely on the default implementation.

RocksDB code outside Compressor does not need these options. There are other similar option getters that are public, as other RocksDB code needs to access them (GetMaxDictBufferBytes, GetParallelThreads).

Making these getters private may be a bit too restrictive. We could at least make them protected.

Comment on lines +244 to +246
void Compressor::SampleDict(std::vector<std::string>& data_block_buffers,
std::string* compression_dict_samples,
std::vector<size_t>* compression_dict_sample_lens) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be a static method or somehow otherwise denoted? It does not appear to use anything inside of the Compressor (other than dictionary sizes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SampleDict and TrainDict provide the default RocksDB dictionary creation logic. These methods are used in the default implementation of CreateDict. I split the two phases of dictionary creation as Compressors may want to use to customize only sampling or training (I ran into that scenario). Compressors are free to customize any of these methods if they need custom logic for one or both phases.

An alternative (also covering the private getters in the previous comment) would be to place the default dictionary creation code in a utility class. Compressors would then use the utility class in their CreateDict method to use the default logic.

Comment on lines 301 to 308
dict = ZSTD_TrainDictionary(compression_dict_samples,
compression_dict_sample_lens, max_dict_bytes);
} else {
dict = ZSTD_FinalizeDictionary(compression_dict_samples,
compression_dict_sample_lens,
max_dict_bytes, level);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dictionary training the same for all dictionaries and not dependent on ZSTD?

What happens if built without ZSTD enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ZSTD dictionary trainer is used with other compression algorithms too.

  • ZSTD_TrainDictionary has conditional compilation based on ZSTD library version. If the trainer is not supported, it returns an empty string.
  • There is also a ZSTD_TrainDictionarySupported function with the same library version check. It is used in CheckCompressionSupported. That only checks CompressionOptions, not Compressor options (as a Compressor could define a custom trainer).

Similar comments apply to ZSTD_FinalizeDictionary.

I added a check for ZSTD_TrainDictionarySupported in TrainDict's default implementation. If the Compressor selects to use a trainer (based on GetMaxTrainBytes and UseDictTrainer), it checks if ZSTD_TrainDictionary is supported. If not, it checks if ZSTD_FinalizeDictionary is supported. If neither is supported, it will return back the original samples.

@lucagiac81 lucagiac81 force-pushed the compressor_interface branch from 241d920 to 1def930 Compare August 15, 2023 20:06
mrambacher
mrambacher previously approved these changes Sep 18, 2023
Copy link
Contributor

@mrambacher mrambacher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks for the PR

@ofriedma
Copy link
Contributor

@lucagiac81 please rebase

@udi-speedb
Copy link
Contributor

@ofriedma, @speedbmike - Please review

@lucagiac81 lucagiac81 force-pushed the compressor_interface branch from 439bc30 to d88100c Compare January 11, 2024 16:06
@lucagiac81
Copy link
Contributor Author

lucagiac81 commented Jan 11, 2024

Code is rebased on latest main based RocksDB 8.6.7 (the previous code was based on RocksDB 8.1.1).

One note regarding ZSTDContext

  • ZSTDContext replaces CompressionContext, making it Compressor-specific. A Compressor can manage its own context as needed (only ZSTD currently needs this).
  • In the previous version of the PR, there was one ZSTDContext instance per thread.
  • After RocksDB PR 11666, ZSTDContext depends on the options selected for ZSTDCompressor. So, different ZSTDCompressor instances (with potentially different options) should not share ZSTDContext instances.
  • The latest code has one ZSTDContext instance per thread, but internally a context is created for each instance of ZSTDCompressor. The net effect is to have one context instance per thread per ZSTDCompressor instance.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants