Skip to content

Commit b74d8cb

Browse files
committed
Use stream pool for gather and scatter
1 parent 6d453df commit b74d8cb

File tree

3 files changed

+75
-40
lines changed

3 files changed

+75
-40
lines changed

cpp/include/cudf/detail/gather.cuh

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
2+
* SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION.
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55
#pragma once
@@ -10,6 +10,7 @@
1010
#include <cudf/detail/utilities/assert.cuh>
1111
#include <cudf/detail/utilities/cuda.cuh>
1212
#include <cudf/detail/utilities/grid_1d.cuh>
13+
#include <cudf/detail/utilities/stream_pool.hpp>
1314
#include <cudf/detail/utilities/vector_factories.hpp>
1415
#include <cudf/detail/valid_if.cuh>
1516
#include <cudf/dictionary/dictionary_column_view.hpp>
@@ -644,23 +645,33 @@ std::unique_ptr<table> gather(table_view const& source_table,
644645
rmm::cuda_stream_view stream,
645646
rmm::device_async_resource_ref mr)
646647
{
647-
std::vector<std::unique_ptr<column>> destination_columns;
648-
649-
// TODO: Could be beneficial to use streams internally here
650-
651-
for (auto const& source_column : source_table) {
652-
// The data gather for n columns will be put on the first n streams
653-
destination_columns.push_back(
654-
cudf::type_dispatcher<dispatch_storage_type>(source_column.type(),
655-
column_gatherer{},
656-
source_column,
657-
gather_map_begin,
658-
gather_map_end,
659-
bounds_policy == out_of_bounds_policy::NULLIFY,
660-
stream,
661-
mr));
648+
auto const num_columns = source_table.num_columns();
649+
auto result = std::vector<std::unique_ptr<column>>(num_columns);
650+
651+
// The data gather for n columns will be executed over n streams. If there is
652+
// only a single column, the fork/join overhead should be avoided.
653+
auto streams = std::vector<rmm::cuda_stream_view>{};
654+
if (num_columns > 1) {
655+
streams = cudf::detail::fork_streams(stream, num_columns);
656+
} else {
657+
streams.push_back(stream);
662658
}
663659

660+
auto it = thrust::make_counting_iterator<size_type>(0);
661+
662+
std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
663+
auto const& source_column = source_table.column(i);
664+
return cudf::type_dispatcher<dispatch_storage_type>(
665+
source_column.type(),
666+
column_gatherer{},
667+
source_column,
668+
gather_map_begin,
669+
gather_map_end,
670+
bounds_policy == out_of_bounds_policy::NULLIFY,
671+
streams[i],
672+
mr);
673+
});
674+
664675
auto needs_new_bitmask = bounds_policy == out_of_bounds_policy::NULLIFY ||
665676
cudf::has_nested_nullable_columns(source_table);
666677
if (needs_new_bitmask) {
@@ -669,15 +680,20 @@ std::unique_ptr<table> gather(table_view const& source_table,
669680
auto const op = bounds_policy == out_of_bounds_policy::NULLIFY
670681
? gather_bitmask_op::NULLIFY
671682
: gather_bitmask_op::DONT_CHECK;
672-
gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr);
683+
gather_bitmask(source_table, gather_map_begin, result, op, stream, mr);
673684
} else {
674685
for (size_type i = 0; i < source_table.num_columns(); ++i) {
675-
set_all_valid_null_masks(source_table.column(i), *destination_columns[i], stream, mr);
686+
set_all_valid_null_masks(source_table.column(i), *result[i], streams[i], mr);
676687
}
677688
}
678689
}
679690

680-
return std::make_unique<table>(std::move(destination_columns));
691+
// Join streams as late as possible so that null mask computations can run on
692+
// the passed in stream while other streams are gathering. Skip joining if
693+
// only one column, since it used the passed in stream rather than forking.
694+
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
695+
696+
return std::make_unique<table>(std::move(result));
681697
}
682698

683699
} // namespace detail

cpp/include/cudf/detail/scatter.cuh

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION.
2+
* SPDX-FileCopyrightText: Copyright (c) 2020-2026, NVIDIA CORPORATION.
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

@@ -10,6 +10,7 @@
1010
#include <cudf/detail/gather.cuh>
1111
#include <cudf/detail/indexalator.cuh>
1212
#include <cudf/detail/nvtx/ranges.hpp>
13+
#include <cudf/detail/utilities/stream_pool.hpp>
1314
#include <cudf/dictionary/detail/update_keys.hpp>
1415
#include <cudf/dictionary/dictionary_column_view.hpp>
1516
#include <cudf/dictionary/dictionary_factories.hpp>
@@ -397,22 +398,32 @@ std::unique_ptr<table> scatter(table_view const& source,
397398
thrust::make_transform_iterator(scatter_map_begin, index_converter<MapType>{target.num_rows()});
398399
auto updated_scatter_map_end =
399400
thrust::make_transform_iterator(scatter_map_end, index_converter<MapType>{target.num_rows()});
400-
auto result = std::vector<std::unique_ptr<column>>(target.num_columns());
401-
402-
std::transform(source.begin(),
403-
source.end(),
404-
target.begin(),
405-
result.begin(),
406-
[=](auto const& source_col, auto const& target_col) {
407-
return type_dispatcher<dispatch_storage_type>(source_col.type(),
408-
column_scatterer{},
409-
source_col,
410-
updated_scatter_map_begin,
411-
updated_scatter_map_end,
412-
target_col,
413-
stream,
414-
mr);
415-
});
401+
402+
auto const num_columns = target.num_columns();
403+
auto result = std::vector<std::unique_ptr<column>>(num_columns);
404+
405+
// The data scatter for n columns will be executed over n streams. If there is
406+
// only a single column, the fork/join overhead should be avoided.
407+
auto streams = std::vector<rmm::cuda_stream_view>{};
408+
if (num_columns > 1) {
409+
streams = cudf::detail::fork_streams(stream, num_columns);
410+
} else {
411+
streams.push_back(stream);
412+
}
413+
414+
auto it = thrust::make_counting_iterator<size_type>(0);
415+
416+
std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
417+
auto const& source_col = source.column(i);
418+
return type_dispatcher<dispatch_storage_type>(source_col.type(),
419+
column_scatterer{},
420+
source_col,
421+
updated_scatter_map_begin,
422+
updated_scatter_map_end,
423+
target.column(i),
424+
streams[i],
425+
mr);
426+
});
416427

417428
// We still need to call `gather_bitmask` even when the source columns are not nullable,
418429
// as if the target has null_mask, that null_mask needs to be updated after scattering.
@@ -426,7 +437,9 @@ std::unique_ptr<table> scatter(table_view const& source,
426437

427438
// For struct columns, we need to superimpose the null_mask of the parent over the null_mask of
428439
// the children.
429-
std::for_each(result.begin(), result.end(), [=](auto& col) {
440+
auto it = thrust::make_counting_iterator<size_type>(0);
441+
std::for_each(it, it + num_columns, [&](size_type i) {
442+
auto& col = result[i];
430443
auto const col_view = col->view();
431444
if (col_view.type().id() == type_id::STRUCT and col_view.nullable()) {
432445
auto const num_rows = col_view.size();
@@ -438,11 +451,17 @@ std::unique_ptr<table> scatter(table_view const& source,
438451
std::move(contents.children),
439452
null_count,
440453
std::move(*contents.null_mask),
441-
stream,
454+
streams[i],
442455
mr);
443456
}
444457
});
445458
}
459+
460+
// Join streams as late as possible so that null mask computations can run on
461+
// the passed in stream while other streams are scattering. Skip joining if
462+
// only one column, since it used the passed in stream rather than forking.
463+
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
464+
446465
return std::make_unique<table>(std::move(result));
447466
}
448467
} // namespace detail

cpp/include/cudf/detail/utilities/stream_pool.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION.
2+
* SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION.
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

@@ -99,7 +99,7 @@ cuda_stream_pool& global_cuda_stream_pool();
9999
* auto const num_streams = 2;
100100
* // do work on stream
101101
* // allocate streams and wait for an event on stream before executing on any of streams
102-
* auto streams = cudf::detail::fork_stream(stream, num_streams);
102+
* auto streams = cudf::detail::fork_streams(stream, num_streams);
103103
* // do work on streams[0] and streams[1]
104104
* // wait for event on streams before continuing to do work on stream
105105
* cudf::detail::join_streams(streams, stream);

0 commit comments

Comments
 (0)