@ -6,7 +6,11 @@ use crate::bindings::io::streams;
use crate ::pipe ;
use crate ::{ HostInputStream , HostOutputStream , StreamError , StreamResult , Subscribe , WasiView } ;
use bytes ::Bytes ;
use std ::future ::Future ;
use std ::io ::IsTerminal ;
use std ::pin ::Pin ;
use std ::sync ::{ Arc , Mutex } ;
use std ::task ::{ Context , Poll } ;
use wasmtime ::component ::Resource ;
/// A trait used to represent the standard input to a guest program.
@ -54,6 +58,52 @@ impl StdinStream for pipe::ClosedInputStream {
}
}
/// An impl of [`StdinStream`] built on top of [`crate::pipe::AsyncReadStream`].
pub struct AsyncStdinStream ( Arc < Mutex < crate ::pipe ::AsyncReadStream > > ) ;
impl AsyncStdinStream {
pub fn new ( s : crate ::pipe ::AsyncReadStream ) -> Self {
Self ( Arc ::new ( Mutex ::new ( s ) ) )
}
}
impl StdinStream for AsyncStdinStream {
fn stream ( & self ) -> Box < dyn HostInputStream > {
Box ::new ( Self ( self . 0. clone ( ) ) )
}
fn isatty ( & self ) -> bool {
false
}
}
impl HostInputStream for AsyncStdinStream {
fn read ( & mut self , size : usize ) -> Result < bytes ::Bytes , crate ::StreamError > {
self . 0. lock ( ) . unwrap ( ) . read ( size )
}
fn skip ( & mut self , size : usize ) -> Result < usize , crate ::StreamError > {
self . 0. lock ( ) . unwrap ( ) . skip ( size )
}
}
impl Subscribe for AsyncStdinStream {
fn ready < 'a , 'b > ( & 'a mut self ) -> Pin < Box < dyn Future < Output = ( ) > + Send + 'b > >
where
Self : 'b ,
'a : 'b ,
{
struct F ( AsyncStdinStream ) ;
impl Future for F {
type Output = ( ) ;
fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self ::Output > {
let mut inner = self . 0 . 0. lock ( ) . unwrap ( ) ;
let mut fut = inner . ready ( ) ;
fut . as_mut ( ) . poll ( cx )
}
}
Box ::pin ( F ( Self ( self . 0. clone ( ) ) ) )
}
}
mod worker_thread_stdin ;
pub use self ::worker_thread_stdin ::{ stdin , Stdin } ;
@ -181,6 +231,72 @@ impl Subscribe for OutputStream {
async fn ready ( & mut self ) { }
}
/// A wrapper of [`crate::pipe::AsyncWriteStream`] that implements
/// [`StdoutStream`]. Note that the [`HostOutputStream`] impl for this is not
/// correct when used for interleaved async IO.
pub struct AsyncStdoutStream ( Arc < Mutex < crate ::pipe ::AsyncWriteStream > > ) ;
impl AsyncStdoutStream {
pub fn new ( s : crate ::pipe ::AsyncWriteStream ) -> Self {
Self ( Arc ::new ( Mutex ::new ( s ) ) )
}
}
impl StdoutStream for AsyncStdoutStream {
fn stream ( & self ) -> Box < dyn HostOutputStream > {
Box ::new ( Self ( self . 0. clone ( ) ) )
}
fn isatty ( & self ) -> bool {
false
}
}
// This implementation is known to be bogus. All check-writes and writes are
// directed at the same underlying stream. The check-write/write protocol does
// require the size returned by a check-write to be accepted by write, even if
// other side-effects happen between those calls, and this implementation
// permits another view (created by StdoutStream::stream()) of the same
// underlying stream to accept a write which will invalidate a prior
// check-write of another view.
// Ultimately, the Std{in,out}Stream::stream() methods exist because many
// different places in a linked component (which may itself contain many
// modules) may need to access stdio without any coordination to keep those
// accesses all using pointing to the same resource. So, we allow many
// resources to be created. We have the reasonable expectation that programs
// won't attempt to interleave async IO from these disparate uses of stdio.
// If that expectation doesn't turn out to be true, and you find yourself at
// this comment to correct it: sorry about that.
impl HostOutputStream for AsyncStdoutStream {
fn check_write ( & mut self ) -> Result < usize , StreamError > {
self . 0. lock ( ) . unwrap ( ) . check_write ( )
}
fn write ( & mut self , bytes : Bytes ) -> Result < ( ) , StreamError > {
self . 0. lock ( ) . unwrap ( ) . write ( bytes )
}
fn flush ( & mut self ) -> Result < ( ) , StreamError > {
self . 0. lock ( ) . unwrap ( ) . flush ( )
}
}
impl Subscribe for AsyncStdoutStream {
fn ready < 'a , 'b > ( & 'a mut self ) -> Pin < Box < dyn Future < Output = ( ) > + Send + 'b > >
where
Self : 'b ,
'a : 'b ,
{
struct F ( AsyncStdoutStream ) ;
impl Future for F {
type Output = ( ) ;
fn poll ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self ::Output > {
let mut inner = self . 0 . 0. lock ( ) . unwrap ( ) ;
let mut fut = inner . ready ( ) ;
fut . as_mut ( ) . poll ( cx )
}
}
Box ::pin ( F ( Self ( self . 0. clone ( ) ) ) )
}
}
#[ derive(Debug, Clone, Copy, PartialEq, Eq) ]
pub enum IsATTY {
Yes ,
@ -255,3 +371,73 @@ impl<T: WasiView> terminal_stderr::Host for T {
}
}
}
#[ cfg(test) ]
mod test {
#[ test ]
fn memory_stdin_stream ( ) {
// A StdinStream has the property that there are multiple
// HostInputStreams created, using the stream() method which are each
// views on the same shared state underneath. Consuming input on one
// stream results in consuming that input on all streams.
//
// The simplest way to measure this is to check if the MemoryInputPipe
// impl of StdinStream follows this property.
let pipe = super ::pipe ::MemoryInputPipe ::new (
"the quick brown fox jumped over the three lazy dogs" ,
) ;
use super ::StdinStream ;
let mut view1 = pipe . stream ( ) ;
let mut view2 = pipe . stream ( ) ;
let read1 = view1 . read ( 10 ) . expect ( "read first 10 bytes" ) ;
assert_eq ! ( read1 , "the quick " . as_bytes ( ) , "first 10 bytes" ) ;
let read2 = view2 . read ( 10 ) . expect ( "read second 10 bytes" ) ;
assert_eq ! ( read2 , "brown fox " . as_bytes ( ) , "second 10 bytes" ) ;
let read3 = view1 . read ( 10 ) . expect ( "read third 10 bytes" ) ;
assert_eq ! ( read3 , "jumped ove" . as_bytes ( ) , "third 10 bytes" ) ;
let read4 = view2 . read ( 10 ) . expect ( "read fourth 10 bytes" ) ;
assert_eq ! ( read4 , "r the thre" . as_bytes ( ) , "fourth 10 bytes" ) ;
}
#[ tokio::test ]
async fn async_stdin_stream ( ) {
// A StdinStream has the property that there are multiple
// HostInputStreams created, using the stream() method which are each
// views on the same shared state underneath. Consuming input on one
// stream results in consuming that input on all streams.
//
// AsyncStdinStream is a slightly more complex impl of StdinStream
// than the MemoryInputPipe above. We can create an AsyncReadStream
// from a file on the disk, and an AsyncStdinStream from that common
// stream, then check that the same property holds as above.
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let mut path = std ::path ::PathBuf ::from ( dir . path ( ) ) ;
path . push ( "file" ) ;
std ::fs ::write ( & path , "the quick brown fox jumped over the three lazy dogs" ) . unwrap ( ) ;
let file = tokio ::fs ::File ::open ( & path )
. await
. expect ( "open created file" ) ;
let stdin_stream = super ::AsyncStdinStream ::new ( crate ::pipe ::AsyncReadStream ::new ( file ) ) ;
use super ::StdinStream ;
let mut view1 = stdin_stream . stream ( ) ;
let mut view2 = stdin_stream . stream ( ) ;
view1 . ready ( ) . await ;
let read1 = view1 . read ( 10 ) . expect ( "read first 10 bytes" ) ;
assert_eq ! ( read1 , "the quick " . as_bytes ( ) , "first 10 bytes" ) ;
let read2 = view2 . read ( 10 ) . expect ( "read second 10 bytes" ) ;
assert_eq ! ( read2 , "brown fox " . as_bytes ( ) , "second 10 bytes" ) ;
let read3 = view1 . read ( 10 ) . expect ( "read third 10 bytes" ) ;
assert_eq ! ( read3 , "jumped ove" . as_bytes ( ) , "third 10 bytes" ) ;
let read4 = view2 . read ( 10 ) . expect ( "read fourth 10 bytes" ) ;
assert_eq ! ( read4 , "r the thre" . as_bytes ( ) , "fourth 10 bytes" ) ;
}
}