diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 12cbcf20affa4..683a5ab735aed 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1468,42 +1468,43 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // which case we call back to the dense write path) std::shared_ptr<::arrow::Array> preserved_dictionary_; - int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, + int64_t WriteLevels(int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { + // Update histograms now, to maximize cache efficiency. + UpdateLevelHistogram(num_levels, def_levels, rep_levels); + int64_t values_to_write = 0; // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { - for (int64_t i = 0; i < num_values; ++i) { + for (int64_t i = 0; i < num_levels; ++i) { if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; } } - WriteDefinitionLevels(num_values, def_levels); + WriteDefinitionLevels(num_levels, def_levels); } else { // Required field, write all values - values_to_write = num_values; + values_to_write = num_levels; } // Not present for non-repeated fields if (descr_->max_repetition_level() > 0) { // A row could include more than one value // Count the occasions where we start a new row - for (int64_t i = 0; i < num_values; ++i) { + for (int64_t i = 0; i < num_levels; ++i) { if (rep_levels[i] == 0) { rows_written_++; num_buffered_rows_++; } } - WriteRepetitionLevels(num_values, rep_levels); + WriteRepetitionLevels(num_levels, rep_levels); } else { // Each value is exactly one row - rows_written_ += num_values; - num_buffered_rows_ += num_values; + rows_written_ += num_levels; + num_buffered_rows_ += num_levels; } - - UpdateLevelHistogram(num_values, def_levels, rep_levels); return values_to_write; } @@ -1575,6 +1576,9 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { + // Update histograms now, to maximize cache efficiency. + UpdateLevelHistogram(num_levels, def_levels, rep_levels); + // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { WriteDefinitionLevels(num_levels, def_levels); @@ -1595,8 +1599,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_levels; num_buffered_rows_ += num_levels; } - - UpdateLevelHistogram(num_levels, def_levels, rep_levels); } void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels, @@ -1606,26 +1608,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } auto add_levels = [](std::vector& level_histogram, - ::arrow::util::span levels) { - for (int16_t level : levels) { - ARROW_DCHECK_LT(level, static_cast(level_histogram.size())); - ++level_histogram[level]; - } + ::arrow::util::span levels, int16_t max_level) { + ARROW_DCHECK_EQ(static_cast(max_level) + 1, level_histogram.size()); + ::parquet::UpdateLevelHistogram(levels, level_histogram); }; - if (descr_->max_definition_level() > 0) { - add_levels(page_size_statistics_->definition_level_histogram, - {def_levels, static_cast(num_levels)}); - } else { - page_size_statistics_->definition_level_histogram[0] += num_levels; - } - - if (descr_->max_repetition_level() > 0) { - add_levels(page_size_statistics_->repetition_level_histogram, - {rep_levels, static_cast(num_levels)}); - } else { - page_size_statistics_->repetition_level_histogram[0] += num_levels; - } + add_levels(page_size_statistics_->definition_level_histogram, + {def_levels, static_cast(num_levels)}, + descr_->max_definition_level()); + add_levels(page_size_statistics_->repetition_level_histogram, + {rep_levels, static_cast(num_levels)}, + descr_->max_repetition_level()); } // Update the unencoded data bytes for ByteArray only per the specification. diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc index a02cef7aba46f..7292f9222a684 100644 --- a/cpp/src/parquet/size_statistics.cc +++ b/cpp/src/parquet/size_statistics.cc @@ -18,6 +18,9 @@ #include "parquet/size_statistics.h" #include +#include +#include +#include #include "arrow/util/logging.h" #include "parquet/exception.h" @@ -25,6 +28,17 @@ namespace parquet { +namespace { + +void MergeLevelHistogram(::arrow::util::span histogram, + ::arrow::util::span other) { + ARROW_DCHECK_EQ(histogram.size(), other.size()); + std::transform(histogram.begin(), histogram.end(), other.begin(), histogram.begin(), + std::plus<>()); +} + +} // namespace + void SizeStatistics::Merge(const SizeStatistics& other) { if (repetition_level_histogram.size() != other.repetition_level_histogram.size()) { throw ParquetException("Repetition level histogram size mismatch"); @@ -36,12 +50,8 @@ void SizeStatistics::Merge(const SizeStatistics& other) { other.unencoded_byte_array_data_bytes.has_value()) { throw ParquetException("Unencoded byte array data bytes are not consistent"); } - std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(), - other.repetition_level_histogram.begin(), - repetition_level_histogram.begin(), std::plus<>()); - std::transform(definition_level_histogram.begin(), definition_level_histogram.end(), - other.definition_level_histogram.begin(), - definition_level_histogram.begin(), std::plus<>()); + MergeLevelHistogram(repetition_level_histogram, other.repetition_level_histogram); + MergeLevelHistogram(definition_level_histogram, other.definition_level_histogram); if (unencoded_byte_array_data_bytes.has_value()) { unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() + other.unencoded_byte_array_data_bytes.value(); @@ -91,4 +101,88 @@ std::unique_ptr SizeStatistics::Make(const ColumnDescriptor* des return size_stats; } +std::ostream& operator<<(std::ostream& os, const SizeStatistics& size_stats) { + constexpr std::string_view kComma = ", "; + os << "SizeStatistics{"; + std::string_view sep = ""; + if (size_stats.unencoded_byte_array_data_bytes.has_value()) { + os << "unencoded_byte_array_data_bytes=" + << *size_stats.unencoded_byte_array_data_bytes; + sep = kComma; + } + auto print_histogram = [&](std::string_view name, + const std::vector& histogram) { + if (!histogram.empty()) { + os << sep << name << "={"; + sep = kComma; + std::string_view value_sep = ""; + for (int64_t v : histogram) { + os << value_sep << v; + value_sep = kComma; + } + os << "}"; + } + }; + print_histogram("repetition_level_histogram", size_stats.repetition_level_histogram); + print_histogram("definition_level_histogram", size_stats.definition_level_histogram); + os << "}"; + return os; +} + +void UpdateLevelHistogram(::arrow::util::span levels, + ::arrow::util::span histogram) { + const int64_t num_levels = static_cast(levels.size()); + DCHECK_GE(histogram.size(), 1); + const int16_t max_level = static_cast(histogram.size() - 1); + if (max_level == 0) { + histogram[0] += num_levels; + return; + } + +#ifndef NDEBUG + for (auto level : levels) { + ARROW_DCHECK_LE(level, max_level); + } +#endif + + if (max_level == 1) { + // Specialize the common case for non-repeated non-nested columns. + // Summing the levels gives us the number of 1s, and the number of 0s follows. + // We do repeated sums in the int16_t space, which the compiler is likely + // to vectorize efficiently. + constexpr int64_t kChunkSize = 1 << 14; // to avoid int16_t overflows + int64_t hist1 = 0; + auto it = levels.begin(); + while (it != levels.end()) { + const auto chunk_size = std::min(levels.end() - it, kChunkSize); + hist1 += std::accumulate(levels.begin(), levels.begin() + chunk_size, int16_t{0}); + it += chunk_size; + } + histogram[0] += num_levels - hist1; + histogram[1] += hist1; + return; + } + + // The generic implementation issues a series of histogram load-stores. + // However, it limits store-to-load dependencies by interleaving partial histogram + // updates. + constexpr int kUnroll = 4; + std::array, kUnroll> partial_hist; + for (auto& hist : partial_hist) { + hist.assign(histogram.size(), 0); + } + int64_t i = 0; + for (; i <= num_levels - kUnroll; i += kUnroll) { + for (int j = 0; j < kUnroll; ++j) { + ++partial_hist[j][levels[i + j]]; + } + } + for (; i < num_levels; ++i) { + ++partial_hist[0][levels[i]]; + } + for (const auto& hist : partial_hist) { + MergeLevelHistogram(histogram, hist); + } +} + } // namespace parquet diff --git a/cpp/src/parquet/size_statistics.h b/cpp/src/parquet/size_statistics.h index c25e70ee36d8a..ec79b8c4f8b8c 100644 --- a/cpp/src/parquet/size_statistics.h +++ b/cpp/src/parquet/size_statistics.h @@ -17,9 +17,12 @@ #pragma once +#include +#include #include #include +#include "arrow/util/span.h" #include "parquet/platform.h" #include "parquet/type_fwd.h" @@ -89,4 +92,11 @@ struct PARQUET_EXPORT SizeStatistics { static std::unique_ptr Make(const ColumnDescriptor* descr); }; +PARQUET_EXPORT +std::ostream& operator<<(std::ostream&, const SizeStatistics&); + +PARQUET_EXPORT +void UpdateLevelHistogram(::arrow::util::span levels, + ::arrow::util::span histogram); + } // namespace parquet diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc index cefd31dce285d..0958ae4dec2ca 100644 --- a/cpp/src/parquet/size_statistics_test.cc +++ b/cpp/src/parquet/size_statistics_test.cc @@ -19,16 +19,14 @@ #include "gtest/gtest.h" #include +#include #include #include "arrow/buffer.h" #include "arrow/table.h" -#include "arrow/testing/builder.h" #include "arrow/testing/gtest_util.h" -#include "arrow/util/bit_util.h" #include "arrow/util/span.h" #include "parquet/arrow/reader.h" -#include "parquet/arrow/reader_internal.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" #include "parquet/column_writer.h" @@ -42,6 +40,29 @@ namespace parquet { +TEST(SizeStatistics, UpdateLevelHistogram) { + { + // max_level = 1 + std::vector histogram(2, 0); + UpdateLevelHistogram(std::vector{0, 1, 1, 1, 0}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(2, 3)); + UpdateLevelHistogram(std::vector{1, 1, 0}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5)); + UpdateLevelHistogram(std::vector{}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5)); + } + { + // max_level > 1 + std::vector histogram(3, 0); + UpdateLevelHistogram(std::vector{0, 1, 2, 2, 0}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(2, 1, 2)); + UpdateLevelHistogram(std::vector{1, 1, 0}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2)); + UpdateLevelHistogram(std::vector{}, histogram); + EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2)); + } +} + TEST(SizeStatistics, ThriftSerDe) { const std::vector kDefLevels = {128, 64, 32, 16}; const std::vector kRepLevels = {100, 80, 60, 40, 20}; @@ -88,13 +109,38 @@ struct PageSizeStatistics { } }; +std::ostream& operator<<(std::ostream& os, const PageSizeStatistics& page_stats) { + constexpr std::string_view kComma = ", "; + os << "PageSizeStatistics{"; + std::string_view sep = ""; + auto print_vector = [&](std::string_view name, const std::vector& values) { + if (!values.empty()) { + os << sep << name << "={"; + sep = kComma; + std::string_view value_sep = ""; + for (int64_t v : values) { + os << value_sep << v; + value_sep = kComma; + } + os << "}"; + } + }; + print_vector("def_levels", page_stats.def_levels); + print_vector("rep_levels", page_stats.rep_levels); + print_vector("byte_array_bytes", page_stats.byte_array_bytes); + os << "}"; + return os; +} + class SizeStatisticsRoundTripTest : public ::testing::Test { public: - void WriteFile(SizeStatisticsLevel level, - const std::shared_ptr<::arrow::Table>& table) { + void WriteFile(SizeStatisticsLevel level, const std::shared_ptr<::arrow::Table>& table, + int max_row_group_length, int page_size, + int write_batch_size = DEFAULT_WRITE_BATCH_SIZE) { auto writer_properties = WriterProperties::Builder() - .max_row_group_length(2) /* every row group has 2 rows */ - ->data_pagesize(1) /* every page has 1 row */ + .max_row_group_length(max_row_group_length) + ->data_pagesize(page_size) + ->write_batch_size(write_batch_size) ->enable_write_page_index() ->enable_statistics() ->set_size_statistics_level(level) @@ -127,6 +173,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); // Read row group size statistics in order. + row_group_stats_.clear(); auto metadata = reader->metadata(); for (int i = 0; i < metadata->num_row_groups(); ++i) { auto row_group_metadata = metadata->RowGroup(i); @@ -138,6 +185,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { } // Read page size statistics in order. + page_stats_.clear(); auto page_index_reader = reader->GetPageIndexReader(); ASSERT_NE(page_index_reader, nullptr); @@ -168,11 +216,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test { } } - void Reset() { - buffer_.reset(); - row_group_stats_.clear(); - page_stats_.clear(); - } + void Reset() { buffer_.reset(); } protected: std::shared_ptr buffer_; @@ -187,7 +231,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) { ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))), ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))), }); - // First two rows are in one row group, and the other two rows are in another row group. + // First two rows will be in one row group, and the other two rows in another row group. auto table = ::arrow::TableFromJSON(schema, {R"([ [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ], [ [[0,1,null]], [["foo","bar",null]] ], @@ -198,7 +242,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) { for (auto size_stats_level : {SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk, SizeStatisticsLevel::PageAndColumnChunk}) { - WriteFile(size_stats_level, table); + WriteFile(size_stats_level, table, /*max_row_group_length=*/2, /*page_size=*/1); ReadSizeStatistics(); if (size_stats_level == SizeStatisticsLevel::None) { @@ -251,8 +295,8 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) { {::arrow::field("a", ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()))}); WriteFile( SizeStatisticsLevel::PageAndColumnChunk, - ::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"})); - + ::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}), + /*max_row_group_length=*/2, /*page_size=*/1); ReadSizeStatistics(); EXPECT_THAT(row_group_stats_, ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2}, @@ -276,4 +320,60 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) { /*byte_array_bytes=*/{4}})); } +TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) { + // Rep/def level histograms are updates in batches of `write_batch_size` levels + // inside a single page. Exercise the logic with more than one batch per page. + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::list(::arrow::utf8()))}); + auto table = ::arrow::TableFromJSON(schema, {R"([ + [ [null,"a","ab"] ], + [ null ], + [ [] ], + [ [null,"d","de"] ], + [ ["g","gh",null] ], + [ ["j","jk",null] ] + ])"}); + for (int write_batch_size : {100, 5, 4, 3, 2, 1}) { + ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size); + WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table, + /*max_row_group_length=*/1000, /*page_size=*/1000, write_batch_size); + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 8}, + /*rep_levels=*/{6, 8}, + /*byte_array_bytes=*/12})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 1, 4, 8}, + /*rep_levels=*/{6, 8}, + /*byte_array_bytes=*/{12}})); + } +} + +TEST_F(SizeStatisticsRoundTripTest, LargePage) { + // When max_level is 1, the levels are summed in 2**30 chunks, exercise this + // by testing with a 90000 rows table; + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())}); + auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([ + [ "a" ], + [ "bc" ], + [ null ] + ])"); + ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches( + ::arrow::RecordBatchVector(30000, seed_batch))); + ASSERT_OK_AND_ASSIGN(table, table->CombineChunks()); + ASSERT_EQ(table->num_rows(), 90000); + + WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table, + /*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30, + /*write_batch_size=*/50000); + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 60000}, + /*rep_levels=*/{90000}, + /*byte_array_bytes=*/90000})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 60000}, + /*rep_levels=*/{90000}, + /*byte_array_bytes=*/{90000}})); +} + } // namespace parquet