Browse Source

preview2: Use blocking io for stdout and stderr (#7388)

* Use blocking io for stdout and stderr

* Document `Stdout` and `Stderr` as blocking on writes
pull/7446/head
Trevor Elliott 1 year ago
committed by GitHub
parent
commit
76d35f9f8e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      crates/wasi/src/preview2/stdio.rs

65
crates/wasi/src/preview2/stdio.rs

@ -3,8 +3,11 @@ use crate::preview2::bindings::cli::{
terminal_stdout,
};
use crate::preview2::bindings::io::streams;
use crate::preview2::pipe::{self, AsyncWriteStream};
use crate::preview2::{HostInputStream, HostOutputStream, WasiView};
use crate::preview2::pipe;
use crate::preview2::{
HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, WasiView,
};
use bytes::Bytes;
use std::io::IsTerminal;
use wasmtime::component::Resource;
@ -56,12 +59,6 @@ impl StdinStream for pipe::ClosedInputStream {
mod worker_thread_stdin;
pub use self::worker_thread_stdin::{stdin, Stdin};
// blocking-write-and-flush must accept 4k. It doesn't seem likely that we need to
// buffer more than that to implement a wrapper on the host process's stdio. If users
// really need more, they can write their own implementation using AsyncWriteStream
// and tokio's stdout/err.
const STDIO_BUFFER_SIZE: usize = 4096;
/// Similar to [`StdinStream`], except for output.
pub trait StdoutStream: Send + Sync {
/// Returns a fresh new stream which can write to this output stream.
@ -112,6 +109,10 @@ impl StdoutStream for pipe::ClosedOutputStream {
}
}
/// This implementation will yield output streams that block on writes, as they
/// inherit the implementation directly from the rust std library. A different
/// implementation of [`StdoutStream`] will be necessary if truly async output
/// streams are required.
pub struct Stdout;
pub fn stdout() -> Stdout {
@ -120,10 +121,7 @@ pub fn stdout() -> Stdout {
impl StdoutStream for Stdout {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(AsyncWriteStream::new(
STDIO_BUFFER_SIZE,
tokio::io::stdout(),
))
Box::new(OutputStream::Stdout)
}
fn isatty(&self) -> bool {
@ -131,6 +129,10 @@ impl StdoutStream for Stdout {
}
}
/// This implementation will yield output streams that block on writes, as they
/// inherit the implementation directly from the rust std library. A different
/// implementation of [`StdoutStream`] will be necessary if truly async output
/// streams are required.
pub struct Stderr;
pub fn stderr() -> Stderr {
@ -139,10 +141,7 @@ pub fn stderr() -> Stderr {
impl StdoutStream for Stderr {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(AsyncWriteStream::new(
STDIO_BUFFER_SIZE,
tokio::io::stderr(),
))
Box::new(OutputStream::Stderr)
}
fn isatty(&self) -> bool {
@ -150,6 +149,40 @@ impl StdoutStream for Stderr {
}
}
enum OutputStream {
Stdout,
Stderr,
}
impl HostOutputStream for OutputStream {
fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
use std::io::Write;
match self {
OutputStream::Stdout => std::io::stdout().write_all(&bytes),
OutputStream::Stderr => std::io::stderr().write_all(&bytes),
}
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}
fn flush(&mut self) -> StreamResult<()> {
use std::io::Write;
match self {
OutputStream::Stdout => std::io::stdout().flush(),
OutputStream::Stderr => std::io::stderr().flush(),
}
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}
fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
}
}
#[async_trait::async_trait]
impl Subscribe for OutputStream {
async fn ready(&mut self) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsATTY {
Yes,

Loading…
Cancel
Save