Skip to content

Commit 77cc6ea

Browse files
authored
Unify the read/write arguments (#38)
Default to reading/writing chunks like web. This is also just a safer API, since partial reads are not the normal.
1 parent e2289e4 commit 77cc6ea

File tree

6 files changed

+63
-103
lines changed

6 files changed

+63
-103
lines changed

web-transport-wasm/build.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fn main() {
2+
println!("cargo:rustc-cfg=web_sys_unstable_apis");
3+
}

web-transport-wasm/src/recv.rs

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,40 +24,10 @@ impl RecvStream {
2424
})
2525
}
2626

27-
/// Read some data into the provided buffer.
28-
///
29-
/// Returns the (non-zero) number of bytes read, or None if the stream is closed.
30-
pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
31-
let chunk = match self.read_chunk(buf.len()).await? {
32-
Some(chunk) => chunk,
33-
None => return Ok(None),
34-
};
35-
36-
let size = chunk.len();
37-
buf[..size].copy_from_slice(&chunk);
38-
Ok(Some(size))
39-
}
40-
41-
/// Read some data into the provided buffer.
42-
///
43-
/// Returns the (non-zero) number of bytes read, or None if the stream is closed.
44-
/// Advances the buffer by the number of bytes read.
45-
pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
46-
let chunk = match self.read_chunk(buf.remaining_mut()).await? {
47-
Some(chunk) => chunk,
48-
None => return Ok(None),
49-
};
50-
51-
let size = chunk.len();
52-
buf.put(chunk);
53-
54-
Ok(Some(size))
55-
}
56-
5727
/// Read the next chunk of data with the provided maximum size.
5828
///
5929
/// This returns a chunk of data instead of copying, which may be more efficient.
60-
pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
30+
pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
6131
if !self.buffer.is_empty() {
6232
let size = cmp::min(max, self.buffer.len());
6333
let data = self.buffer.split_to(size).freeze();
@@ -78,6 +48,22 @@ impl RecvStream {
7848
Ok(Some(data))
7949
}
8050

51+
/// Read some data into the provided buffer.
52+
///
53+
/// Returns the (non-zero) number of bytes read, or None if the stream is closed.
54+
/// Advances the buffer by the number of bytes read.
55+
pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
56+
let chunk = match self.read(buf.remaining_mut()).await? {
57+
Some(chunk) => chunk,
58+
None => return Ok(None),
59+
};
60+
61+
let size = chunk.len();
62+
buf.put(chunk);
63+
64+
Ok(Some(size))
65+
}
66+
8167
/// Abort reading from the stream with the given reason.
8268
pub fn stop(&mut self, reason: &str) {
8369
self.reader.abort(reason);

web-transport-wasm/src/send.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,23 @@ impl SendStream {
1616
Ok(Self { stream, writer })
1717
}
1818

19-
/// Write some of the given buffer to the stream.
20-
///
21-
/// Returns the non-zero number of bytes written.
22-
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
23-
self.writer.write(&Uint8Array::from(buf)).await?;
24-
Ok(buf.len())
25-
}
26-
27-
/// Write all of the given buffer to the stream.
28-
pub async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
19+
/// Write *all* of the given bytes to the stream.
20+
pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
2921
let mut buf = std::io::Cursor::new(buf);
30-
self.write_all_buf(&mut buf).await
22+
self.write_buf(&mut buf).await
3123
}
3224

33-
/// Write some of the given buffer to the stream.
25+
/// Write the given buffer to the stream.
3426
///
35-
/// Returns the non-zero number of bytes written.
3627
/// Advances the buffer by the number of bytes written.
37-
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
38-
let size = self.write(buf.chunk()).await?;
39-
buf.advance(size);
40-
41-
Ok(size)
42-
}
43-
44-
/// Write all of the given buffer to the stream.
45-
///
46-
/// Advances the buffer by the number of bytes written, including any partial writes.
47-
pub async fn write_all_buf<B: Buf>(&mut self, buf: &mut B) -> Result<(), Error> {
28+
/// May be polled/timed out to perform partial writes.
29+
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<(), Error> {
4830
while buf.has_remaining() {
49-
self.write_buf(buf).await?;
31+
let chunk = buf.chunk();
32+
self.writer.write(&Uint8Array::from(chunk)).await?;
33+
buf.advance(chunk.len());
5034
}
35+
5136
Ok(())
5237
}
5338

web-transport-wasm/src/session.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use js_sys::{Object, Reflect, Uint8Array};
23
use url::Url;
34
use wasm_bindgen_futures::JsFuture;
@@ -81,17 +82,17 @@ impl Session {
8182
}
8283

8384
/// Send a datagram over the network.
84-
pub async fn send_datagram(&mut self, payload: &[u8]) -> Result<(), Error> {
85+
pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), Error> {
8586
let mut writer = Writer::new(&self.inner.datagrams().writable())?;
8687
writer.write(&Uint8Array::from(payload.as_ref())).await?;
8788
Ok(())
8889
}
8990

9091
/// Receive a datagram over the network.
91-
pub async fn recv_datagram(&mut self) -> Result<Vec<u8>, Error> {
92+
pub async fn recv_datagram(&mut self) -> Result<Bytes, Error> {
9293
let mut reader = Reader::new(&self.inner.datagrams().readable())?;
9394
let data: Uint8Array = reader.read().await?.unwrap_or_default();
94-
Ok(data.to_vec())
95+
Ok(data.to_vec().into())
9596
}
9697

9798
/// Close the session with the given error code and reason.

web-transport/src/quinn.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -96,23 +96,22 @@ impl SendStream {
9696
Self { inner }
9797
}
9898

99-
/// Write some of the buffer to the stream, potentailly blocking on flow control.
100-
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
101-
Ok(self.inner.write(buf).await?)
99+
/// Write *all* of the buffer to the stream.
100+
pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
101+
self.inner.write_all(buf).await?;
102+
Ok(())
102103
}
103104

104-
/// Write some of the given buffer to the stream, potentially blocking on flow control.
105-
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
106-
let size = self.inner.write(buf.chunk()).await?;
107-
buf.advance(size);
108-
Ok(size)
109-
}
110-
111-
/// Write the entire chunk of bytes to the stream.
105+
/// Write the given buffer to the stream, advancing the internal position.
112106
///
113-
/// More efficient for some implementations, as it avoids a copy
114-
pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), Error> {
115-
Ok(self.inner.write_chunk(buf).await?)
107+
/// This may be polled to perform partial writes.
108+
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<(), Error> {
109+
while buf.has_remaining() {
110+
let size = self.inner.write(buf.chunk()).await?;
111+
buf.advance(size);
112+
}
113+
114+
Ok(())
116115
}
117116

118117
/// Set the stream's priority.
@@ -141,11 +140,15 @@ impl RecvStream {
141140
Self { inner }
142141
}
143142

144-
/// Read some data into the provided buffer.
143+
/// Read the next chunk of data with the provided maximum size.
145144
///
146-
/// The number of bytes read is returned, or None if the stream is closed.
147-
pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
148-
Ok(self.inner.read(buf).await?)
145+
/// This returns a chunk of data instead of copying, which may be more efficient.
146+
pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
147+
Ok(self
148+
.inner
149+
.read_chunk(max, true)
150+
.await?
151+
.map(|chunk| chunk.bytes))
149152
}
150153

151154
/// Read some data into the provided buffer.
@@ -166,17 +169,6 @@ impl RecvStream {
166169
Ok(Some(size))
167170
}
168171

169-
/// Read the next chunk of data with the provided maximum size.
170-
///
171-
/// This returns a chunk of data instead of copying, which may be more efficient.
172-
pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
173-
Ok(self
174-
.inner
175-
.read_chunk(max, true)
176-
.await?
177-
.map(|chunk| chunk.bytes))
178-
}
179-
180172
/// Send a `STOP_SENDING` QUIC code.
181173
pub fn stop(&mut self, code: u32) {
182174
self.inner.stop(code).ok();

web-transport/src/wasm.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,18 @@ impl From<web_transport_wasm::Session> for Session {
5454
pub struct SendStream(web_transport_wasm::SendStream);
5555

5656
impl SendStream {
57-
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
57+
/// Write all of the given data to the stream.
58+
pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
5859
self.0.write(buf).await
5960
}
6061

6162
/// Write some of the given buffer to the stream.
62-
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
63+
///
64+
/// Advances the internal position by the number of bytes written.
65+
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<(), Error> {
6366
self.0.write_buf(buf).await
6467
}
6568

66-
/// Write the entire chunk of bytes to the stream.
67-
/// More efficient for some implementations, as it avoids a copy
68-
pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), Error> {
69-
self.0.write_chunk(buf).await
70-
}
71-
7269
pub fn set_priority(&mut self, order: i32) {
7370
self.0.set_priority(order)
7471
}
@@ -82,20 +79,16 @@ impl SendStream {
8279
pub struct RecvStream(web_transport_wasm::RecvStream);
8380

8481
impl RecvStream {
85-
pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
86-
self.0.read(buf).await
82+
/// Attempt to read a chunk of unbuffered data.
83+
pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
84+
self.0.read(max).await
8785
}
8886

8987
/// Attempt to read from the stream into the given buffer.
9088
pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
9189
self.0.read_buf(buf).await
9290
}
9391

94-
/// Attempt to read a chunk of unbuffered data.
95-
pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
96-
self.0.read_chunk(max).await
97-
}
98-
9992
/// Send a `STOP_SENDING` QUIC code.
10093
pub fn stop(&mut self, code: u32) {
10194
self.0.stop(&code.to_string())

0 commit comments

Comments
 (0)