diff --git a/cpp/include/cudf/io/orc_metadata.hpp b/cpp/include/cudf/io/orc_metadata.hpp index 3c6194bb721..a9045d460b3 100644 --- a/cpp/include/cudf/io/orc_metadata.hpp +++ b/cpp/include/cudf/io/orc_metadata.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -171,12 +171,12 @@ using statistics_type = std::variant; //! Orc I/O interfaces -namespace orc { +namespace orc::detail { // forward declare the type that ProtobufReader uses. The `cudf::io::column_statistics` objects, // returned from `read_parsed_orc_statistics`, are constructed from -// `cudf::io::orc::column_statistics` objects that `ProtobufReader` initializes. +// `cudf::io::orc::detail::column_statistics` objects that `ProtobufReader` initializes. struct column_statistics; -} // namespace orc +} // namespace orc::detail /** * @brief Contains per-column ORC statistics. @@ -194,7 +194,7 @@ struct column_statistics { * * @param detail_statistics The statistics to initialize the object with */ - column_statistics(orc::column_statistics&& detail_statistics); + column_statistics(orc::detail::column_statistics&& detail_statistics); }; /** diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 69fd4068712..0d5bb8ac191 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -292,7 +292,7 @@ raw_orc_statistics read_raw_orc_statistics(source_info const& src_info, CUDF_FAIL("Unsupported source type"); } - orc::metadata const metadata(source.get(), stream); + orc::detail::metadata const metadata(source.get(), stream); // Initialize statistics to return raw_orc_statistics result; @@ -318,7 +318,7 @@ raw_orc_statistics read_raw_orc_statistics(source_info const& src_info, return result; } -column_statistics::column_statistics(orc::column_statistics&& cs) +column_statistics::column_statistics(orc::detail::column_statistics&& cs) { number_of_values = cs.number_of_values; has_null = cs.has_null; @@ -350,9 +350,9 @@ parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info, result.column_names = raw_stats.column_names; auto parse_column_statistics = [](auto const& raw_col_stats) { - orc::column_statistics stats_internal; - orc::ProtobufReader(reinterpret_cast(raw_col_stats.c_str()), - raw_col_stats.size()) + orc::detail::column_statistics stats_internal; + orc::detail::ProtobufReader(reinterpret_cast(raw_col_stats.c_str()), + raw_col_stats.size()) .read(stats_internal); return column_statistics(std::move(stats_internal)); }; @@ -373,7 +373,7 @@ parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info, return result; } namespace { -orc_column_schema make_orc_column_schema(host_span orc_schema, +orc_column_schema make_orc_column_schema(host_span orc_schema, uint32_t column_id, std::string column_name) { @@ -400,7 +400,7 @@ orc_metadata read_orc_metadata(source_info const& src_info, rmm::cuda_stream_vie auto sources = make_datasources(src_info); CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - auto const footer = orc::metadata(sources.front().get(), stream).ff; + auto const footer = orc::detail::metadata(sources.front().get(), stream).ff; return {{make_orc_column_schema(footer.types, 0, "")}, footer.numberOfRows, diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index aeaa87e2202..be3c90a3e24 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -24,7 +24,7 @@ #include -namespace cudf::io::orc::gpu { +namespace cudf::io::orc::detail { /** * @brief Counts the number of characters in each rowgroup of each string column. @@ -266,4 +266,4 @@ void get_dictionary_indices(device_2dspan dictionaries, <<>>(dictionaries, columns); } -} // namespace cudf::io::orc::gpu +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 7046b3b3f91..7ae32f3e8f8 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ #include "orc.hpp" +#include "io/comp/io_uncomp.hpp" #include "orc_field_reader.hpp" #include "orc_field_writer.hpp" @@ -25,7 +26,7 @@ #include -namespace cudf::io::orc { +namespace cudf::io::orc::detail { namespace { [[nodiscard]] constexpr uint32_t varint_size(uint64_t val) @@ -496,7 +497,7 @@ metadata::metadata(datasource* const src, rmm::cuda_stream_view stream) : source buffer = source->host_read(len - ps_length - 1 - ps.footerLength - ps.metadataLength, ps.metadataLength); auto const md_data = decompressor->decompress_blocks({buffer->data(), buffer->size()}, stream); - orc::ProtobufReader(md_data.data(), md_data.size()).read(md); + ProtobufReader(md_data.data(), md_data.size()).read(md); init_parent_descriptors(); init_column_names(); @@ -546,4 +547,4 @@ void metadata::init_parent_descriptors() } } -} // namespace cudf::io::orc +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 8dccf65ef10..49652c9a0d2 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -16,8 +16,6 @@ #pragma once -#include "io/comp/io_uncomp.hpp" - #include #include #include @@ -34,9 +32,7 @@ #include #include -namespace cudf { -namespace io { -namespace orc { +namespace cudf::io::orc::detail { static constexpr uint32_t block_header_size = 3; // Seconds from January 1st, 1970 to January 1st, 2015 @@ -710,6 +706,4 @@ struct rowgroup_rows { [[nodiscard]] CUDF_HOST_DEVICE constexpr auto size() const noexcept { return end - begin; } }; -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/orc_field_reader.hpp b/cpp/src/io/orc/orc_field_reader.hpp index 3689e4d958b..797db239538 100644 --- a/cpp/src/io/orc/orc_field_reader.hpp +++ b/cpp/src/io/orc/orc_field_reader.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ #include "orc.hpp" -#include +#include /** * @file orc_field_reader.hpp @@ -25,9 +25,7 @@ * ProtobufReader::read(...) functions */ -namespace cudf { -namespace io { -namespace orc { +namespace cudf::io::orc::detail { /** * @brief Functor to run an operator for a specified field. @@ -90,6 +88,4 @@ inline void ProtobufReader::function_builder(T& s, size_t maxlen, std::tuple @@ -33,10 +32,7 @@ #include -namespace cudf { -namespace io { -namespace orc { -namespace gpu { +namespace cudf::io::orc::detail { using cudf::detail::device_2dspan; using cudf::detail::host_2dspan; @@ -65,9 +61,7 @@ auto constexpr VALUE_SENTINEL = size_type{-1}; struct CompressedStreamInfo { CompressedStreamInfo() = default; explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_) - : compressed_data(compressed_data_), - uncompressed_data(nullptr), - compressed_data_size(compressed_size_) + : compressed_data(compressed_data_), compressed_data_size(compressed_size_) { } uint8_t const* compressed_data{}; // [in] base ptr to compressed stream data @@ -500,7 +494,4 @@ void reduce_pushdown_masks(device_span orc_columns device_2dspan set_counts, rmm::cuda_stream_view stream); -} // namespace gpu -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/reader_impl_chunking.cu b/cpp/src/io/orc/reader_impl_chunking.cu index 726c79bd004..f19fb3c81d8 100644 --- a/cpp/src/io/orc/reader_impl_chunking.cu +++ b/cpp/src/io/orc/reader_impl_chunking.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ * limitations under the License. */ -#include "io/comp/gpuinflate.hpp" #include "io/orc/reader_impl.hpp" #include "io/orc/reader_impl_chunking.hpp" #include "io/orc/reader_impl_helpers.hpp" @@ -40,16 +39,16 @@ namespace cudf::io::orc::detail { std::size_t gather_stream_info_and_column_desc( std::size_t stripe_id, std::size_t level, - orc::StripeInformation const* stripeinfo, - orc::StripeFooter const* stripefooter, + StripeInformation const* stripeinfo, + StripeFooter const* stripefooter, host_span orc2gdf, - host_span types, + host_span types, bool use_index, bool apply_struct_map, int64_t* num_dictionary_entries, std::size_t* local_stream_order, std::vector* stream_info, - cudf::detail::hostdevice_2dvector* chunks) + cudf::detail::hostdevice_2dvector* chunks) { CUDF_EXPECTS((stream_info == nullptr) ^ (chunks == nullptr), "Either stream_info or chunks must be provided, but not both."); @@ -57,17 +56,17 @@ std::size_t gather_stream_info_and_column_desc( std::size_t src_offset = 0; std::size_t dst_offset = 0; - auto const get_stream_index_type = [](orc::StreamKind kind) { + auto const get_stream_index_type = [](StreamKind kind) { switch (kind) { - case orc::DATA: return gpu::CI_DATA; - case orc::LENGTH: - case orc::SECONDARY: return gpu::CI_DATA2; - case orc::DICTIONARY_DATA: return gpu::CI_DICTIONARY; - case orc::PRESENT: return gpu::CI_PRESENT; - case orc::ROW_INDEX: return gpu::CI_INDEX; + case DATA: return CI_DATA; + case LENGTH: + case SECONDARY: return CI_DATA2; + case DICTIONARY_DATA: return CI_DICTIONARY; + case PRESENT: return CI_PRESENT; + case ROW_INDEX: return CI_INDEX; default: // Skip this stream as it's not strictly required - return gpu::CI_NUM_STREAMS; + return CI_NUM_STREAMS; } }; @@ -87,16 +86,15 @@ std::size_t gather_stream_info_and_column_desc( // for each of its fields. There is only a PRESENT stream, which // needs to be included for the reader. auto const schema_type = types[column_id]; - if (!schema_type.subtypes.empty() && schema_type.kind == orc::STRUCT && - stream.kind == orc::PRESENT) { + if (!schema_type.subtypes.empty() && schema_type.kind == STRUCT && stream.kind == PRESENT) { for (auto const& idx : schema_type.subtypes) { auto const child_idx = (idx < orc2gdf.size()) ? orc2gdf[idx] : -1; if (child_idx >= 0) { col = child_idx; if (chunks) { - auto& chunk = (*chunks)[stripe_id][col]; - chunk.strm_id[gpu::CI_PRESENT] = *local_stream_order; - chunk.strm_len[gpu::CI_PRESENT] = stream.length; + auto& chunk = (*chunks)[stripe_id][col]; + chunk.strm_id[CI_PRESENT] = *local_stream_order; + chunk.strm_len[CI_PRESENT] = stream.length; } } } @@ -105,14 +103,14 @@ std::size_t gather_stream_info_and_column_desc( if (chunks) { if (src_offset >= stripeinfo->indexLength || use_index) { auto const index_type = get_stream_index_type(stream.kind); - if (index_type < gpu::CI_NUM_STREAMS) { + if (index_type < CI_NUM_STREAMS) { auto& chunk = (*chunks)[stripe_id][col]; chunk.strm_id[index_type] = *local_stream_order; chunk.strm_len[index_type] = stream.length; // NOTE: skip_count field is temporarily used to track the presence of index streams chunk.skip_count |= 1 << index_type; - if (index_type == gpu::CI_DICTIONARY) { + if (index_type == CI_DICTIONARY) { chunk.dictionary_start = *num_dictionary_entries; chunk.dict_len = stripefooter->columns[column_id].dictionarySize; *num_dictionary_entries += @@ -643,7 +641,7 @@ void reader_impl::load_next_stripe_data(read_mode mode) // memory once. auto hd_compinfo = [&] { std::size_t max_num_streams{0}; - if (_metadata.per_file_metadata[0].ps.compression != orc::NONE) { + if (_metadata.per_file_metadata[0].ps.compression != NONE) { // Find the maximum number of streams in all levels of the loaded stripes. for (std::size_t level = 0; level < num_levels; ++level) { auto const stream_range = @@ -651,7 +649,7 @@ void reader_impl::load_next_stripe_data(read_mode mode) max_num_streams = std::max(max_num_streams, stream_range.size()); } } - return cudf::detail::hostdevice_vector(max_num_streams, _stream); + return cudf::detail::hostdevice_vector(max_num_streams, _stream); }(); for (std::size_t level = 0; level < num_levels; ++level) { @@ -665,26 +663,26 @@ void reader_impl::load_next_stripe_data(read_mode mode) auto const stream_range = merge_selected_ranges(_file_itm_data.lvl_stripe_stream_ranges[level], load_stripe_range); - if (_metadata.per_file_metadata[0].ps.compression != orc::NONE) { + if (_metadata.per_file_metadata[0].ps.compression != NONE) { auto const& decompressor = *_metadata.per_file_metadata[0].decompressor; - auto compinfo = cudf::detail::hostdevice_span{hd_compinfo}.subspan( + auto compinfo = cudf::detail::hostdevice_span{hd_compinfo}.subspan( 0, stream_range.size()); for (auto stream_idx = stream_range.begin; stream_idx < stream_range.end; ++stream_idx) { auto const& info = stream_info[stream_idx]; auto const dst_base = static_cast(stripe_data[info.source.stripe_idx - stripe_start].data()); compinfo[stream_idx - stream_range.begin] = - gpu::CompressedStreamInfo(dst_base + info.dst_pos, info.length); + CompressedStreamInfo(dst_base + info.dst_pos, info.length); } // Estimate the uncompressed data. compinfo.host_to_device_async(_stream); - gpu::ParseCompressedStripeData(compinfo.device_ptr(), - compinfo.size(), - decompressor.GetBlockSize(), - decompressor.GetLog2MaxCompressionRatio(), - _stream); + ParseCompressedStripeData(compinfo.device_ptr(), + compinfo.size(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), + _stream); compinfo.device_to_host_sync(_stream); for (auto stream_idx = stream_range.begin; stream_idx < stream_range.end; ++stream_idx) { diff --git a/cpp/src/io/orc/reader_impl_chunking.hpp b/cpp/src/io/orc/reader_impl_chunking.hpp index 4ef68ee8d86..cb66edf3c98 100644 --- a/cpp/src/io/orc/reader_impl_chunking.hpp +++ b/cpp/src/io/orc/reader_impl_chunking.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -307,15 +307,15 @@ std::vector find_splits(host_span cumulative_sizes, std::size_t gather_stream_info_and_column_desc( std::size_t stripe_id, std::size_t level, - orc::StripeInformation const* stripeinfo, - orc::StripeFooter const* stripefooter, + StripeInformation const* stripeinfo, + StripeFooter const* stripefooter, host_span orc2gdf, - host_span types, + host_span types, bool use_index, bool apply_struct_map, int64_t* num_dictionary_entries, std::size_t* local_stream_order, std::vector* stream_info, - cudf::detail::hostdevice_2dvector* chunks); + cudf::detail::hostdevice_2dvector* chunks); } // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index b661bb4ff90..586c07cbc16 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,13 +77,13 @@ rmm::device_buffer decompress_stripe_data( range const& loaded_stripe_range, range const& stream_range, std::size_t num_decode_stripes, - cudf::detail::hostdevice_span compinfo, + cudf::detail::hostdevice_span compinfo, stream_source_map const& compinfo_map, OrcDecompressor const& decompressor, host_span stripe_data, host_span stream_info, - cudf::detail::hostdevice_2dvector& chunks, - cudf::detail::hostdevice_2dvector& row_groups, + cudf::detail::hostdevice_2dvector& chunks, + cudf::detail::hostdevice_2dvector& row_groups, size_type row_index_stride, bool use_base_stride, rmm::cuda_stream_view stream) @@ -100,7 +100,7 @@ rmm::device_buffer decompress_stripe_data( auto const& info = stream_info[stream_idx]; auto& stream_comp_info = compinfo[stream_idx - stream_range.begin]; - stream_comp_info = gpu::CompressedStreamInfo( + stream_comp_info = CompressedStreamInfo( static_cast( stripe_data[info.source.stripe_idx - loaded_stripe_range.begin].data()) + info.dst_pos, @@ -120,11 +120,11 @@ rmm::device_buffer decompress_stripe_data( if (!compinfo_ready) { compinfo.host_to_device_async(stream); - gpu::ParseCompressedStripeData(compinfo.device_ptr(), - compinfo.size(), - decompressor.GetBlockSize(), - decompressor.GetLog2MaxCompressionRatio(), - stream); + ParseCompressedStripeData(compinfo.device_ptr(), + compinfo.size(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), + stream); compinfo.device_to_host_sync(stream); for (std::size_t i = 0; i < compinfo.size(); ++i) { @@ -178,11 +178,11 @@ rmm::device_buffer decompress_stripe_data( } compinfo.host_to_device_async(stream); - gpu::ParseCompressedStripeData(compinfo.device_ptr(), - compinfo.size(), - decompressor.GetBlockSize(), - decompressor.GetLog2MaxCompressionRatio(), - stream); + ParseCompressedStripeData(compinfo.device_ptr(), + compinfo.size(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), + stream); // Value for checking whether we decompress successfully. // It doesn't need to be atomic as there is no race condition: we only write `true` if needed. @@ -275,7 +275,7 @@ rmm::device_buffer decompress_stripe_data( // Copy without stream sync, thus need to wait for stream sync below to access. any_block_failure.device_to_host_async(stream); - gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream); + PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream); compinfo.device_to_host_sync(stream); // This also sync stream for `any_block_failure`. // We can check on host after stream synchronize @@ -291,7 +291,7 @@ rmm::device_buffer decompress_stripe_data( for (std::size_t i = 0; i < num_decode_stripes; ++i) { for (std::size_t j = 0; j < num_columns; ++j) { auto& chunk = chunks[i][j]; - for (int k = 0; k < gpu::CI_NUM_STREAMS; ++k) { + for (int k = 0; k < CI_NUM_STREAMS; ++k) { if (chunk.strm_len[k] > 0 && chunk.strm_id[k] < compinfo.size()) { chunk.streams[k] = compinfo[chunk.strm_id[k]].uncompressed_data; chunk.strm_len[k] = compinfo[chunk.strm_id[k]].max_uncompressed_size; @@ -303,14 +303,14 @@ rmm::device_buffer decompress_stripe_data( if (row_groups.size().first) { chunks.host_to_device_async(stream); row_groups.host_to_device_async(stream); - gpu::ParseRowGroupIndex(row_groups.base_device_ptr(), - compinfo.device_ptr(), - chunks.base_device_ptr(), - num_columns, - num_decode_stripes, - row_index_stride, - use_base_stride, - stream); + ParseRowGroupIndex(row_groups.base_device_ptr(), + compinfo.device_ptr(), + chunks.base_device_ptr(), + num_columns, + num_decode_stripes, + row_index_stride, + use_base_stride, + stream); } return decomp_data; @@ -329,7 +329,7 @@ rmm::device_buffer decompress_stripe_data( * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource to use for device memory allocation */ -void update_null_mask(cudf::detail::hostdevice_2dvector& chunks, +void update_null_mask(cudf::detail::hostdevice_2dvector& chunks, host_span out_buffers, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -419,8 +419,8 @@ void decode_stream_data(int64_t num_dicts, size_type row_index_stride, std::size_t level, table_device_view const& d_tz_table, - cudf::detail::hostdevice_2dvector& chunks, - cudf::detail::device_2dspan row_groups, + cudf::detail::hostdevice_2dvector& chunks, + cudf::detail::device_2dspan row_groups, std::vector& out_buffers, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -441,10 +441,10 @@ void decode_stream_data(int64_t num_dicts, }); // Allocate global dictionary for deserializing - rmm::device_uvector global_dict(num_dicts, stream); + rmm::device_uvector global_dict(num_dicts, stream); chunks.host_to_device_async(stream); - gpu::DecodeNullsAndStringDictionaries( + DecodeNullsAndStringDictionaries( chunks.base_device_ptr(), global_dict.data(), num_columns, num_stripes, skip_rows, stream); if (level > 0) { @@ -453,18 +453,18 @@ void decode_stream_data(int64_t num_dicts, } cudf::detail::device_scalar error_count(0, stream); - gpu::DecodeOrcColumnData(chunks.base_device_ptr(), - global_dict.data(), - row_groups, - num_columns, - num_stripes, - skip_rows, - d_tz_table, - row_groups.size().first, - row_index_stride, - level, - error_count.data(), - stream); + DecodeOrcColumnData(chunks.base_device_ptr(), + global_dict.data(), + row_groups, + num_columns, + num_stripes, + skip_rows, + d_tz_table, + row_groups.size().first, + row_index_stride, + level, + error_count.data(), + stream); chunks.device_to_host_async(stream); // `value` synchronizes auto const num_errors = error_count.value(stream); @@ -485,7 +485,7 @@ void decode_stream_data(int64_t num_dicts, * @brief Compute the per-stripe prefix sum of null count, for each struct column in the current * layer. */ -void scan_null_counts(cudf::detail::hostdevice_2dvector const& chunks, +void scan_null_counts(cudf::detail::hostdevice_2dvector const& chunks, uint32_t* d_prefix_sums, rmm::cuda_stream_view stream) { @@ -531,9 +531,9 @@ void scan_null_counts(cudf::detail::hostdevice_2dvector const& * @brief Aggregate child metadata from parent column chunks. */ void aggregate_child_meta(std::size_t level, - cudf::io::orc::detail::column_hierarchy const& selected_columns, - cudf::detail::host_2dspan chunks, - cudf::detail::host_2dspan row_groups, + column_hierarchy const& selected_columns, + cudf::detail::host_2dspan chunks, + cudf::detail::host_2dspan row_groups, host_span nested_cols, host_span out_buffers, reader_column_meta& col_meta) @@ -766,7 +766,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) // Each 'chunk' of data here corresponds to an orc column, in a stripe, at a nested level. // Unfortunately we cannot create one hostdevice_vector to use for all levels because // currently we do not have a hostdevice_2dspan class. - std::vector> lvl_chunks(num_levels); + std::vector> lvl_chunks(num_levels); // For computing null count. auto null_count_prefix_sums = [&] { @@ -787,7 +787,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) // thus only need to allocate memory once. auto hd_compinfo = [&] { std::size_t max_num_streams{0}; - if (_metadata.per_file_metadata[0].ps.compression != orc::NONE) { + if (_metadata.per_file_metadata[0].ps.compression != NONE) { // Find the maximum number of streams in all levels of the decoding stripes. for (std::size_t level = 0; level < num_levels; ++level) { auto const stream_range = @@ -795,7 +795,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) max_num_streams = std::max(max_num_streams, stream_range.size()); } } - return cudf::detail::hostdevice_vector{max_num_streams, _stream}; + return cudf::detail::hostdevice_vector{max_num_streams, _stream}; }(); auto& col_meta = *_col_meta; @@ -812,8 +812,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) auto& chunks = lvl_chunks[level]; auto const num_lvl_columns = columns_level.size(); - chunks = - cudf::detail::hostdevice_2dvector(stripe_count, num_lvl_columns, _stream); + chunks = cudf::detail::hostdevice_2dvector(stripe_count, num_lvl_columns, _stream); memset(chunks.base_host_ptr(), 0, chunks.size_bytes()); bool const use_index = @@ -897,7 +896,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) // num_child_rows for a struct column will be same, for other nested types it will be // calculated. - chunk.num_child_rows = (chunk.type_kind != orc::STRUCT) ? 0 : chunk.num_rows; + chunk.num_child_rows = (chunk.type_kind != STRUCT) ? 0 : chunk.num_rows; chunk.dtype_id = column_types[col_idx].id(); chunk.decimal_scale = _metadata.per_file_metadata[stripe.source_idx] .ff.types[columns_level[col_idx].id] @@ -912,11 +911,11 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) : cudf::size_of(column_types[col_idx]); chunk.num_rowgroups = stripe_num_rowgroups; - if (chunk.type_kind == orc::TIMESTAMP) { + if (chunk.type_kind == TIMESTAMP) { chunk.timestamp_type_id = _options.timestamp_type.id(); } if (not is_stripe_data_empty) { - for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) { + for (int k = 0; k < CI_NUM_STREAMS; k++) { chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k] + stream_range.begin].dst_pos; } @@ -931,10 +930,10 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) // Process dataset chunks into output columns. auto row_groups = - cudf::detail::hostdevice_2dvector(num_rowgroups, num_lvl_columns, _stream); + cudf::detail::hostdevice_2dvector(num_rowgroups, num_lvl_columns, _stream); if (level > 0 and row_groups.size().first) { - cudf::host_span row_groups_span(row_groups.base_host_ptr(), - num_rowgroups * num_lvl_columns); + cudf::host_span row_groups_span(row_groups.base_host_ptr(), + num_rowgroups * num_lvl_columns); auto& rw_grp_meta = col_meta.rwgrp_meta; // Update start row and num rows per row group @@ -950,9 +949,9 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) } // Setup row group descriptors if using indexes. - if (_metadata.per_file_metadata[0].ps.compression != orc::NONE) { + if (_metadata.per_file_metadata[0].ps.compression != NONE) { auto const compinfo = - cudf::detail::hostdevice_span{hd_compinfo}.subspan( + cudf::detail::hostdevice_span{hd_compinfo}.subspan( 0, stream_range.size()); auto decomp_data = decompress_stripe_data(load_stripe_range, stream_range, @@ -979,14 +978,14 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) chunks.host_to_device_async(_stream); row_groups.host_to_device_async(_stream); row_groups.host_to_device_async(_stream); - gpu::ParseRowGroupIndex(row_groups.base_device_ptr(), - nullptr, - chunks.base_device_ptr(), - num_lvl_columns, - stripe_count, - _metadata.get_row_index_stride(), - level == 0, - _stream); + ParseRowGroupIndex(row_groups.base_device_ptr(), + nullptr, + chunks.base_device_ptr(), + num_lvl_columns, + stripe_count, + _metadata.get_row_index_stride(), + level == 0, + _stream); } } @@ -995,7 +994,7 @@ void reader_impl::decompress_and_decode_stripes(read_mode mode) for (std::size_t i = 0; i < column_types.size(); ++i) { bool is_nullable = false; for (std::size_t j = 0; j < stripe_count; ++j) { - if (chunks[j][i].strm_len[gpu::CI_PRESENT] != 0) { + if (chunks[j][i].strm_len[CI_PRESENT] != 0) { is_nullable = true; break; } diff --git a/cpp/src/io/orc/reader_impl_helpers.cpp b/cpp/src/io/orc/reader_impl_helpers.cpp index 7e5db4b7617..1d4aaaf51ef 100644 --- a/cpp/src/io/orc/reader_impl_helpers.cpp +++ b/cpp/src/io/orc/reader_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ std::unique_ptr create_empty_column(size_type orc_col_id, to_cudf_decimal_type(decimal128_columns, metadata, orc_col_id)); switch (kind) { - case orc::LIST: { + case LIST: { schema_info.children.emplace_back("offsets"); schema_info.children.emplace_back(""); return make_lists_column(0, @@ -50,7 +50,7 @@ std::unique_ptr create_empty_column(size_type orc_col_id, rmm::device_buffer{0, stream}, stream); } - case orc::MAP: { + case MAP: { schema_info.children.emplace_back("offsets"); schema_info.children.emplace_back("struct"); auto const child_column_ids = metadata.get_col_type(orc_col_id).subtypes; @@ -76,7 +76,7 @@ std::unique_ptr create_empty_column(size_type orc_col_id, stream); } - case orc::STRUCT: { + case STRUCT: { std::vector> child_columns; for (auto const col : metadata.get_col_type(orc_col_id).subtypes) { schema_info.children.emplace_back(""); @@ -92,7 +92,7 @@ std::unique_ptr create_empty_column(size_type orc_col_id, 0, std::move(child_columns), 0, rmm::device_buffer{0, stream}, stream); } - case orc::DECIMAL: { + case DECIMAL: { int32_t scale = 0; if (type == type_id::DECIMAL32 or type == type_id::DECIMAL64 or type == type_id::DECIMAL128) { scale = -static_cast(metadata.get_types()[orc_col_id].scale.value_or(0)); @@ -119,8 +119,8 @@ column_buffer assemble_buffer(size_type orc_col_id, col_buffer.name = metadata.column_name(0, orc_col_id); auto kind = metadata.get_col_type(orc_col_id).kind; switch (kind) { - case orc::LIST: - case orc::STRUCT: { + case LIST: + case STRUCT: { auto const& children_indices = selected_columns.children.at(orc_col_id); for (auto const child_id : children_indices) { col_buffer.children.emplace_back(assemble_buffer( @@ -128,7 +128,7 @@ column_buffer assemble_buffer(size_type orc_col_id, } } break; - case orc::MAP: { + case MAP: { std::vector child_col_buffers; // Get child buffers auto const& children_indices = selected_columns.children.at(orc_col_id); diff --git a/cpp/src/io/orc/reader_impl_helpers.hpp b/cpp/src/io/orc/reader_impl_helpers.hpp index 4cded30d89b..f2e746b312f 100644 --- a/cpp/src/io/orc/reader_impl_helpers.hpp +++ b/cpp/src/io/orc/reader_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,36 +61,36 @@ struct reader_column_meta { /** * @brief Function that translates ORC data kind to cuDF type enum */ -inline constexpr type_id to_cudf_type(orc::TypeKind kind, +inline constexpr type_id to_cudf_type(TypeKind kind, bool use_np_dtypes, type_id timestamp_type_id, type_id decimal_type_id) { switch (kind) { - case orc::BOOLEAN: return type_id::BOOL8; - case orc::BYTE: return type_id::INT8; - case orc::SHORT: return type_id::INT16; - case orc::INT: return type_id::INT32; - case orc::LONG: return type_id::INT64; - case orc::FLOAT: return type_id::FLOAT32; - case orc::DOUBLE: return type_id::FLOAT64; - case orc::STRING: - case orc::BINARY: - case orc::VARCHAR: - case orc::CHAR: + case BOOLEAN: return type_id::BOOL8; + case BYTE: return type_id::INT8; + case SHORT: return type_id::INT16; + case INT: return type_id::INT32; + case LONG: return type_id::INT64; + case FLOAT: return type_id::FLOAT32; + case DOUBLE: return type_id::FLOAT64; + case STRING: + case BINARY: + case VARCHAR: + case CHAR: // Variable-length types can all be mapped to STRING return type_id::STRING; - case orc::TIMESTAMP: + case TIMESTAMP: return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id : type_id::TIMESTAMP_NANOSECONDS; - case orc::DATE: + case DATE: // There isn't a (DAYS -> np.dtype) mapping return (use_np_dtypes) ? type_id::TIMESTAMP_MILLISECONDS : type_id::TIMESTAMP_DAYS; - case orc::DECIMAL: return decimal_type_id; + case DECIMAL: return decimal_type_id; // Need to update once cuDF plans to support map type - case orc::MAP: - case orc::LIST: return type_id::LIST; - case orc::STRUCT: return type_id::STRUCT; + case MAP: + case LIST: return type_id::LIST; + case STRUCT: return type_id::STRUCT; default: break; } diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 5f4c1e0696d..e81c74ae1a6 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -24,7 +24,7 @@ #include -namespace cudf::io::orc::gpu { +namespace cudf::io::orc::detail { using strings::detail::fixed_point_string_size; @@ -502,4 +502,4 @@ void orc_encode_statistics(uint8_t* blob_bfr, blob_bfr, groups, chunks, statistics_count); } -} // namespace cudf::io::orc::gpu +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index f560b806894..c7947b0e4c9 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -15,6 +15,7 @@ */ #include "io/utilities/block_utils.cuh" +#include "io/utilities/column_buffer.hpp" #include "orc_gpu.hpp" #include @@ -23,10 +24,7 @@ #include -namespace cudf { -namespace io { -namespace orc { -namespace gpu { +namespace cudf::io::orc::detail { using cudf::io::detail::string_index_pair; @@ -2096,7 +2094,4 @@ void __host__ DecodeOrcColumnData(ColumnDesc* chunks, chunks, global_dictionary, tz_table, row_groups, first_row, rowidx_stride, level, error_count); } -} // namespace gpu -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 857daeb5856..15ce1aadb17 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -14,7 +14,6 @@ * limitations under the License. */ -#include "io/comp/gpuinflate.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" #include "orc_gpu.hpp" @@ -40,10 +39,7 @@ #include #include -namespace cudf { -namespace io { -namespace orc { -namespace gpu { +namespace cudf::io::orc::detail { using cudf::detail::device_2dspan; using cudf::io::detail::compression_result; @@ -1421,7 +1417,4 @@ void decimal_sizes_to_offsets(device_2dspan rg_bounds, <<>>(rg_bounds, d_sizes); } -} // namespace gpu -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 5e23bc5adcc..a72b71a83ca 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -26,10 +26,7 @@ #include #include -namespace cudf { -namespace io { -namespace orc { -namespace gpu { +namespace cudf::io::orc::detail { struct comp_in_out { uint8_t const* in_ptr{}; @@ -605,7 +602,4 @@ void __host__ reduce_pushdown_masks(device_span co <<>>(columns, rowgroups, valid_counts); } -} // namespace gpu -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 5c3377a1aeb..ed900105968 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -21,7 +21,6 @@ #include "io/orc/orc_gpu.hpp" #include "io/statistics/column_statistics.cuh" -#include "io/utilities/column_utils.cuh" #include "writer_impl.hpp" #include @@ -40,6 +39,7 @@ #include #include #include +#include #include #include @@ -92,15 +92,15 @@ namespace { /** * @brief Translates cuDF compression to ORC compression. */ -orc::CompressionKind to_orc_compression(compression_type compression) +CompressionKind to_orc_compression(compression_type compression) { switch (compression) { case compression_type::AUTO: - case compression_type::SNAPPY: return orc::CompressionKind::SNAPPY; - case compression_type::ZLIB: return orc::CompressionKind::ZLIB; - case compression_type::ZSTD: return orc::CompressionKind::ZSTD; - case compression_type::LZ4: return orc::CompressionKind::LZ4; - case compression_type::NONE: return orc::CompressionKind::NONE; + case compression_type::SNAPPY: return CompressionKind::SNAPPY; + case compression_type::ZLIB: return CompressionKind::ZLIB; + case compression_type::ZSTD: return CompressionKind::ZSTD; + case compression_type::LZ4: return CompressionKind::LZ4; + case compression_type::NONE: return CompressionKind::NONE; default: CUDF_FAIL("Unsupported compression type"); } } @@ -119,7 +119,7 @@ size_t compression_block_size(compression_type compression) /** * @brief Translates cuDF dtype to ORC datatype */ -constexpr orc::TypeKind to_orc_type(cudf::type_id id, bool list_column_as_map) +constexpr TypeKind to_orc_type(cudf::type_id id, bool list_column_as_map) { switch (id) { case cudf::type_id::INT8: return TypeKind::BYTE; @@ -237,8 +237,8 @@ class orc_column_view { [[nodiscard]] auto const& decimal_offsets() const { return d_decimal_offsets; } void attach_decimal_offsets(uint32_t* sizes_ptr) { d_decimal_offsets = sizes_ptr; } - void attach_stripe_dicts(host_span host_stripe_dicts, - device_span dev_stripe_dicts) + void attach_stripe_dicts(host_span host_stripe_dicts, + device_span dev_stripe_dicts) { stripe_dicts = host_stripe_dicts; d_stripe_dicts = dev_stripe_dicts; @@ -301,8 +301,8 @@ class orc_column_view { host_span rowgroup_char_counts; - host_span stripe_dicts; - device_span d_stripe_dicts; + host_span stripe_dicts; + device_span d_stripe_dicts; // Offsets for encoded decimal elements. Used to enable direct writing of encoded decimal elements // into the output stream. @@ -498,17 +498,17 @@ size_t RLE_stream_size(TypeKind kind, size_t count) case TypeKind::BYTE: return div_rounding_up_unsafe(count, byte_rle_max_len) * (byte_rle_max_len + 1); case TypeKind::SHORT: - return div_rounding_up_unsafe(count, gpu::encode_block_size) * - (gpu::encode_block_size * max_varint_size() + 2); + return div_rounding_up_unsafe(count, encode_block_size) * + (encode_block_size * max_varint_size() + 2); case TypeKind::FLOAT: case TypeKind::INT: case TypeKind::DATE: - return div_rounding_up_unsafe(count, gpu::encode_block_size) * - (gpu::encode_block_size * max_varint_size() + 2); + return div_rounding_up_unsafe(count, encode_block_size) * + (encode_block_size * max_varint_size() + 2); case TypeKind::LONG: case TypeKind::DOUBLE: - return div_rounding_up_unsafe(count, gpu::encode_block_size) * - (gpu::encode_block_size * max_varint_size() + 2); + return div_rounding_up_unsafe(count, encode_block_size) * + (encode_block_size * max_varint_size() + 2); default: CUDF_FAIL("Unsupported ORC type for RLE stream size: " + std::to_string(kind)); } } @@ -536,7 +536,7 @@ orc_streams create_streams(host_span columns, return Stream{ROW_INDEX, col.id()}; }); - std::vector ids(columns.size() * gpu::CI_NUM_STREAMS, -1); + std::vector ids(columns.size() * CI_NUM_STREAMS, -1); std::vector types(streams.size(), INVALID_TYPE_KIND); for (auto& column : columns) { @@ -568,41 +568,39 @@ orc_streams create_streams(host_span columns, auto const kind = column.orc_kind(); auto add_stream = - [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { + [&](StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { auto const max_alignment_padding = compress_required_chunk_alignment(compression) - 1; - const auto base = column.index() * gpu::CI_NUM_STREAMS; + const auto base = column.index() * CI_NUM_STREAMS; ids[base + index_type] = streams.size(); - streams.push_back(orc::Stream{ - kind, - column.id(), - (size == 0) ? 0 : size + max_alignment_padding * segmentation.num_rowgroups()}); + streams.push_back( + Stream{kind, + column.id(), + (size == 0) ? 0 : size + max_alignment_padding * segmentation.num_rowgroups()}); types.push_back(type_kind); }; - auto add_RLE_stream = [&]( - gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind) { + auto add_RLE_stream = [&](StreamIndexType index_type, StreamKind kind, TypeKind type_kind) { add_stream(index_type, kind, type_kind, RLE_column_size(type_kind)); }; - if (is_nullable) { add_RLE_stream(gpu::CI_PRESENT, PRESENT, TypeKind::BOOLEAN); } + if (is_nullable) { add_RLE_stream(CI_PRESENT, PRESENT, TypeKind::BOOLEAN); } switch (kind) { case TypeKind::BOOLEAN: case TypeKind::BYTE: - add_RLE_stream(gpu::CI_DATA, DATA, kind); + add_RLE_stream(CI_DATA, DATA, kind); column.set_orc_encoding(DIRECT); break; case TypeKind::SHORT: case TypeKind::INT: case TypeKind::LONG: case TypeKind::DATE: - add_RLE_stream(gpu::CI_DATA, DATA, kind); + add_RLE_stream(CI_DATA, DATA, kind); column.set_orc_encoding(DIRECT_V2); break; case TypeKind::FLOAT: case TypeKind::DOUBLE: // Pass through if no nulls (no RLE encoding for floating point) - add_stream( - gpu::CI_DATA, DATA, kind, (column.null_count() != 0) ? RLE_column_size(kind) : 0); + add_stream(CI_DATA, DATA, kind, (column.null_count() != 0) ? RLE_column_size(kind) : 0); column.set_orc_encoding(DIRECT); break; case TypeKind::STRING: { @@ -632,35 +630,34 @@ orc_streams create_streams(host_span columns, // Decide between direct or dictionary encoding if (enable_dict && dict_data_size < direct_data_size) { - add_RLE_stream(gpu::CI_DATA, DATA, TypeKind::INT); - add_stream(gpu::CI_DATA2, LENGTH, TypeKind::INT, dict_lengths_div512 * (512 * 4 + 2)); - add_stream( - gpu::CI_DICTIONARY, DICTIONARY_DATA, TypeKind::CHAR, std::max(dict_data_size, 1ul)); + add_RLE_stream(CI_DATA, DATA, TypeKind::INT); + add_stream(CI_DATA2, LENGTH, TypeKind::INT, dict_lengths_div512 * (512 * 4 + 2)); + add_stream(CI_DICTIONARY, DICTIONARY_DATA, TypeKind::CHAR, std::max(dict_data_size, 1ul)); column.set_orc_encoding(DICTIONARY_V2); } else { - add_stream(gpu::CI_DATA, DATA, TypeKind::CHAR, std::max(direct_data_size, 1)); - add_RLE_stream(gpu::CI_DATA2, LENGTH, TypeKind::INT); + add_stream(CI_DATA, DATA, TypeKind::CHAR, std::max(direct_data_size, 1)); + add_RLE_stream(CI_DATA2, LENGTH, TypeKind::INT); column.set_orc_encoding(DIRECT_V2); } break; } case TypeKind::TIMESTAMP: - add_RLE_stream(gpu::CI_DATA, DATA, TypeKind::LONG); - add_RLE_stream(gpu::CI_DATA2, SECONDARY, TypeKind::LONG); + add_RLE_stream(CI_DATA, DATA, TypeKind::LONG); + add_RLE_stream(CI_DATA2, SECONDARY, TypeKind::LONG); column.set_orc_encoding(DIRECT_V2); break; case TypeKind::DECIMAL: // varint values (NO RLE) // data_stream_size = decimal_column_sizes.at(column.index()); - add_stream(gpu::CI_DATA, DATA, TypeKind::DECIMAL, decimal_column_sizes.at(column.index())); + add_stream(CI_DATA, DATA, TypeKind::DECIMAL, decimal_column_sizes.at(column.index())); // scale stream TODO: compute exact size since all elems are equal - add_RLE_stream(gpu::CI_DATA2, SECONDARY, TypeKind::INT); + add_RLE_stream(CI_DATA2, SECONDARY, TypeKind::INT); column.set_orc_encoding(DIRECT_V2); break; case TypeKind::LIST: case TypeKind::MAP: // no data stream, only lengths - add_RLE_stream(gpu::CI_DATA2, LENGTH, TypeKind::INT); + add_RLE_stream(CI_DATA2, LENGTH, TypeKind::INT); column.set_orc_encoding(DIRECT_V2); break; case TypeKind::STRUCT: @@ -683,7 +680,7 @@ std::vector> calculate_aligned_rowgroup_bounds( orc_table.num_columns() * segmentation.num_rowgroups(), stream); auto const d_pd_set_counts = device_2dspan{d_pd_set_counts_data, orc_table.num_columns()}; - gpu::reduce_pushdown_masks(orc_table.d_columns, segmentation.rowgroups, d_pd_set_counts, stream); + reduce_pushdown_masks(orc_table.d_columns, segmentation.rowgroups, d_pd_set_counts, stream); auto aligned_rgs = hostdevice_2dvector( segmentation.num_rowgroups(), orc_table.num_columns(), stream); @@ -838,7 +835,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, rmm::cuda_stream_view stream) { auto const num_columns = orc_table.num_columns(); - hostdevice_2dvector chunks(num_columns, segmentation.num_rowgroups(), stream); + hostdevice_2dvector chunks(num_columns, segmentation.num_rowgroups(), stream); auto const aligned_rowgroups = calculate_aligned_rowgroup_bounds(orc_table, segmentation, stream); @@ -911,7 +908,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, " Please see https://github.com/rapidsai/cudf/issues/6763 for more information."); } - hostdevice_2dvector chunk_streams( + hostdevice_2dvector chunk_streams( num_columns, segmentation.num_rowgroups(), stream); // per-stripe, per-stream owning buffers std::vector>> encoded_data(segmentation.num_stripes()); @@ -921,10 +918,10 @@ encoded_data encode_columns(orc_table_view const& orc_table, }); for (size_t col_idx = 0; col_idx < num_columns; col_idx++) { - for (int strm_type = 0; strm_type < gpu::CI_NUM_STREAMS; ++strm_type) { + for (int strm_type = 0; strm_type < CI_NUM_STREAMS; ++strm_type) { auto const& column = orc_table.column(col_idx); auto col_streams = chunk_streams[col_idx]; - auto const strm_id = streams.id(col_idx * gpu::CI_NUM_STREAMS + strm_type); + auto const strm_id = streams.id(col_idx * CI_NUM_STREAMS + strm_type); std::for_each(stripe.cbegin(), stripe.cend(), [&](auto rg_idx) { col_streams[rg_idx].ids[strm_type] = strm_id; @@ -938,25 +935,25 @@ encoded_data encode_columns(orc_table_view const& orc_table, auto const& ck = chunks[col_idx][rg_idx]; auto& strm = col_streams[rg_idx]; - if ((strm_type == gpu::CI_DICTIONARY) || - (strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { + if ((strm_type == CI_DICTIONARY) || + (strm_type == CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { if (rg_idx == *stripe.cbegin()) { auto const stripe_dict = column.host_stripe_dict(stripe.id); strm.lengths[strm_type] = - (strm_type == gpu::CI_DICTIONARY) + (strm_type == CI_DICTIONARY) ? stripe_dict.char_count : (((stripe_dict.entry_count + 0x1ff) >> 9) * (512 * 4 + 2)); } else { strm.lengths[strm_type] = 0; } - } else if (strm_type == gpu::CI_DATA && ck.type_kind == TypeKind::STRING && + } else if (strm_type == CI_DATA && ck.type_kind == TypeKind::STRING && ck.encoding_kind == DIRECT_V2) { strm.lengths[strm_type] = std::max(column.rowgroup_char_count(rg_idx), 1); - } else if (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 && + } else if (strm_type == CI_DATA && streams[strm_id].length == 0 && (ck.type_kind == DOUBLE || ck.type_kind == FLOAT)) { // Pass-through strm.lengths[strm_type] = ck.num_rows * ck.dtype_len; - } else if (ck.type_kind == DECIMAL && strm_type == gpu::CI_DATA) { + } else if (ck.type_kind == DECIMAL && strm_type == CI_DATA) { strm.lengths[strm_type] = dec_chunk_sizes.rg_sizes.at(col_idx)[rg_idx]; } else { strm.lengths[strm_type] = RLE_stream_size(streams.type(strm_id), ck.num_rows); @@ -974,12 +971,12 @@ encoded_data encode_columns(orc_table_view const& orc_table, auto const& ck = chunks[col_idx][rg_idx]; auto& strm = col_streams[rg_idx]; - if (strm_id < 0 or (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 && + if (strm_id < 0 or (strm_type == CI_DATA && streams[strm_id].length == 0 && (ck.type_kind == DOUBLE || ck.type_kind == FLOAT))) { strm.data_ptrs[strm_type] = nullptr; } else { - if ((strm_type == gpu::CI_DICTIONARY) || - (strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { + if ((strm_type == CI_DICTIONARY) || + (strm_type == CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { strm.data_ptrs[strm_type] = encoded_data[stripe.id][strm_id].data(); } else { strm.data_ptrs[strm_type] = (rg_idx_it == stripe.cbegin()) @@ -1003,16 +1000,16 @@ encoded_data encode_columns(orc_table_view const& orc_table, if (orc_table.num_rows() > 0) { if (orc_table.num_string_columns() != 0) { auto d_stripe_dict = orc_table.string_column(0).device_stripe_dicts(); - gpu::EncodeStripeDictionaries(d_stripe_dict.data(), - orc_table.d_columns, - chunks, - orc_table.num_string_columns(), - segmentation.num_stripes(), - chunk_streams, - stream); + EncodeStripeDictionaries(d_stripe_dict.data(), + orc_table.d_columns, + chunks, + orc_table.num_string_columns(), + segmentation.num_stripes(), + chunk_streams, + stream); } - gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); + EncodeOrcColumnData(chunks, chunk_streams, stream); } chunk_streams.device_to_host_sync(stream); @@ -1034,7 +1031,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, std::vector gather_stripes(size_t num_index_streams, file_segmentation const& segmentation, encoded_data* enc_data, - hostdevice_2dvector* strm_desc, + hostdevice_2dvector* strm_desc, rmm::cuda_stream_view stream) { if (segmentation.num_stripes() == 0) { return {}; } @@ -1051,7 +1048,7 @@ std::vector gather_stripes(size_t num_index_streams, for (size_t col_idx = 0; col_idx < enc_data->streams.size().first; col_idx++) { auto const& col_streams = (enc_data->streams)[col_idx]; // Assign stream data of column data stream(s) - for (int k = 0; k < gpu::CI_INDEX; k++) { + for (int k = 0; k < CI_INDEX; k++) { auto const stream_id = col_streams[0].ids[k]; if (stream_id != -1) { auto const actual_stripe_size = std::accumulate( @@ -1091,7 +1088,7 @@ std::vector gather_stripes(size_t num_index_streams, strm_desc->host_to_device_async(stream); // TODO: use cub::DeviceMemcpy::Batched - gpu::CompactOrcDataStreams(*strm_desc, enc_data->streams, stream); + CompactOrcDataStreams(*strm_desc, enc_data->streams, stream); strm_desc->device_to_host_async(stream); enc_data->streams.device_to_host_sync(stream); @@ -1123,17 +1120,17 @@ cudf::detail::hostdevice_vector allocate_and_encode_blobs( rmm::cuda_stream_view stream) { // figure out the buffer size needed for protobuf format - gpu::orc_init_statistics_buffersize( + orc_init_statistics_buffersize( stats_merge_groups.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); auto max_blobs = stats_merge_groups.element(num_stat_blobs - 1, stream); cudf::detail::hostdevice_vector blobs(max_blobs.start_chunk + max_blobs.num_chunks, stream); - gpu::orc_encode_statistics(blobs.device_ptr(), - stats_merge_groups.device_ptr(), - stat_chunks.data(), - num_stat_blobs, - stream); + orc_encode_statistics(blobs.device_ptr(), + stats_merge_groups.device_ptr(), + stat_chunks.data(), + num_stat_blobs, + stream); stats_merge_groups.device_to_host_async(stream); blobs.device_to_host_sync(stream); return blobs; @@ -1238,7 +1235,7 @@ intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, auto stripe_stat_chunks = stripe_chunks.data(); rmm::device_uvector rowgroup_groups(num_rowgroup_blobs, stream); - gpu::orc_init_statistics_groups( + orc_init_statistics_groups( rowgroup_groups.data(), stat_desc.device_ptr(), segmentation.rowgroups, stream); detail::calculate_group_statistics( @@ -1440,8 +1437,8 @@ void write_index_stream(int32_t stripe_id, int32_t stream_id, host_span columns, file_segmentation const& segmentation, - host_2dspan enc_streams, - host_2dspan strm_desc, + host_2dspan enc_streams, + host_2dspan strm_desc, host_span comp_res, host_span rg_stats, StripeInformation* stripe, @@ -1455,8 +1452,7 @@ void write_index_stream(int32_t stripe_id, row_group_index_info data2; auto const column_id = stream_id - 1; - auto find_record = [=, &strm_desc](gpu::encoder_chunk_streams const& stream, - gpu::StreamIndexType type) { + auto find_record = [=, &strm_desc](encoder_chunk_streams const& stream, StreamIndexType type) { row_group_index_info record; if (stream.ids[type] > 0) { record.pos = 0; @@ -1469,8 +1465,8 @@ void write_index_stream(int32_t stripe_id, } return record; }; - auto scan_record = [=, &comp_res](gpu::encoder_chunk_streams const& stream, - gpu::StreamIndexType type, + auto scan_record = [=, &comp_res](encoder_chunk_streams const& stream, + StreamIndexType type, row_group_index_info& record) { if (record.pos >= 0) { record.pos += stream.lengths[type]; @@ -1489,9 +1485,9 @@ void write_index_stream(int32_t stripe_id, // TBD: Not sure we need an empty index stream for column 0 if (stream_id != 0) { auto const& strm = enc_streams[column_id][0]; - present = find_record(strm, gpu::CI_PRESENT); - data = find_record(strm, gpu::CI_DATA); - data2 = find_record(strm, gpu::CI_DATA2); + present = find_record(strm, CI_PRESENT); + data = find_record(strm, CI_DATA); + data2 = find_record(strm, CI_DATA2); // Change string dictionary to int from index point of view kind = columns[column_id].orc_kind(); @@ -1518,9 +1514,9 @@ void write_index_stream(int32_t stripe_id, if (stream_id != 0) { const auto& strm = enc_streams[column_id][rowgroup]; - scan_record(strm, gpu::CI_PRESENT, present); - scan_record(strm, gpu::CI_DATA, data); - scan_record(strm, gpu::CI_DATA2, data2); + scan_record(strm, CI_PRESENT, present); + scan_record(strm, CI_DATA, data); + scan_record(strm, CI_DATA2, data2); } }); @@ -1549,8 +1545,8 @@ void write_index_stream(int32_t stripe_id, * @param[in] stream CUDA stream used for device memory operations and kernel launches * @return An std::future that should be synchronized to ensure the writing is complete */ -std::future write_data_stream(gpu::StripeStream const& strm_desc, - gpu::encoder_chunk_streams const& enc_stream, +std::future write_data_stream(StripeStream const& strm_desc, + encoder_chunk_streams const& enc_stream, uint8_t const* compressed_data, host_span bounce_buffer, StripeInformation* stripe, @@ -1944,7 +1940,7 @@ encoder_decimal_info decimal_chunk_sizes(orc_table_view& orc_table, if (elem_sizes.empty()) return {}; // Compute element offsets within each row group - gpu::decimal_sizes_to_offsets(segmentation.rowgroups, elem_sizes, stream); + decimal_sizes_to_offsets(segmentation.rowgroups, elem_sizes, stream); // Gather the row group sizes and copy to host auto d_tmp_rowgroup_sizes = rmm::device_uvector(segmentation.num_rowgroups(), stream); @@ -2011,11 +2007,11 @@ auto set_rowgroup_char_counts(orc_table_view& orc_table, auto counts = rmm::device_uvector(num_str_cols * num_rowgroups, stream); auto counts_2d_view = device_2dspan(counts, num_rowgroups); - gpu::rowgroup_char_counts(counts_2d_view, - orc_table.d_columns, - rowgroup_bounds, - orc_table.d_string_column_indices, - stream); + rowgroup_char_counts(counts_2d_view, + orc_table.d_columns, + rowgroup_bounds, + orc_table.d_string_column_indices, + stream); auto const h_counts = cudf::detail::make_host_vector_sync(counts, stream); @@ -2030,7 +2026,7 @@ auto set_rowgroup_char_counts(orc_table_view& orc_table, // Holds the stripe dictionary descriptors and dictionary buffers. struct stripe_dictionaries { - hostdevice_2dvector views; // descriptors [string_column][stripe] + hostdevice_2dvector views; // descriptors [string_column][stripe] std::vector> data_owner; // dictionary data owner, per stripe std::vector> index_owner; // dictionary index owner, per stripe std::vector> order_owner; // dictionary order owner, per stripe @@ -2082,17 +2078,17 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, : segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - segmentation.rowgroups[stripe.first][col_idx].begin; hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); - total_map_storage_size += stripe_num_rows * gpu::occupancy_factor; + total_map_storage_size += stripe_num_rows * occupancy_factor; } hash_maps_storage_offsets[str_column.str_index()].emplace_back(total_map_storage_size); } - hostdevice_2dvector stripe_dicts( + hostdevice_2dvector stripe_dicts( orc_table.num_string_columns(), segmentation.num_stripes(), stream); if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; // Create a single bulk storage to use for all sub-dictionaries - auto map_storage = std::make_unique( + auto map_storage = std::make_unique( total_map_storage_size, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}); @@ -2121,8 +2117,8 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, } stripe_dicts.host_to_device_async(stream); - map_storage->initialize_async({gpu::KEY_SENTINEL, gpu::VALUE_SENTINEL}, {stream.value()}); - gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); + map_storage->initialize_async({KEY_SENTINEL, VALUE_SENTINEL}, {stream.value()}); + populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); // Copy the entry counts and char counts from the device to the host stripe_dicts.device_to_host_sync(stream); @@ -2169,8 +2165,8 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, // Synchronize to ensure the copy is complete before we clear `map_slots` stripe_dicts.host_to_device_sync(stream); - gpu::collect_map_entries(stripe_dicts, stream); - gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); + collect_map_entries(stripe_dicts, stream); + get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); // deallocate hash map storage, unused after this point map_storage.reset(); @@ -2299,7 +2295,7 @@ auto convert_table_to_orc_data(table_view const& input, // Assemble individual disparate column chunks into contiguous data streams size_type const num_index_streams = (orc_table.num_columns() + 1); auto const num_data_streams = streams.size() - num_index_streams; - hostdevice_2dvector strm_descs( + hostdevice_2dvector strm_descs( segmentation.num_stripes(), num_data_streams, stream); auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data, &strm_descs, stream); @@ -2353,17 +2349,17 @@ auto convert_table_to_orc_data(table_view const& input, compression_result{0, compression_status::FAILURE}); if (compression != compression_type::NONE) { strm_descs.host_to_device_async(stream); - compression_stats = gpu::CompressOrcDataStreams(compressed_data, - num_compressed_blocks, - compression, - compression_blocksize, - max_compressed_block_size, - block_align, - collect_compression_stats, - strm_descs, - enc_data.streams, - comp_results, - stream); + compression_stats = CompressOrcDataStreams(compressed_data, + num_compressed_blocks, + compression, + compression_blocksize, + max_compressed_block_size, + block_align, + collect_compression_stats, + strm_descs, + enc_data.streams, + comp_results, + stream); // deallocate encoded data as it is not needed anymore enc_data.data.clear(); @@ -2535,7 +2531,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, orc_table_view const& orc_table, device_span compressed_data, host_span comp_results, - host_2dspan strm_descs, + host_2dspan strm_descs, host_span rg_stats, orc_streams& streams, host_span stripes, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 7d23482cb17..b6a27d5a6c5 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -136,7 +136,7 @@ struct file_segmentation { */ struct encoded_data { std::vector>> data; // Owning array of the encoded data - hostdevice_2dvector streams; // streams of encoded data, per chunk + hostdevice_2dvector streams; // streams of encoded data, per chunk }; /** @@ -309,7 +309,7 @@ class writer::impl { orc_table_view const& orc_table, device_span compressed_data, host_span comp_results, - host_2dspan strm_descs, + host_2dspan strm_descs, host_span rg_stats, orc_streams& streams, host_span stripes,