Skip to content

Commit

Permalink
Refactor distinct hash join to handle multiple probes with the same b…
Browse files Browse the repository at this point in the history
…uild table (#17609)

This PR updates the distinct join implementation to allow the same build table to be reused for multiple probe operations. It also introduces several breaking changes, including removing the need for users to specify whether the input data contains nested columns. Additionally, the output order has been updated to align with the hash join behavior, with probe indices now appearing on the left and build indices on the right.

The PR leverages the new conditional query API in the cuco hash set, enabling more efficient handling of nullable data. While this optimization improves performance, it is not currently reflected in benchmarks due to the absence of a dedicated test case for this scenario.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Jason Lowe (https://github.com/jlowe)
  - Bradley Dice (https://github.com/bdice)
  - Nghia Truong (https://github.com/ttnghia)

URL: #17609
  • Loading branch information
PointKernel authored Jan 4, 2025
1 parent 756d66b commit 62d72df
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 260 deletions.
20 changes: 5 additions & 15 deletions cpp/benchmarks/join/distinct_join.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,13 +23,8 @@ void distinct_inner_join(nvbench::state& state,
auto join = [](cudf::table_view const& probe_input,
cudf::table_view const& build_input,
cudf::null_equality compare_nulls) {
auto const has_nulls =
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls};
return hj_obj.inner_join();
auto hj_obj = cudf::distinct_hash_join{build_input, compare_nulls};
return hj_obj.inner_join(probe_input);
};

BM_join<Key, Nullable>(state, join);
Expand All @@ -42,13 +37,8 @@ void distinct_left_join(nvbench::state& state,
auto join = [](cudf::table_view const& probe_input,
cudf::table_view const& build_input,
cudf::null_equality compare_nulls) {
auto const has_nulls =
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls};
return hj_obj.left_join();
auto hj_obj = cudf::distinct_hash_join{build_input, compare_nulls};
return hj_obj.left_join(probe_input);
};

BM_join<Key, Nullable>(state, join);
Expand Down
112 changes: 53 additions & 59 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,19 +36,24 @@ using cudf::experimental::row::lhs_index_type;
using cudf::experimental::row::rhs_index_type;

/**
* @brief An comparator adapter wrapping both self comparator and two table comparator
* @brief A custom comparator used for the build table insertion
*/
template <typename Equal>
struct comparator_adapter {
comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {}

__device__ constexpr auto operator()(
struct always_not_equal {
__device__ constexpr bool operator()(
cuco::pair<hash_value_type, rhs_index_type> const&,
cuco::pair<hash_value_type, rhs_index_type> const&) const noexcept
{
// All build table keys are distinct thus `false` no matter what
return false;
}
};

/**
* @brief An comparator adapter wrapping the two table comparator
*/
template <typename Equal>
struct comparator_adapter {
comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {}

__device__ constexpr auto operator()(
cuco::pair<hash_value_type, lhs_index_type> const& lhs,
Expand All @@ -62,56 +67,14 @@ struct comparator_adapter {
Equal _d_equal;
};

template <typename Hasher>
struct hasher_adapter {
hasher_adapter(Hasher const& d_hasher = {}) : _d_hasher{d_hasher} {}

template <typename T>
__device__ constexpr auto operator()(cuco::pair<hash_value_type, T> const& key) const noexcept
{
return _d_hasher(key.first);
}

private:
Hasher _d_hasher;
};

/**
* @brief Distinct hash join that builds hash table in creation and probes results in subsequent
* `*_join` member functions.
*
* @tparam HasNested Flag indicating whether there are nested columns in build/probe table
* This class enables the distinct hash join scheme that builds hash table once, and probes as many
* times as needed (possibly in parallel).
*/
template <cudf::has_nested HasNested>
struct distinct_hash_join {
private:
/// Device row equal type
using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<HasNested == cudf::has_nested::YES,
cudf::nullate::DYNAMIC>>;
using hasher = hasher_adapter<thrust::identity<hash_value_type>>;
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;

/// Hash table type
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, rhs_index_type>,
cuco::extent<size_type>,
cuda::thread_scope_device,
comparator_adapter<d_equal_type>,
probing_scheme_type,
cudf::detail::cuco_allocator<char>,
cuco_storage_type>;

bool _has_nulls; ///< true if nulls are present in either build table or probe table
cudf::null_equality _nulls_equal; ///< whether to consider nulls as equal
cudf::table_view _build; ///< input table to build the hash map
cudf::table_view _probe; ///< input table to probe the hash map
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_build; ///< input table preprocssed for row operators
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_probe; ///< input table preprocssed for row operators
hash_table_type _hash_table; ///< hash table built on `_build`

class distinct_hash_join {
public:
distinct_hash_join() = delete;
~distinct_hash_join() = default;
Expand All @@ -120,21 +83,28 @@ struct distinct_hash_join {
distinct_hash_join& operator=(distinct_hash_join const&) = delete;
distinct_hash_join& operator=(distinct_hash_join&&) = delete;

/**
* @brief Hasher adapter used by distinct hash join
*/
struct hasher {
template <typename T>
__device__ constexpr hash_value_type operator()(
cuco::pair<hash_value_type, T> const& key) const noexcept
{
return key.first;
}
};

/**
* @brief Constructor that internally builds the hash table based on the given `build` table.
*
* @throw cudf::logic_error if the number of columns in `build` table is 0.
*
* @param build The build table, from which the hash table is built
* @param probe The probe table
* @param has_nulls Flag to indicate if any nulls exist in the `build` table or
* any `probe` table that will be used later for join.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
distinct_hash_join(cudf::table_view const& build,
cudf::table_view const& probe,
bool has_nulls,
cudf::null_equality compare_nulls,
rmm::cuda_stream_view stream);

Expand All @@ -143,12 +113,36 @@ struct distinct_hash_join {
*/
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const;
inner_join(cudf::table_view const& probe,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

/**
* @copydoc cudf::distinct_hash_join::left_join
*/
std::unique_ptr<rmm::device_uvector<size_type>> left_join(
rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const;
cudf::table_view const& probe,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

private:
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;

/// Hash table type
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, rhs_index_type>,
cuco::extent<size_type>,
cuda::thread_scope_device,
always_not_equal,
probing_scheme_type,
cudf::detail::cuco_allocator<char>,
cuco_storage_type>;

bool _has_nested_columns; ///< True if nested columns are present in build and probe tables
cudf::null_equality _nulls_equal; ///< Whether to consider nulls as equal
cudf::table_view _build; ///< Input table to build the hash map
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_build; ///< Input table preprocssed for row operators
hash_table_type _hash_table; ///< Hash table built on `_build`
};
} // namespace cudf::detail
35 changes: 13 additions & 22 deletions cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,13 +34,6 @@

namespace CUDF_EXPORT cudf {

/**
* @brief Enum to indicate whether the distinct join table has nested columns or not
*
* @ingroup column_join
*/
enum class has_nested : bool { YES, NO };

// forward declaration
namespace hashing::detail {

Expand All @@ -61,7 +54,6 @@ class hash_join;
/**
* @brief Forward declaration for our distinct hash join
*/
template <cudf::has_nested HasNested>
class distinct_hash_join;
} // namespace detail

Expand Down Expand Up @@ -469,20 +461,19 @@ class hash_join {
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

private:
const std::unique_ptr<impl_type const> _impl;
std::unique_ptr<impl_type const> _impl;
};

/**
* @brief Distinct hash join that builds hash table in creation and probes results in subsequent
* `*_join` member functions
*
* This class enables the distinct hash join scheme that builds hash table once, and probes as many
* times as needed (possibly in parallel).
*
* @note Behavior is undefined if the build table contains duplicates.
* @note All NaNs are considered as equal
*
* @tparam HasNested Flag indicating whether there are nested columns in build/probe table
*/
// TODO: `HasNested` to be removed via dispatching
template <cudf::has_nested HasNested>
class distinct_hash_join {
public:
distinct_hash_join() = delete;
Expand All @@ -496,32 +487,29 @@ class distinct_hash_join {
* @brief Constructs a distinct hash join object for subsequent probe calls
*
* @param build The build table that contains distinct elements
* @param probe The probe table, from which the keys are probed
* @param has_nulls Flag to indicate if there exists any nulls in the `build` table or
* any `probe` table that will be used later for join
* @param compare_nulls Controls whether null join-key values should match or not
* @param stream CUDA stream used for device memory operations and kernel launches
*/
distinct_hash_join(cudf::table_view const& build,
cudf::table_view const& probe,
nullable_join has_nulls = nullable_join::YES,
null_equality compare_nulls = null_equality::EQUAL,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Returns the row indices that can be used to construct the result of performing
* an inner join between two tables. @see cudf::inner_join().
*
* @param probe The probe table, from which the keys are probed
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned indices' device memory.
*
* @return A pair of columns [`build_indices`, `probe_indices`] that can be used to
* @return A pair of columns [`probe_indices`, `build_indices`] that can be used to
* construct the result of performing an inner join between two tables
* with `build` and `probe` as the join keys.
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(rmm::cuda_stream_view stream = cudf::get_default_stream(),
inner_join(cudf::table_view const& probe,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
Expand All @@ -532,19 +520,22 @@ class distinct_hash_join {
* the row index of the matched row from the build table if there is a match. Otherwise, contains
* `JoinNoneValue`.
*
* @param probe The probe table, from which the keys are probed
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table and columns' device
* memory.
*
* @return A `build_indices` column that can be used to construct the result of
* performing a left join between two tables with `build` and `probe` as the join
* keys.
*/
[[nodiscard]] std::unique_ptr<rmm::device_uvector<size_type>> left_join(
cudf::table_view const& probe,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

private:
using impl_type = typename cudf::detail::distinct_hash_join<HasNested>; ///< Implementation type
using impl_type = cudf::detail::distinct_hash_join; ///< Implementation type

std::unique_ptr<impl_type> _impl; ///< Distinct hash join implementation
};
Expand Down
Loading

0 comments on commit 62d72df

Please sign in to comment.