1
1
/*
2
- * Copyright (c) 2022-2024 , NVIDIA CORPORATION.
2
+ * Copyright (c) 2022-2025 , NVIDIA CORPORATION.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
30
30
#include < cudf/utilities/memory_resource.hpp>
31
31
#include < cudf/utilities/span.hpp>
32
32
33
+ #include < rmm/cuda_stream_pool.hpp>
33
34
#include < rmm/device_uvector.hpp>
34
35
#include < rmm/exec_policy.hpp>
35
36
36
37
#include < thrust/distance.h>
37
38
#include < thrust/iterator/constant_iterator.h>
38
39
#include < thrust/scatter.h>
39
40
41
+ #include < BS_thread_pool.hpp>
42
+ #include < BS_thread_pool_utils.hpp>
43
+
40
44
#include < numeric>
41
45
42
46
namespace cudf ::io::json::detail {
43
47
44
48
namespace {
45
49
50
+ namespace pools {
51
+
52
+ BS::thread_pool& tpool ()
53
+ {
54
+ static BS::thread_pool _tpool (std::thread::hardware_concurrency ());
55
+ return _tpool;
56
+ }
57
+
58
+ } // namespace pools
59
+
46
60
class compressed_host_buffer_source final : public datasource {
47
61
public:
48
62
explicit compressed_host_buffer_source (std::unique_ptr<datasource> const & src,
@@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
51
65
{
52
66
auto ch_buffer = host_span<uint8_t const >(reinterpret_cast <uint8_t const *>(_dbuf_ptr->data ()),
53
67
_dbuf_ptr->size ());
54
- if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
55
- comptype == compression_type::SNAPPY) {
68
+ if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
69
+ _comptype == compression_type::SNAPPY) {
56
70
_decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size (_comptype, ch_buffer);
57
71
} else {
58
72
_decompressed_buffer = cudf::io::detail::decompress (_comptype, ch_buffer);
@@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
96
110
return std::make_unique<non_owning_buffer>(_decompressed_buffer.data () + offset, count);
97
111
}
98
112
99
- [[nodiscard]] bool supports_device_read () const override { return false ; }
113
+ std::future<size_t > device_read_async (size_t offset,
114
+ size_t size,
115
+ uint8_t * dst,
116
+ rmm::cuda_stream_view stream) override
117
+ {
118
+ auto & thread_pool = pools::tpool ();
119
+ return thread_pool.submit_task ([this , offset, size, dst, stream] {
120
+ auto hbuf = host_read (offset, size);
121
+ CUDF_CUDA_TRY (
122
+ cudaMemcpyAsync (dst, hbuf->data (), hbuf->size (), cudaMemcpyHostToDevice, stream.value ()));
123
+ stream.synchronize ();
124
+ return hbuf->size ();
125
+ });
126
+ }
127
+
128
+ [[nodiscard]] bool supports_device_read () const override { return true ; }
100
129
101
130
[[nodiscard]] size_t size () const override { return _decompressed_ch_buffer_size; }
102
131
@@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
431
460
// line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
432
461
// delimiter.
433
462
auto constexpr num_delimiter_chars = 1 ;
463
+ std::vector<std::future<size_t >> thread_tasks;
464
+ auto stream_pool = cudf::detail::fork_streams (stream, pools::tpool ().get_thread_count ());
434
465
435
466
auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t >(sources.size (), stream);
436
467
std::vector<std::size_t > prefsum_source_sizes (sources.size ());
@@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
447
478
448
479
auto const total_bytes_to_read = std::min (range_size, prefsum_source_sizes.back () - range_offset);
449
480
range_offset -= start_source ? prefsum_source_sizes[start_source - 1 ] : 0 ;
450
- for (std::size_t i = start_source; i < sources.size () && bytes_read < total_bytes_to_read; i++) {
481
+ for (std::size_t i = start_source, cur_stream = 0 ;
482
+ i < sources.size () && bytes_read < total_bytes_to_read;
483
+ i++) {
451
484
if (sources[i]->is_empty ()) continue ;
452
485
auto data_size = std::min (sources[i]->size () - range_offset, total_bytes_to_read - bytes_read);
453
486
auto destination = reinterpret_cast <uint8_t *>(buffer.data ()) + bytes_read +
454
487
(num_delimiter_chars * delimiter_map.size ());
455
- if (sources[i]->is_device_read_preferred (data_size)) {
456
- bytes_read += sources[i]->device_read (range_offset, data_size, destination, stream);
488
+ if (sources[i]->supports_device_read ()) {
489
+ thread_tasks.emplace_back (sources[i]->device_read_async (
490
+ range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size ()]));
491
+ bytes_read += data_size;
457
492
} else {
458
493
h_buffers.emplace_back (sources[i]->host_read (range_offset, data_size));
459
494
auto const & h_buffer = h_buffers.back ();
@@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
481
516
buffer.data ());
482
517
}
483
518
stream.synchronize ();
519
+
520
+ if (thread_tasks.size ()) {
521
+ auto const bytes_read = std::accumulate (
522
+ thread_tasks.begin (), thread_tasks.end (), std::size_t {0 }, [](std::size_t sum, auto & task) {
523
+ return sum + task.get ();
524
+ });
525
+ CUDF_EXPECTS (bytes_read == total_bytes_to_read, " something's fishy" );
526
+ }
527
+
484
528
return buffer.first (bytes_read + (delimiter_map.size () * num_delimiter_chars));
485
529
}
486
530
@@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
505
549
return read_json_impl (sources, reader_opts, stream, mr);
506
550
507
551
std::vector<std::unique_ptr<datasource>> compressed_sources;
508
- for (size_t i = 0 ; i < sources.size (); i++) {
509
- compressed_sources.emplace_back (
510
- std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression ()));
552
+ std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
553
+ auto & thread_pool = pools::tpool ();
554
+ for (auto & src : sources) {
555
+ thread_tasks.emplace_back (thread_pool.submit_task ([&reader_opts, &src] {
556
+ return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression ());
557
+ }));
511
558
}
559
+ std::transform (thread_tasks.begin (),
560
+ thread_tasks.end (),
561
+ std::back_inserter (compressed_sources),
562
+ [](auto & task) { return task.get (); });
512
563
// in read_json_impl, we need the compressed source size to actually be the
513
564
// uncompressed source size for correct batching
514
565
return read_json_impl (compressed_sources, reader_opts, stream, mr);
0 commit comments