Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distinct hash join to handle multiple probes with the same build table #17609

Merged
merged 19 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cpp/benchmarks/join/distinct_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ void distinct_inner_join(nvbench::state& state,
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls};
return hj_obj.inner_join();
auto hj_obj = cudf::distinct_hash_join{build_input, has_nulls, compare_nulls};
return hj_obj.inner_join(probe_input);
};

BM_join<Key, Nullable>(state, join);
Expand All @@ -46,9 +45,8 @@ void distinct_left_join(nvbench::state& state,
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls};
return hj_obj.left_join();
auto hj_obj = cudf::distinct_hash_join{build_input, has_nulls, compare_nulls};
return hj_obj.left_join(probe_input);
};

BM_join<Key, Nullable>(state, join);
Expand Down
108 changes: 53 additions & 55 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,24 @@ using cudf::experimental::row::lhs_index_type;
using cudf::experimental::row::rhs_index_type;

/**
* @brief An comparator adapter wrapping both self comparator and two table comparator
* @brief A custom comparator used for the build table insertion
*/
template <typename Equal>
struct comparator_adapter {
comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {}

__device__ constexpr auto operator()(
struct always_not_equal {
__device__ constexpr bool operator()(
cuco::pair<hash_value_type, rhs_index_type> const&,
cuco::pair<hash_value_type, rhs_index_type> const&) const noexcept
{
// All build table keys are distinct thus `false` no matter what
return false;
}
};

/**
* @brief An comparator adapter wrapping the two table comparator
*/
template <typename Equal>
struct comparator_adapter {
comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {}

__device__ constexpr auto operator()(
cuco::pair<hash_value_type, lhs_index_type> const& lhs,
Expand All @@ -62,56 +67,14 @@ struct comparator_adapter {
Equal _d_equal;
};

template <typename Hasher>
struct hasher_adapter {
hasher_adapter(Hasher const& d_hasher = {}) : _d_hasher{d_hasher} {}

template <typename T>
__device__ constexpr auto operator()(cuco::pair<hash_value_type, T> const& key) const noexcept
{
return _d_hasher(key.first);
}

private:
Hasher _d_hasher;
};

/**
* @brief Distinct hash join that builds hash table in creation and probes results in subsequent
* `*_join` member functions.
*
* @tparam HasNested Flag indicating whether there are nested columns in build/probe table
* This class enables the distinct hash join scheme that builds hash table once, and probes as many
* times as needed (possibly in parallel).
*/
template <cudf::has_nested HasNested>
struct distinct_hash_join {
private:
/// Device row equal type
using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<HasNested == cudf::has_nested::YES,
cudf::nullate::DYNAMIC>>;
using hasher = hasher_adapter<thrust::identity<hash_value_type>>;
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;

/// Hash table type
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, rhs_index_type>,
cuco::extent<size_type>,
cuda::thread_scope_device,
comparator_adapter<d_equal_type>,
probing_scheme_type,
cudf::detail::cuco_allocator<char>,
cuco_storage_type>;

bool _has_nulls; ///< true if nulls are present in either build table or probe table
cudf::null_equality _nulls_equal; ///< whether to consider nulls as equal
cudf::table_view _build; ///< input table to build the hash map
cudf::table_view _probe; ///< input table to probe the hash map
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_build; ///< input table preprocssed for row operators
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_probe; ///< input table preprocssed for row operators
hash_table_type _hash_table; ///< hash table built on `_build`

class distinct_hash_join {
public:
distinct_hash_join() = delete;
~distinct_hash_join() = default;
Expand All @@ -120,20 +83,30 @@ struct distinct_hash_join {
distinct_hash_join& operator=(distinct_hash_join const&) = delete;
distinct_hash_join& operator=(distinct_hash_join&&) = delete;

/**
* @brief Hasher adapter used by distinct hash join
*/
struct hasher {
template <typename T>
__device__ constexpr hash_value_type operator()(
cuco::pair<hash_value_type, T> const& key) const noexcept
{
return key.first;
}
};

/**
* @brief Constructor that internally builds the hash table based on the given `build` table.
*
* @throw cudf::logic_error if the number of columns in `build` table is 0.
*
* @param build The build table, from which the hash table is built
* @param probe The probe table
* @param has_nulls Flag to indicate if any nulls exist in the `build` table or
* any `probe` table that will be used later for join.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
distinct_hash_join(cudf::table_view const& build,
cudf::table_view const& probe,
bool has_nulls,
cudf::null_equality compare_nulls,
rmm::cuda_stream_view stream);
Expand All @@ -143,12 +116,37 @@ struct distinct_hash_join {
*/
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const;
inner_join(cudf::table_view const& probe,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

/**
* @copydoc cudf::distinct_hash_join::left_join
*/
std::unique_ptr<rmm::device_uvector<size_type>> left_join(
rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const;
cudf::table_view const& probe,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

private:
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;

/// Hash table type
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, rhs_index_type>,
cuco::extent<size_type>,
cuda::thread_scope_device,
always_not_equal,
probing_scheme_type,
cudf::detail::cuco_allocator<char>,
cuco_storage_type>;

bool _has_nulls; ///< True if nulls are present in either build table or probe table
Copy link
Contributor

@ttnghia ttnghia Jan 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do not specify the probe table, and we may not know anything about the probe tables in the future, should we set this always be true? Otherwise, specifying false but the probe table has nulls then the result may be incorrect.

I remember that we've dealt with this same issue before, with some previous version of this join code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following the same null check pattern used in hash_join, so I assume downstream users like Spark are handling this correctly. If needed, I'm happy to open a separate PR to update both distinct_hash_join and hash_join to improve this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually you reviewed that PR. Here it is: #13120.

The bug was fixed by checking hash_nulls on both table, like in https://github.com/rapidsai/cudf/pull/13120/files#diff-734dd746efe774e94501b6aa986e7824656182f85bf9af8bf30036c002ca1b82R82.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if Spark can handle this correctly but assuming that we stumbled upon the same issue before, I would highly suspect that it couldn't do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Spark handles this properly, e.g.:

auto has_nulls = cudf::has_nested_nulls(left) || cudf::has_nested_nulls(right)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;

Are you suggesting that we should remove _has_null as a data member as it's always true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Previously we have both tables thus we can compute has_nulls but now we only have one table thus this variable should be removed and the corresponding nullate value should always be true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

bool _has_nested_columns; ///< True if the table has nested columns
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
cudf::null_equality _nulls_equal; ///< Whether to consider nulls as equal
cudf::table_view _build; ///< Input table to build the hash map
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table>
_preprocessed_build; ///< Input table preprocssed for row operators
hash_table_type _hash_table; ///< Hash table built on `_build`
};
} // namespace cudf::detail
30 changes: 12 additions & 18 deletions cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@

namespace CUDF_EXPORT cudf {

/**
* @brief Enum to indicate whether the distinct join table has nested columns or not
*
* @ingroup column_join
*/
enum class has_nested : bool { YES, NO };

// forward declaration
namespace hashing::detail {

Expand All @@ -61,7 +54,6 @@ class hash_join;
/**
* @brief Forward declaration for our distinct hash join
*/
template <cudf::has_nested HasNested>
class distinct_hash_join;
} // namespace detail

Expand Down Expand Up @@ -469,20 +461,19 @@ class hash_join {
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

private:
const std::unique_ptr<impl_type const> _impl;
std::unique_ptr<impl_type const> _impl;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};

/**
* @brief Distinct hash join that builds hash table in creation and probes results in subsequent
* `*_join` member functions
*
* This class enables the distinct hash join scheme that builds hash table once, and probes as many
* times as needed (possibly in parallel).
*
* @note Behavior is undefined if the build table contains duplicates.
* @note All NaNs are considered as equal
*
* @tparam HasNested Flag indicating whether there are nested columns in build/probe table
*/
// TODO: `HasNested` to be removed via dispatching
template <cudf::has_nested HasNested>
class distinct_hash_join {
public:
distinct_hash_join() = delete;
Expand All @@ -496,14 +487,12 @@ class distinct_hash_join {
* @brief Constructs a distinct hash join object for subsequent probe calls
*
* @param build The build table that contains distinct elements
* @param probe The probe table, from which the keys are probed
* @param has_nulls Flag to indicate if there exists any nulls in the `build` table or
* any `probe` table that will be used later for join
* @param compare_nulls Controls whether null join-key values should match or not
* @param stream CUDA stream used for device memory operations and kernel launches
*/
distinct_hash_join(cudf::table_view const& build,
cudf::table_view const& probe,
nullable_join has_nulls = nullable_join::YES,
null_equality compare_nulls = null_equality::EQUAL,
rmm::cuda_stream_view stream = cudf::get_default_stream());
Expand All @@ -512,16 +501,18 @@ class distinct_hash_join {
* @brief Returns the row indices that can be used to construct the result of performing
* an inner join between two tables. @see cudf::inner_join().
*
* @param probe The probe table, from which the keys are probed
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned indices' device memory.
*
* @return A pair of columns [`build_indices`, `probe_indices`] that can be used to
* @return A pair of columns [`probe_indices`, `build_indices`] that can be used to
* construct the result of performing an inner join between two tables
* with `build` and `probe` as the join keys.
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(rmm::cuda_stream_view stream = cudf::get_default_stream(),
inner_join(cudf::table_view const& probe,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
Expand All @@ -532,19 +523,22 @@ class distinct_hash_join {
* the row index of the matched row from the build table if there is a match. Otherwise, contains
* `JoinNoneValue`.
*
* @param probe The probe table, from which the keys are probed
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table and columns' device
* memory.
*
* @return A `build_indices` column that can be used to construct the result of
* performing a left join between two tables with `build` and `probe` as the join
* keys.
*/
[[nodiscard]] std::unique_ptr<rmm::device_uvector<size_type>> left_join(
cudf::table_view const& probe,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

private:
using impl_type = typename cudf::detail::distinct_hash_join<HasNested>; ///< Implementation type
using impl_type = cudf::detail::distinct_hash_join; ///< Implementation type

std::unique_ptr<impl_type> _impl; ///< Distinct hash join implementation
};
Expand Down
Loading
Loading