Skip to content

Commit

Permalink
Refactor aggregator set data, add statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jan 20, 2025
1 parent 7a641d7 commit f92a5ac
Show file tree
Hide file tree
Showing 27 changed files with 1,139 additions and 234 deletions.
7 changes: 5 additions & 2 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ set(arcticdb_srcs
column_store/key_segment.cpp
column_store/memory_segment_impl.cpp
column_store/memory_segment_impl.cpp
column_store/statistics.hpp
column_store/string_pool.cpp
entity/data_error.cpp
entity/field_collection.cpp
Expand Down Expand Up @@ -913,6 +914,7 @@ if(${TEST})
column_store/test/test_column_data_random_accessor.cpp
column_store/test/test_index_filtering.cpp
column_store/test/test_memory_segment.cpp
column_store/test/test_statistics.cpp
entity/test/test_atom_key.cpp
entity/test/test_key_serialization.cpp
entity/test/test_metrics.cpp
Expand Down Expand Up @@ -941,10 +943,12 @@ if(${TEST})
storage/test/test_storage_factory.cpp
storage/test/test_storage_exceptions.cpp
storage/test/test_azure_storage.cpp
storage/test/common.hpp
storage/test/test_storage_operations.cpp
stream/test/stream_test_common.cpp
stream/test/test_aggregator.cpp
stream/test/test_append_map.cpp
stream/test/test_protobuf_mappings.cpp
stream/test/test_row_builder.cpp
stream/test/test_segment_aggregator.cpp
stream/test/test_types.cpp
Expand Down Expand Up @@ -979,8 +983,7 @@ if(${TEST})
version/test/test_version_map_batch.cpp
version/test/test_version_store.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp
storage/test/common.hpp)
python/python_handlers.cpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ void decode_v2(const Segment& segment,
auto& col = res.column(static_cast<position_t>(*col_index));

data += decode_field(res.field(*col_index).type(), *encoded_field, data, col, col.opt_sparse_map(), hdr.encoding_version());
col.set_statistics(encoded_field->get_statistics());

seg_row_count = std::max(seg_row_count, calculate_last_row(col));
} else {
data += encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic);
Expand Down Expand Up @@ -533,6 +535,7 @@ void decode_v1(const Segment& segment,
hdr.encoding_version()
);
seg_row_count = std::max(seg_row_count, calculate_last_row(col));
col.set_statistics(field.get_statistics());
ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data - begin);
} else {
data += encoding_sizes::field_compressed_size(field);
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/codec/encode_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ namespace arcticdb {
encoded_fields.reserve(encoded_buffer_size, in_mem_seg.num_columns());
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column_data.num_blocks() > 0) {
encoder.encode(codec_opts, column_data, *column_field, *out_buffer, pos);
Expand All @@ -147,6 +148,7 @@ namespace arcticdb {
auto* ndarray = column_field->mutable_ndarray();
ndarray->set_items_count(0);
}
column_field->set_statistics(column.get_statistics());
}
encode_string_pool<EncodingPolicyV1>(in_mem_seg, segment_header, codec_opts, *out_buffer, pos);
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/codec/encode_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,12 @@ static void encode_encoded_fields(
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
write_magic<ColumnMagic>(*out_buffer, pos);
auto column_data = in_mem_seg.column_data(column_index);
const auto& column = in_mem_seg.column(column_index);
auto column_data = column.data();
auto* column_field = encoded_fields.add_field(column_data.num_blocks());
if(column.has_statistics())
column_field->set_statistics(column.get_statistics());

ARCTICDB_TRACE(log::codec(),"Beginning encoding of column {}: ({}) to position {}", column_index, in_mem_seg.descriptor().field(column_index).name(), pos);

if(column_data.num_blocks() > 0) {
Expand Down
11 changes: 10 additions & 1 deletion cpp/arcticdb/codec/encoded_field.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ struct EncodedFieldImpl : public EncodedField {
sizeof(values_count_) +
sizeof(sparse_map_bytes_) +
sizeof(items_count_) +
sizeof(format_);
sizeof(format_) +
sizeof(stats_);

EncodedFieldImpl() = default;

Expand Down Expand Up @@ -366,6 +367,14 @@ struct EncodedFieldImpl : public EncodedField {
sparse_map_bytes_ = bytes;
}

void set_statistics(FieldStats stats) {
stats_ = stats;
}

FieldStats get_statistics() const {
return stats_;
}

EncodedBlock *add_values(EncodingVersion encoding_version) {
const bool old_style = encoding_version == EncodingVersion::V1;
size_t pos;
Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/codec/protobuf_mappings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "arcticdb/storage/memory_layout.hpp"
#include <arcticdb/codec/segment_header.hpp>
#include <arcticdb/codec/protobuf_mappings.hpp>
#include <arcticdb/stream/protobuf_mappings.hpp>
#include <folly/container/Enumerate.h>

namespace arcticdb {
Expand Down Expand Up @@ -78,6 +79,8 @@ void encoded_field_from_proto(const arcticdb::proto::encoding::EncodedField& inp
auto* value_block = output_ndarray->add_values(EncodingVersion::V1);
block_from_proto(input_ndarray.values(i), *value_block, false);
}

output.set_statistics(create_from_proto(input.stats()));
}

void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto::encoding::EncodedField& output) {
Expand All @@ -97,6 +100,8 @@ void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto:
auto* value_block = output_ndarray->add_values();
proto_from_block(input_ndarray.values(i), *value_block);
}

field_stats_to_proto(input.get_statistics(), *output.mutable_stats());
}

size_t num_blocks(const arcticdb::proto::encoding::EncodedField& field) {
Expand Down
80 changes: 80 additions & 0 deletions cpp/arcticdb/codec/test/test_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,86 @@ TEST(Segment, RoundtripTimeseriesDescriptorWriteToBufferV2) {
ASSERT_EQ(decoded, copy);
}

TEST(Segment, RoundtripStatisticsV1) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v1(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v1(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, RoundtripStatisticsV2) {
ScopedConfig reload_interval("Statistics.GenerateOnWrite", 1);
const auto stream_desc = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "int8"),
scalar_field(DataType::FLOAT64, "doubles")
});

SegmentInMemory in_mem_seg{stream_desc.clone()};
constexpr size_t num_rows = 10;
for(auto i = 0UL; i < num_rows; ++i) {
in_mem_seg.set_scalar<uint8_t>(0, static_cast<uint8_t>(i));
in_mem_seg.set_scalar<double>(1, static_cast<double>(i * 2));
in_mem_seg.end_row();
}
in_mem_seg.calculate_statistics();
auto copy = in_mem_seg.clone();
auto seg = encode_v2(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
SegmentInMemory decoded{stream_desc.clone()};
decode_v2(unserialized, unserialized.header(), decoded, unserialized.descriptor());
auto col1_stats = decoded.column(0).get_statistics();
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_max<uint8_t>(), 9);
ASSERT_TRUE(col1_stats.has_max());
ASSERT_EQ(col1_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col1_stats.has_unique());
ASSERT_EQ(col1_stats.get_unique_count(), 10);
auto col2_stats = decoded.column(1).get_statistics();
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_max<double>(), 18.0);
ASSERT_TRUE(col2_stats.has_max());
ASSERT_EQ(col2_stats.get_min<uint8_t>(), 0);
ASSERT_TRUE(col2_stats.has_unique());
ASSERT_EQ(col2_stats.get_unique_count(), 10);
}

TEST(Segment, ColumnNamesProduceDifferentHashes) {
const auto stream_desc_1 = stream_descriptor(StreamId{"thing"}, RowCountIndex{}, {
scalar_field(DataType::UINT8, "ints1"),
Expand Down
14 changes: 14 additions & 0 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <arcticdb/column_store/chunked_buffer.hpp>
#include <arcticdb/column_store/column_data.hpp>
#include <arcticdb/column_store/statistics.hpp>
#include <arcticdb/column_store/column_data_random_accessor.hpp>
#include <arcticdb/entity/native_tensor.hpp>
#include <arcticdb/entity/performance_tracing.hpp>
Expand Down Expand Up @@ -251,6 +252,18 @@ class Column {

bool sparse_permitted() const;

void set_statistics(FieldStatsImpl stats) {
stats_ = stats;
}

bool has_statistics() const {
return stats_.set_;
};

FieldStatsImpl get_statistics() const {
return stats_;
}

void backfill_sparse_map(ssize_t to_row) {
ARCTICDB_TRACE(log::version(), "Backfilling sparse map to position {}", to_row);
// Initialise the optional to an empty bitset if it has not been created yet
Expand Down Expand Up @@ -936,6 +949,7 @@ class Column {
Sparsity allow_sparse_ = Sparsity::NOT_PERMITTED;

std::optional<util::BitMagic> sparse_map_;
FieldStatsImpl stats_;
util::MagicNum<'D', 'C', 'o', 'l'> magic_;
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ class SegmentInMemory {
impl_->reset_timeseries_descriptor();
}

void calculate_statistics() {
impl_->calculate_statistics();
}

[[nodiscard]] size_t num_columns() const { return impl_->num_columns(); }

[[nodiscard]] size_t row_count() const { return impl_->row_count(); }
Expand Down
14 changes: 14 additions & 0 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,20 @@ void SegmentInMemoryImpl::reset_timeseries_descriptor() {
tsd_.reset();
}

void SegmentInMemoryImpl::calculate_statistics() {
for(auto& column : columns_) {
if(column->type().dimension() == Dimension::Dim0) {
const auto type = column->type();
if(is_numeric_type(type.data_type()) || is_sequence_type(type.data_type())) {
type.visit_tag([&column] (auto tdt) {
using TagType = std::decay_t<decltype(tdt)>;
column->set_statistics(generate_column_statistics<TagType>(column->data()));
});
}
}
}
}

void SegmentInMemoryImpl::reset_metadata() {
metadata_.reset();
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ class SegmentInMemoryImpl {

void reset_timeseries_descriptor();

void calculate_statistics();

bool has_user_metadata() {
return tsd_.has_value() && !tsd_->proto_is_null() && tsd_->proto().has_user_meta();
}
Expand Down
Loading

0 comments on commit f92a5ac

Please sign in to comment.