Skip to content

Commit

Permalink
cudf::merge public API now support passing a user stream (#16124)
Browse files Browse the repository at this point in the history
Expands the `cudf::merge` function to support a user stream

Found as part of #15982 when building benchmarks

Authors:
  - Robert Maynard (https://github.com/robertmaynard)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #16124
  • Loading branch information
robertmaynard authored Jul 2, 2024
1 parent 3bd9975 commit f534e20
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 3 deletions.
1 change: 1 addition & 0 deletions cpp/include/cudf/detail/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ using index_vector = rmm::device_uvector<index_type>;
* std::vector<cudf::size_type> const& key_cols,
* std::vector<cudf::order> const& column_order,
* std::vector<cudf::null_order> const& null_precedence,
* rmm::cuda_stream_view stream,
* rmm::device_async_resource_ref mr)
*
* @param stream CUDA stream used for device memory operations and kernel launches
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/cudf/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ namespace cudf {
* @param[in] column_order Sort order types of columns indexed by key_cols
* @param[in] null_precedence Array indicating the order of nulls with respect
* to non-nulls for the indexing columns (key_cols)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table's device memory
*
* @returns A table containing sorted data from all input tables
Expand All @@ -106,7 +107,7 @@ std::unique_ptr<cudf::table> merge(
std::vector<cudf::size_type> const& key_cols,
std::vector<cudf::order> const& column_order,
std::vector<cudf::null_order> const& null_precedence = {},
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/merge/merge.cu
Original file line number Diff line number Diff line change
Expand Up @@ -694,11 +694,11 @@ std::unique_ptr<cudf::table> merge(std::vector<table_view> const& tables_to_merg
std::vector<cudf::size_type> const& key_cols,
std::vector<cudf::order> const& column_order,
std::vector<cudf::null_order> const& null_precedence,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::merge(
tables_to_merge, key_cols, column_order, null_precedence, cudf::get_default_stream(), mr);
return detail::merge(tables_to_merge, key_cols, column_order, null_precedence, stream, mr);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LABELING_BINS_TEST streams/labeling_bins_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_MERGE_TEST streams/merge_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_MULTIBYTE_SPLIT_TEST streams/io/multibyte_split_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_ORCIO_TEST streams/io/orc_test.cpp STREAM_MODE testing)
Expand Down
137 changes: 137 additions & 0 deletions cpp/tests/streams/merge_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/testing_main.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/merge.hpp>
#include <cudf/sorting.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>

#include <vector>

template <typename T>
class MergeTest_ : public cudf::test::BaseFixture {};

TYPED_TEST_SUITE(MergeTest_, cudf::test::FixedWidthTypes);

TYPED_TEST(MergeTest_, MergeIsZeroWhenShouldNotBeZero)
{
using columnFactoryT = cudf::test::fixed_width_column_wrapper<TypeParam, int32_t>;

columnFactoryT leftColWrap1({1, 2, 3, 4, 5});
cudf::test::fixed_width_column_wrapper<TypeParam> rightColWrap1{};

std::vector<cudf::size_type> key_cols{0};
std::vector<cudf::order> column_order;
column_order.push_back(cudf::order::ASCENDING);
std::vector<cudf::null_order> null_precedence(column_order.size(), cudf::null_order::AFTER);

cudf::table_view left_view{{leftColWrap1}};
cudf::table_view right_view{{rightColWrap1}};
cudf::table_view expected{{leftColWrap1}};

auto result = cudf::merge({left_view, right_view},
key_cols,
column_order,
null_precedence,
cudf::test::get_default_stream());

int expected_len = 5;
ASSERT_EQ(result->num_rows(), expected_len);
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result->view());
}

TYPED_TEST(MergeTest_, SingleTableInput)
{
cudf::size_type inputRows = 40;

auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; });
cudf::test::fixed_width_column_wrapper<TypeParam, typename decltype(sequence)::value_type>
colWrap1(sequence, sequence + inputRows);

std::vector<cudf::size_type> key_cols{0};
std::vector<cudf::order> column_order{cudf::order::ASCENDING};
std::vector<cudf::null_order> null_precedence{};

cudf::table_view left_view{{colWrap1}};

std::unique_ptr<cudf::table> p_outputTable;
CUDF_EXPECT_NO_THROW(
p_outputTable = cudf::merge(
{left_view}, key_cols, column_order, null_precedence, cudf::test::get_default_stream()));

auto input_column_view{left_view.column(0)};
auto output_column_view{p_outputTable->view().column(0)};

CUDF_TEST_EXPECT_COLUMNS_EQUAL(input_column_view, output_column_view);
}

class MergeTest : public cudf::test::BaseFixture {};

TEST_F(MergeTest, KeysWithNulls)
{
cudf::size_type nrows = 13200; // Ensures that thrust::merge uses more than one tile/block
auto data_iter = thrust::make_counting_iterator<int32_t>(0);
auto valids1 =
cudf::detail::make_counting_transform_iterator(0, [](auto row) { return row % 10 != 0; });
cudf::test::fixed_width_column_wrapper<int32_t> data1(data_iter, data_iter + nrows, valids1);
auto valids2 =
cudf::detail::make_counting_transform_iterator(0, [](auto row) { return row % 15 != 0; });
cudf::test::fixed_width_column_wrapper<int32_t> data2(data_iter, data_iter + nrows, valids2);
auto all_data = cudf::concatenate(std::vector<cudf::column_view>{{data1, data2}},
cudf::test::get_default_stream());

std::vector<cudf::order> column_orders{cudf::order::ASCENDING, cudf::order::DESCENDING};
std::vector<cudf::null_order> null_precedences{cudf::null_order::AFTER, cudf::null_order::BEFORE};

for (auto co : column_orders)
for (auto np : null_precedences) {
std::vector<cudf::order> column_order{co};
std::vector<cudf::null_order> null_precedence{np};
auto sorted1 = cudf::sort(cudf::table_view({data1}),
column_order,
null_precedence,
cudf::test::get_default_stream())
->release();
auto col1 = sorted1.front()->view();
auto sorted2 = cudf::sort(cudf::table_view({data2}),
column_order,
null_precedence,
cudf::test::get_default_stream())
->release();
auto col2 = sorted2.front()->view();

auto result = cudf::merge({cudf::table_view({col1}), cudf::table_view({col2})},
{0},
column_order,
null_precedence,
cudf::test::get_default_stream());
auto sorted_all = cudf::sort(cudf::table_view({all_data->view()}),
column_order,
null_precedence,
cudf::test::get_default_stream());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(sorted_all->view().column(0), result->view().column(0));
}
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit f534e20

Please sign in to comment.