From 69ab9880eec438d106f6769cc8323e13fe2098b0 Mon Sep 17 00:00:00 2001 From: Basit Ayantunde Date: Fri, 20 Sep 2024 22:44:01 +0100 Subject: [PATCH] Exposed stream-ordering to join API (#16793) Adds stream ordering to the public join APIs: - `inner_join` - `left_join` - `full_join` - `left_semi_join` - `left_anti_join` - `cross_join` - `conditional_inner_join` - `conditional_left_join` - `conditional_full_join` - `conditional_left_semi_join` - `conditional_left_anti_join` - `mixed_inner_join` - `mixed_left_join` - `mixed_full_join` - `mixed_left_semi_join` - `mixed_left_anti_join` - `mixed_inner_join_size` - `mixed_left_join_size` - `conditional_inner_join_size` - `conditional_left_join_size` - `conditional_left_semi_join_size` - `conditional_left_anti_join_size` closes #16792 follows up https://github.com/rapidsai/cudf/issues/13744 Authors: - Basit Ayantunde (https://github.com/lamarrr) Approvers: - Paul Mattione (https://github.com/pmattione-nvidia) - Nghia Truong (https://github.com/ttnghia) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/16793 --- .../ndsh_data_generator/table_helpers.cpp | 2 +- cpp/benchmarks/ndsh/utilities.cpp | 15 +- cpp/examples/parquet_io/parquet_io.cpp | 9 +- cpp/include/cudf/join.hpp | 44 ++++ cpp/src/join/conditional_join.cu | 75 ++---- cpp/src/join/conditional_join.hpp | 1 - cpp/src/join/cross_join.cu | 4 +- cpp/src/join/join.cu | 10 +- cpp/src/join/mixed_join.cu | 16 +- cpp/src/join/mixed_join_semi.cu | 7 +- cpp/src/join/semi_join.cu | 7 +- cpp/tests/CMakeLists.txt | 1 + cpp/tests/join/join_tests.cpp | 12 +- cpp/tests/join/semi_anti_join_tests.cpp | 7 +- cpp/tests/streams/join_test.cpp | 219 ++++++++++++++++++ 15 files changed, 349 insertions(+), 80 deletions(-) create mode 100644 cpp/tests/streams/join_test.cpp diff --git a/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp index d4368906702..54d177df401 100644 --- a/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp +++ b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp @@ -85,7 +85,7 @@ std::unique_ptr perform_left_join(cudf::table_view const& left_inpu auto const left_selected = left_input.select(left_on); auto const right_selected = right_input.select(right_on); auto const [left_join_indices, right_join_indices] = - cudf::left_join(left_selected, right_selected, cudf::null_equality::EQUAL, mr); + cudf::left_join(left_selected, right_selected, cudf::null_equality::EQUAL, stream, mr); auto const left_indices_span = cudf::device_span{*left_join_indices}; auto const right_indices_span = cudf::device_span{*right_join_indices}; diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index 2d514764fc2..62116ddf661 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -146,11 +147,15 @@ std::unique_ptr join_and_gather(cudf::table_view const& left_input, cudf::null_equality compare_nulls) { CUDF_FUNC_RANGE(); - constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; - auto const left_selected = left_input.select(left_on); - auto const right_selected = right_input.select(right_on); - auto const [left_join_indices, right_join_indices] = cudf::inner_join( - left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); + constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto const left_selected = left_input.select(left_on); + auto const right_selected = right_input.select(right_on); + auto const [left_join_indices, right_join_indices] = + cudf::inner_join(left_selected, + right_selected, + compare_nulls, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); auto const left_indices_span = cudf::device_span{*left_join_indices}; auto const right_indices_span = cudf::device_span{*right_join_indices}; diff --git a/cpp/examples/parquet_io/parquet_io.cpp b/cpp/examples/parquet_io/parquet_io.cpp index 442731694fa..9cda22d0695 100644 --- a/cpp/examples/parquet_io/parquet_io.cpp +++ b/cpp/examples/parquet_io/parquet_io.cpp @@ -18,6 +18,8 @@ #include "../utilities/timer.hpp" +#include + /** * @file parquet_io.cpp * @brief Demonstrates usage of the libcudf APIs to read and write @@ -159,8 +161,11 @@ int main(int argc, char const** argv) // Left anti-join the original and transcoded tables // identical tables should not throw an exception and // return an empty indices vector - auto const indices = cudf::left_anti_join( - input->view(), transcoded_input->view(), cudf::null_equality::EQUAL, resource.get()); + auto const indices = cudf::left_anti_join(input->view(), + transcoded_input->view(), + cudf::null_equality::EQUAL, + cudf::get_default_stream(), + resource.get()); // No exception thrown, check indices auto const valid = indices->size() == 0; diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index cc8912cb022..a590eb27511 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -97,6 +97,7 @@ class distinct_hash_join; * @param[in] right_keys The right table * @param[in] compare_nulls controls whether null join-key values * should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -108,6 +109,7 @@ std::pair>, inner_join(cudf::table_view const& left_keys, cudf::table_view const& right_keys, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -137,6 +139,7 @@ inner_join(cudf::table_view const& left_keys, * @param[in] right_keys The right table * @param[in] compare_nulls controls whether null join-key values * should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -148,6 +151,7 @@ std::pair>, left_join(cudf::table_view const& left_keys, cudf::table_view const& right_keys, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -176,6 +180,7 @@ left_join(cudf::table_view const& left_keys, * @param[in] right_keys The right table * @param[in] compare_nulls controls whether null join-key values * should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -187,6 +192,7 @@ std::pair>, full_join(cudf::table_view const& left_keys, cudf::table_view const& right_keys, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -205,6 +211,7 @@ full_join(cudf::table_view const& left_keys, * @param left_keys The left table * @param right_keys The right table * @param compare_nulls Controls whether null join-key values should match or not + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A vector `left_indices` that can be used to construct @@ -215,6 +222,7 @@ std::unique_ptr> left_semi_join( cudf::table_view const& left_keys, cudf::table_view const& right_keys, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -236,6 +244,7 @@ std::unique_ptr> left_semi_join( * @param[in] right_keys The right table * @param[in] compare_nulls controls whether null join-key values * should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A column `left_indices` that can be used to construct @@ -246,6 +255,7 @@ std::unique_ptr> left_anti_join( cudf::table_view const& left_keys, cudf::table_view const& right_keys, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -266,6 +276,7 @@ std::unique_ptr> left_anti_join( * * @param left The left table * @param right The right table + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table's device memory * * @return Result of cross joining `left` and `right` tables @@ -273,6 +284,7 @@ std::unique_ptr> left_anti_join( std::unique_ptr cross_join( cudf::table_view const& left, cudf::table_view const& right, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -567,6 +579,7 @@ class distinct_hash_join { * @param right The right table * @param binary_predicate The condition on which to join * @param output_size Optional value which allows users to specify the exact output size + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -578,6 +591,7 @@ conditional_inner_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, std::optional output_size = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -612,6 +626,7 @@ conditional_inner_join(table_view const& left, * @param right The right table * @param binary_predicate The condition on which to join * @param output_size Optional value which allows users to specify the exact output size + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -623,6 +638,7 @@ conditional_left_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, std::optional output_size = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -655,6 +671,7 @@ conditional_left_join(table_view const& left, * @param left The left table * @param right The right table * @param binary_predicate The condition on which to join + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -665,6 +682,7 @@ std::pair>, conditional_full_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -693,6 +711,7 @@ conditional_full_join(table_view const& left, * @param right The right table * @param binary_predicate The condition on which to join * @param output_size Optional value which allows users to specify the exact output size + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A vector `left_indices` that can be used to construct the result of @@ -704,6 +723,7 @@ std::unique_ptr> conditional_left_semi_join( table_view const& right, ast::expression const& binary_predicate, std::optional output_size = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -732,6 +752,7 @@ std::unique_ptr> conditional_left_semi_join( * @param right The right table * @param binary_predicate The condition on which to join * @param output_size Optional value which allows users to specify the exact output size + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A vector `left_indices` that can be used to construct the result of @@ -743,6 +764,7 @@ std::unique_ptr> conditional_left_anti_join( table_view const& right, ast::expression const& binary_predicate, std::optional output_size = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -786,6 +808,7 @@ std::unique_ptr> conditional_left_anti_join( * @param output_size_data An optional pair of values indicating the exact output size and the * number of matches for each row in the larger of the two input tables, left or right (may be * precomputed using the corresponding mixed_inner_join_size API). + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -801,6 +824,7 @@ mixed_inner_join( ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, std::optional>> output_size_data = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -846,6 +870,7 @@ mixed_inner_join( * @param output_size_data An optional pair of values indicating the exact output size and the * number of matches for each row in the larger of the two input tables, left or right (may be * precomputed using the corresponding mixed_left_join_size API). + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -861,6 +886,7 @@ mixed_left_join( ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, std::optional>> output_size_data = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -906,6 +932,7 @@ mixed_left_join( * @param output_size_data An optional pair of values indicating the exact output size and the * number of matches for each row in the larger of the two input tables, left or right (may be * precomputed using the corresponding mixed_full_join_size API). + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -921,6 +948,7 @@ mixed_full_join( ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, std::optional>> output_size_data = {}, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -956,6 +984,7 @@ mixed_full_join( * @param right_conditional The right table used for the conditional join * @param binary_predicate The condition on which to join * @param compare_nulls Whether or not null values join to each other or not + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -968,6 +997,7 @@ std::unique_ptr> mixed_left_semi_join( table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1004,6 +1034,7 @@ std::unique_ptr> mixed_left_semi_join( * @param right_conditional The right table used for the conditional join * @param binary_predicate The condition on which to join * @param compare_nulls Whether or not null values join to each other or not + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair of vectors [`left_indices`, `right_indices`] that can be used to construct @@ -1016,6 +1047,7 @@ std::unique_ptr> mixed_left_anti_join( table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1041,6 +1073,7 @@ std::unique_ptr> mixed_left_anti_join( * @param right_conditional The right table used for the conditional join * @param binary_predicate The condition on which to join * @param compare_nulls Whether or not null values join to each other or not + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair containing the size that would result from performing the @@ -1056,6 +1089,7 @@ std::pair>> mixed_in table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1081,6 +1115,7 @@ std::pair>> mixed_in * @param right_conditional The right table used for the conditional join * @param binary_predicate The condition on which to join * @param compare_nulls Whether or not null values join to each other or not + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return A pair containing the size that would result from performing the @@ -1096,6 +1131,7 @@ std::pair>> mixed_le table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1111,6 +1147,7 @@ std::pair>> mixed_le * @param left The left table * @param right The right table * @param binary_predicate The condition on which to join + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return The size that would result from performing the requested join @@ -1119,6 +1156,7 @@ std::size_t conditional_inner_join_size( table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1134,6 +1172,7 @@ std::size_t conditional_inner_join_size( * @param left The left table * @param right The right table * @param binary_predicate The condition on which to join + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return The size that would result from performing the requested join @@ -1142,6 +1181,7 @@ std::size_t conditional_left_join_size( table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1157,6 +1197,7 @@ std::size_t conditional_left_join_size( * @param left The left table * @param right The right table * @param binary_predicate The condition on which to join + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return The size that would result from performing the requested join @@ -1165,6 +1206,7 @@ std::size_t conditional_left_semi_join_size( table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -1180,6 +1222,7 @@ std::size_t conditional_left_semi_join_size( * @param left The left table * @param right The right table * @param binary_predicate The condition on which to join + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device memory * * @return The size that would result from performing the requested join @@ -1188,6 +1231,7 @@ std::size_t conditional_left_anti_join_size( table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @} */ // end of group } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu index 748691fb7d1..2ec23e0dc6d 100644 --- a/cpp/src/join/conditional_join.cu +++ b/cpp/src/join/conditional_join.cu @@ -27,7 +27,6 @@ #include #include #include -#include #include #include @@ -377,16 +376,12 @@ conditional_inner_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, std::optional output_size, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::conditional_join(left, - right, - binary_predicate, - detail::join_kind::INNER_JOIN, - output_size, - cudf::get_default_stream(), - mr); + return detail::conditional_join( + left, right, binary_predicate, detail::join_kind::INNER_JOIN, output_size, stream, mr); } std::pair>, @@ -395,16 +390,12 @@ conditional_left_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, std::optional output_size, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::conditional_join(left, - right, - binary_predicate, - detail::join_kind::LEFT_JOIN, - output_size, - cudf::get_default_stream(), - mr); + return detail::conditional_join( + left, right, binary_predicate, detail::join_kind::LEFT_JOIN, output_size, stream, mr); } std::pair>, @@ -412,16 +403,12 @@ std::pair>, conditional_full_join(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::conditional_join(left, - right, - binary_predicate, - detail::join_kind::FULL_JOIN, - {}, - cudf::get_default_stream(), - mr); + return detail::conditional_join( + left, right, binary_predicate, detail::join_kind::FULL_JOIN, {}, stream, mr); } std::unique_ptr> conditional_left_semi_join( @@ -429,16 +416,12 @@ std::unique_ptr> conditional_left_semi_join( table_view const& right, ast::expression const& binary_predicate, std::optional output_size, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::conditional_join_anti_semi(left, - right, - binary_predicate, - detail::join_kind::LEFT_SEMI_JOIN, - output_size, - cudf::get_default_stream(), - mr); + return detail::conditional_join_anti_semi( + left, right, binary_predicate, detail::join_kind::LEFT_SEMI_JOIN, output_size, stream, mr); } std::unique_ptr> conditional_left_anti_join( @@ -446,64 +429,56 @@ std::unique_ptr> conditional_left_anti_join( table_view const& right, ast::expression const& binary_predicate, std::optional output_size, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::conditional_join_anti_semi(left, - right, - binary_predicate, - detail::join_kind::LEFT_ANTI_JOIN, - output_size, - cudf::get_default_stream(), - mr); + return detail::conditional_join_anti_semi( + left, right, binary_predicate, detail::join_kind::LEFT_ANTI_JOIN, output_size, stream, mr); } std::size_t conditional_inner_join_size(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::compute_conditional_join_output_size( - left, right, binary_predicate, detail::join_kind::INNER_JOIN, cudf::get_default_stream(), mr); + left, right, binary_predicate, detail::join_kind::INNER_JOIN, stream, mr); } std::size_t conditional_left_join_size(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::compute_conditional_join_output_size( - left, right, binary_predicate, detail::join_kind::LEFT_JOIN, cudf::get_default_stream(), mr); + left, right, binary_predicate, detail::join_kind::LEFT_JOIN, stream, mr); } std::size_t conditional_left_semi_join_size(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::compute_conditional_join_output_size(left, - right, - binary_predicate, - detail::join_kind::LEFT_SEMI_JOIN, - cudf::get_default_stream(), - mr); + return detail::compute_conditional_join_output_size( + left, right, binary_predicate, detail::join_kind::LEFT_SEMI_JOIN, stream, mr); } std::size_t conditional_left_anti_join_size(table_view const& left, table_view const& right, ast::expression const& binary_predicate, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::compute_conditional_join_output_size(left, - right, - binary_predicate, - detail::join_kind::LEFT_ANTI_JOIN, - cudf::get_default_stream(), - mr); + return detail::compute_conditional_join_output_size( + left, right, binary_predicate, detail::join_kind::LEFT_ANTI_JOIN, stream, mr); } } // namespace cudf diff --git a/cpp/src/join/conditional_join.hpp b/cpp/src/join/conditional_join.hpp index 4f6a9484e8c..303442e79ef 100644 --- a/cpp/src/join/conditional_join.hpp +++ b/cpp/src/join/conditional_join.hpp @@ -19,7 +19,6 @@ #include #include -#include #include #include diff --git a/cpp/src/join/cross_join.cu b/cpp/src/join/cross_join.cu index eeb49736bac..15594fb60e3 100644 --- a/cpp/src/join/cross_join.cu +++ b/cpp/src/join/cross_join.cu @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -75,10 +74,11 @@ std::unique_ptr cross_join(cudf::table_view const& left, std::unique_ptr cross_join(cudf::table_view const& left, cudf::table_view const& right, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::cross_join(left, right, cudf::get_default_stream(), mr); + return detail::cross_join(left, right, stream, mr); } } // namespace cudf diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index 0abff27667b..7b13c260364 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -20,7 +20,6 @@ #include #include #include -#include #include #include @@ -120,10 +119,11 @@ std::pair>, inner_join(table_view const& left, table_view const& right, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::inner_join(left, right, compare_nulls, cudf::get_default_stream(), mr); + return detail::inner_join(left, right, compare_nulls, stream, mr); } std::pair>, @@ -131,10 +131,11 @@ std::pair>, left_join(table_view const& left, table_view const& right, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::left_join(left, right, compare_nulls, cudf::get_default_stream(), mr); + return detail::left_join(left, right, compare_nulls, stream, mr); } std::pair>, @@ -142,10 +143,11 @@ std::pair>, full_join(table_view const& left, table_view const& right, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::full_join(left, right, compare_nulls, cudf::get_default_stream(), mr); + return detail::full_join(left, right, compare_nulls, stream, mr); } } // namespace cudf diff --git a/cpp/src/join/mixed_join.cu b/cpp/src/join/mixed_join.cu index 8ff78dd47f4..820b81ee309 100644 --- a/cpp/src/join/mixed_join.cu +++ b/cpp/src/join/mixed_join.cu @@ -28,7 +28,6 @@ #include #include #include -#include #include #include @@ -484,6 +483,7 @@ mixed_inner_join( ast::expression const& binary_predicate, null_equality compare_nulls, std::optional>> const output_size_data, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -495,7 +495,7 @@ mixed_inner_join( compare_nulls, detail::join_kind::INNER_JOIN, output_size_data, - cudf::get_default_stream(), + stream, mr); } @@ -506,6 +506,7 @@ std::pair>> mixed_in table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -516,7 +517,7 @@ std::pair>> mixed_in binary_predicate, compare_nulls, detail::join_kind::INNER_JOIN, - cudf::get_default_stream(), + stream, mr); } @@ -530,6 +531,7 @@ mixed_left_join( ast::expression const& binary_predicate, null_equality compare_nulls, std::optional>> const output_size_data, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -541,7 +543,7 @@ mixed_left_join( compare_nulls, detail::join_kind::LEFT_JOIN, output_size_data, - cudf::get_default_stream(), + stream, mr); } @@ -552,6 +554,7 @@ std::pair>> mixed_le table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -562,7 +565,7 @@ std::pair>> mixed_le binary_predicate, compare_nulls, detail::join_kind::LEFT_JOIN, - cudf::get_default_stream(), + stream, mr); } @@ -576,6 +579,7 @@ mixed_full_join( ast::expression const& binary_predicate, null_equality compare_nulls, std::optional>> const output_size_data, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -587,7 +591,7 @@ mixed_full_join( compare_nulls, detail::join_kind::FULL_JOIN, output_size_data, - cudf::get_default_stream(), + stream, mr); } diff --git a/cpp/src/join/mixed_join_semi.cu b/cpp/src/join/mixed_join_semi.cu index cfb785e242c..aa4fa281159 100644 --- a/cpp/src/join/mixed_join_semi.cu +++ b/cpp/src/join/mixed_join_semi.cu @@ -29,7 +29,6 @@ #include #include #include -#include #include #include @@ -267,6 +266,7 @@ std::unique_ptr> mixed_left_semi_join( table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -277,7 +277,7 @@ std::unique_ptr> mixed_left_semi_join( binary_predicate, compare_nulls, detail::join_kind::LEFT_SEMI_JOIN, - cudf::get_default_stream(), + stream, mr); } @@ -288,6 +288,7 @@ std::unique_ptr> mixed_left_anti_join( table_view const& right_conditional, ast::expression const& binary_predicate, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -298,7 +299,7 @@ std::unique_ptr> mixed_left_anti_join( binary_predicate, compare_nulls, detail::join_kind::LEFT_ANTI_JOIN, - cudf::get_default_stream(), + stream, mr); } diff --git a/cpp/src/join/semi_join.cu b/cpp/src/join/semi_join.cu index f69ded73e8d..d2ab2122c75 100644 --- a/cpp/src/join/semi_join.cu +++ b/cpp/src/join/semi_join.cu @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -98,22 +97,24 @@ std::unique_ptr> left_semi_join( cudf::table_view const& left, cudf::table_view const& right, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::left_semi_anti_join( - detail::join_kind::LEFT_SEMI_JOIN, left, right, compare_nulls, cudf::get_default_stream(), mr); + detail::join_kind::LEFT_SEMI_JOIN, left, right, compare_nulls, stream, mr); } std::unique_ptr> left_anti_join( cudf::table_view const& left, cudf::table_view const& right, null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::left_semi_anti_join( - detail::join_kind::LEFT_ANTI_JOIN, left, right, compare_nulls, cudf::get_default_stream(), mr); + detail::join_kind::LEFT_ANTI_JOIN, left, right, compare_nulls, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 1bedb344a01..586bac97570 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -691,6 +691,7 @@ ConfigureTest(STREAM_DICTIONARY_TEST streams/dictionary_test.cpp STREAM_MODE tes ConfigureTest(STREAM_FILLING_TEST streams/filling_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_GROUPBY_TEST streams/groupby_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_HASHING_TEST streams/hash_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_JOIN_TEST streams/join_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LABELING_BINS_TEST streams/labeling_bins_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) diff --git a/cpp/tests/join/join_tests.cpp b/cpp/tests/join/join_tests.cpp index ab387a5c7f5..3431e941359 100644 --- a/cpp/tests/join/join_tests.cpp +++ b/cpp/tests/join/join_tests.cpp @@ -39,6 +39,8 @@ #include #include +#include + #include template @@ -60,6 +62,7 @@ template >, cudf::table_view const& left_keys, cudf::table_view const& right_keys, cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr), cudf::out_of_bounds_policy oob_policy = cudf::out_of_bounds_policy::DONT_CHECK> std::unique_ptr join_and_gather( @@ -68,12 +71,13 @@ std::unique_ptr join_and_gather( std::vector const& left_on, std::vector const& right_on, cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) { auto left_selected = left_input.select(left_on); auto right_selected = right_input.select(right_on); auto const [left_join_indices, right_join_indices] = - join_impl(left_selected, right_selected, compare_nulls, mr); + join_impl(left_selected, right_selected, compare_nulls, stream, mr); auto left_indices_span = cudf::device_span{*left_join_indices}; auto right_indices_span = cudf::device_span{*right_join_indices}; @@ -2027,7 +2031,11 @@ struct JoinTestLists : public cudf::test::BaseFixture { auto const probe_tv = cudf::table_view{{probe}}; auto const [left_result_map, right_result_map] = - join_func(build_tv, probe_tv, nulls_equal, cudf::get_current_device_resource_ref()); + join_func(build_tv, + probe_tv, + nulls_equal, + cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); auto const left_result_table = sort_and_gather(build_tv, column_view_from_device_uvector(*left_result_map), oob_policy); diff --git a/cpp/tests/join/semi_anti_join_tests.cpp b/cpp/tests/join/semi_anti_join_tests.cpp index 3e279260b99..554d5754e39 100644 --- a/cpp/tests/join/semi_anti_join_tests.cpp +++ b/cpp/tests/join/semi_anti_join_tests.cpp @@ -28,8 +28,11 @@ #include #include #include +#include #include +#include + #include template @@ -51,6 +54,7 @@ template > (*join_impl)( cudf::table_view const& left_keys, cudf::table_view const& right_keys, cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)> std::unique_ptr join_and_gather( cudf::table_view const& left_input, @@ -58,11 +62,12 @@ std::unique_ptr join_and_gather( std::vector const& left_on, std::vector const& right_on, cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) { auto left_selected = left_input.select(left_on); auto right_selected = right_input.select(right_on); - auto const join_indices = join_impl(left_selected, right_selected, compare_nulls, mr); + auto const join_indices = join_impl(left_selected, right_selected, compare_nulls, stream, mr); auto left_indices_span = cudf::device_span{*join_indices}; auto left_indices_col = cudf::column_view{left_indices_span}; diff --git a/cpp/tests/streams/join_test.cpp b/cpp/tests/streams/join_test.cpp new file mode 100644 index 00000000000..2811bb676fa --- /dev/null +++ b/cpp/tests/streams/join_test.cpp @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +class JoinTest : public cudf::test::BaseFixture { + static inline cudf::table make_table() + { + cudf::test::fixed_width_column_wrapper col0{{3, 1, 2, 0, 3}}; + cudf::test::strings_column_wrapper col1{{"s0", "s1", "s2", "s4", "s1"}}; + cudf::test::fixed_width_column_wrapper col2{{0, 1, 2, 4, 1}}; + + std::vector> columns; + columns.push_back(col0.release()); + columns.push_back(col1.release()); + columns.push_back(col2.release()); + + return cudf::table{std::move(columns)}; + } + + public: + cudf::table table0{make_table()}; + cudf::table table1{make_table()}; + cudf::table conditional0{make_table()}; + cudf::table conditional1{make_table()}; + cudf::ast::column_reference col_ref_left_0{0}; + cudf::ast::column_reference col_ref_right_0{0, cudf::ast::table_reference::RIGHT}; + cudf::ast::operation left_zero_eq_right_zero{ + cudf::ast::ast_operator::EQUAL, col_ref_left_0, col_ref_right_0}; +}; + +TEST_F(JoinTest, InnerJoin) +{ + cudf::inner_join(table0, table1, cudf::null_equality::EQUAL, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, LeftJoin) +{ + cudf::left_join(table0, table1, cudf::null_equality::EQUAL, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, FullJoin) +{ + cudf::full_join(table0, table1, cudf::null_equality::EQUAL, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, LeftSemiJoin) +{ + cudf::left_semi_join( + table0, table1, cudf::null_equality::EQUAL, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, LeftAntiJoin) +{ + cudf::left_anti_join( + table0, table1, cudf::null_equality::EQUAL, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, CrossJoin) { cudf::cross_join(table0, table1, cudf::test::get_default_stream()); } + +TEST_F(JoinTest, ConditionalInnerJoin) +{ + cudf::conditional_inner_join( + table0, table1, left_zero_eq_right_zero, std::nullopt, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftJoin) +{ + cudf::conditional_left_join( + table0, table1, left_zero_eq_right_zero, std::nullopt, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalFullJoin) +{ + cudf::conditional_full_join( + table0, table1, left_zero_eq_right_zero, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftSemiJoin) +{ + cudf::conditional_left_semi_join( + table0, table1, left_zero_eq_right_zero, std::nullopt, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftAntiJoin) +{ + cudf::conditional_left_anti_join( + table0, table1, left_zero_eq_right_zero, std::nullopt, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedInnerJoin) +{ + cudf::mixed_inner_join(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + std::nullopt, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedLeftJoin) +{ + cudf::mixed_left_join(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + std::nullopt, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedFullJoin) +{ + cudf::mixed_full_join(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + std::nullopt, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedLeftSemiJoin) +{ + cudf::mixed_left_semi_join(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedLeftAntiJoin) +{ + cudf::mixed_left_anti_join(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedInnerJoinSize) +{ + cudf::mixed_inner_join_size(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, MixedLeftJoinSize) +{ + cudf::mixed_left_join_size(table0, + table1, + conditional0, + conditional1, + left_zero_eq_right_zero, + cudf::null_equality::EQUAL, + cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalInnerJoinSize) +{ + cudf::conditional_inner_join_size( + table0, table1, left_zero_eq_right_zero, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftJoinSize) +{ + cudf::conditional_left_join_size( + table0, table1, left_zero_eq_right_zero, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftSemiJoinSize) +{ + cudf::conditional_left_semi_join_size( + table0, table1, left_zero_eq_right_zero, cudf::test::get_default_stream()); +} + +TEST_F(JoinTest, ConditionalLeftAntiJoinSize) +{ + cudf::conditional_left_anti_join_size( + table0, table1, left_zero_eq_right_zero, cudf::test::get_default_stream()); +}