Synchronize asynchronous pipe reads and writes
This commit is contained in:
parent
05142a7e44
commit
547504795c
2 changed files with 138 additions and 2 deletions
|
@ -326,6 +326,12 @@ impl Default for IO_STATUS_BLOCK {
|
|||
}
|
||||
}
|
||||
|
||||
pub type LPOVERLAPPED_COMPLETION_ROUTINE = unsafe extern "C" fn(
|
||||
dwErrorCode: DWORD,
|
||||
dwNumberOfBytesTransfered: DWORD,
|
||||
lpOverlapped: *mut OVERLAPPED,
|
||||
);
|
||||
|
||||
#[repr(C)]
|
||||
#[cfg(not(target_pointer_width = "64"))]
|
||||
pub struct WSADATA {
|
||||
|
@ -891,6 +897,7 @@ extern "system" {
|
|||
pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
|
||||
pub fn SwitchToThread() -> BOOL;
|
||||
pub fn Sleep(dwMilliseconds: DWORD);
|
||||
pub fn SleepEx(dwMilliseconds: DWORD, bAlertable: BOOL) -> DWORD;
|
||||
pub fn GetProcessId(handle: HANDLE) -> DWORD;
|
||||
pub fn CopyFileExW(
|
||||
lpExistingFileName: LPCWSTR,
|
||||
|
@ -957,6 +964,13 @@ extern "system" {
|
|||
lpNumberOfBytesRead: LPDWORD,
|
||||
lpOverlapped: LPOVERLAPPED,
|
||||
) -> BOOL;
|
||||
pub fn ReadFileEx(
|
||||
hFile: BorrowedHandle<'_>,
|
||||
lpBuffer: LPVOID,
|
||||
nNumberOfBytesToRead: DWORD,
|
||||
lpOverlapped: LPOVERLAPPED,
|
||||
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||
) -> BOOL;
|
||||
pub fn WriteFile(
|
||||
hFile: BorrowedHandle<'_>,
|
||||
lpBuffer: LPVOID,
|
||||
|
@ -964,6 +978,13 @@ extern "system" {
|
|||
lpNumberOfBytesWritten: LPDWORD,
|
||||
lpOverlapped: LPOVERLAPPED,
|
||||
) -> BOOL;
|
||||
pub fn WriteFileEx(
|
||||
hFile: BorrowedHandle<'_>,
|
||||
lpBuffer: LPVOID,
|
||||
nNumberOfBytesToWrite: DWORD,
|
||||
lpOverlapped: LPOVERLAPPED,
|
||||
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||
) -> BOOL;
|
||||
pub fn CloseHandle(hObject: HANDLE) -> BOOL;
|
||||
pub fn MoveFileExW(lpExistingFileName: LPCWSTR, lpNewFileName: LPCWSTR, dwFlags: DWORD)
|
||||
-> BOOL;
|
||||
|
|
|
@ -173,6 +173,15 @@ fn random_number() -> usize {
|
|||
}
|
||||
}
|
||||
|
||||
// Abstracts over `ReadFileEx` and `WriteFileEx`
|
||||
type AlertableIoFn = unsafe extern "system" fn(
|
||||
BorrowedHandle<'_>,
|
||||
c::LPVOID,
|
||||
c::DWORD,
|
||||
c::LPOVERLAPPED,
|
||||
c::LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||
) -> c::BOOL;
|
||||
|
||||
impl AnonPipe {
|
||||
pub fn handle(&self) -> &Handle {
|
||||
&self.inner
|
||||
|
@ -182,7 +191,19 @@ impl AnonPipe {
|
|||
}
|
||||
|
||||
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
let result = unsafe {
|
||||
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
|
||||
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
|
||||
};
|
||||
|
||||
match result {
|
||||
// The special treatment of BrokenPipe is to deal with Windows
|
||||
// pipe semantics, which yields this error when *reading* from
|
||||
// a pipe after the other end has closed; we interpret that as
|
||||
// EOF on the pipe.
|
||||
Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
|
||||
_ => result,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
|
||||
|
@ -195,7 +216,10 @@ impl AnonPipe {
|
|||
}
|
||||
|
||||
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.inner.write(buf)
|
||||
unsafe {
|
||||
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
|
||||
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
|
||||
|
@ -206,6 +230,97 @@ impl AnonPipe {
|
|||
pub fn is_write_vectored(&self) -> bool {
|
||||
self.inner.is_write_vectored()
|
||||
}
|
||||
|
||||
/// Synchronizes asynchronous reads or writes using our anonymous pipe.
|
||||
///
|
||||
/// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
|
||||
/// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
|
||||
///
|
||||
/// Note: This should not be used for handles we don't create.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// `buf` must be a pointer to a buffer that's valid for reads or writes
|
||||
/// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
|
||||
///
|
||||
/// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
|
||||
/// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
|
||||
/// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
|
||||
unsafe fn alertable_io_internal(
|
||||
&self,
|
||||
io: AlertableIoFn,
|
||||
buf: c::LPVOID,
|
||||
len: c::DWORD,
|
||||
) -> io::Result<usize> {
|
||||
// Use "alertable I/O" to synchronize the pipe I/O.
|
||||
// This has four steps.
|
||||
//
|
||||
// STEP 1: Start the asynchronous I/O operation.
|
||||
// This simply calls either `ReadFileEx` or `WriteFileEx`,
|
||||
// giving it a pointer to the buffer and callback function.
|
||||
//
|
||||
// STEP 2: Enter an alertable state.
|
||||
// The callback set in step 1 will not be called until the thread
|
||||
// enters an "alertable" state. This can be done using `SleepEx`.
|
||||
//
|
||||
// STEP 3: The callback
|
||||
// Once the I/O is complete and the thread is in an alertable state,
|
||||
// the callback will be run on the same thread as the call to
|
||||
// `ReadFileEx` or `WriteFileEx` done in step 1.
|
||||
// In the callback we simply set the result of the async operation.
|
||||
//
|
||||
// STEP 4: Return the result.
|
||||
// At this point we'll have a result from the callback function
|
||||
// and can simply return it. Note that we must not return earlier,
|
||||
// while the I/O is still in progress.
|
||||
|
||||
// The result that will be set from the asynchronous callback.
|
||||
let mut async_result: Option<AsyncResult> = None;
|
||||
struct AsyncResult {
|
||||
error: u32,
|
||||
transfered: u32,
|
||||
}
|
||||
|
||||
// STEP 3: The callback.
|
||||
unsafe extern "C" fn callback(
|
||||
dwErrorCode: u32,
|
||||
dwNumberOfBytesTransfered: u32,
|
||||
lpOverlapped: *mut c::OVERLAPPED,
|
||||
) {
|
||||
// Set `async_result` using a pointer smuggled through `hEvent`.
|
||||
let result = AsyncResult { error: dwErrorCode, transfered: dwNumberOfBytesTransfered };
|
||||
*(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
|
||||
}
|
||||
|
||||
// STEP 1: Start the I/O operation.
|
||||
let mut overlapped: c::OVERLAPPED = crate::mem::zeroed();
|
||||
// `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
|
||||
// Therefore the documentation suggests using it to smuggle a pointer to the callback.
|
||||
overlapped.hEvent = &mut async_result as *mut _ as *mut _;
|
||||
|
||||
// Asynchronous read of the pipe.
|
||||
// If successful, `callback` will be called once it completes.
|
||||
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
|
||||
if result == c::FALSE {
|
||||
// We can return here because the call failed.
|
||||
// After this we must not return until the I/O completes.
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
// Wait indefinitely for the result.
|
||||
while async_result.is_none() {
|
||||
// STEP 2: Enter an alertable state.
|
||||
// The second parameter of `SleepEx` is used to make this sleep alertable.
|
||||
c::SleepEx(c::INFINITE, c::TRUE);
|
||||
}
|
||||
// STEP 4: Return the result.
|
||||
// `async_result` is always `Some` at this point.
|
||||
let result = async_result.unwrap();
|
||||
match result.error {
|
||||
c::ERROR_SUCCESS => Ok(result.transfered as usize),
|
||||
error => Err(io::Error::from_raw_os_error(error as _)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
|
||||
|
|
Loading…
Add table
Reference in a new issue