Browse Source

stream wit definition eliminates stream-error (#6846)

* stream wit definition eliminates stream-error

In https://github.com/WebAssembly/wasi-io/pull/38 we got review feedback
to eliminate the stream-error in favor of the empty error in wit
`result<a>`.

This means we cant use trappable error anymore, and therefore leads to
all this other unsightly transformation of the streams trait definition
and all its call sites.

We'll fix the wasmtime-wit-bindgen macro to support this case better in
the future, but rn we gotta stay synchronized with upstream

On the upside this showed us that the host stream trait design doesnt
differentiate between a runtime and a trapping error, so lets fix that
next

introduce a StreamRuntimeError, use it in filesystem streams

and fix an incorrect error transformation in the filesystem read impl

fill in fixmes for distinguishing a stream runtime error

delete outdated fixmes: downcast is now guaranteed by child resource tracking

* dont try to detect rustix io error - just call all read/write errors runtime

I don't think we should trap on any of the errors possible here,
reporting them as failures is sufficient
pull/6858/head
Pat Hickey 1 year ago
committed by GitHub
parent
commit
96e3372ec3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      crates/wasi/src/preview2/command.rs
  2. 14
      crates/wasi/src/preview2/filesystem.rs
  3. 7
      crates/wasi/src/preview2/host/filesystem.rs
  4. 322
      crates/wasi/src/preview2/host/io.rs
  5. 23
      crates/wasi/src/preview2/mod.rs
  6. 54
      crates/wasi/src/preview2/preview1.rs
  7. 39
      crates/wasi/src/preview2/stream.rs
  8. 127
      crates/wasi/wit/deps/io/streams.wit

2
crates/wasi/src/preview2/command.rs

@ -6,7 +6,6 @@ wasmtime::component::bindgen!({
async: true,
trappable_error_type: {
"wasi:filesystem/types"::"error-code": Error,
"wasi:io/streams"::"stream-error": Error,
},
with: {
"wasi:filesystem/types": crate::preview2::bindings::filesystem::types,
@ -61,7 +60,6 @@ pub mod sync {
async: false,
trappable_error_type: {
"wasi:filesystem/types"::"error-code": Error,
"wasi:io/streams"::"stream-error": Error,
},
with: {
"wasi:filesystem/types": crate::preview2::bindings::sync_io::filesystem::types,

14
crates/wasi/src/preview2/filesystem.rs

@ -1,4 +1,4 @@
use crate::preview2::{StreamState, Table, TableError};
use crate::preview2::{StreamRuntimeError, StreamState, Table, TableError};
use bytes::{Bytes, BytesMut};
use std::sync::Arc;
@ -152,24 +152,20 @@ impl FileInputStream {
}
}
pub(crate) fn read_result(
r: Result<usize, std::io::Error>,
) -> Result<(usize, StreamState), std::io::Error> {
fn read_result(r: Result<usize, std::io::Error>) -> Result<(usize, StreamState), anyhow::Error> {
match r {
Ok(0) => Ok((0, StreamState::Closed)),
Ok(n) => Ok((n, StreamState::Open)),
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => Ok((0, StreamState::Open)),
Err(e) => Err(e),
Err(e) => Err(StreamRuntimeError::from(anyhow::anyhow!(e)).into()),
}
}
pub(crate) fn write_result(
r: Result<usize, std::io::Error>,
) -> Result<(usize, StreamState), std::io::Error> {
fn write_result(r: Result<usize, std::io::Error>) -> Result<(usize, StreamState), anyhow::Error> {
match r {
Ok(0) => Ok((0, StreamState::Closed)),
Ok(n) => Ok((n, StreamState::Open)),
Err(e) => Err(e),
Err(e) => Err(StreamRuntimeError::from(anyhow::anyhow!(e)).into()),
}
}

7
crates/wasi/src/preview2/host/filesystem.rs

@ -209,7 +209,10 @@ impl<T: WasiView> types::Host for T {
})
.await;
let (bytes_read, state) = crate::preview2::filesystem::read_result(r)?;
let (bytes_read, state) = match r? {
0 => (0, true),
n => (n, false),
};
buffer.truncate(
bytes_read
@ -217,7 +220,7 @@ impl<T: WasiView> types::Host for T {
.expect("bytes read into memory as u64 fits in usize"),
);
Ok((buffer, state.is_closed()))
Ok((buffer, state))
}
async fn write(

322
crates/wasi/src/preview2/host/io.rs

@ -1,32 +1,16 @@
use crate::preview2::{
bindings::io::streams::{self, InputStream, OutputStream, StreamError},
bindings::io::streams::{self, InputStream, OutputStream},
bindings::poll::poll::Pollable,
filesystem::{FileInputStream, FileOutputStream},
poll::PollableFuture,
stream::{
HostInputStream, HostOutputStream, InternalInputStream, InternalOutputStream,
InternalTableStreamExt, StreamState,
InternalTableStreamExt, StreamRuntimeError, StreamState,
},
HostPollable, TableError, TablePollableExt, WasiView,
HostPollable, TablePollableExt, WasiView,
};
use anyhow::anyhow;
use std::any::Any;
impl From<anyhow::Error> for streams::Error {
fn from(error: anyhow::Error) -> streams::Error {
tracing::trace!(
"turning anyhow::Error in the streams interface into the empty error result: {error:?}"
);
StreamError { dummy: 0 }.into()
}
}
impl From<TableError> for streams::Error {
fn from(error: TableError) -> streams::Error {
streams::Error::trap(anyhow!(error))
}
}
impl From<StreamState> for streams::StreamStatus {
fn from(state: StreamState) -> Self {
match state {
@ -54,17 +38,37 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: InputStream,
len: u64,
) -> Result<(Vec<u8>, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(stream)? {
InternalInputStream::Host(s) => {
let (bytes, state) = HostInputStream::read(s.as_mut(), len as usize)?;
let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
debug_assert!(bytes.len() <= len as usize);
Ok((bytes.into(), state.into()))
Ok(Ok((bytes.into(), state.into())))
}
InternalInputStream::File(s) => {
let (bytes, state) = FileInputStream::read(s, len as usize).await?;
Ok((bytes.into(), state.into()))
let (bytes, state) = match FileInputStream::read(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes.into(), state.into())))
}
}
}
@ -73,17 +77,37 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: InputStream,
len: u64,
) -> Result<(Vec<u8>, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(stream)? {
InternalInputStream::Host(s) => {
s.ready().await?;
let (bytes, state) = HostInputStream::read(s.as_mut(), len as usize)?;
let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
debug_assert!(bytes.len() <= len as usize);
Ok((bytes.into(), state.into()))
Ok(Ok((bytes.into(), state.into())))
}
InternalInputStream::File(s) => {
let (bytes, state) = FileInputStream::read(s, len as usize).await?;
Ok((bytes.into(), state.into()))
let (bytes, state) = match FileInputStream::read(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes.into(), state.into())))
}
}
}
@ -92,15 +116,26 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: OutputStream,
bytes: Vec<u8>,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_output_stream_mut(stream)? {
InternalOutputStream::Host(s) => {
let (bytes_written, status) = HostOutputStream::write(s.as_mut(), bytes.into())?;
Ok((u64::try_from(bytes_written).unwrap(), status.into()))
let (bytes_written, status) =
match HostOutputStream::write(s.as_mut(), bytes.into()) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((u64::try_from(bytes_written).unwrap(), status.into())))
}
InternalOutputStream::File(s) => {
let (nwritten, state) = FileOutputStream::write(s, bytes.into()).await?;
Ok((nwritten as u64, state.into()))
Ok(Ok((nwritten as u64, state.into())))
}
}
}
@ -109,24 +144,35 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: OutputStream,
bytes: Vec<u8>,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_output_stream_mut(stream)? {
InternalOutputStream::Host(s) => {
let mut bytes = bytes::Bytes::from(bytes);
let mut nwritten: usize = 0;
loop {
s.ready().await?;
let (written, state) = HostOutputStream::write(s.as_mut(), bytes.clone())?;
let (written, state) = match HostOutputStream::write(s.as_mut(), bytes.clone())
{
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
let _ = bytes.split_to(written);
nwritten += written;
if bytes.is_empty() || state == StreamState::Closed {
return Ok((nwritten as u64, state.into()));
return Ok(Ok((nwritten as u64, state.into())));
}
}
}
InternalOutputStream::File(s) => {
let (written, state) = FileOutputStream::write(s, bytes.into()).await?;
Ok((written as u64, state.into()))
Ok(Ok((written as u64, state.into())))
}
}
}
@ -135,17 +181,37 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: InputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(stream)? {
InternalInputStream::Host(s) => {
// TODO: the cast to usize should be fallible, use `.try_into()?`
let (bytes_skipped, state) = HostInputStream::skip(s.as_mut(), len as usize)?;
let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok((bytes_skipped as u64, state.into()))
Ok(Ok((bytes_skipped as u64, state.into())))
}
InternalInputStream::File(s) => {
let (bytes_skipped, state) = FileInputStream::skip(s, len as usize).await?;
Ok((bytes_skipped as u64, state.into()))
let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes_skipped as u64, state.into())))
}
}
}
@ -154,18 +220,38 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: InputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(stream)? {
InternalInputStream::Host(s) => {
s.ready().await?;
// TODO: the cast to usize should be fallible, use `.try_into()?`
let (bytes_skipped, state) = HostInputStream::skip(s.as_mut(), len as usize)?;
let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok((bytes_skipped as u64, state.into()))
Ok(Ok((bytes_skipped as u64, state.into())))
}
InternalInputStream::File(s) => {
let (bytes_skipped, state) = FileInputStream::skip(s, len as usize).await?;
Ok((bytes_skipped as u64, state.into()))
let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes_skipped as u64, state.into())))
}
}
}
@ -174,22 +260,42 @@ impl<T: WasiView> streams::Host for T {
&mut self,
stream: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
let s = self.table_mut().get_internal_output_stream_mut(stream)?;
let mut bytes = bytes::Bytes::from_static(ZEROS);
bytes.truncate((len as usize).min(bytes.len()));
let (written, state) = match s {
InternalOutputStream::Host(s) => HostOutputStream::write(s.as_mut(), bytes)?,
InternalOutputStream::File(s) => FileOutputStream::write(s, bytes).await?,
InternalOutputStream::Host(s) => match HostOutputStream::write(s.as_mut(), bytes) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
},
InternalOutputStream::File(s) => match FileOutputStream::write(s, bytes).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
},
};
Ok((written as u64, state.into()))
Ok(Ok((written as u64, state.into())))
}
async fn blocking_write_zeroes(
&mut self,
stream: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
let mut remaining = len as usize;
let s = self.table_mut().get_internal_output_stream_mut(stream)?;
loop {
@ -199,12 +305,32 @@ impl<T: WasiView> streams::Host for T {
let mut bytes = bytes::Bytes::from_static(ZEROS);
bytes.truncate(remaining.min(bytes.len()));
let (written, state) = match s {
InternalOutputStream::Host(s) => HostOutputStream::write(s.as_mut(), bytes)?,
InternalOutputStream::File(s) => FileOutputStream::write(s, bytes).await?,
InternalOutputStream::Host(s) => match HostOutputStream::write(s.as_mut(), bytes) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
},
InternalOutputStream::File(s) => match FileOutputStream::write(s, bytes).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
},
};
remaining -= written;
if remaining == 0 || state == StreamState::Closed {
return Ok((len - remaining as u64, state.into()));
return Ok(Ok((len - remaining as u64, state.into())));
}
}
}
@ -214,7 +340,7 @@ impl<T: WasiView> streams::Host for T {
_src: InputStream,
_dst: OutputStream,
_len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
// TODO: We can't get two streams at the same time because they both
// carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is
// stabilized, that could allow us to add a `get_many_stream_mut` or
@ -243,7 +369,7 @@ impl<T: WasiView> streams::Host for T {
_src: InputStream,
_dst: OutputStream,
_len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
// TODO: once splice is implemented, figure out what the blocking semantics are for waiting
// on src and dest here.
todo!("stream splice is not implemented")
@ -253,7 +379,7 @@ impl<T: WasiView> streams::Host for T {
&mut self,
_src: InputStream,
_dst: OutputStream,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
// TODO: We can't get two streams at the same time because they both
// carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is
// stabilized, that could allow us to add a `get_many_stream_mut` or
@ -283,10 +409,6 @@ impl<T: WasiView> streams::Host for T {
let pollable = match self.table_mut().get_internal_input_stream_mut(stream)? {
InternalInputStream::Host(_) => {
fn input_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> {
// FIXME: This downcast and match should be guaranteed by the checks above,
// however, the table element at index could be changed which would make this
// panic! This is a known problem with referring to other resources in the
// table which must be fixed.
let stream = stream
.downcast_mut::<InternalInputStream>()
.expect("downcast to InternalInputStream failed");
@ -318,10 +440,6 @@ impl<T: WasiView> streams::Host for T {
let pollable = match self.table_mut().get_internal_output_stream_mut(stream)? {
InternalOutputStream::Host(_) => {
fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> {
// FIXME: This downcast and match should be guaranteed by the checks above,
// however, the table element at index could be changed which would make this
// panic! This is a known problem with referring to other resources in the
// table which must be fixed.
let stream = stream
.downcast_mut::<InternalOutputStream>()
.expect("downcast to HostOutputStream failed");
@ -353,6 +471,12 @@ pub mod sync {
in_tokio, WasiView,
};
// same boilerplate everywhere, converting between two identical types with different
// definition sites. one day wasmtime-wit-bindgen will make all this unnecessary
fn xform<A>(r: Result<(A, AsyncStreamStatus), ()>) -> Result<(A, streams::StreamStatus), ()> {
r.map(|(a, b)| (a, b.into()))
}
impl From<AsyncStreamStatus> for streams::StreamStatus {
fn from(other: AsyncStreamStatus) -> Self {
match other {
@ -375,80 +499,64 @@ pub mod sync {
&mut self,
stream: InputStream,
len: u64,
) -> Result<(Vec<u8>, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::read(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::read(self, stream, len).await }).map(xform)
}
fn blocking_read(
&mut self,
stream: InputStream,
len: u64,
) -> Result<(Vec<u8>, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::blocking_read(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::blocking_read(self, stream, len).await }).map(xform)
}
fn write(
&mut self,
stream: OutputStream,
bytes: Vec<u8>,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::write(self, stream, bytes).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::write(self, stream, bytes).await }).map(xform)
}
fn blocking_write(
&mut self,
stream: OutputStream,
bytes: Vec<u8>,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::blocking_write(self, stream, bytes).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::blocking_write(self, stream, bytes).await }).map(xform)
}
fn skip(
&mut self,
stream: InputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::skip(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::skip(self, stream, len).await }).map(xform)
}
fn blocking_skip(
&mut self,
stream: InputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::blocking_skip(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::blocking_skip(self, stream, len).await }).map(xform)
}
fn write_zeroes(
&mut self,
stream: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::write_zeroes(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::write_zeroes(self, stream, len).await }).map(xform)
}
fn blocking_write_zeroes(
&mut self,
stream: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::blocking_write_zeroes(self, stream, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::blocking_write_zeroes(self, stream, len).await }).map(xform)
}
fn splice(
@ -456,10 +564,8 @@ pub mod sync {
src: InputStream,
dst: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::splice(self, src, dst, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::splice(self, src, dst, len).await }).map(xform)
}
fn blocking_splice(
@ -467,20 +573,16 @@ pub mod sync {
src: InputStream,
dst: OutputStream,
len: u64,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::blocking_splice(self, src, dst, len).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::blocking_splice(self, src, dst, len).await }).map(xform)
}
fn forward(
&mut self,
src: InputStream,
dst: OutputStream,
) -> Result<(u64, streams::StreamStatus), streams::Error> {
in_tokio(async { AsyncHost::forward(self, src, dst).await })
.map(|(a, b)| (a, b.into()))
.map_err(streams::Error::from)
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
in_tokio(async { AsyncHost::forward(self, src, dst).await }).map(xform)
}
fn subscribe_to_input_stream(&mut self, stream: InputStream) -> anyhow::Result<Pollable> {

23
crates/wasi/src/preview2/mod.rs

@ -37,7 +37,9 @@ pub use self::filesystem::{DirPerms, FilePerms};
pub use self::poll::{ClosureFuture, HostPollable, MakeFuture, PollableFuture, TablePollableExt};
pub use self::random::{thread_rng, Deterministic};
pub use self::stdio::{stderr, stdin, stdout, IsATTY, Stderr, Stdin, Stdout};
pub use self::stream::{HostInputStream, HostOutputStream, StreamState, TableStreamExt};
pub use self::stream::{
HostInputStream, HostOutputStream, StreamRuntimeError, StreamState, TableStreamExt,
};
pub use self::table::{OccupiedEntry, Table, TableError};
pub use cap_fs_ext::SystemTimeSpec;
pub use cap_rand::RngCore;
@ -54,7 +56,6 @@ pub mod bindings {
",
tracing: true,
trappable_error_type: {
"wasi:io/streams"::"stream-error": Error,
"wasi:filesystem/types"::"error-code": Error,
},
with: {
@ -63,22 +64,6 @@ pub mod bindings {
});
}
pub use self::_internal::wasi::{filesystem, io, poll};
impl From<super::io::streams::StreamError> for io::streams::StreamError {
fn from(other: super::io::streams::StreamError) -> Self {
let super::io::streams::StreamError { dummy } = other;
Self { dummy }
}
}
impl From<super::io::streams::Error> for io::streams::Error {
fn from(other: super::io::streams::Error) -> Self {
match other.downcast() {
Ok(se) => io::streams::Error::from(io::streams::StreamError::from(se)),
Err(e) => io::streams::Error::trap(e),
}
}
}
}
pub(crate) mod _internal_clocks {
@ -105,7 +90,6 @@ pub mod bindings {
tracing: true,
async: true,
trappable_error_type: {
"wasi:io/streams"::"stream-error": Error,
"wasi:filesystem/types"::"error-code": Error,
},
with: {
@ -137,7 +121,6 @@ pub mod bindings {
tracing: true,
trappable_error_type: {
"wasi:filesystem/types"::"error-code": Error,
"wasi:io/streams"::"stream-error": Error,
},
with: {
"wasi:clocks/wall-clock": crate::preview2::bindings::clocks::wall_clock,

54
crates/wasi/src/preview2/preview1.rs

@ -431,6 +431,14 @@ impl wiggle::GuestErrorType for types::Errno {
}
}
fn stream_res<A>(r: anyhow::Result<Result<A, ()>>) -> Result<A, types::Error> {
match r {
Ok(Ok(a)) => Ok(a),
Ok(Err(_)) => Err(types::Errno::Io.into()),
Err(trap) => Err(types::Error::trap(trap)),
}
}
fn systimespec(set: bool, ts: types::Timestamp, now: bool) -> Result<filesystem::NewTimestamp> {
if set && now {
Err(types::Errno::Inval.into())
@ -1201,11 +1209,10 @@ impl<
})?;
let max = buf.len().try_into().unwrap_or(u64::MAX);
let (read, state) = if blocking {
streams::Host::blocking_read(self, stream, max).await
stream_res(streams::Host::blocking_read(self, stream, max).await)?
} else {
streams::Host::read(self, stream, max).await
}
.map_err(|_| types::Errno::Io)?;
stream_res(streams::Host::read(self, stream, max).await)?
};
let n = read.len().try_into()?;
let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?;
@ -1217,13 +1224,14 @@ impl<
let Some(buf) = first_non_empty_iovec(iovs)? else {
return Ok(0)
};
let (read, state) = streams::Host::read(
self,
input_stream,
buf.len().try_into().unwrap_or(u64::MAX),
)
.await
.map_err(|_| types::Errno::Io)?;
let (read, state) = stream_res(
streams::Host::read(
self,
input_stream,
buf.len().try_into().unwrap_or(u64::MAX),
)
.await,
)?;
(buf, read, state)
}
_ => return Err(types::Errno::Badf.into()),
@ -1263,11 +1271,10 @@ impl<
})?;
let max = buf.len().try_into().unwrap_or(u64::MAX);
let (read, state) = if blocking {
streams::Host::blocking_read(self, stream, max).await
stream_res(streams::Host::blocking_read(self, stream, max).await)?
} else {
streams::Host::read(self, stream, max).await
}
.map_err(|_| types::Errno::Io)?;
stream_res(streams::Host::read(self, stream, max).await)?
};
(buf, read, state)
}
@ -1325,11 +1332,10 @@ impl<
(stream, position)
};
let (n, _stat) = if blocking {
streams::Host::blocking_write(self, stream, buf).await
stream_res(streams::Host::blocking_write(self, stream, buf).await)?
} else {
streams::Host::write(self, stream, buf).await
}
.map_err(|_| types::Errno::Io)?;
stream_res(streams::Host::write(self, stream, buf).await)?
};
if !append {
let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?;
position.store(pos, Ordering::Relaxed);
@ -1340,9 +1346,8 @@ impl<
let Some(buf) = first_non_empty_ciovec(ciovs)? else {
return Ok(0)
};
let (n, _stat) = streams::Host::blocking_write(self, output_stream, buf)
.await
.map_err(|_| types::Errno::Io)?;
let (n, _stat) =
stream_res(streams::Host::blocking_write(self, output_stream, buf).await)?;
n
}
_ => return Err(types::Errno::Badf.into()),
@ -1372,11 +1377,10 @@ impl<
.unwrap_or_else(types::Error::trap)
})?;
if blocking {
streams::Host::blocking_write(self, stream, buf).await
stream_res(streams::Host::blocking_write(self, stream, buf).await)?
} else {
streams::Host::write(self, stream, buf).await
stream_res(streams::Host::write(self, stream, buf).await)?
}
.map_err(|_| types::Errno::Io)?
}
Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => {
// NOTE: legacy implementation returns SPIPE here

39
crates/wasi/src/preview2/stream.rs

@ -2,6 +2,33 @@ use crate::preview2::filesystem::{FileInputStream, FileOutputStream};
use crate::preview2::{Table, TableError};
use anyhow::Error;
use bytes::Bytes;
use std::fmt;
/// An error which should be reported to Wasm as a runtime error, rather than
/// an error which should trap Wasm execution. The definition for runtime
/// stream errors is the empty type, so the contents of this error will only
/// be available via a `tracing`::event` at `Level::DEBUG`.
pub struct StreamRuntimeError(anyhow::Error);
impl From<anyhow::Error> for StreamRuntimeError {
fn from(e: anyhow::Error) -> Self {
StreamRuntimeError(e)
}
}
impl fmt::Debug for StreamRuntimeError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Stream runtime error: {:?}", self.0)
}
}
impl fmt::Display for StreamRuntimeError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Stream runtime error")
}
}
impl std::error::Error for StreamRuntimeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum StreamState {
@ -22,10 +49,14 @@ pub trait HostInputStream: Send + Sync {
/// Read bytes. On success, returns a pair holding the number of bytes
/// read and a flag indicating whether the end of the stream was reached.
/// Important: this read must be non-blocking!
/// Returning an Err which downcasts to a [`StreamRuntimeError`] will be
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error>;
/// Read bytes from a stream and discard them. Important: this method must
/// be non-blocking!
/// Returning an Error which downcasts to a StreamRuntimeError will be
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
fn skip(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> {
let mut nread = 0;
let mut state = StreamState::Open;
@ -42,6 +73,7 @@ pub trait HostInputStream: Send + Sync {
/// Check for read readiness: this method blocks until the stream is ready
/// for reading.
/// Returning an error will trap execution.
async fn ready(&mut self) -> Result<(), Error>;
}
@ -51,10 +83,14 @@ pub trait HostInputStream: Send + Sync {
pub trait HostOutputStream: Send + Sync {
/// Write bytes. On success, returns the number of bytes written.
/// Important: this write must be non-blocking!
/// Returning an Err which downcasts to a [`StreamRuntimeError`] will be
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
fn write(&mut self, bytes: Bytes) -> Result<(usize, StreamState), Error>;
/// Transfer bytes directly from an input stream to an output stream.
/// Important: this splice must be non-blocking!
/// Returning an Err which downcasts to a [`StreamRuntimeError`] will be
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
fn splice(
&mut self,
src: &mut dyn HostInputStream,
@ -77,6 +113,8 @@ pub trait HostOutputStream: Send + Sync {
/// Repeatedly write a byte to a stream. Important: this write must be
/// non-blocking!
/// Returning an Err which downcasts to a [`StreamRuntimeError`] will be
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
fn write_zeroes(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> {
// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
// repeatedly from a 'static buffer of zeros.
@ -87,6 +125,7 @@ pub trait HostOutputStream: Send + Sync {
/// Check for write readiness: this method blocks until the stream is
/// ready for writing.
/// Returning an error will trap execution.
async fn ready(&mut self) -> Result<(), Error>;
}

127
crates/wasi/wit/deps/io/streams.wit

@ -8,16 +8,6 @@ package wasi:io
interface streams {
use wasi:poll/poll.{pollable}
/// An error type returned from a stream operation.
///
/// TODO: need to figure out the actual contents of this error. Used to be
/// an empty record but that's no longer allowed. The `dummy` field is
/// only here to have this be a valid in the component model by being
/// non-empty.
record stream-error {
dummy: u32,
}
/// Streams provide a sequence of data and then end; once they end, they
/// no longer provide any further data.
///
@ -37,15 +27,12 @@ interface streams {
/// An input bytestream. In the future, this will be replaced by handle
/// types.
///
/// This conceptually represents a `stream<u8, _>`. It's temporary
/// scaffolding until component-model's async features are ready.
///
/// `input-stream`s are *non-blocking* to the extent practical on underlying
/// platforms. I/O operations always return promptly; if fewer bytes are
/// promptly available than requested, they return the number of bytes promptly
/// available, which could even be zero. To wait for data to be available,
/// use the `subscribe-to-input-stream` function to obtain a `pollable` which
/// can be polled for using `wasi_poll`.
/// can be polled for using `wasi:poll/poll.poll_oneoff`.
///
/// And at present, it is a `u32` instead of being an actual handle, until
/// the wit-bindgen implementation of handles and resources is ready.
@ -58,40 +45,36 @@ interface streams {
/// This function returns a list of bytes containing the data that was
/// read, along with a `stream-status` which, indicates whether further
/// reads are expected to produce data. The returned list will contain up to
/// `len` bytes; it may return fewer than requested, but not more.
///
/// Once a stream has reached the end, subsequent calls to read or
/// `skip` will always report end-of-stream rather than producing more
/// `len` bytes; it may return fewer than requested, but not more. An
/// empty list and `stream-status:open` indicates no more data is
/// available at this time, and that the pollable given by
/// `subscribe-to-input-stream` will be ready when more data is available.
///
/// Once a stream has reached the end, subsequent calls to `read` or
/// `skip` will always report `stream-status:ended` rather than producing more
/// data.
///
/// If `len` is 0, it represents a request to read 0 bytes, which should
/// always succeed, assuming the stream hasn't reached its end yet, and
/// return an empty list.
///
/// The len here is a `u64`, but some callees may not be able to allocate
/// a buffer as large as that would imply.
/// FIXME: describe what happens if allocation fails.
/// When the caller gives a `len` of 0, it represents a request to read 0
/// bytes. This read should always succeed and return an empty list and
/// the current `stream-status`.
///
/// When the returned `stream-status` is `open`, the length of the returned
/// value may be less than `len`. When an empty list is returned, this
/// indicates that no more bytes were available from the stream at that
/// time. In that case the subscribe-to-input-stream pollable will indicate
/// when additional bytes are available for reading.
/// The `len` parameter is a `u64`, which could represent a list of u8 which
/// is not possible to allocate in wasm32, or not desirable to allocate as
/// as a return value by the callee. The callee may return a list of bytes
/// less than `len` in size while more bytes are available for reading.
read: func(
this: input-stream,
/// The maximum number of bytes to read
len: u64
) -> result<tuple<list<u8>, stream-status>, stream-error>
) -> result<tuple<list<u8>, stream-status>>
/// Read bytes from a stream, with blocking.
///
/// This is similar to `read`, except that it blocks until at least one
/// byte can be read.
/// Read bytes from a stream, after blocking until at least one byte can
/// be read. Except for blocking, identical to `read`.
blocking-read: func(
this: input-stream,
/// The maximum number of bytes to read
len: u64
) -> result<tuple<list<u8>, stream-status>, stream-error>
) -> result<tuple<list<u8>, stream-status>>
/// Skip bytes from a stream.
///
@ -102,40 +85,42 @@ interface streams {
/// `skip` will always report end-of-stream rather than producing more
/// data.
///
/// This function returns the number of bytes skipped, along with a bool
/// indicating whether the end of the stream was reached. The returned
/// value will be at most `len`; it may be less.
/// This function returns the number of bytes skipped, along with a
/// `stream-status` indicating whether the end of the stream was
/// reached. The returned value will be at most `len`; it may be less.
skip: func(
this: input-stream,
/// The maximum number of bytes to skip.
len: u64,
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Skip bytes from a stream, with blocking.
///
/// This is similar to `skip`, except that it blocks until at least one
/// byte can be consumed.
/// Skip bytes from a stream, after blocking until at least one byte
/// can be skipped. Except for blocking behavior, identical to `skip`.
blocking-skip: func(
this: input-stream,
/// The maximum number of bytes to skip.
len: u64,
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Create a `pollable` which will resolve once either the specified stream
/// has bytes available to read or the other end of the stream has been
/// closed.
/// The created `pollable` is a child resource of the `input-stream`.
/// Implementations may trap if the `input-stream` is dropped before
/// all derived `pollable`s created with this function are dropped.
subscribe-to-input-stream: func(this: input-stream) -> pollable
/// Dispose of the specified `input-stream`, after which it may no longer
/// be used.
/// Implementations may trap if this `input-stream` is dropped while child
/// `pollable` resources are still alive.
/// After this `input-stream` is dropped, implementations may report any
/// corresponding `output-stream` has `stream-state.closed`.
drop-input-stream: func(this: input-stream)
/// An output bytestream. In the future, this will be replaced by handle
/// types.
///
/// This conceptually represents a `stream<u8, _>`. It's temporary
/// scaffolding until component-model's async features are ready.
///
/// `output-stream`s are *non-blocking* to the extent practical on
/// underlying platforms. Except where specified otherwise, I/O operations also
/// always return promptly, after the number of bytes that can be written
@ -159,17 +144,18 @@ interface streams {
/// When the returned `stream-status` is `open`, the `u64` return value may
/// be less than the length of `buf`. This indicates that no more bytes may
/// be written to the stream promptly. In that case the
/// subscribe-to-output-stream pollable will indicate when additional bytes
/// `subscribe-to-output-stream` pollable will indicate when additional bytes
/// may be promptly written.
///
/// TODO: document what happens when an empty list is written
/// Writing an empty list must return a non-error result with `0` for the
/// `u64` return value, and the current `stream-status`.
write: func(
this: output-stream,
/// Data to write
buf: list<u8>
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Write bytes to a stream, with blocking.
/// Blocking write of bytes to a stream.
///
/// This is similar to `write`, except that it blocks until at least one
/// byte can be written.
@ -177,27 +163,29 @@ interface streams {
this: output-stream,
/// Data to write
buf: list<u8>
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Write multiple zero bytes to a stream.
/// Write multiple zero-bytes to a stream.
///
/// This function returns a `u64` indicating the number of zero bytes
/// that were written; it may be less than `len`.
/// This function returns a `u64` indicating the number of zero-bytes
/// that were written; it may be less than `len`. Equivelant to a call to
/// `write` with a list of zeroes of the given length.
write-zeroes: func(
this: output-stream,
/// The number of zero bytes to write
/// The number of zero-bytes to write
len: u64
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Write multiple zero bytes to a stream, with blocking.
///
/// This is similar to `write-zeroes`, except that it blocks until at least
/// one byte can be written.
/// one byte can be written. Equivelant to a call to `blocking-write` with
/// a list of zeroes of the given length.
blocking-write-zeroes: func(
this: output-stream,
/// The number of zero bytes to write
len: u64
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Read from one stream and write to another.
///
@ -212,7 +200,7 @@ interface streams {
src: input-stream,
/// The number of bytes to splice
len: u64,
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Read from one stream and write to another, with blocking.
///
@ -224,7 +212,7 @@ interface streams {
src: input-stream,
/// The number of bytes to splice
len: u64,
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Forward the entire contents of an input stream to an output stream.
///
@ -242,13 +230,24 @@ interface streams {
this: output-stream,
/// The stream to read from
src: input-stream
) -> result<tuple<u64, stream-status>, stream-error>
) -> result<tuple<u64, stream-status>>
/// Create a `pollable` which will resolve once either the specified stream
/// is ready to accept bytes or the other end of the stream has been closed.
/// is ready to accept bytes or the `stream-state` has become closed.
///
/// Once the stream-state is closed, this pollable is always ready
/// immediately.
///
/// The created `pollable` is a child resource of the `output-stream`.
/// Implementations may trap if the `output-stream` is dropped before
/// all derived `pollable`s created with this function are dropped.
subscribe-to-output-stream: func(this: output-stream) -> pollable
/// Dispose of the specified `output-stream`, after which it may no longer
/// be used.
/// Implementations may trap if this `output-stream` is dropped while
/// child `pollable` resources are still alive.
/// After this `output-stream` is dropped, implementations may report any
/// corresponding `input-stream` has `stream-state.closed`.
drop-output-stream: func(this: output-stream)
}

Loading…
Cancel
Save