diff --git a/cpp/include/cudf/detail/gather.cuh b/cpp/include/cudf/detail/gather.cuh index 878d3d41114..6b1ad01c241 100644 --- a/cpp/include/cudf/detail/gather.cuh +++ b/cpp/include/cudf/detail/gather.cuh @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -644,23 +645,33 @@ std::unique_ptr gather(table_view const& source_table, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - std::vector> destination_columns; - - // TODO: Could be beneficial to use streams internally here - - for (auto const& source_column : source_table) { - // The data gather for n columns will be put on the first n streams - destination_columns.push_back( - cudf::type_dispatcher(source_column.type(), - column_gatherer{}, - source_column, - gather_map_begin, - gather_map_end, - bounds_policy == out_of_bounds_policy::NULLIFY, - stream, - mr)); + auto const num_columns = source_table.num_columns(); + auto result = std::vector>(num_columns); + + // The data gather for n columns will be executed over n streams. If there is + // only a single column, the fork/join overhead should be avoided. + auto streams = std::vector{}; + if (num_columns > 1) { + streams = cudf::detail::fork_streams(stream, num_columns); + } else { + streams.push_back(stream); } + auto it = thrust::make_counting_iterator(0); + + std::transform(it, it + num_columns, result.begin(), [&](size_type i) { + auto const& source_column = source_table.column(i); + return cudf::type_dispatcher( + source_column.type(), + column_gatherer{}, + source_column, + gather_map_begin, + gather_map_end, + bounds_policy == out_of_bounds_policy::NULLIFY, + streams[i], + mr); + }); + auto needs_new_bitmask = bounds_policy == out_of_bounds_policy::NULLIFY || cudf::has_nested_nullable_columns(source_table); if (needs_new_bitmask) { @@ -669,15 +680,20 @@ std::unique_ptr
gather(table_view const& source_table, auto const op = bounds_policy == out_of_bounds_policy::NULLIFY ? gather_bitmask_op::NULLIFY : gather_bitmask_op::DONT_CHECK; - gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr); + gather_bitmask(source_table, gather_map_begin, result, op, stream, mr); } else { for (size_type i = 0; i < source_table.num_columns(); ++i) { - set_all_valid_null_masks(source_table.column(i), *destination_columns[i], stream, mr); + set_all_valid_null_masks(source_table.column(i), *result[i], streams[i], mr); } } } - return std::make_unique
(std::move(destination_columns)); + // Join streams as late as possible so that null mask computations can run on + // the passed in stream while other streams are gathering. Skip joining if + // only one column, since it used the passed in stream rather than forking. + if (num_columns > 1) { cudf::detail::join_streams(streams, stream); } + + return std::make_unique
(std::move(result)); } } // namespace detail diff --git a/cpp/include/cudf/detail/scatter.cuh b/cpp/include/cudf/detail/scatter.cuh index b35f2ff66e2..bce88c5b124 100644 --- a/cpp/include/cudf/detail/scatter.cuh +++ b/cpp/include/cudf/detail/scatter.cuh @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2020-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -397,22 +398,32 @@ std::unique_ptr
scatter(table_view const& source, thrust::make_transform_iterator(scatter_map_begin, index_converter{target.num_rows()}); auto updated_scatter_map_end = thrust::make_transform_iterator(scatter_map_end, index_converter{target.num_rows()}); - auto result = std::vector>(target.num_columns()); - - std::transform(source.begin(), - source.end(), - target.begin(), - result.begin(), - [=](auto const& source_col, auto const& target_col) { - return type_dispatcher(source_col.type(), - column_scatterer{}, - source_col, - updated_scatter_map_begin, - updated_scatter_map_end, - target_col, - stream, - mr); - }); + + auto const num_columns = target.num_columns(); + auto result = std::vector>(num_columns); + + // The data scatter for n columns will be executed over n streams. If there is + // only a single column, the fork/join overhead should be avoided. + auto streams = std::vector{}; + if (num_columns > 1) { + streams = cudf::detail::fork_streams(stream, num_columns); + } else { + streams.push_back(stream); + } + + auto it = thrust::make_counting_iterator(0); + + std::transform(it, it + num_columns, result.begin(), [&](size_type i) { + auto const& source_col = source.column(i); + return type_dispatcher(source_col.type(), + column_scatterer{}, + source_col, + updated_scatter_map_begin, + updated_scatter_map_end, + target.column(i), + streams[i], + mr); + }); // We still need to call `gather_bitmask` even when the source columns are not nullable, // as if the target has null_mask, that null_mask needs to be updated after scattering. @@ -426,7 +437,9 @@ std::unique_ptr
scatter(table_view const& source, // For struct columns, we need to superimpose the null_mask of the parent over the null_mask of // the children. - std::for_each(result.begin(), result.end(), [=](auto& col) { + auto it = thrust::make_counting_iterator(0); + std::for_each(it, it + num_columns, [&](size_type i) { + auto& col = result[i]; auto const col_view = col->view(); if (col_view.type().id() == type_id::STRUCT and col_view.nullable()) { auto const num_rows = col_view.size(); @@ -438,11 +451,17 @@ std::unique_ptr
scatter(table_view const& source, std::move(contents.children), null_count, std::move(*contents.null_mask), - stream, + streams[i], mr); } }); } + + // Join streams as late as possible so that null mask computations can run on + // the passed in stream while other streams are scattering. Skip joining if + // only one column, since it used the passed in stream rather than forking. + if (num_columns > 1) { cudf::detail::join_streams(streams, stream); } + return std::make_unique
(std::move(result)); } } // namespace detail diff --git a/cpp/include/cudf/detail/utilities/stream_pool.hpp b/cpp/include/cudf/detail/utilities/stream_pool.hpp index d68527d4c6e..f03045aa4cf 100644 --- a/cpp/include/cudf/detail/utilities/stream_pool.hpp +++ b/cpp/include/cudf/detail/utilities/stream_pool.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -99,7 +99,7 @@ cuda_stream_pool& global_cuda_stream_pool(); * auto const num_streams = 2; * // do work on stream * // allocate streams and wait for an event on stream before executing on any of streams - * auto streams = cudf::detail::fork_stream(stream, num_streams); + * auto streams = cudf::detail::fork_streams(stream, num_streams); * // do work on streams[0] and streams[1] * // wait for event on streams before continuing to do work on stream * cudf::detail::join_streams(streams, stream);