Skip to content

Commit

Permalink
frame_write: Use tokio_util::io::framed_write
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Jun 16, 2022
1 parent e4cf88c commit a4188e9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ members = [
futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
tokio-util = { version = "0.7.1", features = ["codec"] }
tokio-util = { version = "0.7.3", features = ["io", "codec"] }
tokio = { version = "1", features = ["io-util"] }
bytes = "1"
http = "0.2"
Expand Down
48 changes: 7 additions & 41 deletions src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::io::poll_write_buf;

use std::io::{self, Cursor, IoSlice};
use std::io::{self, Cursor};

// A macro to get around a method needing to borrow &mut self
macro_rules! limited_write_buf {
Expand Down Expand Up @@ -44,9 +45,6 @@ struct Encoder<B> {

/// Max frame size, this is specified by the peer
max_frame_size: FrameSize,

/// Whether or not the wrapped `AsyncWrite` supports vectored IO.
is_write_vectored: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -76,7 +74,6 @@ where
B: Buf,
{
pub fn new(inner: T) -> FramedWrite<T, B> {
let is_write_vectored = inner.is_write_vectored();
FramedWrite {
inner,
encoder: Encoder {
Expand All @@ -85,7 +82,6 @@ where
next: None,
last_data_frame: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
is_write_vectored,
},
}
}
Expand Down Expand Up @@ -126,21 +122,15 @@ where
Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
ready!(write(
&mut self.inner,
self.encoder.is_write_vectored,
&mut buf,
cx,
))?
ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?;
}
_ => {
tracing::trace!(queued_data_frame = false);
ready!(write(
&mut self.inner,
self.encoder.is_write_vectored,
&mut self.encoder.buf,
ready!(poll_write_buf(
Pin::new(&mut self.inner),
cx,
))?
&mut self.encoder.buf
))?;
}
}
}
Expand All @@ -165,30 +155,6 @@ where
}
}

fn write<T, B>(
writer: &mut T,
is_write_vectored: bool,
buf: &mut B,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>>
where
T: AsyncWrite + Unpin,
B: Buf,
{
// TODO(eliza): when tokio-util 0.5.1 is released, this
// could just use `poll_write_buf`...
const MAX_IOVS: usize = 64;
let n = if is_write_vectored {
let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
let cnt = buf.chunks_vectored(&mut bufs);
ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))?
} else {
ready!(Pin::new(writer).poll_write(cx, buf.chunk()))?
};
buf.advance(n);
Ok(()).into()
}

#[must_use]
enum ControlFlow {
Continue,
Expand Down

0 comments on commit a4188e9

Please sign in to comment.