From 3d548d86050604777419612eb00ed31903f41390 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 15:47:39 +0800 Subject: [PATCH 1/6] fs: read/write_at for unix & seek_read/write for win --- tokio/src/fs/file.rs | 168 +++++++++++++++++++++++++++++++++++++ tokio/src/fs/mocks.rs | 12 +++ tokio/tests/io_read_at.rs | 21 +++++ tokio/tests/io_write_at.rs | 26 ++++++ 4 files changed, 227 insertions(+) create mode 100644 tokio/tests/io_read_at.rs create mode 100644 tokio/tests/io_write_at.rs diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index efce9fda990..b127e91ea27 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -538,6 +538,174 @@ impl File { pub fn set_max_buf_size(&mut self, max_buf_size: usize) { self.max_buf_size = max_buf_size; } + + /// Reads a number of bytes starting from a given offset. + /// + /// Returns the number of bytes read. + /// + /// The offset is relative to the start of the file and thus independent + /// from the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// It is not an error to return with a short read. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncSeekExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut buf = vec![0_u8; 10]; + /// let mut file = File::open("foo.txt").await?; + /// file.read_at(&mut buf, 5).await?; + /// + /// assert_eq!(file.stream_position().await.unwrap(), 0); + /// # Ok(()) + /// # } + /// ``` + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + fn _read_at(std: &StdFile, n: usize, offset: u64) -> io::Result> { + use std::os::unix::fs::FileExt; + let mut buf: Vec = vec![0; n]; + let n_read = std.read_at(&mut buf, offset)?; + buf.truncate(n_read); + + Ok(buf) + } + + let std = self.std.clone(); + let n = buf.len(); + let bytes_read = asyncify(move || _read_at(&std, n, offset)).await?; + let len = bytes_read.len(); + buf[..len].copy_from_slice(&bytes_read); + + Ok(len) + } + + /// Writes a number of bytes starting from a given offset. + /// + /// Returns the number of bytes written. + /// + /// The offset is relative to the start of the file and thus independent from + /// the current cursor. + /// + /// The current file cursor is not affected by this function. + /// + /// When writing beyond the end of the file, the file is appropriately + /// extended and the intermediate bytes are initialized with the value 0. + /// + /// It is not an error to return a short write. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncSeekExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.write_at(b"foo", 5).await?; + /// + /// assert_eq!(file.stream_position().await.unwrap(), 0); + /// # Ok(()) + /// # } + /// ``` + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub async fn write_at(&self, buf: &[u8], offset: u64) -> io::Result { + use std::os::unix::fs::FileExt; + + let std = self.std.clone(); + let buf_clone = buf.to_vec(); + asyncify(move || std.write_at(&buf_clone, offset)).await + } + + /// Seeks to a given position and reads a number of bytes. + /// + /// Returns the number of bytes read. + /// + /// The offset is relative to the start of the file and thus independent from + /// the current cursor. The current cursor is affected by this function, it + /// is set to the end of the read. + /// + /// Reading beyond the end of the file will always return with a length of 0. + /// + /// It is not an error to return with a short read. When returning from + /// such a short read, the file pointer is still updated. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncSeekExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// let mut buf = vec![0_u8; 10]; + /// file.seek_read(&mut buf, 5).await?; + /// # Ok(()) + /// # } + /// ``` + #[cfg(windows)] + #[cfg_attr(docsrs, doc(cfg(windows)))] + pub async fn seek_read(&self, buf: &[u8], offset: u64) -> io::Result { + fn _read_at(std: &StdFile, n: usize, offset: u64) -> io::Result> { + use std::os::windows::fs::FileExt; + let mut buf: Vec = vec![0; n]; + let n_read = std.seek_read(&mut buf, offset)?; + buf.truncate(n_read); + + Ok(buf) + } + + let std = self.std.clone(); + let n = buf.len(); + let bytes_read = asyncify(move || _read_at(&std, n, offset)).await?; + let len = bytes_read.len(); + buf[..len].copy_from_slice(&bytes_read); + + Ok(len) + } + + /// Seeks to a given position and writes a number of bytes. + /// + /// Returns the number of bytes written. + /// + /// The offset is relative to the start of the file and thus independent from + /// the current cursor. The current cursor is affected by this function, it + /// is set to the end of the write. + /// + /// When writing beyond the end of the file, the file is appropriately + /// extended and the intermediate bytes are set to zero. + /// + /// It is not an error to return a short write. When returning from such a + /// short write, the file pointer is still updated. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::io::AsyncSeekExt; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek_write(b"foo", 5).await?; + /// # Ok(()) + /// # } + /// ``` + #[cfg(windows)] + #[cfg_attr(docsrs, doc(cfg(windows)))] + pub async fn seek_write(&self, buf: &[u8], offset: u64) -> io::Result { + use std::os::windows::fs::FileExt; + + let std = self.std.clone(); + let buf_clone = buf.to_vec(); + asyncify(move || std.seek_write(&buf_clone, offset)).await + } } impl AsyncRead for File { diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index a2ce1cd6ca3..235f314ed05 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -52,6 +52,18 @@ mock! { impl std::os::unix::io::FromRawFd for File { unsafe fn from_raw_fd(h: std::os::unix::io::RawFd) -> Self; } + + #[cfg(unix)] + impl std::os::unix::fs::FileExt for File { + fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result; + fn write_at(&self, buf: &[u8], offset: u64) -> io::Result; + } + + #[cfg(windows)] + impl std::os::windows::fs::FileExt for File { + fn seek_read(&self, buf: &mut [u8], offset: u64) -> io::Result; + fn seek_write(&self, buf: &[u8], offset: u64) -> io::Result; + } } impl Read for MockFile { diff --git a/tokio/tests/io_read_at.rs b/tokio/tests/io_read_at.rs new file mode 100644 index 00000000000..a1de9ad89db --- /dev/null +++ b/tokio/tests/io_read_at.rs @@ -0,0 +1,21 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASM does support this, but it is nightly + +use tempfile::tempdir; +use tokio::fs; +use tokio::io::AsyncSeekExt; + +#[tokio::test] +#[cfg(unix)] +async fn read_at() { + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("a.txt"); + fs::write(&file_path, b"HelloWorld").await.unwrap(); + let mut file = fs::File::open(file_path.as_path()).await.unwrap(); + + let mut buf = [0_u8; 10]; + assert_eq!(file.read_at(&mut buf, 5).await.unwrap(), 5); + assert_eq!(&buf[..5], b"World"); + + assert_eq!(file.stream_position().await.unwrap(), 0); +} diff --git a/tokio/tests/io_write_at.rs b/tokio/tests/io_write_at.rs new file mode 100644 index 00000000000..a87f9a2da8d --- /dev/null +++ b/tokio/tests/io_write_at.rs @@ -0,0 +1,26 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASM does support this, but it is nightly + +use tempfile::tempdir; +use tokio::fs; +use tokio::fs::OpenOptions; +use tokio::io::AsyncSeekExt; + +#[tokio::test] +#[cfg(unix)] +async fn write_at() { + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("a.txt"); + fs::write(&file_path, b"Hello File").await.unwrap(); + let mut file = OpenOptions::new() + .write(true) + .open(file_path.as_path()) + .await + .unwrap(); + + assert_eq!(file.write_at(b"World", 5).await.unwrap(), 5); + let contents = fs::read(file_path.as_path()).await.unwrap(); + assert_eq!(contents, b"HelloWorld"); + + assert_eq!(file.stream_position().await.unwrap(), 0); +} From 0274f3ac6892cf16451ec313e409902076c2b1ca Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 15:56:30 +0800 Subject: [PATCH 2/6] test: add test for seek_read/seek_write --- tokio/src/fs/file.rs | 2 +- tokio/tests/io_read_at.rs | 10 +++++----- tokio/tests/io_seek_read.rs | 21 +++++++++++++++++++++ tokio/tests/io_seek_write.rs | 26 ++++++++++++++++++++++++++ tokio/tests/io_write_at.rs | 12 ++++++------ 5 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 tokio/tests/io_seek_read.rs create mode 100644 tokio/tests/io_seek_write.rs diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index b127e91ea27..310077d721d 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -652,7 +652,7 @@ impl File { /// ``` #[cfg(windows)] #[cfg_attr(docsrs, doc(cfg(windows)))] - pub async fn seek_read(&self, buf: &[u8], offset: u64) -> io::Result { + pub async fn seek_read(&self, buf: &mut [u8], offset: u64) -> io::Result { fn _read_at(std: &StdFile, n: usize, offset: u64) -> io::Result> { use std::os::windows::fs::FileExt; let mut buf: Vec = vec![0; n]; diff --git a/tokio/tests/io_read_at.rs b/tokio/tests/io_read_at.rs index a1de9ad89db..3e3b5ff9b18 100644 --- a/tokio/tests/io_read_at.rs +++ b/tokio/tests/io_read_at.rs @@ -1,13 +1,13 @@ #![warn(rust_2018_idioms)] -#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASM does support this, but it is nightly - -use tempfile::tempdir; -use tokio::fs; -use tokio::io::AsyncSeekExt; +#![cfg(all(feature = "full", not(target_os = "wasi")))] #[tokio::test] #[cfg(unix)] async fn read_at() { + use tempfile::tempdir; + use tokio::fs; + use tokio::io::AsyncSeekExt; + let temp_dir = tempdir().unwrap(); let file_path = temp_dir.path().join("a.txt"); fs::write(&file_path, b"HelloWorld").await.unwrap(); diff --git a/tokio/tests/io_seek_read.rs b/tokio/tests/io_seek_read.rs new file mode 100644 index 00000000000..cd81dc05145 --- /dev/null +++ b/tokio/tests/io_seek_read.rs @@ -0,0 +1,21 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] + +#[tokio::test] +#[cfg(windows)] +async fn read_at() { + use tempfile::tempdir; + use tokio::fs; + use tokio::io::AsyncSeekExt; + + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("a.txt"); + fs::write(&file_path, b"HelloWorld").await.unwrap(); + let mut file = fs::File::open(file_path.as_path()).await.unwrap(); + + let mut buf = [0_u8; 10]; + assert_eq!(file.seek_read(&mut buf, 5).await.unwrap(), 5); + assert_eq!(&buf[..5], b"World"); + + assert_eq!(file.stream_position().await.unwrap(), 10); +} diff --git a/tokio/tests/io_seek_write.rs b/tokio/tests/io_seek_write.rs new file mode 100644 index 00000000000..b2d8e5f7340 --- /dev/null +++ b/tokio/tests/io_seek_write.rs @@ -0,0 +1,26 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] + +#[tokio::test] +#[cfg(windows)] +async fn write_at() { + use tempfile::tempdir; + use tokio::fs; + use tokio::fs::OpenOptions; + use tokio::io::AsyncSeekExt; + + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("a.txt"); + fs::write(&file_path, b"Hello File").await.unwrap(); + let mut file = OpenOptions::new() + .write(true) + .open(file_path.as_path()) + .await + .unwrap(); + + assert_eq!(file.write_at(b"World", 5).await.unwrap(), 5); + let contents = fs::read(file_path.as_path()).await.unwrap(); + assert_eq!(contents, b"HelloWorld"); + + assert_eq!(file.stream_position().await.unwrap(), 10); +} diff --git a/tokio/tests/io_write_at.rs b/tokio/tests/io_write_at.rs index a87f9a2da8d..28ec23634bf 100644 --- a/tokio/tests/io_write_at.rs +++ b/tokio/tests/io_write_at.rs @@ -1,14 +1,14 @@ #![warn(rust_2018_idioms)] -#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASM does support this, but it is nightly - -use tempfile::tempdir; -use tokio::fs; -use tokio::fs::OpenOptions; -use tokio::io::AsyncSeekExt; +#![cfg(all(feature = "full", not(target_os = "wasi")))] #[tokio::test] #[cfg(unix)] async fn write_at() { + use tempfile::tempdir; + use tokio::fs; + use tokio::fs::OpenOptions; + use tokio::io::AsyncSeekExt; + let temp_dir = tempdir().unwrap(); let file_path = temp_dir.path().join("a.txt"); fs::write(&file_path, b"Hello File").await.unwrap(); From ffe6451e38517b511789303b867c7b75a66e4f91 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 15:57:57 +0800 Subject: [PATCH 3/6] style: fmt --- tokio/tests/io_seek_read.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/io_seek_read.rs b/tokio/tests/io_seek_read.rs index cd81dc05145..f11c0993d71 100644 --- a/tokio/tests/io_seek_read.rs +++ b/tokio/tests/io_seek_read.rs @@ -1,5 +1,5 @@ #![warn(rust_2018_idioms)] -#![cfg(all(feature = "full", not(target_os = "wasi")))] +#![cfg(all(feature = "full", not(target_os = "wasi")))] #[tokio::test] #[cfg(windows)] @@ -7,7 +7,7 @@ async fn read_at() { use tempfile::tempdir; use tokio::fs; use tokio::io::AsyncSeekExt; - + let temp_dir = tempdir().unwrap(); let file_path = temp_dir.path().join("a.txt"); fs::write(&file_path, b"HelloWorld").await.unwrap(); From dbb1b7e87b5ae236d3b1fb01e7fe287c0831917f Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 16:03:55 +0800 Subject: [PATCH 4/6] test: fix windows test --- tokio/tests/io_seek_write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/io_seek_write.rs b/tokio/tests/io_seek_write.rs index b2d8e5f7340..d9b3e964d71 100644 --- a/tokio/tests/io_seek_write.rs +++ b/tokio/tests/io_seek_write.rs @@ -18,7 +18,7 @@ async fn write_at() { .await .unwrap(); - assert_eq!(file.write_at(b"World", 5).await.unwrap(), 5); + assert_eq!(file.seek_write(b"World", 5).await.unwrap(), 5); let contents = fs::read(file_path.as_path()).await.unwrap(); assert_eq!(contents, b"HelloWorld"); From 488d5f243013fa12abc9d2a047407d931056a640 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 16:13:15 +0800 Subject: [PATCH 5/6] test: fix windows test --- tokio/src/fs/file.rs | 6 +++--- tokio/tests/io_seek_read.rs | 2 +- tokio/tests/io_seek_write.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 310077d721d..f091e5409f7 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -607,7 +607,7 @@ impl File { /// use tokio::io::AsyncSeekExt; /// /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; + /// let mut file = File::create("foo.txt").await?; /// file.write_at(b"foo", 5).await?; /// /// assert_eq!(file.stream_position().await.unwrap(), 0); @@ -644,7 +644,7 @@ impl File { /// use tokio::io::AsyncSeekExt; /// /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; + /// let file = File::open("foo.txt").await?; /// let mut buf = vec![0_u8; 10]; /// file.seek_read(&mut buf, 5).await?; /// # Ok(()) @@ -692,7 +692,7 @@ impl File { /// use tokio::io::AsyncSeekExt; /// /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; + /// let file = File::create("foo.txt").await?; /// file.seek_write(b"foo", 5).await?; /// # Ok(()) /// # } diff --git a/tokio/tests/io_seek_read.rs b/tokio/tests/io_seek_read.rs index f11c0993d71..dfb1afd0ded 100644 --- a/tokio/tests/io_seek_read.rs +++ b/tokio/tests/io_seek_read.rs @@ -3,7 +3,7 @@ #[tokio::test] #[cfg(windows)] -async fn read_at() { +async fn seek_read() { use tempfile::tempdir; use tokio::fs; use tokio::io::AsyncSeekExt; diff --git a/tokio/tests/io_seek_write.rs b/tokio/tests/io_seek_write.rs index d9b3e964d71..1eeb1c5aff0 100644 --- a/tokio/tests/io_seek_write.rs +++ b/tokio/tests/io_seek_write.rs @@ -3,7 +3,7 @@ #[tokio::test] #[cfg(windows)] -async fn write_at() { +async fn seek_write() { use tempfile::tempdir; use tokio::fs; use tokio::fs::OpenOptions; From c98e22fc5a275e083c3079424bfbf964fba0f7a9 Mon Sep 17 00:00:00 2001 From: Steve Lau Date: Wed, 27 Mar 2024 16:23:47 +0800 Subject: [PATCH 6/6] test: fix windows test --- tokio/src/fs/file.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index f091e5409f7..6096c2b7a41 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -561,7 +561,7 @@ impl File { /// let mut file = File::open("foo.txt").await?; /// file.read_at(&mut buf, 5).await?; /// - /// assert_eq!(file.stream_position().await.unwrap(), 0); + /// assert_eq!(file.stream_position().await?, 0); /// # Ok(()) /// # } /// ``` @@ -610,7 +610,7 @@ impl File { /// let mut file = File::create("foo.txt").await?; /// file.write_at(b"foo", 5).await?; /// - /// assert_eq!(file.stream_position().await.unwrap(), 0); + /// assert_eq!(file.stream_position().await?, 0); /// # Ok(()) /// # } /// ``` @@ -641,7 +641,6 @@ impl File { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::io::AsyncSeekExt; /// /// # async fn dox() -> std::io::Result<()> { /// let file = File::open("foo.txt").await?; @@ -689,7 +688,6 @@ impl File { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::io::AsyncSeekExt; /// /// # async fn dox() -> std::io::Result<()> { /// let file = File::create("foo.txt").await?;