Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compression support in GetMsgPackedValueAsString #327

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
add compression support in GetMsgPackedValueAsString
hendrikmuhs committed Mar 14, 2025
commit e5937a9f0cbd183580d3c4b39377f9f62c64e11a
14 changes: 9 additions & 5 deletions keyvi/include/keyvi/compression/compression_selector.h
Original file line number Diff line number Diff line change
@@ -62,8 +62,8 @@ inline CompressionStrategy* compression_strategy(const std::string& name = "") {
typedef std::string (*decompress_func_t)(const std::string&);
typedef void (CompressionStrategy::*compress_mem_fn_t)(buffer_t*, const char*, size_t);

inline decompress_func_t decompressor_by_code(const std::string& s) {
switch (s[0]) {
inline decompress_func_t decompressor_by_code(const char code) {
switch (code) {
case NO_COMPRESSION:
TRACE("unpack uncompressed string");
return RawCompressionStrategy::DoDecompress;
@@ -75,12 +75,16 @@ inline decompress_func_t decompressor_by_code(const std::string& s) {
return SnappyCompressionStrategy::DoDecompress;
default:
throw std::invalid_argument("Invalid compression code " +
boost::lexical_cast<std::string>(static_cast<int>(s[0])));
boost::lexical_cast<std::string>(static_cast<int>(code)));
}
}

/** Returns an instance of a compression strategy by name. */
inline compression_strategy_t compression_strategy_by_code(const CompressionAlgorithm algorithm) {
inline decompress_func_t decompressor_from_string(const std::string& s) {
return decompressor_by_code(s[0]);
}

/** Returns an instance of a compression strategy by enum. */
inline compression_strategy_t compression_strategy_by_enum(const CompressionAlgorithm algorithm) {
switch (algorithm) {
case NO_COMPRESSION:
return std::make_unique<RawCompressionStrategy>();
6 changes: 6 additions & 0 deletions keyvi/include/keyvi/compression/compression_strategy.h
Original file line number Diff line number Diff line change
@@ -62,6 +62,12 @@ struct CompressionStrategy {
return std::string(buf.data(), buf.size());
}

inline std::string CompressWithoutHeader(const std::string& raw) {
buffer_t buf;
Compress(&buf, raw.data(), raw.size());
return std::string(buf.data() + 1, buf.size() - 1);
}

/**
* By the time this function is called, the length field added in Compress()
* will have been removed.
6 changes: 6 additions & 0 deletions keyvi/include/keyvi/dictionary/fsa/automata.h
Original file line number Diff line number Diff line change
@@ -394,6 +394,12 @@ class Automata final {
return value_store_reader_->GetRawValueAsString(state_value);
}

std::string GetMsgPackedValueAsString(uint64_t state_value, const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
assert(value_store_reader_);
return value_store_reader_->GetMsgPackedValueAsString(state_value, compression_algorithm);
}

std::string GetStatistics() const {
return dictionary_properties_->GetStatistics();
}
29 changes: 27 additions & 2 deletions keyvi/include/keyvi/dictionary/fsa/internal/ivalue_store.h
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
#include <boost/interprocess/mapped_region.hpp>
#include <boost/variant.hpp>

#include "keyvi/compression/compression_selector.h"
#include "keyvi/dictionary/dictionary_merger_fwd.h"
#include "keyvi/dictionary/fsa/internal/value_store_properties.h"
#include "keyvi/dictionary/fsa/internal/value_store_types.h"
@@ -109,15 +110,39 @@ class IValueStoreReader {
* Get Value as string in raw format
*
* Note: The raw format is an implementation detail of keyvi, not an official binary interface.
* Value store implementers can override this method for performance reasons.
* Value store implementers can override this method with an optimized version.
*
* @param fsa_value
* @return the value as string without any decompression
* @return the value as binary encoded string
*/
virtual std::string GetRawValueAsString(uint64_t fsa_value) const {
return keyvi::util::EncodeJsonValue(GetValueAsString(fsa_value));
}

/**
* Get Value as msgpack string
*
* Value store implementers can override this method with an optimized version.
*
* @param fsa_value
* @return the value as msgpack string
*/
virtual std::string GetMsgPackedValueAsString(uint64_t fsa_value,
const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
const std::string msgpacked_value = keyvi::util::JsonStringToMsgPack(GetValueAsString(fsa_value));

if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
return msgpacked_value;
}

// compress the value
const compression::compression_strategy_t compressor =
compression::compression_strategy_by_enum(compression_algorithm);

return compressor->Compress(msgpacked_value);
}

/**
* Get Value as string (for dumping or communication)
*
24 changes: 24 additions & 0 deletions keyvi/include/keyvi/dictionary/fsa/internal/json_value_store.h
Original file line number Diff line number Diff line change
@@ -353,6 +353,30 @@ class JsonValueStoreReader final : public IValueStoreReader {
return keyvi::util::decodeVarIntString(strings_ + fsa_value);
}

std::string GetMsgPackedValueAsString(uint64_t fsa_value,
const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const override {
size_t value_size;
const char* value_ptr = keyvi::util::decodeVarIntString(strings_ + fsa_value, &value_size);

if (value_ptr[0] == compression_algorithm) {
return std::string(value_ptr[1], value_size - 1);
}

// decompress
const compression::decompress_func_t decompressor = compression::decompressor_by_code(value_ptr[0]);
std::string msgpacked_value = decompressor(std::string(value_ptr, value_size));

if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
return msgpacked_value;
}
// compress
const compression::compression_strategy_t compressor =
compression::compression_strategy_by_enum(compression_algorithm);

return compressor->CompressWithoutHeader(msgpacked_value);
}

std::string GetValueAsString(uint64_t fsa_value) const override {
TRACE("JsonValueStoreReader GetValueAsString");
std::string packed_string = keyvi::util::decodeVarIntString(strings_ + fsa_value);
17 changes: 1 addition & 16 deletions keyvi/include/keyvi/dictionary/match.h
Original file line number Diff line number Diff line change
@@ -199,22 +199,7 @@ struct Match {

std::string GetMsgPackedValueAsString(const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
const std::string raw_value = GetRawValueAsString();
if (raw_value.empty()) {
return raw_value;
}

if (raw_value[0] == compression_algorithm) {
return raw_value.substr(1);
} else if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
const compression::decompress_func_t decompressor = compression::decompressor_by_code(raw_value);
return decompressor(raw_value);
}

// todo: recompress
const compression::decompress_func_t decompressor = compression::decompressor_by_code(raw_value);

return decompressor(raw_value);
return fsa_->GetMsgPackedValueAsString(state_, compression_algorithm);
}

/**
2 changes: 1 addition & 1 deletion keyvi/include/keyvi/util/float_vector_value.h
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ namespace keyvi {
namespace util {

inline std::vector<float> DecodeFloatVector(const std::string& encoded_value) {
compression::decompress_func_t decompressor = compression::decompressor_by_code(encoded_value);
compression::decompress_func_t decompressor = compression::decompressor_from_string(encoded_value);
std::string unompressed_string_value = decompressor(encoded_value);

const size_t vector_size = unompressed_string_value.size() / sizeof(uint32_t);
14 changes: 2 additions & 12 deletions keyvi/include/keyvi/util/json_value.h
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ namespace util {

/** Decompresses (if needed) and decodes a json value stored in a JsonValueStore. */
inline std::string DecodeJsonValue(const std::string& encoded_value) {
compression::decompress_func_t decompressor = compression::decompressor_by_code(encoded_value);
compression::decompress_func_t decompressor = compression::decompressor_from_string(encoded_value);
std::string packed_string = decompressor(encoded_value);
TRACE("unpacking %s", packed_string.c_str());

@@ -64,17 +64,7 @@ inline void EncodeJsonValue(std::function<void(compression::buffer_t*, const cha
size_t compression_threshold = 32) {
msgpack_buffer->clear();

rapidjson::Document json_document;
json_document.Parse<rapidjson::kParseNanAndInfFlag>(raw_value.c_str());

if (!json_document.HasParseError()) {
TRACE("Got json");
msgpack::packer<msgpack::sbuffer> packer(msgpack_buffer);
JsonToMsgPack(json_document, &packer, single_precision_float);
} else {
TRACE("Got a normal string");
msgpack::pack(msgpack_buffer, raw_value);
}
JsonStringToMsgPack(raw_value, msgpack_buffer, single_precision_float);
// compression
if (msgpack_buffer->size() > compression_threshold) {
long_compress(buffer, msgpack_buffer->data(), msgpack_buffer->size());
24 changes: 24 additions & 0 deletions keyvi/include/keyvi/util/msgpack_util.h
Original file line number Diff line number Diff line change
@@ -147,6 +147,30 @@ inline void MsgPackDump(Writer* writer, const msgpack::object& o) {
}
}

inline void JsonStringToMsgPack(const std::string& raw_value, msgpack::v1::sbuffer* msgpack_buffer,
bool single_precision_float) {
rapidjson::Document json_document;
json_document.Parse<rapidjson::kParseNanAndInfFlag>(raw_value.c_str());

if (!json_document.HasParseError()) {
TRACE("Got json");
msgpack::packer<msgpack::sbuffer> packer(msgpack_buffer);
JsonToMsgPack(json_document, &packer, single_precision_float);
} else {
TRACE("Got a normal string");
msgpack::pack(msgpack_buffer, raw_value);
}
}

inline std::string JsonStringToMsgPack(const std::string& raw_value, bool single_precision_float = false
) {
msgpack::sbuffer msgpack_buffer;
compression::buffer_t buffer;

JsonStringToMsgPack(raw_value, &msgpack_buffer, single_precision_float);
return std::string(reinterpret_cast<char*>(buffer.data()), buffer.size());
}

} /* namespace util */
} /* namespace keyvi */