Skip to content

Commit

Permalink
Add decoder for DELTA_BYTE_ARRAY to Parquet reader (#14101)
Browse files Browse the repository at this point in the history
Part of #13501. Adds ability to decode DELTA_BYTE_ARRAY encoded pages.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - https://github.com/nvdbaranec
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #14101
  • Loading branch information
etseidl authored Nov 16, 2023
1 parent 8e1ef05 commit bf63d10
Show file tree
Hide file tree
Showing 11 changed files with 1,044 additions and 114 deletions.
58 changes: 43 additions & 15 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ namespace cudf::io::parquet::detail {
// per mini-block. While encoding, the lowest delta value is subtracted from all the deltas in the
// block to ensure that all encoded values are positive. The deltas for each mini-block are bit
// packed using the same encoding as the RLE/Bit-Packing Hybrid encoder.
//
// DELTA_BYTE_ARRAY encoding (incremental encoding or front compression), is used for BYTE_ARRAY
// columns. For each element in a sequence of strings, a prefix length from the preceding string
// and a suffix is stored. The prefix lengths are DELTA_BINARY_PACKED encoded. The suffixes are
// encoded with DELTA_LENGTH_BYTE_ARRAY encoding, which is a DELTA_BINARY_PACKED list of suffix
// lengths, followed by the concatenated suffix data.

// we decode one mini-block at a time. max mini-block size seen is 64.
constexpr int delta_rolling_buf_size = 128;
// The largest mini-block size we can currently support.
constexpr int max_delta_mini_block_size = 64;

// The first pass decodes `values_per_mb` values, and then the second pass does another
// batch of size `values_per_mb`. The largest value for values_per_miniblock among the
// major writers seems to be 64, so 2 * 64 should be good. We save the first value separately
// since it is not encoded in the first mini-block.
constexpr int delta_rolling_buf_size = 2 * max_delta_mini_block_size;

/**
* @brief Read a ULEB128 varint integer
Expand Down Expand Up @@ -90,7 +90,8 @@ struct delta_binary_decoder {
uleb128_t mini_block_count; // usually 4, chosen such that block_size/mini_block_count is a
// multiple of 32
uleb128_t value_count; // total values encoded in the block
zigzag128_t last_value; // last value decoded, initialized to first_value from header
zigzag128_t first_value; // initial value, stored in the header
zigzag128_t last_value; // last value decoded

uint32_t values_per_mb; // block_size / mini_block_count, must be multiple of 32
uint32_t current_value_idx; // current value index, initialized to 0 at start of block
Expand All @@ -102,6 +103,13 @@ struct delta_binary_decoder {

uleb128_t value[delta_rolling_buf_size]; // circular buffer of delta values

// returns the value stored in the `value` array at index
// `rolling_index<delta_rolling_buf_size>(idx)`. If `idx` is `0`, then return `first_value`.
constexpr zigzag128_t value_at(size_type idx)
{
return idx == 0 ? first_value : value[rolling_index<delta_rolling_buf_size>(idx)];
}

// returns the number of values encoded in the block data. when all_values is true,
// account for the first value in the header. otherwise just count the values encoded
// in the mini-block data.
Expand Down Expand Up @@ -145,7 +153,8 @@ struct delta_binary_decoder {
block_size = get_uleb128(d_start, d_end);
mini_block_count = get_uleb128(d_start, d_end);
value_count = get_uleb128(d_start, d_end);
last_value = get_zz128(d_start, d_end);
first_value = get_zz128(d_start, d_end);
last_value = first_value;

current_value_idx = 0;
values_per_mb = block_size / mini_block_count;
Expand Down Expand Up @@ -179,19 +188,38 @@ struct delta_binary_decoder {
}
}

// given start/end pointers in the data, find the end of the binary encoded block. when done,
// `this` will be initialized with the correct start and end positions. returns the end, which is
// start of data/next block. should only be called from thread 0.
inline __device__ uint8_t const* find_end_of_block(uint8_t const* start, uint8_t const* end)
{
// read block header
init_binary_block(start, end);

// test for no encoded values. a single value will be in the block header.
if (value_count <= 1) { return block_start; }

// read mini-block headers and skip over data
while (current_value_idx < num_encoded_values(false)) {
setup_next_mini_block(false);
}
// calculate the correct end of the block
auto const* const new_end = cur_mb == 0 ? block_start : cur_mb_start;
// re-init block with correct end
init_binary_block(start, new_end);
return new_end;
}

// decode the current mini-batch of deltas, and convert to values.
// called by all threads in a warp, currently only one warp supported.
inline __device__ void calc_mini_block_values(int lane_id)
{
using cudf::detail::warp_size;
if (current_value_idx >= value_count) { return; }

// need to save first value from header on first pass
// need to account for the first value from header on first pass
if (current_value_idx == 0) {
if (lane_id == 0) {
current_value_idx++;
value[0] = last_value;
}
if (lane_id == 0) { current_value_idx++; }
__syncwarp();
if (current_value_idx >= value_count) { return; }
}
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,13 @@ __global__ void __launch_bounds__(decode_block_size)
int out_thread0;
[[maybe_unused]] null_count_back_copier _{s, t};

if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{KERNEL_MASK_GENERAL}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::GENERAL},
true)) {
return;
}

Expand Down Expand Up @@ -486,6 +491,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down Expand Up @@ -603,7 +609,7 @@ __global__ void __launch_bounds__(decode_block_size)
}

struct mask_tform {
__device__ uint32_t operator()(PageInfo const& p) { return p.kernel_mask; }
__device__ uint32_t operator()(PageInfo const& p) { return static_cast<uint32_t>(p.kernel_mask); }
};

} // anonymous namespace
Expand Down
12 changes: 10 additions & 2 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -991,8 +991,15 @@ struct all_types_filter {
* @brief Functor for setupLocalPageInfo that takes a mask of allowed types.
*/
struct mask_filter {
int mask;
__device__ inline bool operator()(PageInfo const& page) { return (page.kernel_mask & mask) != 0; }
uint32_t mask;

__device__ mask_filter(uint32_t m) : mask(m) {}
__device__ mask_filter(decode_kernel_mask m) : mask(static_cast<uint32_t>(m)) {}

__device__ inline bool operator()(PageInfo const& page)
{
return BitAnd(mask, page.kernel_mask) != 0;
}
};

/**
Expand Down Expand Up @@ -1306,6 +1313,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_BYTE_ARRAY:
// nothing to do, just don't error
break;
default: {
Expand Down
Loading

0 comments on commit bf63d10

Please sign in to comment.