diff --git a/crates/test-programs/src/bin/cli_serve_with_print.rs b/crates/test-programs/src/bin/cli_serve_with_print.rs new file mode 100644 index 0000000000..424eb841cf --- /dev/null +++ b/crates/test-programs/src/bin/cli_serve_with_print.rs @@ -0,0 +1,30 @@ +use std::io::Write; +use test_programs::proxy; +use test_programs::wasi::http::types::{ + Fields, IncomingRequest, OutgoingResponse, ResponseOutparam, +}; + +struct T; + +proxy::export!(T); + +impl proxy::exports::wasi::http::incoming_handler::Guest for T { + fn handle(_request: IncomingRequest, outparam: ResponseOutparam) { + print!("this is half a print "); + std::io::stdout().flush().unwrap(); + println!("to stdout"); + println!(); // empty line + println!("after empty"); + + eprint!("this is half a print "); + std::io::stderr().flush().unwrap(); + eprintln!("to stderr"); + eprintln!(); // empty line + eprintln!("after empty"); + + let resp = OutgoingResponse::new(Fields::new()); + ResponseOutparam::set(outparam, Ok(resp)); + } +} + +fn main() {} diff --git a/src/commands/serve.rs b/src/commands/serve.rs index 8a200ee093..001688ce70 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -127,15 +127,15 @@ impl ServeCommand { builder.env("REQUEST_ID", req_id.to_string()); - builder.stdout(LogStream { - prefix: format!("stdout [{req_id}] :: "), - output: Output::Stdout, - }); + builder.stdout(LogStream::new( + format!("stdout [{req_id}] :: "), + Output::Stdout, + )); - builder.stderr(LogStream { - prefix: format!("stderr [{req_id}] :: "), - output: Output::Stderr, - }); + builder.stderr(LogStream::new( + format!("stderr [{req_id}] :: "), + Output::Stderr, + )); let mut host = Host { table: wasmtime::component::ResourceTable::new(), @@ -470,6 +470,17 @@ impl Output { struct LogStream { prefix: String, output: Output, + needs_prefix_on_next_write: bool, +} + +impl LogStream { + fn new(prefix: String, output: Output) -> LogStream { + LogStream { + prefix, + output, + needs_prefix_on_next_write: true, + } + } } impl wasmtime_wasi::StdoutStream for LogStream { @@ -489,19 +500,34 @@ impl wasmtime_wasi::StdoutStream for LogStream { impl wasmtime_wasi::HostOutputStream for LogStream { fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> { - let mut msg = Vec::new(); - - for line in bytes.split(|c| *c == b'\n') { - if !line.is_empty() { - msg.extend_from_slice(&self.prefix.as_bytes()); - msg.extend_from_slice(line); - msg.push(b'\n'); + let mut bytes = &bytes[..]; + + while !bytes.is_empty() { + if self.needs_prefix_on_next_write { + self.output + .write_all(self.prefix.as_bytes()) + .map_err(StreamError::LastOperationFailed)?; + self.needs_prefix_on_next_write = false; + } + match bytes.iter().position(|b| *b == b'\n') { + Some(i) => { + let (a, b) = bytes.split_at(i + 1); + bytes = b; + self.output + .write_all(a) + .map_err(StreamError::LastOperationFailed)?; + self.needs_prefix_on_next_write = true; + } + None => { + self.output + .write_all(bytes) + .map_err(StreamError::LastOperationFailed)?; + break; + } } } - self.output - .write_all(&msg) - .map_err(StreamError::LastOperationFailed) + Ok(()) } fn flush(&mut self) -> StreamResult<()> { diff --git a/tests/all/cli_tests.rs b/tests/all/cli_tests.rs index 20078eb983..069a0e532e 100644 --- a/tests/all/cli_tests.rs +++ b/tests/all/cli_tests.rs @@ -1507,7 +1507,7 @@ mod test_programs { } /// Completes this server gracefully by printing the output on failure. - fn finish(mut self) -> Result { + fn finish(mut self) -> Result<(String, String)> { let mut child = self.child.take().unwrap(); // If the child process has already exited then collect the output @@ -1525,7 +1525,10 @@ mod test_programs { bail!("child failed {output:?}"); } - Ok(String::from_utf8_lossy(&output.stderr).into_owned()) + Ok(( + String::from_utf8_lossy(&output.stdout).into_owned(), + String::from_utf8_lossy(&output.stderr).into_owned(), + )) } /// Send a request to this server and wait for the response. @@ -1660,7 +1663,7 @@ mod test_programs { ) .await; assert!(result.is_err()); - let stderr = server.finish()?; + let (_, stderr) = server.finish()?; assert!( stderr.contains("maximum concurrent memory limit of 0 reached"), "bad stderr: {stderr}", @@ -1766,6 +1769,51 @@ mod test_programs { Ok(()) } + + #[tokio::test] + async fn cli_serve_with_print() -> Result<()> { + let server = WasmtimeServe::new(CLI_SERVE_WITH_PRINT_COMPONENT, |cmd| { + cmd.arg("-Scli"); + })?; + + for _ in 0..2 { + let resp = server + .send_request( + hyper::Request::builder() + .uri("http://localhost/") + .body(String::new()) + .context("failed to make request")?, + ) + .await?; + assert!(resp.status().is_success()); + } + + let (out, err) = server.finish()?; + assert_eq!( + out, + "\ +stdout [0] :: this is half a print to stdout +stdout [0] :: \n\ +stdout [0] :: after empty +stdout [1] :: this is half a print to stdout +stdout [1] :: \n\ +stdout [1] :: after empty +" + ); + assert_eq!( + err, + "\ +stderr [0] :: this is half a print to stderr +stderr [0] :: \n\ +stderr [0] :: after empty +stderr [1] :: this is half a print to stderr +stderr [1] :: \n\ +stderr [1] :: after empty +" + ); + + Ok(()) + } } #[test]