Skip to content

bug: no back-pressure on concurrent reads with chunks #6440

@satlank

Description

@satlank

Describe the bug

When enabling concurrent reads no back-pressure is applied leading to a runaway creation of reading tasks. When reading a sufficiently large file, this will either exhaust memory (observed with an S3 back-end and a 15G file), or run out of file handles (observed with a 'large' local file).

It appears to be necessary that chunk is also set on the reader (not sure why that makes a difference).

Steps to Reproduce

Below is an example program (using Fs) that will fail with running on of file handles (depending on how many are allowed to be created, tested on MacOS - in any case I would not expect more than order of 4 being required for the reading here, given that concurrent is set to 4).

Using these dependencies:

[package]
name = "missing-back-pressure"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
bytes = "1.10"
futures-util = "0.3"
opendal = { version = "0.54", features = ["services-fs"] }

And this main.rs:

use std::{error::Error, time::Instant};

use bytes::Buf;
use futures_util::StreamExt;
use opendal::Operator;

fn main() -> Result<(), Box<dyn Error>> {
    let root = std::env::current_dir()?;
    let root_str = root.to_str().ok_or("Invalid path")?;
    let runtime = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed to create Tokio runtime");

    let builder = opendal::services::Fs::default().root(root_str);

    let op = Operator::new(builder)?.finish();

    let join_handle = runtime.spawn(async move {
        let mut stream = op
            .reader_with("large_file.dat")
            .concurrent(4)
            .chunk(4 * 1024 * 1024)
            .await
            .expect("Failed to create reader")
            .into_stream(..)
            .await
            .expect("Failed to create stream");

        let mut byte_count = 0;
        let mut read_count = 0;

        let start_time = Instant::now();
        while let Some(chunk) = stream.next().await {
            let b = chunk
                .inspect_err(|e| {
                    eprintln!("Error reading chunk: {}", e);
                })
                .unwrap();
            byte_count += b.remaining();
            read_count += 1;
        }
        let end_time = Instant::now();
        let duration = end_time.duration_since(start_time);
        let duration_secs = duration.as_secs_f64();
        let throughput = byte_count as f64 / duration_secs / (1024.0 * 1024.0); // MB/s

        println!("Read {} bytes in {} chunks", byte_count, read_count);
        println!("Total time: {:.2} seconds", duration_secs);
        println!("Throughput: {:.2} MB/s", throughput);
    });

    // Wait for the async task to complete
    runtime.block_on(join_handle)?;

    Ok(())
}

The large_file.dat was created with:

dd if=/dev/urandom of=large_file.dat bs=1G count=100

The observed output is:

cargo run --release
    Finished `release` profile [optimized] target(s) in 0.05s
     Running `target/release/missing-back-pressure`
Error reading chunk: Unexpected (temporary) at read, context: { service: fs, path: large_file.dat, range: 11714691072-11718885375 } => uncategorized error, source: Too many open files (os error 24)

thread 'tokio-runtime-worker' panicked at src/main.rs:39:18:
called `Result::unwrap()` on an `Err` value: Unexpected (temporary) at read => uncategorized error

Context:
   service: fs
   path: large_file.dat
   range: 11714691072-11718885375

Source:
   Too many open files (os error 24)

note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Error: JoinError::Panic(Id(13), "called `Result::unwrap()` on an `Err` value: Unexpected (temporary) at read => uncategorized error\n\nContext:\n   service: fs\n   path: large_file.dat\n   range: 11714691072-11718885375\n\nSource:\n   Too many open files (os error 24)\n", ...)

Expected Behavior

With the fix applied in my branch (will create the PR in a bit, it encouraged me to create a bug-report first), i.e. using

[patch.crates-io]
opendal = { git = "https://github.com/satlank/opendal", branch = "fixNoBackpressureOnConcurrent" }

The expected output is (i.e. no error):

cargo run --release
    Finished `release` profile [optimized] target(s) in 0.17s
     Running `target/release/missing-back-pressure`
Read 107374182400 bytes in 25600 chunks
Total time: 24.80 seconds
Throughput: 4129.41 MB/s

Additional Context

No response

Are you willing to submit a PR to fix this bug?

  • Yes, I would like to submit a PR.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingreleases-note/fixThe PR fixes a bug or has a title that begins with "fix"services/fs

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions