Rollup merge of #108326 - tmiasko:read-buf, r=thomcc

Implement read_buf for a few more types

Implement read_buf for TcpStream, Stdin, StdinLock, ChildStdout,
ChildStderr (and internally for AnonPipe, Handle, Socket), so
that it skips buffer initialization.

The other provided methods like read_to_string and read_to_end are
implemented in terms of read_buf and so benefit from the optimization
as well.

This commit also implements read_vectored and is_read_vectored where
applicable.
This commit is contained in:
nils 2023-03-21 13:00:21 +01:00 committed by GitHub
commit 82dc127d7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 250 additions and 31 deletions

View file

@ -2,7 +2,8 @@ use crate::io::prelude::*;
use crate::env;
use crate::fs::{self, File, OpenOptions};
use crate::io::{ErrorKind, SeekFrom};
use crate::io::{BorrowedBuf, ErrorKind, SeekFrom};
use crate::mem::MaybeUninit;
use crate::path::Path;
use crate::str;
use crate::sync::Arc;
@ -401,6 +402,23 @@ fn file_test_io_seek_read_write() {
check!(fs::remove_file(&filename));
}
#[test]
fn file_test_read_buf() {
let tmpdir = tmpdir();
let filename = &tmpdir.join("test");
check!(fs::write(filename, &[1, 2, 3, 4]));
let mut buf: [MaybeUninit<u8>; 128] = MaybeUninit::uninit_array();
let mut buf = BorrowedBuf::from(buf.as_mut_slice());
let mut file = check!(File::open(filename));
check!(file.read_buf(buf.unfilled()));
assert_eq!(buf.filled(), &[1, 2, 3, 4]);
// File::read_buf should omit buffer initialization.
assert_eq!(buf.init_len(), 4);
check!(fs::remove_file(filename));
}
#[test]
fn file_test_stat_is_correct_on_is_file() {
let tmpdir = tmpdir();

View file

@ -8,7 +8,7 @@ use crate::io::prelude::*;
use crate::cell::{Cell, RefCell};
use crate::fmt;
use crate::fs::File;
use crate::io::{self, BufReader, IoSlice, IoSliceMut, LineWriter, Lines};
use crate::io::{self, BorrowedCursor, BufReader, IoSlice, IoSliceMut, LineWriter, Lines};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::{Arc, Mutex, MutexGuard, OnceLock, ReentrantMutex, ReentrantMutexGuard};
use crate::sys::stdio;
@ -97,6 +97,10 @@ impl Read for StdinRaw {
handle_ebadf(self.0.read(buf), 0)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
handle_ebadf(self.0.read_buf(buf), ())
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
handle_ebadf(self.0.read_vectored(bufs), 0)
}
@ -418,6 +422,9 @@ impl Read for Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.lock().read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.lock().read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.lock().read_vectored(bufs)
}
@ -450,6 +457,10 @@ impl Read for StdinLock<'_> {
self.inner.read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.inner.read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
}

View file

@ -6,7 +6,7 @@ mod tests;
use crate::io::prelude::*;
use crate::fmt;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::iter::FusedIterator;
use crate::net::{Shutdown, SocketAddr, ToSocketAddrs};
use crate::sys_common::net as net_imp;
@ -619,6 +619,10 @@ impl Read for TcpStream {
self.0.read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.0.read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}
@ -653,6 +657,10 @@ impl Read for &TcpStream {
self.0.read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.0.read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}

View file

@ -1,6 +1,7 @@
use crate::fmt;
use crate::io::prelude::*;
use crate::io::{ErrorKind, IoSlice, IoSliceMut};
use crate::io::{BorrowedBuf, ErrorKind, IoSlice, IoSliceMut};
use crate::mem::MaybeUninit;
use crate::net::test::{next_test_ip4, next_test_ip6};
use crate::net::*;
use crate::sync::mpsc::channel;
@ -279,6 +280,31 @@ fn partial_read() {
})
}
#[test]
fn read_buf() {
each_ip(&mut |addr| {
let srv = t!(TcpListener::bind(&addr));
let t = thread::spawn(move || {
let mut s = t!(TcpStream::connect(&addr));
s.write_all(&[1, 2, 3, 4]).unwrap();
});
let mut s = t!(srv.accept()).0;
let mut buf: [MaybeUninit<u8>; 128] = MaybeUninit::uninit_array();
let mut buf = BorrowedBuf::from(buf.as_mut_slice());
t!(s.read_buf(buf.unfilled()));
assert_eq!(buf.filled(), &[1, 2, 3, 4]);
// FIXME: sgx uses default_read_buf that initializes the buffer.
if cfg!(not(target_env = "sgx")) {
// TcpStream::read_buf should omit buffer initialization.
assert_eq!(buf.init_len(), 4);
}
t.join().ok().expect("thread panicked");
})
}
#[test]
fn read_vectored() {
each_ip(&mut |addr| {

View file

@ -110,7 +110,7 @@ use crate::convert::Infallible;
use crate::ffi::OsStr;
use crate::fmt;
use crate::fs;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::num::NonZeroI32;
use crate::path::Path;
use crate::str;
@ -354,6 +354,10 @@ impl Read for ChildStdout {
self.inner.read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.inner.read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
}
@ -419,6 +423,10 @@ impl Read for ChildStderr {
self.inner.read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.inner.read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
}

View file

@ -1,7 +1,8 @@
use crate::io::prelude::*;
use super::{Command, Output, Stdio};
use crate::io::ErrorKind;
use crate::io::{BorrowedBuf, ErrorKind};
use crate::mem::MaybeUninit;
use crate::str;
fn known_command() -> Command {
@ -119,6 +120,37 @@ fn stdin_works() {
assert_eq!(out, "foobar\n");
}
#[test]
#[cfg_attr(any(target_os = "vxworks"), ignore)]
fn child_stdout_read_buf() {
let mut cmd = if cfg!(target_os = "windows") {
let mut cmd = Command::new("cmd");
cmd.arg("/C").arg("echo abc");
cmd
} else {
let mut cmd = shell_cmd();
cmd.arg("-c").arg("echo abc");
cmd
};
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
let child = cmd.spawn().unwrap();
let mut stdout = child.stdout.unwrap();
let mut buf: [MaybeUninit<u8>; 128] = MaybeUninit::uninit_array();
let mut buf = BorrowedBuf::from(buf.as_mut_slice());
stdout.read_buf(buf.unfilled()).unwrap();
// ChildStdout::read_buf should omit buffer initialization.
if cfg!(target_os = "windows") {
assert_eq!(buf.filled(), b"abc\r\n");
assert_eq!(buf.init_len(), 5);
} else {
assert_eq!(buf.filled(), b"abc\n");
assert_eq!(buf.init_len(), 4);
};
}
#[test]
#[cfg_attr(any(target_os = "vxworks"), ignore)]
fn test_process_status() {

View file

@ -1,7 +1,7 @@
use fortanix_sgx_abi::Fd;
use super::abi::usercalls;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::mem;
use crate::sys::{AsInner, FromInner, IntoInner};
@ -30,6 +30,10 @@ impl FileDesc {
usercalls::read(self.fd, &mut [IoSliceMut::new(buf)])
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
crate::io::default_read_buf(|b| self.read(b), buf)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
usercalls::read(self.fd, bufs)
}

View file

@ -1,6 +1,6 @@
use crate::error;
use crate::fmt;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::net::{Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr, ToSocketAddrs};
use crate::sync::Arc;
use crate::sys::fd::FileDesc;
@ -144,6 +144,10 @@ impl TcpStream {
self.inner.inner.read(buf)
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.inner.inner.read_buf(buf)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.inner.read_vectored(bufs)
}

View file

@ -469,6 +469,15 @@ impl<'a> Read for &'a FileDesc {
fn read_buf(&mut self, cursor: BorrowedCursor<'_>) -> io::Result<()> {
(**self).read_buf(cursor)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
(**self).read_vectored(bufs)
}
#[inline]
fn is_read_vectored(&self) -> bool {
(**self).is_read_vectored()
}
}
impl AsInner<OwnedFd> for FileDesc {

View file

@ -1,6 +1,6 @@
use crate::cmp;
use crate::ffi::CStr;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedBuf, BorrowedCursor, IoSlice, IoSliceMut};
use crate::mem;
use crate::net::{Shutdown, SocketAddr};
use crate::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
@ -242,19 +242,35 @@ impl Socket {
self.0.duplicate().map(Socket)
}
fn recv_with_flags(&self, buf: &mut [u8], flags: c_int) -> io::Result<usize> {
fn recv_with_flags(&self, mut buf: BorrowedCursor<'_>, flags: c_int) -> io::Result<()> {
let ret = cvt(unsafe {
libc::recv(self.as_raw_fd(), buf.as_mut_ptr() as *mut c_void, buf.len(), flags)
libc::recv(
self.as_raw_fd(),
buf.as_mut().as_mut_ptr() as *mut c_void,
buf.capacity(),
flags,
)
})?;
Ok(ret as usize)
unsafe {
buf.advance(ret as usize);
}
Ok(())
}
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.recv_with_flags(buf, 0)
let mut buf = BorrowedBuf::from(buf);
self.recv_with_flags(buf.unfilled(), 0)?;
Ok(buf.len())
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.recv_with_flags(buf, MSG_PEEK)
let mut buf = BorrowedBuf::from(buf);
self.recv_with_flags(buf.unfilled(), MSG_PEEK)?;
Ok(buf.len())
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.recv_with_flags(buf, 0)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {

View file

@ -1,4 +1,4 @@
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::mem;
use crate::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use crate::sys::fd::FileDesc;
@ -49,6 +49,10 @@ impl AnonPipe {
self.0.read(buf)
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.0.read_buf(buf)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}

View file

@ -1,4 +1,4 @@
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::mem::ManuallyDrop;
use crate::os::unix::io::FromRawFd;
use crate::sys::fd::FileDesc;
@ -18,6 +18,10 @@ impl io::Read for Stdin {
unsafe { ManuallyDrop::new(FileDesc::from_raw_fd(libc::STDIN_FILENO)).read(buf) }
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
unsafe { ManuallyDrop::new(FileDesc::from_raw_fd(libc::STDIN_FILENO)).read_buf(buf) }
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
unsafe { ManuallyDrop::new(FileDesc::from_raw_fd(libc::STDIN_FILENO)).read_vectored(bufs) }
}

View file

@ -1,5 +1,5 @@
use crate::fmt;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::net::{Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
use crate::sys::unsupported;
use crate::time::Duration;
@ -39,6 +39,10 @@ impl TcpStream {
self.0
}
pub fn read_buf(&self, _buf: BorrowedCursor<'_>) -> io::Result<()> {
self.0
}
pub fn read_vectored(&self, _: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0
}

View file

@ -1,4 +1,4 @@
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
pub struct AnonPipe(!);
@ -7,6 +7,10 @@ impl AnonPipe {
self.0
}
pub fn read_buf(&self, _buf: BorrowedCursor<'_>) -> io::Result<()> {
self.0
}
pub fn read_vectored(&self, _bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0
}

View file

@ -2,7 +2,7 @@
#![allow(dead_code)]
use super::err2io;
use crate::io::{self, IoSlice, IoSliceMut, SeekFrom};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut, SeekFrom};
use crate::mem;
use crate::net::Shutdown;
use crate::os::wasi::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
@ -46,6 +46,22 @@ impl WasiFd {
unsafe { wasi::fd_read(self.as_raw_fd() as wasi::Fd, iovec(bufs)).map_err(err2io) }
}
pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
unsafe {
let bufs = [wasi::Iovec {
buf: buf.as_mut().as_mut_ptr() as *mut u8,
buf_len: buf.capacity(),
}];
match wasi::fd_read(self.as_raw_fd() as wasi::Fd, &bufs) {
Ok(n) => {
buf.advance(n);
Ok(())
}
Err(e) => Err(err2io(e)),
}
}
}
pub fn write(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
unsafe { wasi::fd_write(self.as_raw_fd() as wasi::Fd, ciovec(bufs)).map_err(err2io) }
}

View file

@ -441,7 +441,7 @@ impl File {
}
pub fn read_buf(&self, cursor: BorrowedCursor<'_>) -> io::Result<()> {
crate::io::default_read_buf(|buf| self.read(buf), cursor)
self.fd.read_buf(cursor)
}
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {

View file

@ -3,7 +3,7 @@
use super::err2io;
use super::fd::WasiFd;
use crate::fmt;
use crate::io::{self, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::net::{Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
use crate::os::wasi::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use crate::sys::unsupported;
@ -91,6 +91,10 @@ impl TcpStream {
self.read_vectored(&mut [IoSliceMut::new(buf)])
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.socket().as_inner().read_buf(buf)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.socket().as_inner().read(bufs)
}

View file

@ -327,7 +327,16 @@ impl<'a> Read for &'a Handle {
(**self).read(buf)
}
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> io::Result<()> {
(**self).read_buf(buf)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
(**self).read_vectored(bufs)
}
#[inline]
fn is_read_vectored(&self) -> bool {
(**self).is_read_vectored()
}
}

View file

@ -1,7 +1,7 @@
#![unstable(issue = "none", feature = "windows_net")]
use crate::cmp;
use crate::io::{self, IoSlice, IoSliceMut, Read};
use crate::io::{self, BorrowedBuf, BorrowedCursor, IoSlice, IoSliceMut, Read};
use crate::mem;
use crate::net::{Shutdown, SocketAddr};
use crate::os::windows::io::{
@ -214,28 +214,38 @@ impl Socket {
Ok(Self(self.0.try_clone()?))
}
fn recv_with_flags(&self, buf: &mut [u8], flags: c_int) -> io::Result<usize> {
fn recv_with_flags(&self, mut buf: BorrowedCursor<'_>, flags: c_int) -> io::Result<()> {
// On unix when a socket is shut down all further reads return 0, so we
// do the same on windows to map a shut down socket to returning EOF.
let length = cmp::min(buf.len(), i32::MAX as usize) as i32;
let result =
unsafe { c::recv(self.as_raw_socket(), buf.as_mut_ptr() as *mut _, length, flags) };
let length = cmp::min(buf.capacity(), i32::MAX as usize) as i32;
let result = unsafe {
c::recv(self.as_raw_socket(), buf.as_mut().as_mut_ptr() as *mut _, length, flags)
};
match result {
c::SOCKET_ERROR => {
let error = unsafe { c::WSAGetLastError() };
if error == c::WSAESHUTDOWN {
Ok(0)
Ok(())
} else {
Err(io::Error::from_raw_os_error(error))
}
}
_ => Ok(result as usize),
_ => {
unsafe { buf.advance(result as usize) };
Ok(())
}
}
}
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
let mut buf = BorrowedBuf::from(buf);
self.recv_with_flags(buf.unfilled(), 0)?;
Ok(buf.len())
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.recv_with_flags(buf, 0)
}
@ -277,7 +287,9 @@ impl Socket {
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.recv_with_flags(buf, c::MSG_PEEK)
let mut buf = BorrowedBuf::from(buf);
self.recv_with_flags(buf.unfilled(), c::MSG_PEEK)?;
Ok(buf.len())
}
fn recv_from_with_flags(

View file

@ -1,7 +1,7 @@
use crate::os::windows::prelude::*;
use crate::ffi::OsStr;
use crate::io::{self, IoSlice, IoSliceMut, Read};
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut, Read};
use crate::mem;
use crate::path::Path;
use crate::ptr;
@ -252,6 +252,28 @@ impl AnonPipe {
}
}
pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
let result = unsafe {
let len = crate::cmp::min(buf.capacity(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::ReadFileEx, buf.as_mut().as_mut_ptr() as _, len)
};
match result {
// The special treatment of BrokenPipe is to deal with Windows
// pipe semantics, which yields this error when *reading* from
// a pipe after the other end has closed; we interpret that as
// EOF on the pipe.
Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
Err(e) => Err(e),
Ok(n) => {
unsafe {
buf.advance(n);
}
Ok(())
}
}
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
}

View file

@ -4,7 +4,7 @@ mod tests;
use crate::cmp;
use crate::convert::{TryFrom, TryInto};
use crate::fmt;
use crate::io::{self, ErrorKind, IoSlice, IoSliceMut};
use crate::io::{self, BorrowedCursor, ErrorKind, IoSlice, IoSliceMut};
use crate::mem;
use crate::net::{Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
use crate::ptr;
@ -272,6 +272,10 @@ impl TcpStream {
self.inner.read(buf)
}
pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> {
self.inner.read_buf(buf)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
}