Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compression support in GetMsgPackedValueAsString #327

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
28 changes: 24 additions & 4 deletions keyvi/include/keyvi/compression/compression_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#ifndef KEYVI_COMPRESSION_COMPRESSION_SELECTOR_H_
#define KEYVI_COMPRESSION_COMPRESSION_SELECTOR_H_

#include <memory>
#include <string>

#include <boost/algorithm/string.hpp>
Expand Down Expand Up @@ -61,8 +62,8 @@ inline CompressionStrategy* compression_strategy(const std::string& name = "") {
typedef std::string (*decompress_func_t)(const std::string&);
typedef void (CompressionStrategy::*compress_mem_fn_t)(buffer_t*, const char*, size_t);

inline decompress_func_t decompressor_by_code(const std::string& s) {
switch (s[0]) {
inline decompress_func_t decompressor_by_code(const CompressionAlgorithm algorithm) {
switch (algorithm) {
case NO_COMPRESSION:
TRACE("unpack uncompressed string");
return RawCompressionStrategy::DoDecompress;
Expand All @@ -73,8 +74,27 @@ inline decompress_func_t decompressor_by_code(const std::string& s) {
TRACE("unpack snappy compressed string");
return SnappyCompressionStrategy::DoDecompress;
default:
throw std::invalid_argument("Invalid compression code " +
boost::lexical_cast<std::string>(static_cast<int>(s[0])));
throw std::invalid_argument("Invalid compression algorithm " +
boost::lexical_cast<std::string>(static_cast<int>(algorithm)));
}
}

inline decompress_func_t decompressor_from_string(const std::string& s) {
return decompressor_by_code(static_cast<CompressionAlgorithm>(s[0]));
}

/** Returns an instance of a compression strategy by enum. */
inline compression_strategy_t compression_strategy_by_code(const CompressionAlgorithm algorithm) {
switch (algorithm) {
case NO_COMPRESSION:
return std::make_unique<RawCompressionStrategy>();
case ZLIB_COMPRESSION:
return std::make_unique<ZlibCompressionStrategy>();
case SNAPPY_COMPRESSION:
return std::make_unique<SnappyCompressionStrategy>();
default:
throw std::invalid_argument("Invalid compression algorithm " +
boost::lexical_cast<std::string>(static_cast<int>(algorithm)));
}
}

Expand Down
17 changes: 10 additions & 7 deletions keyvi/include/keyvi/compression/compression_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
#define KEYVI_COMPRESSION_COMPRESSION_STRATEGY_H_

#include <cstring>
#include <memory>
#include <string>
#include <vector>

namespace keyvi {
namespace compression {

enum CompressionCode {
enum CompressionAlgorithm {
NO_COMPRESSION = 0,
ZLIB_COMPRESSION = 1,
SNAPPY_COMPRESSION = 2,
Expand Down Expand Up @@ -61,6 +62,12 @@ struct CompressionStrategy {
return std::string(buf.data(), buf.size());
}

inline std::string CompressWithoutHeader(const std::string& raw) {
buffer_t buf;
Compress(&buf, raw.data(), raw.size());
return std::string(buf.data() + 1, buf.size() - 1);
}

/**
* By the time this function is called, the length field added in Compress()
* will have been removed.
Expand All @@ -71,6 +78,8 @@ struct CompressionStrategy {
virtual std::string name() const = 0;
};

using compression_strategy_t = std::unique_ptr<CompressionStrategy>;

/**
* A compression strategy that does almost nothing; i.e. it only adds
* the length field.
Expand All @@ -84,12 +93,6 @@ struct RawCompressionStrategy final : public CompressionStrategy {
std::memcpy(buffer->data() + 1, raw, raw_size);
}

static inline std::string DoCompress(const char* raw, size_t raw_size) {
buffer_t buf;
DoCompress(&buf, raw, raw_size);
return std::string(buf.data(), buf.size());
}

inline std::string Decompress(const std::string& compressed) { return DoDecompress(compressed); }

static inline std::string DoDecompress(const std::string& compressed) { return compressed.substr(1); }
Expand Down
6 changes: 0 additions & 6 deletions keyvi/include/keyvi/compression/snappy_compression_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ struct SnappyCompressionStrategy final : public CompressionStrategy {
buffer->resize(output_length + 1);
}

static inline std::string DoCompress(const char* raw, size_t raw_size) {
buffer_t buf;
DoCompress(&buf, raw, raw_size);
return std::string(buf.data(), buf.size());
}

inline std::string Decompress(const std::string& compressed) { return DoDecompress(compressed); }

static std::string DoDecompress(const std::string& compressed) {
Expand Down
7 changes: 7 additions & 0 deletions keyvi/include/keyvi/dictionary/fsa/automata.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ class Automata final {
return value_store_reader_->GetRawValueAsString(state_value);
}

std::string GetMsgPackedValueAsString(uint64_t state_value,
const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
assert(value_store_reader_);
return value_store_reader_->GetMsgPackedValueAsString(state_value, compression_algorithm);
}

std::string GetStatistics() const {
return dictionary_properties_->GetStatistics();
}
Expand Down
29 changes: 27 additions & 2 deletions keyvi/include/keyvi/dictionary/fsa/internal/ivalue_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <boost/interprocess/mapped_region.hpp>
#include <boost/variant.hpp>

#include "keyvi/compression/compression_selector.h"
#include "keyvi/dictionary/dictionary_merger_fwd.h"
#include "keyvi/dictionary/fsa/internal/value_store_properties.h"
#include "keyvi/dictionary/fsa/internal/value_store_types.h"
Expand Down Expand Up @@ -109,15 +110,39 @@ class IValueStoreReader {
* Get Value as string in raw format
*
* Note: The raw format is an implementation detail of keyvi, not an official binary interface.
* Value store implementers can override this method for performance reasons.
* Value store implementers can override this method with an optimized version.
*
* @param fsa_value
* @return the value as string without any decompression
* @return the value as binary encoded string
*/
virtual std::string GetRawValueAsString(uint64_t fsa_value) const {
return keyvi::util::EncodeJsonValue(GetValueAsString(fsa_value));
}

/**
* Get Value as msgpack string
*
* Value store implementers can override this method with an optimized version.
*
* @param fsa_value
* @return the value as msgpack string
*/
virtual std::string GetMsgPackedValueAsString(uint64_t fsa_value,
const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
const std::string msgpacked_value = keyvi::util::JsonStringToMsgPack(GetValueAsString(fsa_value));

if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
return msgpacked_value;
}

// compress the value
const compression::compression_strategy_t compressor =
compression::compression_strategy_by_code(compression_algorithm);

return compressor->CompressWithoutHeader(msgpacked_value);
}

/**
* Get Value as string (for dumping or communication)
*
Expand Down
29 changes: 29 additions & 0 deletions keyvi/include/keyvi/dictionary/fsa/internal/json_value_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,35 @@ class JsonValueStoreReader final : public IValueStoreReader {
return keyvi::util::decodeVarIntString(strings_ + fsa_value);
}

std::string GetMsgPackedValueAsString(uint64_t fsa_value,
const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const override {
size_t value_size;
const char* value_ptr = keyvi::util::decodeVarIntString(strings_ + fsa_value, &value_size);

if (value_size == 0) {
return std::string();
}

if (value_ptr[0] == compression_algorithm) {
return std::string(value_ptr + 1, value_size - 1);
}

// decompress
const compression::decompress_func_t decompressor =
compression::decompressor_by_code(static_cast<compression::CompressionAlgorithm>(value_ptr[0]));
std::string msgpacked_value = decompressor(std::string(value_ptr, value_size));

if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
return msgpacked_value;
}
// compress
const compression::compression_strategy_t compressor =
compression::compression_strategy_by_code(compression_algorithm);

return compressor->CompressWithoutHeader(msgpacked_value);
}

std::string GetValueAsString(uint64_t fsa_value) const override {
TRACE("JsonValueStoreReader GetValueAsString");
std::string packed_string = keyvi::util::decodeVarIntString(strings_ + fsa_value);
Expand Down
33 changes: 27 additions & 6 deletions keyvi/include/keyvi/dictionary/match.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <boost/container/flat_map.hpp>
#include <boost/variant.hpp>

#include "keyvi/compression/compression_strategy.h"
#include "keyvi/dictionary/fsa/automata.h"
#include "keyvi/util/json_value.h"

Expand Down Expand Up @@ -196,13 +197,33 @@ struct Match {
return fsa_->GetRawValueAsString(state_);
}

std::string GetMsgPackedValueAsString() const {
const std::string raw_value = GetRawValueAsString();
if (raw_value.empty()) {
return raw_value;
std::string GetMsgPackedValueAsString(const compression::CompressionAlgorithm compression_algorithm =
compression::CompressionAlgorithm::NO_COMPRESSION) const {
if (!fsa_) {
if (raw_value_.empty()) {
return raw_value_;
}

if (raw_value_[0] == compression_algorithm) {
return raw_value_.substr(1);
}

// decompress
const compression::decompress_func_t decompressor =
compression::decompressor_by_code(static_cast<compression::CompressionAlgorithm>(raw_value_[0]));
std::string msgpacked_value = decompressor(raw_value_);

if (compression_algorithm == compression::CompressionAlgorithm::NO_COMPRESSION) {
return msgpacked_value;
}
// compress
const compression::compression_strategy_t compressor =
compression::compression_strategy_by_code(compression_algorithm);

return compressor->CompressWithoutHeader(msgpacked_value);
}
const compression::decompress_func_t decompressor = compression::decompressor_by_code(raw_value);
return decompressor(raw_value);

return fsa_->GetMsgPackedValueAsString(state_, compression_algorithm);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion keyvi/include/keyvi/util/float_vector_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace keyvi {
namespace util {

inline std::vector<float> DecodeFloatVector(const std::string& encoded_value) {
compression::decompress_func_t decompressor = compression::decompressor_by_code(encoded_value);
compression::decompress_func_t decompressor = compression::decompressor_from_string(encoded_value);
std::string unompressed_string_value = decompressor(encoded_value);

const size_t vector_size = unompressed_string_value.size() / sizeof(uint32_t);
Expand Down
14 changes: 2 additions & 12 deletions keyvi/include/keyvi/util/json_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace util {

/** Decompresses (if needed) and decodes a json value stored in a JsonValueStore. */
inline std::string DecodeJsonValue(const std::string& encoded_value) {
compression::decompress_func_t decompressor = compression::decompressor_by_code(encoded_value);
compression::decompress_func_t decompressor = compression::decompressor_from_string(encoded_value);
std::string packed_string = decompressor(encoded_value);
TRACE("unpacking %s", packed_string.c_str());

Expand All @@ -64,17 +64,7 @@ inline void EncodeJsonValue(std::function<void(compression::buffer_t*, const cha
size_t compression_threshold = 32) {
msgpack_buffer->clear();

rapidjson::Document json_document;
json_document.Parse<rapidjson::kParseNanAndInfFlag>(raw_value.c_str());

if (!json_document.HasParseError()) {
TRACE("Got json");
msgpack::packer<msgpack::sbuffer> packer(msgpack_buffer);
JsonToMsgPack(json_document, &packer, single_precision_float);
} else {
TRACE("Got a normal string");
msgpack::pack(msgpack_buffer, raw_value);
}
JsonStringToMsgPack(raw_value, msgpack_buffer, single_precision_float);
// compression
if (msgpack_buffer->size() > compression_threshold) {
long_compress(buffer, msgpack_buffer->data(), msgpack_buffer->size());
Expand Down
23 changes: 23 additions & 0 deletions keyvi/include/keyvi/util/msgpack_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#ifndef KEYVI_UTIL_MSGPACK_UTIL_H_
#define KEYVI_UTIL_MSGPACK_UTIL_H_
#include <limits>
#include <string>

#include "msgpack.hpp"
#include "rapidjson/document.h"
Expand Down Expand Up @@ -147,6 +148,28 @@ inline void MsgPackDump(Writer* writer, const msgpack::object& o) {
}
}

inline void JsonStringToMsgPack(const std::string& raw_value, msgpack::v1::sbuffer* msgpack_buffer,
bool single_precision_float) {
rapidjson::Document json_document;
json_document.Parse<rapidjson::kParseNanAndInfFlag>(raw_value.c_str());

if (!json_document.HasParseError()) {
TRACE("Got json");
msgpack::packer<msgpack::sbuffer> packer(msgpack_buffer);
JsonToMsgPack(json_document, &packer, single_precision_float);
} else {
TRACE("Got a normal string");
msgpack::pack(msgpack_buffer, raw_value);
}
}

inline std::string JsonStringToMsgPack(const std::string& raw_value, bool single_precision_float = false) {
msgpack::sbuffer msgpack_buffer;

JsonStringToMsgPack(raw_value, &msgpack_buffer, single_precision_float);
return std::string(reinterpret_cast<char*>(msgpack_buffer.data()), msgpack_buffer.size());
}

} /* namespace util */
} /* namespace keyvi */

Expand Down
6 changes: 5 additions & 1 deletion python/src/addons/Match.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@

def GetRawValueAsString(self, *args):
"""deprecated, use get_raw_value_as_string"""
return call_deprecated_method("GetRawValueAsString", "raw_value_as_string", self.raw_value_as_string, *args)
return call_deprecated_method("GetRawValueAsString", "dumps", self.dumps, *args)

def raw_value_as_string(self, *args):
"""deprecated, use get_raw_value_as_string"""
return call_deprecated_method("raw_value_as_string", "dumps", self.dumps, *args)

def __bool__(self):
return not self.inst.get().IsEmpty()
Expand Down
5 changes: 5 additions & 0 deletions python/src/pxds/compression.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
cdef extern from "keyvi/compression/compression_strategy.h" namespace "keyvi::compression":
ctypedef enum CompressionAlgorithm:
NO_COMPRESSION,
ZLIB_COMPRESSION,
SNAPPY_COMPRESSION
4 changes: 3 additions & 1 deletion python/src/pxds/match.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from libcpp.string cimport string as libcpp_utf8_string
from libcpp.string cimport string as libcpp_utf8_output_string
from libcpp cimport bool
from cpython.ref cimport PyObject
from compression cimport CompressionAlgorithm

cdef extern from "keyvi/dictionary/match.h" namespace "keyvi::dictionary":
cdef cppclass Match:
Expand All @@ -20,7 +21,8 @@ cdef extern from "keyvi/dictionary/match.h" namespace "keyvi::dictionary":
PyObject* GetAttributePy(libcpp_utf8_string) except + nogil # wrap-ignore
libcpp_utf8_output_string GetValueAsString() except + # wrap-as:value_as_string
libcpp_string GetRawValueAsString() except + # wrap-as:raw_value_as_string
libcpp_string GetMsgPackedValueAsString() except + # wrap-ignore
libcpp_string GetMsgPackedValueAsString() except + # wrap-as:msgpacked_value_as_string
libcpp_string GetMsgPackedValueAsString(CompressionAlgorithm) except + # wrap-as:msgpacked_value_as_string
void SetRawValue(libcpp_utf8_string) except + # wrap-ignore
void SetAttribute(libcpp_utf8_string, libcpp_utf8_string) except + # wrap-ignore
void SetAttribute(libcpp_utf8_string, float) except + # wrap-ignore
Expand Down
2 changes: 1 addition & 1 deletion python/src/py/keyvi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
from keyvi._version import __version__

# global keyvi concepts
from keyvi._core import MatchIterator, Match, loading_strategy_types
from keyvi._core import MatchIterator, Match, loading_strategy_types, CompressionAlgorithm
Loading
Loading