-
Notifications
You must be signed in to change notification settings - Fork 1k
Global stream pool #13922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Global stream pool #13922
Changes from 101 commits
5e9cf26
9326321
ee7511d
2cafe62
219ff0b
0e181a8
129d9ab
2b5a25d
b58e55c
c63e503
2c5e087
996893e
f1f74dc
639b8ab
02cd2be
9871d66
7debd29
c1bbb84
8e66a08
7b09c4f
5e05872
6576aa3
1d6efbc
334fb53
d18dabf
05eb40f
a957ecc
9b636c7
6a53d43
ceb22ab
c00be0b
e30aa11
7ef4be2
2410e47
4aa783d
94afb8d
12b9bab
a6f7957
b7dbf47
ea49a23
3a9f186
d7671d7
7084d89
f60bc1c
9ebeca9
07e73ac
410ab67
f779455
761393f
ced688d
4846032
d615625
c1ce34c
835e866
6353a4a
9ffea01
0904418
616d3fc
e924a97
d0bf0cd
d3b0c09
3c83b89
5ac20a0
9914e1f
7d077f7
62e0493
5725c61
0ab8c15
b404de7
1d137ec
6cd3e00
a774ac1
60b45b3
fb44e80
16532e4
d7112a5
637019e
6b48f80
6384f89
71ece73
db1d08d
f233870
384c7ee
301e596
e3cfa89
e74149f
0d946e8
0c1faed
a31056c
576230a
21b4443
3f3c5b5
40d53e7
10cb2b4
e0ba2a9
83d8710
b8fddcc
3059d94
1ad44db
bc62b92
4c2c17b
06d2a75
ddfc118
4b00031
ac55b7e
10643c4
4455230
274d4f3
13cd266
b5c55bb
f5ff4d0
1dc75d6
9774635
1ceaedf
8b316ba
d538be9
f8af2b6
05edba5
2346cb1
d7d30f9
64b4e42
9826366
2a20067
4ed8082
272d883
8b4f15d
457c6fb
04dbba5
ded5900
1c5ae32
5ffc75a
0a7035f
9fb0958
6644b48
113e66f
2629a79
87626b6
f81d13e
614f352
560c03c
80ab1f5
46c98f4
57bd1b3
afd71f9
a449ab5
d107e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| /* | ||
| * Copyright (c) 2023, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #include <mutex> | ||
|
|
||
| #include "stream_pool.hpp" | ||
|
|
||
| #include <cudf/detail/utilities/logger.hpp> | ||
| #include <cudf/utilities/default_stream.hpp> | ||
| #include <cudf/utilities/error.hpp> | ||
|
|
||
| namespace cudf::io::detail::parquet { | ||
|
|
||
| namespace { | ||
|
|
||
| // TODO: what is a good number here. what's the penalty for making it larger? | ||
|
||
| // Dave Baranec rule of thumb was max_streams_needed * num_concurrent_threads, | ||
| // where num_concurrent_threads was estimated to be 4. so using 32 will allow | ||
| // for 8 streams per thread, which should be plenty (decoding will be up to 4 | ||
| // kernels when delta_byte_array decoding is added). rmm::cuda_stream_pool | ||
| // defaults to 16. | ||
| std::size_t constexpr STREAM_POOL_SIZE = 32; | ||
|
|
||
| class rmm_cuda_stream_pool : public cuda_stream_pool { | ||
| rmm::cuda_stream_pool _pool; | ||
|
|
||
| public: | ||
| rmm_cuda_stream_pool() : _pool{STREAM_POOL_SIZE} {} | ||
| rmm::cuda_stream_view get_stream() override { return _pool.get_stream(); } | ||
| rmm::cuda_stream_view get_stream(std::size_t stream_id) override | ||
| { | ||
| return _pool.get_stream(stream_id); | ||
| } | ||
|
|
||
| std::vector<rmm::cuda_stream_view> get_streams(uint32_t count) override | ||
| { | ||
| static std::mutex stream_pool_mutex; | ||
|
|
||
| if (count > STREAM_POOL_SIZE) { | ||
| CUDF_LOG_WARN("get_streams called with count ({}) > pool size ({})", count, STREAM_POOL_SIZE); | ||
| } | ||
| auto streams = std::vector<rmm::cuda_stream_view>(); | ||
| std::lock_guard<std::mutex> lock(stream_pool_mutex); | ||
| for (uint32_t i = 0; i < count; i++) { | ||
| streams.emplace_back(_pool.get_stream()); | ||
| } | ||
| return streams; | ||
etseidl marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| std::size_t get_stream_pool_size() const override { return STREAM_POOL_SIZE; } | ||
| }; | ||
|
|
||
| class debug_cuda_stream_pool : public cuda_stream_pool { | ||
| public: | ||
| rmm::cuda_stream_view get_stream() override { return cudf::get_default_stream(); } | ||
| rmm::cuda_stream_view get_stream(std::size_t stream_id) override | ||
| { | ||
| return cudf::get_default_stream(); | ||
| } | ||
|
|
||
| std::vector<rmm::cuda_stream_view> get_streams(uint32_t count) override | ||
| { | ||
| return std::vector<rmm::cuda_stream_view>(count, cudf::get_default_stream()); | ||
| } | ||
|
|
||
| std::size_t get_stream_pool_size() const override { return 1UL; } | ||
| }; | ||
|
|
||
| cuda_stream_pool* create_global_cuda_stream_pool() | ||
| { | ||
| if (getenv("LIBCUDF_USE_DEBUG_STREAM_POOL")) return new debug_cuda_stream_pool(); | ||
|
|
||
| return new rmm_cuda_stream_pool(); | ||
| } | ||
|
|
||
| // implementation of per-thread-default-event. | ||
| class cuda_event_map { | ||
| public: | ||
| cuda_event_map() {} | ||
|
|
||
| cudaEvent_t find(std::thread::id thread_id) | ||
| { | ||
| std::lock_guard<std::mutex> lock(map_mutex_); | ||
| auto it = event_map_.find(thread_id); | ||
| if (it != event_map_.end()) { | ||
| return it->second; | ||
| } else { | ||
| cudaEvent_t event; | ||
| CUDF_CUDA_TRY(cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); | ||
| event_map_[thread_id] = event; | ||
| return event; | ||
| } | ||
| } | ||
|
|
||
| cuda_event_map(cuda_event_map const&) = delete; | ||
| void operator=(cuda_event_map const&) = delete; | ||
|
|
||
| private: | ||
| std::unordered_map<std::thread::id, cudaEvent_t> event_map_; | ||
| std::mutex map_mutex_; | ||
| }; | ||
|
|
||
| cudaEvent_t event_for_thread() | ||
| { | ||
| static cuda_event_map instance; | ||
| return instance.find(std::this_thread::get_id()); | ||
| } | ||
vuule marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| } // anonymous namespace | ||
|
|
||
| cuda_stream_pool& global_cuda_stream_pool() | ||
| { | ||
| static cuda_stream_pool* pool = create_global_cuda_stream_pool(); | ||
| return *pool; | ||
| } | ||
|
|
||
| void fork_streams(host_span<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream) | ||
| { | ||
| cudaEvent_t event = event_for_thread(); | ||
| CUDF_CUDA_TRY(cudaEventRecord(event, stream)); | ||
| for (auto& strm : streams) { | ||
| CUDF_CUDA_TRY(cudaStreamWaitEvent(strm, event, 0)); | ||
| } | ||
| } | ||
|
|
||
| void join_streams(host_span<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream) | ||
| { | ||
| cudaEvent_t event = event_for_thread(); | ||
| for (auto& strm : streams) { | ||
| CUDF_CUDA_TRY(cudaEventRecord(event, strm)); | ||
| CUDF_CUDA_TRY(cudaStreamWaitEvent(stream, event, 0)); | ||
| } | ||
| } | ||
|
|
||
| } // namespace cudf::io::detail::parquet | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| /* | ||
| * Copyright (c) 2023, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cudf/utilities/span.hpp> | ||
|
|
||
| #include <rmm/cuda_stream_pool.hpp> | ||
|
|
||
| namespace cudf::io::detail::parquet { | ||
|
|
||
| /** | ||
| * @brief A pool of CUDA stream objects | ||
| * | ||
| * Meant to provide efficient on-demand access to CUDA streams. | ||
| * | ||
| * TODO: better docs! | ||
| */ | ||
| class cuda_stream_pool { | ||
| public: | ||
| virtual ~cuda_stream_pool() = default; | ||
|
|
||
| /** | ||
| * @brief Get a `cuda_stream_view` of a stream in the pool. | ||
| * | ||
| * This function is thread safe with respect to other calls to the same function. | ||
| * | ||
| * @return Stream view. | ||
| */ | ||
| virtual rmm::cuda_stream_view get_stream() = 0; | ||
|
|
||
| /** | ||
| * @brief Get a `cuda_stream_view` of the stream associated with `stream_id`. | ||
| * | ||
| * Equivalent values of `stream_id` return a stream_view to the same underlying stream. | ||
| * This function is thread safe with respect to other calls to the same function. | ||
| * | ||
| * @param stream_id Unique identifier for the desired stream | ||
| * @return Requested stream view. | ||
| */ | ||
| virtual rmm::cuda_stream_view get_stream(std::size_t stream_id) = 0; | ||
|
|
||
| /** | ||
| * @brief Get a set of `cuda_stream_view` objects from the pool. | ||
| * | ||
| * This function is thread safe with respect to other calls to the same function. | ||
| * | ||
| * @param count The number of stream views to return. | ||
| * @return Vector containing `count` stream views. | ||
| */ | ||
| virtual std::vector<rmm::cuda_stream_view> get_streams(uint32_t count) = 0; | ||
|
|
||
| /** | ||
| * @brief Get the number of streams in the pool. | ||
| * | ||
| * This function is thread safe with respect to other calls to the same function. | ||
| * | ||
| * @return the number of streams in the pool | ||
| */ | ||
| virtual std::size_t get_stream_pool_size() const = 0; | ||
| }; | ||
|
|
||
| /** | ||
| * @brief Return the global cuda_stream_pool object. | ||
| * | ||
| * TODO: document how to control the implementation | ||
| * | ||
| * @return The cuda_stream_pool singleton. | ||
| */ | ||
| cuda_stream_pool& global_cuda_stream_pool(); | ||
|
|
||
| /** | ||
| * @brief Synchronize a set of streams to an event on another stream. | ||
| * | ||
| * @param streams Vector of streams to synchronize on. | ||
| * @param stream Stream to synchronize the other streams to, usually the default stream. | ||
| */ | ||
| void fork_streams(host_span<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream); | ||
|
|
||
| /** | ||
| * @brief Synchronize a stream to an event on a set of streams. | ||
| * | ||
| * @param streams Vector of streams to synchronize on. | ||
| * @param stream Stream to synchronize the other streams to, usually the default stream. | ||
| */ | ||
| void join_streams(host_span<rmm::cuda_stream_view> streams, rmm::cuda_stream_view stream); | ||
|
|
||
| } // namespace cudf::io::detail::parquet |
Uh oh!
There was an error while loading. Please reload this page.