diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 9f4c0ba943a..20993d12af8 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -47,12 +47,6 @@ namespace parquet { namespace gpu { namespace { -// Spark doesn't support RLE encoding for BOOLEANs -#ifdef ENABLE_BOOL_RLE -constexpr bool enable_bool_rle = true; -#else -constexpr bool enable_bool_rle = false; -#endif using ::cudf::detail::device_2dspan; @@ -70,6 +64,9 @@ constexpr uint32_t WARP_MASK = cudf::detail::warp_size - 1; // currently 64k - 1 constexpr uint32_t MAX_GRID_Y_SIZE = (1 << 16) - 1; +// space needed for RLE length field +constexpr int RLE_LENGTH_FIELD_LEN = 4; + struct frag_init_state_s { parquet_column_device_view col; PageFragment frag; @@ -78,6 +75,7 @@ struct frag_init_state_s { struct page_enc_state_s { uint8_t* cur; //!< current output ptr uint8_t* rle_out; //!< current RLE write ptr + uint8_t* rle_len_pos; //!< position to write RLE length (for V2 boolean data) uint32_t rle_run; //!< current RLE run uint32_t run_val; //!< current RLE run value uint32_t rle_pos; //!< RLE encoder positions @@ -210,6 +208,27 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) } } +Encoding __device__ determine_encoding(PageType page_type, + Type physical_type, + bool use_dictionary, + bool write_v2_headers) +{ + // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and + // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and + // data pages (actual encoding is identical). + switch (page_type) { + case PageType::DATA_PAGE: return use_dictionary ? Encoding::PLAIN_DICTIONARY : Encoding::PLAIN; + case PageType::DATA_PAGE_V2: + // TODO need to work in delta encodings here when they're added + return physical_type == BOOLEAN ? Encoding::RLE + : use_dictionary ? Encoding::RLE_DICTIONARY + : Encoding::PLAIN; + case PageType::DICTIONARY_PAGE: + return write_v2_headers ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY; + default: CUDF_UNREACHABLE("unsupported page type"); + } +} + } // anonymous namespace // blockDim {512,1,1} @@ -384,6 +403,11 @@ __global__ void __launch_bounds__(128) num_pages = 1; } __syncwarp(); + + // page padding needed for RLE encoded boolean data + auto const rle_pad = + write_v2_headers && col_g.physical_type == BOOLEAN ? RLE_LENGTH_FIELD_LEN : 0; + // This loop goes over one page fragment at a time and adds it to page. // When page size crosses a particular limit, then it moves on to the next page and then next // page fragment gets added to that one. @@ -427,12 +451,12 @@ __global__ void __launch_bounds__(128) // override this_max_page_size if the requested size is smaller this_max_page_size = min(this_max_page_size, max_page_size_bytes); - // subtract size of rep and def level vectors - auto num_vals = values_in_page + frag_g.num_values; - this_max_page_size = - underflow_safe_subtract(this_max_page_size, - max_RLE_page_size(col_g.num_def_level_bits(), num_vals) + - max_RLE_page_size(col_g.num_rep_level_bits(), num_vals)); + // subtract size of rep and def level vectors and RLE length field + auto num_vals = values_in_page + frag_g.num_values; + this_max_page_size = underflow_safe_subtract( + this_max_page_size, + max_RLE_page_size(col_g.num_def_level_bits(), num_vals) + + max_RLE_page_size(col_g.num_rep_level_bits(), num_vals) + rle_pad); // checks to see when we need to close the current page and start a new one auto const is_last_chunk = num_rows >= ck_g.num_rows; @@ -474,7 +498,7 @@ __global__ void __launch_bounds__(128) page_g.num_values = values_in_page; auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page); auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page); - auto const max_data_size = page_size + def_level_size + rep_level_size; + auto const max_data_size = page_size + def_level_size + rep_level_size + rle_pad; // page size must fit in 32-bit signed integer if (max_data_size > std::numeric_limits::max()) { CUDF_UNREACHABLE("page size exceeds maximum for i32"); @@ -967,7 +991,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_results) + device_span comp_results, + bool write_v2_headers) { __shared__ __align__(8) page_enc_state_s state_g; using block_reduce = cub::BlockReduce; @@ -990,6 +1015,7 @@ __global__ void __launch_bounds__(128, 8) s->page.def_lvl_bytes = 0; s->page.rep_lvl_bytes = 0; s->page.num_nulls = 0; + s->rle_len_pos = nullptr; } __syncthreads(); @@ -1132,9 +1158,15 @@ __global__ void __launch_bounds__(128, 8) s->rle_pos = 0; s->rle_numvals = 0; s->rle_out = dst; + s->page.encoding = + determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); if (dict_bits >= 0 && physical_type != BOOLEAN) { dst[0] = dict_bits; s->rle_out = dst + 1; + } else if (is_v2 && physical_type == BOOLEAN) { + // save space for RLE length. we don't know the total length yet. + s->rle_out = dst + RLE_LENGTH_FIELD_LEN; + s->rle_len_pos = dst; } s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); @@ -1188,7 +1220,7 @@ __global__ void __launch_bounds__(128, 8) } rle_numvals += rle_numvals_in_block; __syncthreads(); - if ((!enable_bool_rle) && (physical_type == BOOLEAN)) { + if (!is_v2 && physical_type == BOOLEAN) { PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t); } else { RleEncode(s, rle_numvals, dict_bits, (cur_val_idx == s->page.num_leaf_values), t); @@ -1345,22 +1377,29 @@ __global__ void __launch_bounds__(128, 8) uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid); + // save RLE length if necessary + if (s->rle_len_pos != nullptr && t < 32) { + // size doesn't include the 4 bytes for the length + auto const rle_size = static_cast(s->cur - s->rle_len_pos) - RLE_LENGTH_FIELD_LEN; + if (t < RLE_LENGTH_FIELD_LEN) { s->rle_len_pos[t] = rle_size >> (t * 8); } + __syncwarp(); + } + + // V2 does not compress rep and def level data + size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes; + if (t == 0) { - s->page.num_nulls = s->page.num_values - valid_count; - uint8_t* base = s->page.page_data + s->page.max_hdr_size; - auto actual_data_size = static_cast(s->cur - base); + s->page.num_nulls = s->page.num_values - valid_count; + uint8_t* const base = s->page.page_data + s->page.max_hdr_size; + auto const actual_data_size = static_cast(s->cur - base); if (actual_data_size > s->page.max_data_size) { CUDF_UNREACHABLE("detected possible page data corruption"); } s->page.max_data_size = actual_data_size; if (not comp_in.empty()) { - // V2 does not compress rep and def level data - size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes; - comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size}; + comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size}; comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size, 0}; // size is unused - // copy uncompressed bytes over - memcpy(s->page.compressed_data + s->page.max_hdr_size, base, skip_comp_size); } pages[blockIdx.x] = s->page; if (not comp_results.empty()) { @@ -1368,6 +1407,15 @@ __global__ void __launch_bounds__(128, 8) pages[blockIdx.x].comp_res = &comp_results[blockIdx.x]; } } + + // copy over uncompressed data + if (skip_comp_size != 0 && not comp_in.empty()) { + uint8_t const* const src = s->page.page_data + s->page.max_hdr_size; + uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size; + for (int i = t; i < skip_comp_size; i += block_size) { + dst[i] = src[i]; + } + } } constexpr int decide_compression_warps_in_block = 4; @@ -1865,28 +1913,16 @@ __global__ void __launch_bounds__(128) } header_encoder encoder(hdr_start); PageType page_type = page_g.page_type; - // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and - // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and - // data pages (actual encoding is identical). - Encoding encoding; - if (enable_bool_rle) { - encoding = (col_g.physical_type == BOOLEAN) ? Encoding::RLE - : (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary) - ? Encoding::PLAIN_DICTIONARY - : Encoding::PLAIN; - } else { - encoding = (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary) - ? Encoding::PLAIN_DICTIONARY - : Encoding::PLAIN; - } + encoder.field_int32(1, page_type); encoder.field_int32(2, uncompressed_page_size); encoder.field_int32(3, compressed_page_size); + if (page_type == PageType::DATA_PAGE) { // DataPageHeader encoder.field_struct_begin(5); encoder.field_int32(1, page_g.num_values); // NOTE: num_values != num_rows for list types - encoder.field_int32(2, encoding); // encoding + encoder.field_int32(2, page_g.encoding); // encoding encoder.field_int32(3, Encoding::RLE); // definition_level_encoding encoder.field_int32(4, Encoding::RLE); // repetition_level_encoding // Optionally encode page-level statistics @@ -1898,11 +1934,12 @@ __global__ void __launch_bounds__(128) } encoder.field_struct_end(5); } else if (page_type == PageType::DATA_PAGE_V2) { + // DataPageHeaderV2 encoder.field_struct_begin(8); encoder.field_int32(1, page_g.num_values); encoder.field_int32(2, page_g.num_nulls); encoder.field_int32(3, page_g.num_rows); - encoder.field_int32(4, encoding); + encoder.field_int32(4, page_g.encoding); encoder.field_int32(5, page_g.def_lvl_bytes); encoder.field_int32(6, page_g.rep_lvl_bytes); encoder.field_bool(7, ck_g.is_compressed); // TODO can compress at page level now @@ -1918,7 +1955,7 @@ __global__ void __launch_bounds__(128) // DictionaryPageHeader encoder.field_struct_begin(7); encoder.field_int32(1, ck_g.num_dict_entries); // number of values in dictionary - encoder.field_int32(2, encoding); + encoder.field_int32(2, page_g.encoding); encoder.field_struct_end(7); } encoder.end(&hdr_end, false); @@ -2237,6 +2274,7 @@ void InitEncoderPages(device_2dspan chunks, } void EncodePages(device_span pages, + bool write_v2_headers, device_span> comp_in, device_span> comp_out, device_span comp_results, @@ -2245,8 +2283,8 @@ void EncodePages(device_span pages, auto num_pages = pages.size(); // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<128> - <<>>(pages, comp_in, comp_out, comp_results); + gpuEncodePages<128><<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers); } void DecideCompression(device_span chunks, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index b3c9d0765fa..fc4ad026b61 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -411,6 +411,7 @@ struct EncPage { uint8_t* compressed_data; //!< Ptr to compressed page uint16_t num_fragments; //!< Number of fragments in page PageType page_type; //!< Page type + Encoding encoding; //!< Encoding used for page data EncColumnChunk* chunk; //!< Chunk that this page belongs to uint32_t chunk_id; //!< Index in chunk array uint32_t hdr_size; //!< Size of page header @@ -672,13 +673,18 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, /** * @brief Launches kernel for packing column data into parquet pages * + * If compression is to be used, `comp_in`, `comp_out`, and `comp_res` will be initialized for + * use in subsequent compression operations. + * * @param[in,out] pages Device array of EncPages (unordered) + * @param[in] write_v2_headers True if V2 page headers should be written * @param[out] comp_in Compressor input buffers - * @param[out] comp_in Compressor output buffers - * @param[out] comp_stats Compressor results - * @param[in] stream CUDA stream to use, default 0 + * @param[out] comp_out Compressor output buffers + * @param[out] comp_res Compressor results + * @param[in] stream CUDA stream to use */ void EncodePages(device_span pages, + bool write_v2_headers, device_span> comp_in, device_span> comp_out, device_span comp_res, @@ -688,7 +694,7 @@ void EncodePages(device_span pages, * @brief Launches kernel to make the compressed vs uncompressed chunk-level decision * * @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes) - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecideCompression(device_span chunks, rmm::cuda_stream_view stream); @@ -696,10 +702,10 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @brief Launches kernel to encode page headers * * @param[in,out] pages Device array of EncPages - * @param[in] comp_stats Compressor status + * @param[in] comp_res Compressor status * @param[in] page_stats Optional page-level statistics to be included in page header * @param[in] chunk_stats Optional chunk-level statistics to be encoded - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void EncodePageHeaders(device_span pages, device_span comp_res, @@ -712,7 +718,7 @@ void EncodePageHeaders(device_span pages, * * @param[in,out] chunks Column chunks * @param[in] pages Device array of EncPages - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void GatherPages(device_span chunks, device_span pages, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 06e7b6bfc8a..021b6cffa5a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1266,6 +1266,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, * @param comp_stats optional compression statistics (nullopt if none) * @param compression compression format * @param column_index_truncate_length maximum length of min or max values in column index, in bytes + * @param write_v2_headers True if V2 page headers should be written * @param stream CUDA stream used for device memory operations and kernel launches */ void encode_pages(hostdevice_2dvector& chunks, @@ -1280,6 +1281,7 @@ void encode_pages(hostdevice_2dvector& chunks, std::optional& comp_stats, Compression compression, int32_t column_index_truncate_length, + bool write_v2_headers, rmm::cuda_stream_view stream) { auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch); @@ -1300,7 +1302,7 @@ void encode_pages(hostdevice_2dvector& chunks, comp_res.end(), compression_result{0, compression_status::FAILURE}); - gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream); + gpu::EncodePages(batch_pages, write_v2_headers, comp_in, comp_out, comp_res, stream); switch (compression) { case parquet::Compression::SNAPPY: if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { @@ -1926,6 +1928,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, comp_stats, compression, column_index_truncate_length, + write_v2_headers, stream); bool need_sync{false};