From 432dbf0e74c4bdf3b7dd9f3ac00ca9a1172c3d75 Mon Sep 17 00:00:00 2001 From: Marcin Mielniczuk Date: Tue, 14 Jan 2020 16:33:35 +0100 Subject: [PATCH] More WIP --- crates/wasi-common/Cargo.toml | 1 + .../src/sys/windows/hostcalls_impl/misc.rs | 260 ++++++++++++------ 2 files changed, 175 insertions(+), 86 deletions(-) diff --git a/crates/wasi-common/Cargo.toml b/crates/wasi-common/Cargo.toml index f9d0f08ec2..370e615397 100644 --- a/crates/wasi-common/Cargo.toml +++ b/crates/wasi-common/Cargo.toml @@ -22,6 +22,7 @@ filetime = "0.2.7" lazy_static = "1.4.0" num = { version = "0.2.0", default-features = false } wig = { path = "wig" } +crossbeam = "0.7.3" [target.'cfg(unix)'.dependencies] yanix = { path = "yanix" } diff --git a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs index b45fb638f3..dc58d6de3d 100644 --- a/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs +++ b/crates/wasi-common/src/sys/windows/hostcalls_impl/misc.rs @@ -1,19 +1,79 @@ #![allow(non_camel_case_types)] #![allow(unused_unsafe)] #![allow(unused)] +use crate::fdentry::Descriptor; use crate::hostcalls_impl::{ClockEventData, FdEventData}; use crate::memory::*; use crate::sys::host_impl; use crate::{wasi, wasi32, Error, Result}; use cpu_time::{ProcessTime, ThreadTime}; +use crossbeam::channel::{self, Receiver, Sender}; use lazy_static::lazy_static; -use log::trace; +use log::{error, trace}; use std::convert::TryInto; +use std::io; +use std::os::windows::io::AsRawHandle; +use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +type StdinPayload = io::Result; +struct StdinPoll { + request_tx: Sender<()>, + notify_rx: Receiver, +} + +enum PollState { + Ready, + Closed, + TimedOut, + Error(Error), +} + +impl StdinPoll { + fn poll(&self, timeout: Option) -> PollState { + use crossbeam::channel::{RecvTimeoutError, TryRecvError}; + // Clean up possible unread result from previous poll + match self.notify_rx.try_recv() { + Ok(_) | Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => panic!("FIXME"), + } + self.request_tx.send(()).expect("FIXME"); + let pollret = match timeout { + Some(timeout) => self.notify_rx.recv_timeout(timeout), + None => Ok(self.notify_rx.recv().expect("FIXME")), + }; + match pollret { + Ok(Ok(true)) => PollState::Ready, + Ok(Ok(false)) => PollState::Closed, + Ok(Err(e)) => PollState::Error(e.into()), + Err(RecvTimeoutError::Timeout) => PollState::TimedOut, + Err(RecvTimeoutError::Disconnected) => panic!("FIXME"), + } + } + + fn event_loop(request_rx: Receiver<()>, notify_tx: Sender) -> ! { + use std::io::BufRead; + loop { + request_rx.recv().expect("FIXME"); + let buf = std::io::stdin().lock().fill_buf().map(|s| !s.is_empty()); + notify_tx.send(buf).expect("FIXME"); + } + } +} + lazy_static! { static ref START_MONOTONIC: Instant = Instant::now(); static ref PERF_COUNTER_RES: u64 = get_perf_counter_resolution_ns(); + static ref STDIN_POLL: StdinPoll = { + let channel_size = 1; + let (request_tx, request_rx) = channel::bounded(channel_size); + let (notify_tx, notify_rx) = channel::bounded(channel_size); + thread::spawn(move || StdinPoll::event_loop(request_rx, notify_tx)); + StdinPoll { + request_tx, + notify_rx, + } + }; } // Timer resolution on Windows is really hard. We may consider exposing the resolution of the respective @@ -110,34 +170,80 @@ fn make_timeout_event(timeout: &ClockEventData) -> wasi::__wasi_event_t { } } +fn handle_timeout( + timeout_event: ClockEventData, + timeout: Duration, + events: &mut Vec, +) { + thread::sleep(timeout); + handle_timeout_event(timeout_event, events); +} + +fn handle_timeout_event(timeout_event: ClockEventData, events: &mut Vec) { + let new_event = make_timeout_event(&timeout_event); + events.push(new_event); +} + +fn handle_rw_event(event: FdEventData, out_events: &mut Vec) { + let size = match event.descriptor { + Descriptor::OsHandle(os_handle) => { + if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ { + os_handle.metadata().map(|m| m.len()).map_err(Into::into) + } else { + // The spec is unclear what nbytes should actually be for __WASI_EVENTTYPE_FD_WRITE and + // the implementation on Unix just returns 0 here, so it's probably fine + // to do the same on Windows for now. + // cf. https://github.com/WebAssembly/WASI/issues/148 + Ok(0) + } + } + // We return the only universally correct lower bound, see the comment later in the function. + Descriptor::Stdin => Ok(1), + // On Unix, ioctl(FIONREAD) will return 0 for stdout/stderr. Emulate the same behavior on Windows. + Descriptor::Stdout | Descriptor::Stderr => Ok(0), + }; + + let new_event = make_rw_event(&event, size); + out_events.push(new_event); +} + +fn handle_error_event( + event: FdEventData, + error: Error, + out_events: &mut Vec, +) { + let new_event = make_rw_event(&event, Err(error)); + out_events.push(new_event); +} + pub(crate) fn poll_oneoff( timeout: Option, fd_events: Vec, events: &mut Vec, ) -> Result<()> { - use crate::fdentry::Descriptor; use std::fs::Metadata; use std::thread; - let timeout_duration = timeout - .map(|t| t.delay.try_into().map(Duration::from_nanos)) + let timeout = timeout + .map(|event| { + event + .delay + .try_into() + .map(Duration::from_nanos) + .map(|dur| (event, dur)) + }) .transpose()?; // With no events to listen, poll_oneoff just becomes a sleep. if fd_events.is_empty() { - match timeout_duration { - Some(t) => { - thread::sleep(t); - let timeout_event = timeout.expect("timeout should be Some"); - let new_event = make_timeout_event(&timeout_event); - events.push(new_event); - } - // `poll` invoked with nfds = 0, timeout = -1 appears to be an infinite sleep - // Even though the thread is not guanteed to remain parked forever, `poll(2)` - // mentions that spurious readiness notifications may occur, so it's probably fine - None => thread::park(), + match timeout { + Some((event, dur)) => return Ok(handle_timeout(event, dur, events)), + // `poll` invoked with nfds = 0, timeout = -1 appears to be an infinite sleep on Unix + // usually meant to be interrupted by a signal. Unfortunately, WASI doesn't currently + // support signals and there is no way to interrupt this infinite sleep, so we + // return `ENOTSUP` + None => return Err(Error::ENOTSUP), } - return Ok(()); } // Currently WASI file support is only (a) regular files (b) directories (c) symlinks on Windows, @@ -149,6 +255,7 @@ pub(crate) fn poll_oneoff( // Therefore, we only poll the stdin. let mut stdin_events = vec![]; let mut immediate_events = vec![]; + let mut pipe_events = vec![]; let mut stdin_ready = None; for event in fd_events { @@ -162,7 +269,22 @@ pub(crate) fn poll_oneoff( stdin_events.push(event) } } - _ => immediate_events.push(event), + Descriptor::Stdin | Descriptor::Stderr | Descriptor::Stdout => { + immediate_events.push(event) + } + Descriptor::OsHandle(os_handle) => { + let ftype = unsafe { winx::file::get_file_type(os_handle.as_raw_handle()) }?; + if ftype.is_unknown() || ftype.is_char() { + error!("poll_oneoff: unsupported file type: {:?}", ftype); + handle_error_event(event, Error::ENOTSUP, events); + } else if ftype.is_disk() { + immediate_events.push(event); + } else if ftype.is_pipe() { + pipe_events.push(event); + } else { + unreachable!(); + } + } } } @@ -170,30 +292,43 @@ pub(crate) fn poll_oneoff( if !immediate_events.is_empty() { trace!(" | have immediate events, will return immediately"); for mut event in immediate_events { - let size = match event.descriptor { - Descriptor::OsHandle(os_handle) => { - if event.r#type == wasi::__WASI_EVENTTYPE_FD_READ { - os_handle.metadata().map(|m| m.len()).map_err(Into::into) - } else { - // The spec is unclear what nbytes should actually be for __WASI_EVENTTYPE_FD_WRITE and - // the implementation on Unix just returns 0 here, so it's probably fine - // to do the same on Windows for now. - // cf. https://github.com/WebAssembly/WASI/issues/148 - Ok(0) - } - } - // We return the only universally correct lower bound, see the comment later in the function. - Descriptor::Stdin => Ok(1), - // On Unix, ioctl(FIONREAD) will return 0 for stdout/stderr. Emulate the same behavior on Windows. - Descriptor::Stdout | Descriptor::Stderr => Ok(0), - }; - - let new_event = make_rw_event(&event, size); - events.push(new_event) + handle_rw_event(event, events); } } else if !stdin_events.is_empty() { - trace!(" | actively polling stdin"); - // There are some stdin poll requests and there's no data available immediately + // REVIEW: is there a better place to document this? Perhaps in + // `struct PollStdin`? + // + // If there's a request to poll stdin, we spin up a separate thread to + // waiting for data to arrive on stdin. This thread will not terminate. + // + // TODO more explain why this way + trace!(" | passively waiting on stdin"); + let dur = timeout.map(|t| t.1); + let state = STDIN_POLL.poll(dur); + for event in stdin_events { + match state { + PollState::Ready => handle_rw_event(event, events), + PollState::Closed => { /* error? FIXME */ } + PollState::TimedOut => { /* FIXME */ } + PollState::Error(ref e) => { + handle_error_event(event, Error::ENOTSUP /*FIXME*/, events); + } + } + } + } else if !pipe_events.is_empty() { + trace!(" | actively polling stdin or pipes"); + match timeout { + Some((event, dur)) => { + error!("Polling pipes not supported on Windows, will just time out."); + return Ok(handle_timeout(event, dur, events)); + } + None => { + error!("Polling only pipes with no timeout not supported on Windows."); + return Err(Error::ENOTSUP); + } + } + // TODO remove these old comments!!! + // There are some stdin or pipe poll requests and there's no data available immediately // We are busy-polling the stdin with delay, unfortunately. // @@ -216,55 +351,8 @@ pub(crate) fn poll_oneoff( // // However, polling stdin is a relatively infrequent use case, so this hopefully won't be // a major issue. - let timeout_duration = timeout - .map(|t| t.delay.try_into().map(Duration::from_nanos)) - .transpose()?; // avoid issuing more syscalls if we're requested to return immediately - if timeout_duration == Some(Duration::from_nanos(0)) { - return Ok(()); - } - - let poll_interval = Duration::from_millis(10); - let poll_start = Instant::now(); - - let timeout_occurred: Option = loop { - // Even though we assume that stdin is not ready, it's better to check it - // sooner than later, as we're going to wait anyway if it's the case. - if stdin_nonempty() { - break None; - } - if let Some(timeout_duration) = timeout_duration { - if poll_start.elapsed() >= timeout_duration { - break timeout; - } - } - thread::sleep(poll_interval); - }; - - match timeout_occurred { - Some(timeout_info) => { - let new_event = make_timeout_event(&timeout_info); - events.push(new_event); - } - None => { - // stdin became ready for reading - for event in stdin_events { - assert_eq!( - event.r#type, - wasi::__WASI_EVENTTYPE_FD_READ, - "stdin was expected to be polled for reading" - ); - - // Another limitation is that `std::io::BufRead` doesn't allow us - // to find out the number bytes available in the buffer, - // so we return the only universally correct lower bound, - // which is 1 byte. - let new_event = make_rw_event(&event, Ok(1)); - events.push(new_event); - } - } - } } Ok(())