11/*
2- * Copyright (c) 2022-2024 , NVIDIA CORPORATION.
2+ * Copyright (c) 2022-2025 , NVIDIA CORPORATION.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3030#include < cudf/utilities/memory_resource.hpp>
3131#include < cudf/utilities/span.hpp>
3232
33+ #include < rmm/cuda_stream_pool.hpp>
3334#include < rmm/device_uvector.hpp>
3435#include < rmm/exec_policy.hpp>
3536
3637#include < thrust/distance.h>
3738#include < thrust/iterator/constant_iterator.h>
3839#include < thrust/scatter.h>
3940
41+ #include < BS_thread_pool.hpp>
42+ #include < BS_thread_pool_utils.hpp>
43+
4044#include < numeric>
4145
4246namespace cudf ::io::json::detail {
4347
4448namespace {
4549
50+ namespace pools {
51+
52+ BS::thread_pool& tpool ()
53+ {
54+ static BS::thread_pool _tpool (std::thread::hardware_concurrency ());
55+ return _tpool;
56+ }
57+
58+ } // namespace pools
59+
4660class compressed_host_buffer_source final : public datasource {
4761 public:
4862 explicit compressed_host_buffer_source (std::unique_ptr<datasource> const & src,
@@ -51,8 +65,8 @@ class compressed_host_buffer_source final : public datasource {
5165 {
5266 auto ch_buffer = host_span<uint8_t const >(reinterpret_cast <uint8_t const *>(_dbuf_ptr->data ()),
5367 _dbuf_ptr->size ());
54- if (comptype == compression_type::GZIP || comptype == compression_type::ZIP ||
55- comptype == compression_type::SNAPPY) {
68+ if (_comptype == compression_type::GZIP || _comptype == compression_type::ZIP ||
69+ _comptype == compression_type::SNAPPY) {
5670 _decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size (_comptype, ch_buffer);
5771 } else {
5872 _decompressed_buffer = cudf::io::detail::decompress (_comptype, ch_buffer);
@@ -96,7 +110,22 @@ class compressed_host_buffer_source final : public datasource {
96110 return std::make_unique<non_owning_buffer>(_decompressed_buffer.data () + offset, count);
97111 }
98112
99- [[nodiscard]] bool supports_device_read () const override { return false ; }
113+ std::future<size_t > device_read_async (size_t offset,
114+ size_t size,
115+ uint8_t * dst,
116+ rmm::cuda_stream_view stream) override
117+ {
118+ auto & thread_pool = pools::tpool ();
119+ return thread_pool.submit_task ([this , offset, size, dst, stream] {
120+ auto hbuf = host_read (offset, size);
121+ CUDF_CUDA_TRY (
122+ cudaMemcpyAsync (dst, hbuf->data (), hbuf->size (), cudaMemcpyHostToDevice, stream.value ()));
123+ stream.synchronize ();
124+ return hbuf->size ();
125+ });
126+ }
127+
128+ [[nodiscard]] bool supports_device_read () const override { return true ; }
100129
101130 [[nodiscard]] size_t size () const override { return _decompressed_ch_buffer_size; }
102131
@@ -431,6 +460,8 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
431460 // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line
432461 // delimiter.
433462 auto constexpr num_delimiter_chars = 1 ;
463+ std::vector<std::future<size_t >> thread_tasks;
464+ auto stream_pool = cudf::detail::fork_streams (stream, pools::tpool ().get_thread_count ());
434465
435466 auto delimiter_map = cudf::detail::make_empty_host_vector<std::size_t >(sources.size (), stream);
436467 std::vector<std::size_t > prefsum_source_sizes (sources.size ());
@@ -447,13 +478,17 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
447478
448479 auto const total_bytes_to_read = std::min (range_size, prefsum_source_sizes.back () - range_offset);
449480 range_offset -= start_source ? prefsum_source_sizes[start_source - 1 ] : 0 ;
450- for (std::size_t i = start_source; i < sources.size () && bytes_read < total_bytes_to_read; i++) {
481+ for (std::size_t i = start_source, cur_stream = 0 ;
482+ i < sources.size () && bytes_read < total_bytes_to_read;
483+ i++) {
451484 if (sources[i]->is_empty ()) continue ;
452485 auto data_size = std::min (sources[i]->size () - range_offset, total_bytes_to_read - bytes_read);
453486 auto destination = reinterpret_cast <uint8_t *>(buffer.data ()) + bytes_read +
454487 (num_delimiter_chars * delimiter_map.size ());
455- if (sources[i]->is_device_read_preferred (data_size)) {
456- bytes_read += sources[i]->device_read (range_offset, data_size, destination, stream);
488+ if (sources[i]->supports_device_read ()) {
489+ thread_tasks.emplace_back (sources[i]->device_read_async (
490+ range_offset, data_size, destination, stream_pool[cur_stream++ % stream_pool.size ()]));
491+ bytes_read += data_size;
457492 } else {
458493 h_buffers.emplace_back (sources[i]->host_read (range_offset, data_size));
459494 auto const & h_buffer = h_buffers.back ();
@@ -481,6 +516,15 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
481516 buffer.data ());
482517 }
483518 stream.synchronize ();
519+
520+ if (thread_tasks.size ()) {
521+ auto const bytes_read = std::accumulate (
522+ thread_tasks.begin (), thread_tasks.end (), std::size_t {0 }, [](std::size_t sum, auto & task) {
523+ return sum + task.get ();
524+ });
525+ CUDF_EXPECTS (bytes_read == total_bytes_to_read, " something's fishy" );
526+ }
527+
484528 return buffer.first (bytes_read + (delimiter_map.size () * num_delimiter_chars));
485529}
486530
@@ -505,10 +549,17 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
505549 return read_json_impl (sources, reader_opts, stream, mr);
506550
507551 std::vector<std::unique_ptr<datasource>> compressed_sources;
508- for (size_t i = 0 ; i < sources.size (); i++) {
509- compressed_sources.emplace_back (
510- std::make_unique<compressed_host_buffer_source>(sources[i], reader_opts.get_compression ()));
552+ std::vector<std::future<std::unique_ptr<compressed_host_buffer_source>>> thread_tasks;
553+ auto & thread_pool = pools::tpool ();
554+ for (auto & src : sources) {
555+ thread_tasks.emplace_back (thread_pool.submit_task ([&reader_opts, &src] {
556+ return std::make_unique<compressed_host_buffer_source>(src, reader_opts.get_compression ());
557+ }));
511558 }
559+ std::transform (thread_tasks.begin (),
560+ thread_tasks.end (),
561+ std::back_inserter (compressed_sources),
562+ [](auto & task) { return task.get (); });
512563 // in read_json_impl, we need the compressed source size to actually be the
513564 // uncompressed source size for correct batching
514565 return read_json_impl (compressed_sources, reader_opts, stream, mr);
0 commit comments