diff --git a/crates/wasi/src/preview2/command.rs b/crates/wasi/src/preview2/command.rs index d44b4026a4..76120de6a7 100644 --- a/crates/wasi/src/preview2/command.rs +++ b/crates/wasi/src/preview2/command.rs @@ -6,7 +6,6 @@ wasmtime::component::bindgen!({ async: true, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, - "wasi:io/streams"::"stream-error": Error, }, with: { "wasi:filesystem/types": crate::preview2::bindings::filesystem::types, @@ -61,7 +60,6 @@ pub mod sync { async: false, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, - "wasi:io/streams"::"stream-error": Error, }, with: { "wasi:filesystem/types": crate::preview2::bindings::sync_io::filesystem::types, diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index d13ee68e2d..c9224ce33f 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -1,4 +1,4 @@ -use crate::preview2::{StreamState, Table, TableError}; +use crate::preview2::{StreamRuntimeError, StreamState, Table, TableError}; use bytes::{Bytes, BytesMut}; use std::sync::Arc; @@ -152,24 +152,20 @@ impl FileInputStream { } } -pub(crate) fn read_result( - r: Result, -) -> Result<(usize, StreamState), std::io::Error> { +fn read_result(r: Result) -> Result<(usize, StreamState), anyhow::Error> { match r { Ok(0) => Ok((0, StreamState::Closed)), Ok(n) => Ok((n, StreamState::Open)), Err(e) if e.kind() == std::io::ErrorKind::Interrupted => Ok((0, StreamState::Open)), - Err(e) => Err(e), + Err(e) => Err(StreamRuntimeError::from(anyhow::anyhow!(e)).into()), } } -pub(crate) fn write_result( - r: Result, -) -> Result<(usize, StreamState), std::io::Error> { +fn write_result(r: Result) -> Result<(usize, StreamState), anyhow::Error> { match r { Ok(0) => Ok((0, StreamState::Closed)), Ok(n) => Ok((n, StreamState::Open)), - Err(e) => Err(e), + Err(e) => Err(StreamRuntimeError::from(anyhow::anyhow!(e)).into()), } } diff --git a/crates/wasi/src/preview2/host/filesystem.rs b/crates/wasi/src/preview2/host/filesystem.rs index 1fc1e17e22..189f5ca34a 100644 --- a/crates/wasi/src/preview2/host/filesystem.rs +++ b/crates/wasi/src/preview2/host/filesystem.rs @@ -209,7 +209,10 @@ impl types::Host for T { }) .await; - let (bytes_read, state) = crate::preview2::filesystem::read_result(r)?; + let (bytes_read, state) = match r? { + 0 => (0, true), + n => (n, false), + }; buffer.truncate( bytes_read @@ -217,7 +220,7 @@ impl types::Host for T { .expect("bytes read into memory as u64 fits in usize"), ); - Ok((buffer, state.is_closed())) + Ok((buffer, state)) } async fn write( diff --git a/crates/wasi/src/preview2/host/io.rs b/crates/wasi/src/preview2/host/io.rs index 04fb8ed81f..7dc52a027e 100644 --- a/crates/wasi/src/preview2/host/io.rs +++ b/crates/wasi/src/preview2/host/io.rs @@ -1,32 +1,16 @@ use crate::preview2::{ - bindings::io::streams::{self, InputStream, OutputStream, StreamError}, + bindings::io::streams::{self, InputStream, OutputStream}, bindings::poll::poll::Pollable, filesystem::{FileInputStream, FileOutputStream}, poll::PollableFuture, stream::{ HostInputStream, HostOutputStream, InternalInputStream, InternalOutputStream, - InternalTableStreamExt, StreamState, + InternalTableStreamExt, StreamRuntimeError, StreamState, }, - HostPollable, TableError, TablePollableExt, WasiView, + HostPollable, TablePollableExt, WasiView, }; -use anyhow::anyhow; use std::any::Any; -impl From for streams::Error { - fn from(error: anyhow::Error) -> streams::Error { - tracing::trace!( - "turning anyhow::Error in the streams interface into the empty error result: {error:?}" - ); - StreamError { dummy: 0 }.into() - } -} - -impl From for streams::Error { - fn from(error: TableError) -> streams::Error { - streams::Error::trap(anyhow!(error)) - } -} - impl From for streams::StreamStatus { fn from(state: StreamState) -> Self { match state { @@ -54,17 +38,37 @@ impl streams::Host for T { &mut self, stream: InputStream, len: u64, - ) -> Result<(Vec, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result, streams::StreamStatus), ()>> { match self.table_mut().get_internal_input_stream_mut(stream)? { InternalInputStream::Host(s) => { - let (bytes, state) = HostInputStream::read(s.as_mut(), len as usize)?; + let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; debug_assert!(bytes.len() <= len as usize); - Ok((bytes.into(), state.into())) + Ok(Ok((bytes.into(), state.into()))) } InternalInputStream::File(s) => { - let (bytes, state) = FileInputStream::read(s, len as usize).await?; - Ok((bytes.into(), state.into())) + let (bytes, state) = match FileInputStream::read(s, len as usize).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; + Ok(Ok((bytes.into(), state.into()))) } } } @@ -73,17 +77,37 @@ impl streams::Host for T { &mut self, stream: InputStream, len: u64, - ) -> Result<(Vec, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result, streams::StreamStatus), ()>> { match self.table_mut().get_internal_input_stream_mut(stream)? { InternalInputStream::Host(s) => { s.ready().await?; - let (bytes, state) = HostInputStream::read(s.as_mut(), len as usize)?; + let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; debug_assert!(bytes.len() <= len as usize); - Ok((bytes.into(), state.into())) + Ok(Ok((bytes.into(), state.into()))) } InternalInputStream::File(s) => { - let (bytes, state) = FileInputStream::read(s, len as usize).await?; - Ok((bytes.into(), state.into())) + let (bytes, state) = match FileInputStream::read(s, len as usize).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; + Ok(Ok((bytes.into(), state.into()))) } } } @@ -92,15 +116,26 @@ impl streams::Host for T { &mut self, stream: OutputStream, bytes: Vec, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { match self.table_mut().get_internal_output_stream_mut(stream)? { InternalOutputStream::Host(s) => { - let (bytes_written, status) = HostOutputStream::write(s.as_mut(), bytes.into())?; - Ok((u64::try_from(bytes_written).unwrap(), status.into())) + let (bytes_written, status) = + match HostOutputStream::write(s.as_mut(), bytes.into()) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; + Ok(Ok((u64::try_from(bytes_written).unwrap(), status.into()))) } InternalOutputStream::File(s) => { let (nwritten, state) = FileOutputStream::write(s, bytes.into()).await?; - Ok((nwritten as u64, state.into())) + Ok(Ok((nwritten as u64, state.into()))) } } } @@ -109,24 +144,35 @@ impl streams::Host for T { &mut self, stream: OutputStream, bytes: Vec, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { match self.table_mut().get_internal_output_stream_mut(stream)? { InternalOutputStream::Host(s) => { let mut bytes = bytes::Bytes::from(bytes); let mut nwritten: usize = 0; loop { s.ready().await?; - let (written, state) = HostOutputStream::write(s.as_mut(), bytes.clone())?; + let (written, state) = match HostOutputStream::write(s.as_mut(), bytes.clone()) + { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; let _ = bytes.split_to(written); nwritten += written; if bytes.is_empty() || state == StreamState::Closed { - return Ok((nwritten as u64, state.into())); + return Ok(Ok((nwritten as u64, state.into()))); } } } InternalOutputStream::File(s) => { let (written, state) = FileOutputStream::write(s, bytes.into()).await?; - Ok((written as u64, state.into())) + Ok(Ok((written as u64, state.into()))) } } } @@ -135,17 +181,37 @@ impl streams::Host for T { &mut self, stream: InputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { match self.table_mut().get_internal_input_stream_mut(stream)? { InternalInputStream::Host(s) => { // TODO: the cast to usize should be fallible, use `.try_into()?` - let (bytes_skipped, state) = HostInputStream::skip(s.as_mut(), len as usize)?; + let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; - Ok((bytes_skipped as u64, state.into())) + Ok(Ok((bytes_skipped as u64, state.into()))) } InternalInputStream::File(s) => { - let (bytes_skipped, state) = FileInputStream::skip(s, len as usize).await?; - Ok((bytes_skipped as u64, state.into())) + let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; + Ok(Ok((bytes_skipped as u64, state.into()))) } } } @@ -154,18 +220,38 @@ impl streams::Host for T { &mut self, stream: InputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { match self.table_mut().get_internal_input_stream_mut(stream)? { InternalInputStream::Host(s) => { s.ready().await?; // TODO: the cast to usize should be fallible, use `.try_into()?` - let (bytes_skipped, state) = HostInputStream::skip(s.as_mut(), len as usize)?; + let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; - Ok((bytes_skipped as u64, state.into())) + Ok(Ok((bytes_skipped as u64, state.into()))) } InternalInputStream::File(s) => { - let (bytes_skipped, state) = FileInputStream::skip(s, len as usize).await?; - Ok((bytes_skipped as u64, state.into())) + let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }; + Ok(Ok((bytes_skipped as u64, state.into()))) } } } @@ -174,22 +260,42 @@ impl streams::Host for T { &mut self, stream: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { let s = self.table_mut().get_internal_output_stream_mut(stream)?; let mut bytes = bytes::Bytes::from_static(ZEROS); bytes.truncate((len as usize).min(bytes.len())); let (written, state) = match s { - InternalOutputStream::Host(s) => HostOutputStream::write(s.as_mut(), bytes)?, - InternalOutputStream::File(s) => FileOutputStream::write(s, bytes).await?, + InternalOutputStream::Host(s) => match HostOutputStream::write(s.as_mut(), bytes) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }, + InternalOutputStream::File(s) => match FileOutputStream::write(s, bytes).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }, }; - Ok((written as u64, state.into())) + Ok(Ok((written as u64, state.into()))) } async fn blocking_write_zeroes( &mut self, stream: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { let mut remaining = len as usize; let s = self.table_mut().get_internal_output_stream_mut(stream)?; loop { @@ -199,12 +305,32 @@ impl streams::Host for T { let mut bytes = bytes::Bytes::from_static(ZEROS); bytes.truncate(remaining.min(bytes.len())); let (written, state) = match s { - InternalOutputStream::Host(s) => HostOutputStream::write(s.as_mut(), bytes)?, - InternalOutputStream::File(s) => FileOutputStream::write(s, bytes).await?, + InternalOutputStream::Host(s) => match HostOutputStream::write(s.as_mut(), bytes) { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }, + InternalOutputStream::File(s) => match FileOutputStream::write(s, bytes).await { + Ok(a) => a, + Err(e) => { + if let Some(e) = e.downcast_ref::() { + tracing::debug!("stream runtime error: {e:?}"); + return Ok(Err(())); + } else { + return Err(e); + } + } + }, }; remaining -= written; if remaining == 0 || state == StreamState::Closed { - return Ok((len - remaining as u64, state.into())); + return Ok(Ok((len - remaining as u64, state.into()))); } } } @@ -214,7 +340,7 @@ impl streams::Host for T { _src: InputStream, _dst: OutputStream, _len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { // TODO: We can't get two streams at the same time because they both // carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is // stabilized, that could allow us to add a `get_many_stream_mut` or @@ -243,7 +369,7 @@ impl streams::Host for T { _src: InputStream, _dst: OutputStream, _len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { // TODO: once splice is implemented, figure out what the blocking semantics are for waiting // on src and dest here. todo!("stream splice is not implemented") @@ -253,7 +379,7 @@ impl streams::Host for T { &mut self, _src: InputStream, _dst: OutputStream, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { + ) -> anyhow::Result> { // TODO: We can't get two streams at the same time because they both // carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is // stabilized, that could allow us to add a `get_many_stream_mut` or @@ -283,10 +409,6 @@ impl streams::Host for T { let pollable = match self.table_mut().get_internal_input_stream_mut(stream)? { InternalInputStream::Host(_) => { fn input_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { - // FIXME: This downcast and match should be guaranteed by the checks above, - // however, the table element at index could be changed which would make this - // panic! This is a known problem with referring to other resources in the - // table which must be fixed. let stream = stream .downcast_mut::() .expect("downcast to InternalInputStream failed"); @@ -318,10 +440,6 @@ impl streams::Host for T { let pollable = match self.table_mut().get_internal_output_stream_mut(stream)? { InternalOutputStream::Host(_) => { fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> { - // FIXME: This downcast and match should be guaranteed by the checks above, - // however, the table element at index could be changed which would make this - // panic! This is a known problem with referring to other resources in the - // table which must be fixed. let stream = stream .downcast_mut::() .expect("downcast to HostOutputStream failed"); @@ -353,6 +471,12 @@ pub mod sync { in_tokio, WasiView, }; + // same boilerplate everywhere, converting between two identical types with different + // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary + fn xform(r: Result<(A, AsyncStreamStatus), ()>) -> Result<(A, streams::StreamStatus), ()> { + r.map(|(a, b)| (a, b.into())) + } + impl From for streams::StreamStatus { fn from(other: AsyncStreamStatus) -> Self { match other { @@ -375,80 +499,64 @@ pub mod sync { &mut self, stream: InputStream, len: u64, - ) -> Result<(Vec, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::read(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result, streams::StreamStatus), ()>> { + in_tokio(async { AsyncHost::read(self, stream, len).await }).map(xform) } fn blocking_read( &mut self, stream: InputStream, len: u64, - ) -> Result<(Vec, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::blocking_read(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result, streams::StreamStatus), ()>> { + in_tokio(async { AsyncHost::blocking_read(self, stream, len).await }).map(xform) } fn write( &mut self, stream: OutputStream, bytes: Vec, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::write(self, stream, bytes).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::write(self, stream, bytes).await }).map(xform) } fn blocking_write( &mut self, stream: OutputStream, bytes: Vec, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::blocking_write(self, stream, bytes).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::blocking_write(self, stream, bytes).await }).map(xform) } fn skip( &mut self, stream: InputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::skip(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::skip(self, stream, len).await }).map(xform) } fn blocking_skip( &mut self, stream: InputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::blocking_skip(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::blocking_skip(self, stream, len).await }).map(xform) } fn write_zeroes( &mut self, stream: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::write_zeroes(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::write_zeroes(self, stream, len).await }).map(xform) } fn blocking_write_zeroes( &mut self, stream: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::blocking_write_zeroes(self, stream, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::blocking_write_zeroes(self, stream, len).await }).map(xform) } fn splice( @@ -456,10 +564,8 @@ pub mod sync { src: InputStream, dst: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::splice(self, src, dst, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::splice(self, src, dst, len).await }).map(xform) } fn blocking_splice( @@ -467,20 +573,16 @@ pub mod sync { src: InputStream, dst: OutputStream, len: u64, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::blocking_splice(self, src, dst, len).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::blocking_splice(self, src, dst, len).await }).map(xform) } fn forward( &mut self, src: InputStream, dst: OutputStream, - ) -> Result<(u64, streams::StreamStatus), streams::Error> { - in_tokio(async { AsyncHost::forward(self, src, dst).await }) - .map(|(a, b)| (a, b.into())) - .map_err(streams::Error::from) + ) -> anyhow::Result> { + in_tokio(async { AsyncHost::forward(self, src, dst).await }).map(xform) } fn subscribe_to_input_stream(&mut self, stream: InputStream) -> anyhow::Result { diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index f0144562ee..ac3a6576ec 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -37,7 +37,9 @@ pub use self::filesystem::{DirPerms, FilePerms}; pub use self::poll::{ClosureFuture, HostPollable, MakeFuture, PollableFuture, TablePollableExt}; pub use self::random::{thread_rng, Deterministic}; pub use self::stdio::{stderr, stdin, stdout, IsATTY, Stderr, Stdin, Stdout}; -pub use self::stream::{HostInputStream, HostOutputStream, StreamState, TableStreamExt}; +pub use self::stream::{ + HostInputStream, HostOutputStream, StreamRuntimeError, StreamState, TableStreamExt, +}; pub use self::table::{OccupiedEntry, Table, TableError}; pub use cap_fs_ext::SystemTimeSpec; pub use cap_rand::RngCore; @@ -54,7 +56,6 @@ pub mod bindings { ", tracing: true, trappable_error_type: { - "wasi:io/streams"::"stream-error": Error, "wasi:filesystem/types"::"error-code": Error, }, with: { @@ -63,22 +64,6 @@ pub mod bindings { }); } pub use self::_internal::wasi::{filesystem, io, poll}; - - impl From for io::streams::StreamError { - fn from(other: super::io::streams::StreamError) -> Self { - let super::io::streams::StreamError { dummy } = other; - Self { dummy } - } - } - - impl From for io::streams::Error { - fn from(other: super::io::streams::Error) -> Self { - match other.downcast() { - Ok(se) => io::streams::Error::from(io::streams::StreamError::from(se)), - Err(e) => io::streams::Error::trap(e), - } - } - } } pub(crate) mod _internal_clocks { @@ -105,7 +90,6 @@ pub mod bindings { tracing: true, async: true, trappable_error_type: { - "wasi:io/streams"::"stream-error": Error, "wasi:filesystem/types"::"error-code": Error, }, with: { @@ -137,7 +121,6 @@ pub mod bindings { tracing: true, trappable_error_type: { "wasi:filesystem/types"::"error-code": Error, - "wasi:io/streams"::"stream-error": Error, }, with: { "wasi:clocks/wall-clock": crate::preview2::bindings::clocks::wall_clock, diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs index a2f957aa18..1503522075 100644 --- a/crates/wasi/src/preview2/preview1.rs +++ b/crates/wasi/src/preview2/preview1.rs @@ -431,6 +431,14 @@ impl wiggle::GuestErrorType for types::Errno { } } +fn stream_res(r: anyhow::Result>) -> Result { + match r { + Ok(Ok(a)) => Ok(a), + Ok(Err(_)) => Err(types::Errno::Io.into()), + Err(trap) => Err(types::Error::trap(trap)), + } +} + fn systimespec(set: bool, ts: types::Timestamp, now: bool) -> Result { if set && now { Err(types::Errno::Inval.into()) @@ -1201,11 +1209,10 @@ impl< })?; let max = buf.len().try_into().unwrap_or(u64::MAX); let (read, state) = if blocking { - streams::Host::blocking_read(self, stream, max).await + stream_res(streams::Host::blocking_read(self, stream, max).await)? } else { - streams::Host::read(self, stream, max).await - } - .map_err(|_| types::Errno::Io)?; + stream_res(streams::Host::read(self, stream, max).await)? + }; let n = read.len().try_into()?; let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?; @@ -1217,13 +1224,14 @@ impl< let Some(buf) = first_non_empty_iovec(iovs)? else { return Ok(0) }; - let (read, state) = streams::Host::read( - self, - input_stream, - buf.len().try_into().unwrap_or(u64::MAX), - ) - .await - .map_err(|_| types::Errno::Io)?; + let (read, state) = stream_res( + streams::Host::read( + self, + input_stream, + buf.len().try_into().unwrap_or(u64::MAX), + ) + .await, + )?; (buf, read, state) } _ => return Err(types::Errno::Badf.into()), @@ -1263,11 +1271,10 @@ impl< })?; let max = buf.len().try_into().unwrap_or(u64::MAX); let (read, state) = if blocking { - streams::Host::blocking_read(self, stream, max).await + stream_res(streams::Host::blocking_read(self, stream, max).await)? } else { - streams::Host::read(self, stream, max).await - } - .map_err(|_| types::Errno::Io)?; + stream_res(streams::Host::read(self, stream, max).await)? + }; (buf, read, state) } @@ -1325,11 +1332,10 @@ impl< (stream, position) }; let (n, _stat) = if blocking { - streams::Host::blocking_write(self, stream, buf).await + stream_res(streams::Host::blocking_write(self, stream, buf).await)? } else { - streams::Host::write(self, stream, buf).await - } - .map_err(|_| types::Errno::Io)?; + stream_res(streams::Host::write(self, stream, buf).await)? + }; if !append { let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?; position.store(pos, Ordering::Relaxed); @@ -1340,9 +1346,8 @@ impl< let Some(buf) = first_non_empty_ciovec(ciovs)? else { return Ok(0) }; - let (n, _stat) = streams::Host::blocking_write(self, output_stream, buf) - .await - .map_err(|_| types::Errno::Io)?; + let (n, _stat) = + stream_res(streams::Host::blocking_write(self, output_stream, buf).await)?; n } _ => return Err(types::Errno::Badf.into()), @@ -1372,11 +1377,10 @@ impl< .unwrap_or_else(types::Error::trap) })?; if blocking { - streams::Host::blocking_write(self, stream, buf).await + stream_res(streams::Host::blocking_write(self, stream, buf).await)? } else { - streams::Host::write(self, stream, buf).await + stream_res(streams::Host::write(self, stream, buf).await)? } - .map_err(|_| types::Errno::Io)? } Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => { // NOTE: legacy implementation returns SPIPE here diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index 625d4a3a1d..827c8cbe60 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -2,6 +2,33 @@ use crate::preview2::filesystem::{FileInputStream, FileOutputStream}; use crate::preview2::{Table, TableError}; use anyhow::Error; use bytes::Bytes; +use std::fmt; + +/// An error which should be reported to Wasm as a runtime error, rather than +/// an error which should trap Wasm execution. The definition for runtime +/// stream errors is the empty type, so the contents of this error will only +/// be available via a `tracing`::event` at `Level::DEBUG`. +pub struct StreamRuntimeError(anyhow::Error); +impl From for StreamRuntimeError { + fn from(e: anyhow::Error) -> Self { + StreamRuntimeError(e) + } +} +impl fmt::Debug for StreamRuntimeError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "Stream runtime error: {:?}", self.0) + } +} +impl fmt::Display for StreamRuntimeError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "Stream runtime error") + } +} +impl std::error::Error for StreamRuntimeError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} #[derive(Clone, Copy, Debug, PartialEq)] pub enum StreamState { @@ -22,10 +49,14 @@ pub trait HostInputStream: Send + Sync { /// Read bytes. On success, returns a pair holding the number of bytes /// read and a flag indicating whether the end of the stream was reached. /// Important: this read must be non-blocking! + /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be + /// reported to Wasm as the empty error result. Otherwise, errors will trap. fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error>; /// Read bytes from a stream and discard them. Important: this method must /// be non-blocking! + /// Returning an Error which downcasts to a StreamRuntimeError will be + /// reported to Wasm as the empty error result. Otherwise, errors will trap. fn skip(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> { let mut nread = 0; let mut state = StreamState::Open; @@ -42,6 +73,7 @@ pub trait HostInputStream: Send + Sync { /// Check for read readiness: this method blocks until the stream is ready /// for reading. + /// Returning an error will trap execution. async fn ready(&mut self) -> Result<(), Error>; } @@ -51,10 +83,14 @@ pub trait HostInputStream: Send + Sync { pub trait HostOutputStream: Send + Sync { /// Write bytes. On success, returns the number of bytes written. /// Important: this write must be non-blocking! + /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be + /// reported to Wasm as the empty error result. Otherwise, errors will trap. fn write(&mut self, bytes: Bytes) -> Result<(usize, StreamState), Error>; /// Transfer bytes directly from an input stream to an output stream. /// Important: this splice must be non-blocking! + /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be + /// reported to Wasm as the empty error result. Otherwise, errors will trap. fn splice( &mut self, src: &mut dyn HostInputStream, @@ -77,6 +113,8 @@ pub trait HostOutputStream: Send + Sync { /// Repeatedly write a byte to a stream. Important: this write must be /// non-blocking! + /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be + /// reported to Wasm as the empty error result. Otherwise, errors will trap. fn write_zeroes(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> { // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write // repeatedly from a 'static buffer of zeros. @@ -87,6 +125,7 @@ pub trait HostOutputStream: Send + Sync { /// Check for write readiness: this method blocks until the stream is /// ready for writing. + /// Returning an error will trap execution. async fn ready(&mut self) -> Result<(), Error>; } diff --git a/crates/wasi/wit/deps/io/streams.wit b/crates/wasi/wit/deps/io/streams.wit index 011bde0d6a..98df181c1e 100644 --- a/crates/wasi/wit/deps/io/streams.wit +++ b/crates/wasi/wit/deps/io/streams.wit @@ -8,16 +8,6 @@ package wasi:io interface streams { use wasi:poll/poll.{pollable} - /// An error type returned from a stream operation. - /// - /// TODO: need to figure out the actual contents of this error. Used to be - /// an empty record but that's no longer allowed. The `dummy` field is - /// only here to have this be a valid in the component model by being - /// non-empty. - record stream-error { - dummy: u32, - } - /// Streams provide a sequence of data and then end; once they end, they /// no longer provide any further data. /// @@ -37,15 +27,12 @@ interface streams { /// An input bytestream. In the future, this will be replaced by handle /// types. /// - /// This conceptually represents a `stream`. It's temporary - /// scaffolding until component-model's async features are ready. - /// /// `input-stream`s are *non-blocking* to the extent practical on underlying /// platforms. I/O operations always return promptly; if fewer bytes are /// promptly available than requested, they return the number of bytes promptly /// available, which could even be zero. To wait for data to be available, /// use the `subscribe-to-input-stream` function to obtain a `pollable` which - /// can be polled for using `wasi_poll`. + /// can be polled for using `wasi:poll/poll.poll_oneoff`. /// /// And at present, it is a `u32` instead of being an actual handle, until /// the wit-bindgen implementation of handles and resources is ready. @@ -58,40 +45,36 @@ interface streams { /// This function returns a list of bytes containing the data that was /// read, along with a `stream-status` which, indicates whether further /// reads are expected to produce data. The returned list will contain up to - /// `len` bytes; it may return fewer than requested, but not more. - /// - /// Once a stream has reached the end, subsequent calls to read or - /// `skip` will always report end-of-stream rather than producing more + /// `len` bytes; it may return fewer than requested, but not more. An + /// empty list and `stream-status:open` indicates no more data is + /// available at this time, and that the pollable given by + /// `subscribe-to-input-stream` will be ready when more data is available. + /// + /// Once a stream has reached the end, subsequent calls to `read` or + /// `skip` will always report `stream-status:ended` rather than producing more /// data. /// - /// If `len` is 0, it represents a request to read 0 bytes, which should - /// always succeed, assuming the stream hasn't reached its end yet, and - /// return an empty list. - /// - /// The len here is a `u64`, but some callees may not be able to allocate - /// a buffer as large as that would imply. - /// FIXME: describe what happens if allocation fails. + /// When the caller gives a `len` of 0, it represents a request to read 0 + /// bytes. This read should always succeed and return an empty list and + /// the current `stream-status`. /// - /// When the returned `stream-status` is `open`, the length of the returned - /// value may be less than `len`. When an empty list is returned, this - /// indicates that no more bytes were available from the stream at that - /// time. In that case the subscribe-to-input-stream pollable will indicate - /// when additional bytes are available for reading. + /// The `len` parameter is a `u64`, which could represent a list of u8 which + /// is not possible to allocate in wasm32, or not desirable to allocate as + /// as a return value by the callee. The callee may return a list of bytes + /// less than `len` in size while more bytes are available for reading. read: func( this: input-stream, /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>, stream-error> + ) -> result, stream-status>> - /// Read bytes from a stream, with blocking. - /// - /// This is similar to `read`, except that it blocks until at least one - /// byte can be read. + /// Read bytes from a stream, after blocking until at least one byte can + /// be read. Except for blocking, identical to `read`. blocking-read: func( this: input-stream, /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>, stream-error> + ) -> result, stream-status>> /// Skip bytes from a stream. /// @@ -102,40 +85,42 @@ interface streams { /// `skip` will always report end-of-stream rather than producing more /// data. /// - /// This function returns the number of bytes skipped, along with a bool - /// indicating whether the end of the stream was reached. The returned - /// value will be at most `len`; it may be less. + /// This function returns the number of bytes skipped, along with a + /// `stream-status` indicating whether the end of the stream was + /// reached. The returned value will be at most `len`; it may be less. skip: func( this: input-stream, /// The maximum number of bytes to skip. len: u64, - ) -> result, stream-error> + ) -> result> - /// Skip bytes from a stream, with blocking. - /// - /// This is similar to `skip`, except that it blocks until at least one - /// byte can be consumed. + /// Skip bytes from a stream, after blocking until at least one byte + /// can be skipped. Except for blocking behavior, identical to `skip`. blocking-skip: func( this: input-stream, /// The maximum number of bytes to skip. len: u64, - ) -> result, stream-error> + ) -> result> /// Create a `pollable` which will resolve once either the specified stream /// has bytes available to read or the other end of the stream has been /// closed. + /// The created `pollable` is a child resource of the `input-stream`. + /// Implementations may trap if the `input-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. subscribe-to-input-stream: func(this: input-stream) -> pollable /// Dispose of the specified `input-stream`, after which it may no longer /// be used. + /// Implementations may trap if this `input-stream` is dropped while child + /// `pollable` resources are still alive. + /// After this `input-stream` is dropped, implementations may report any + /// corresponding `output-stream` has `stream-state.closed`. drop-input-stream: func(this: input-stream) /// An output bytestream. In the future, this will be replaced by handle /// types. /// - /// This conceptually represents a `stream`. It's temporary - /// scaffolding until component-model's async features are ready. - /// /// `output-stream`s are *non-blocking* to the extent practical on /// underlying platforms. Except where specified otherwise, I/O operations also /// always return promptly, after the number of bytes that can be written @@ -159,17 +144,18 @@ interface streams { /// When the returned `stream-status` is `open`, the `u64` return value may /// be less than the length of `buf`. This indicates that no more bytes may /// be written to the stream promptly. In that case the - /// subscribe-to-output-stream pollable will indicate when additional bytes + /// `subscribe-to-output-stream` pollable will indicate when additional bytes /// may be promptly written. /// - /// TODO: document what happens when an empty list is written + /// Writing an empty list must return a non-error result with `0` for the + /// `u64` return value, and the current `stream-status`. write: func( this: output-stream, /// Data to write buf: list - ) -> result, stream-error> + ) -> result> - /// Write bytes to a stream, with blocking. + /// Blocking write of bytes to a stream. /// /// This is similar to `write`, except that it blocks until at least one /// byte can be written. @@ -177,27 +163,29 @@ interface streams { this: output-stream, /// Data to write buf: list - ) -> result, stream-error> + ) -> result> - /// Write multiple zero bytes to a stream. + /// Write multiple zero-bytes to a stream. /// - /// This function returns a `u64` indicating the number of zero bytes - /// that were written; it may be less than `len`. + /// This function returns a `u64` indicating the number of zero-bytes + /// that were written; it may be less than `len`. Equivelant to a call to + /// `write` with a list of zeroes of the given length. write-zeroes: func( this: output-stream, - /// The number of zero bytes to write + /// The number of zero-bytes to write len: u64 - ) -> result, stream-error> + ) -> result> /// Write multiple zero bytes to a stream, with blocking. /// /// This is similar to `write-zeroes`, except that it blocks until at least - /// one byte can be written. + /// one byte can be written. Equivelant to a call to `blocking-write` with + /// a list of zeroes of the given length. blocking-write-zeroes: func( this: output-stream, /// The number of zero bytes to write len: u64 - ) -> result, stream-error> + ) -> result> /// Read from one stream and write to another. /// @@ -212,7 +200,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result, stream-error> + ) -> result> /// Read from one stream and write to another, with blocking. /// @@ -224,7 +212,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result, stream-error> + ) -> result> /// Forward the entire contents of an input stream to an output stream. /// @@ -242,13 +230,24 @@ interface streams { this: output-stream, /// The stream to read from src: input-stream - ) -> result, stream-error> + ) -> result> /// Create a `pollable` which will resolve once either the specified stream - /// is ready to accept bytes or the other end of the stream has been closed. + /// is ready to accept bytes or the `stream-state` has become closed. + /// + /// Once the stream-state is closed, this pollable is always ready + /// immediately. + /// + /// The created `pollable` is a child resource of the `output-stream`. + /// Implementations may trap if the `output-stream` is dropped before + /// all derived `pollable`s created with this function are dropped. subscribe-to-output-stream: func(this: output-stream) -> pollable /// Dispose of the specified `output-stream`, after which it may no longer /// be used. + /// Implementations may trap if this `output-stream` is dropped while + /// child `pollable` resources are still alive. + /// After this `output-stream` is dropped, implementations may report any + /// corresponding `input-stream` has `stream-state.closed`. drop-output-stream: func(this: output-stream) }