Skip to content

Streaming read write #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 23, 2025
Merged

Conversation

drbh
Copy link
Contributor

@drbh drbh commented Apr 22, 2025

This PR adds support for a StreamWriter and StreamReader to support processing from any source that implements the Read trait, and write to any destination implementing the Write trait.

Additionally this PR includes two small examples examples/network_streaming.rs and examples/streaming.rs to show simple uses of the streaming api.

Potential uses for streaming

  • Processing data as it arrives over a network
  • Handle arbitrarily large files regardless of available RAM (fixed memory alloc)
  • Pipeline processing where data is flowing continuously

Run example

cargo run --release --example network_streaming

outputs

   Compiling binseq v0.5.4 (/Users/drbh/Projects/binseq)
    Finished `release` profile [optimized] target(s) in 0.41s
     Running `target/release/examples/network_streaming`
Server listening on 127.0.0.1:3000
Connected to server
Client connected
Server: Sent record 0
Server: Sent record 1
Server: Sent record 2
Server: Sent record 3
Server: Sent record 4
Server: Sent record 5
Server: Sent record 6
Server: Sent record 7
Server: Sent record 8
Server: Sent record 9
Server: All records sent
Client: Received header with sequence length = 100
Client: Received record 0 with flag = 0
Client: Received record 1 with flag = 1
Client: Received record 2 with flag = 2
Client: Received record 3 with flag = 3
Client: Received record 4 with flag = 4
Client: Received record 5 with flag = 5
Client: Received record 6 with flag = 6
Client: Received record 7 with flag = 7
Client: Received record 8 with flag = 8
Client: Received record 9 with flag = 9
Client: Received 10 records total

@noamteyssier
Copy link
Collaborator

Thanks for the great addition! The network use case is very cool and not one I originally considered. I'll take a closer look at the implementation, but from a cursory glance it looks solid.

One small change I'd request is changing the next_record function to just a next and returning Option<Result<T>> instead of Result<Option<T>> so users can use it in a while .. let .. some pattern used in other fallible lending iterators:

// lending iterator pattern

let mut reader = StreamReader::new(handle)
while let Some(record) = reader.next() {
    let record = record?;
    do_something(record);
}

I'll try to merge this in tomorrow, I'm working on ingesting vbinseq into this crate which will incur some refactors. It's a bigger change so I want to get it out of the way first before I add in new features. Once that's done I should be able to rebase this and drop it in.

@noamteyssier noamteyssier merged commit caebdd5 into ArcInstitute:main Apr 23, 2025
@drbh
Copy link
Contributor Author

drbh commented Apr 24, 2025

@noamteyssier thanks for rebasing and merging!! 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants