|
44 | 44 | #include <thrust/unique.h> |
45 | 45 |
|
46 | 46 | #include <bitset> |
| 47 | +#include <limits> |
47 | 48 | #include <numeric> |
48 | 49 |
|
49 | 50 | namespace cudf::io::parquet::detail { |
@@ -1592,36 +1593,68 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num |
1592 | 1593 | auto const d_cols_info = cudf::detail::make_device_uvector_async( |
1593 | 1594 | h_cols_info, _stream, cudf::get_current_device_resource_ref()); |
1594 | 1595 |
|
1595 | | - auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size(); |
1596 | | - // size iterator. indexes pages by sorted order |
1597 | | - rmm::device_uvector<size_type> size_input{num_keys, _stream}; |
1598 | | - thrust::transform( |
1599 | | - rmm::exec_policy(_stream), |
1600 | | - thrust::make_counting_iterator<size_type>(0), |
1601 | | - thrust::make_counting_iterator<size_type>(num_keys), |
1602 | | - size_input.begin(), |
1603 | | - get_page_nesting_size{ |
1604 | | - d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()}); |
1605 | | - auto const reduction_keys = |
1606 | | - cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()}); |
| 1596 | + // Vector to store page sizes for each column at each depth |
1607 | 1597 | cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream}; |
1608 | 1598 |
|
1609 | | - // find the size of each column |
1610 | | - thrust::reduce_by_key(rmm::exec_policy(_stream), |
1611 | | - reduction_keys, |
1612 | | - reduction_keys + num_keys, |
1613 | | - size_input.cbegin(), |
1614 | | - thrust::make_discard_iterator(), |
1615 | | - sizes.d_begin()); |
1616 | | - |
1617 | | - // for nested hierarchies, compute per-page start offset |
1618 | | - thrust::exclusive_scan_by_key( |
1619 | | - rmm::exec_policy(_stream), |
1620 | | - reduction_keys, |
1621 | | - reduction_keys + num_keys, |
1622 | | - size_input.cbegin(), |
1623 | | - start_offset_output_iterator{ |
1624 | | - subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()}); |
| 1599 | + // Total number of keys to process |
| 1600 | + auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size(); |
| 1601 | + |
| 1602 | + // Maximum 1 billion keys processed per iteration |
| 1603 | + auto constexpr max_keys_per_iter = |
| 1604 | + static_cast<size_t>(std::numeric_limits<size_type>::max() / 2); |
| 1605 | + |
| 1606 | + // Number of keys for per each column |
| 1607 | + auto const num_keys_per_col = max_depth * subpass.pages.size(); |
| 1608 | + |
| 1609 | + // The largest multiple of `num_keys_per_col` that is <= `num_keys` |
| 1610 | + auto const num_keys_per_iter = |
| 1611 | + num_keys <= max_keys_per_iter |
| 1612 | + ? num_keys |
| 1613 | + : num_keys_per_col * std::max<size_t>(1, max_keys_per_iter / num_keys_per_col); |
| 1614 | + |
| 1615 | + // Size iterator. Indexes pages by sorted order |
| 1616 | + rmm::device_uvector<size_type> size_input{num_keys_per_iter, _stream}; |
| 1617 | + |
| 1618 | + // To keep track of the starting key of an iteration |
| 1619 | + size_t key_start = 0; |
| 1620 | + // Loop until all keys are processed |
| 1621 | + while (key_start < num_keys) { |
| 1622 | + // Number of keys processed in this iteration |
| 1623 | + auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start); |
| 1624 | + thrust::transform( |
| 1625 | + rmm::exec_policy_nosync(_stream), |
| 1626 | + thrust::make_counting_iterator<size_t>(key_start), |
| 1627 | + thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter), |
| 1628 | + size_input.begin(), |
| 1629 | + get_page_nesting_size{ |
| 1630 | + d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()}); |
| 1631 | + |
| 1632 | + // Manually create a int64_t `key_start` compatible counting_transform_iterator to avoid |
| 1633 | + // implicit casting to size_type. |
| 1634 | + auto const reduction_keys = thrust::make_transform_iterator( |
| 1635 | + thrust::make_counting_iterator<size_t>(key_start), get_reduction_key{subpass.pages.size()}); |
| 1636 | + |
| 1637 | + // Find the size of each column |
| 1638 | + thrust::reduce_by_key(rmm::exec_policy_nosync(_stream), |
| 1639 | + reduction_keys, |
| 1640 | + reduction_keys + num_keys_this_iter, |
| 1641 | + size_input.cbegin(), |
| 1642 | + thrust::make_discard_iterator(), |
| 1643 | + sizes.d_begin() + (key_start / subpass.pages.size())); |
| 1644 | + |
| 1645 | + // For nested hierarchies, compute per-page start offset |
| 1646 | + thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream), |
| 1647 | + reduction_keys, |
| 1648 | + reduction_keys + num_keys_this_iter, |
| 1649 | + size_input.cbegin(), |
| 1650 | + start_offset_output_iterator{subpass.pages.device_begin(), |
| 1651 | + key_start, |
| 1652 | + d_cols_info.data(), |
| 1653 | + max_depth, |
| 1654 | + subpass.pages.size()}); |
| 1655 | + // Increment the key_start |
| 1656 | + key_start += num_keys_this_iter; |
| 1657 | + } |
1625 | 1658 |
|
1626 | 1659 | sizes.device_to_host_sync(_stream); |
1627 | 1660 | for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) { |
|
0 commit comments