Browse Source

Add `wasi_http_double_echo` test (#7412)

This tests streaming outgoing request bodies, which we didn't previously have
coverage for.

Signed-off-by: Joel Dice <joel.dice@fermyon.com>
pull/7443/head
Joel Dice 1 year ago
committed by GitHub
parent
commit
8aec021480
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 147
      crates/test-programs/src/bin/api_proxy_streaming.rs
  2. 88
      crates/wasi-http/tests/all/main.rs

147
crates/test-programs/src/bin/api_proxy_streaming.rs

@ -1,9 +1,9 @@
use anyhow::{bail, Result};
use bindings::wasi::http::types::{
Fields, IncomingRequest, Method, OutgoingBody, OutgoingRequest, OutgoingResponse,
ResponseOutparam, Scheme,
Fields, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest,
OutgoingResponse, ResponseOutparam, Scheme,
};
use futures::{stream, SinkExt, StreamExt, TryStreamExt};
use futures::{future, stream, Future, SinkExt, StreamExt, TryStreamExt};
use url::Url;
mod bindings {
@ -35,6 +35,9 @@ async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam
match (request.method(), request.path_with_query().as_deref()) {
(Method::Get, Some("/hash-all")) => {
// Send outgoing GET requests to the specified URLs and stream the hashes of the response bodies as
// they arrive.
let urls = headers.iter().filter_map(|(k, v)| {
(k == "url")
.then_some(v)
@ -73,6 +76,8 @@ async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam
}
(Method::Post, Some("/echo")) => {
// Echo the request body without buffering it.
let response = OutgoingResponse::new(
200,
Fields::from_list(
@ -108,16 +113,142 @@ async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam
}
}
_ => {
let response = OutgoingResponse::new(405, Fields::new());
(Method::Post, Some("/double-echo")) => {
// Pipe the request body to an outgoing request and stream the response back to the client.
let body = response.body().expect("response should be writable");
if let Some(url) = headers.iter().find_map(|(k, v)| {
(k == "url")
.then_some(v)
.and_then(|v| std::str::from_utf8(v).ok())
.and_then(|v| Url::parse(v).ok())
}) {
match double_echo(request, &url).await {
Ok((request_copy, response)) => {
let mut stream = executor::incoming_body(
response.consume().expect("response should be consumable"),
);
let response = OutgoingResponse::new(
200,
Fields::from_list(
&headers
.into_iter()
.filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
.collect::<Vec<_>>(),
)
.unwrap(),
);
let mut body = executor::outgoing_body(
response.body().expect("response should be writable"),
);
ResponseOutparam::set(response_out, Ok(response));
let response_copy = async move {
while let Some(chunk) = stream.next().await {
body.send(chunk?).await?;
}
Ok::<_, anyhow::Error>(())
};
ResponseOutparam::set(response_out, Ok(response));
let (request_copy, response_copy) =
future::join(request_copy, response_copy).await;
if let Err(e) = request_copy.and(response_copy) {
eprintln!("error piping to and from {url}: {e}");
}
}
Err(e) => {
eprintln!("Error sending outgoing request to {url}: {e}");
server_error(response_out);
}
}
} else {
bad_request(response_out);
}
}
_ => method_not_allowed(response_out),
}
}
async fn double_echo(
incoming_request: IncomingRequest,
url: &Url,
) -> Result<(impl Future<Output = Result<()>>, IncomingResponse)> {
let outgoing_request = OutgoingRequest::new(
&Method::Post,
Some(url.path()),
Some(&match url.scheme() {
"http" => Scheme::Http,
"https" => Scheme::Https,
scheme => Scheme::Other(scheme.into()),
}),
Some(&format!(
"{}{}",
url.host_str().unwrap_or(""),
if let Some(port) = url.port() {
format!(":{port}")
} else {
String::new()
}
)),
Fields::new(),
);
let mut body = executor::outgoing_body(
outgoing_request
.body()
.expect("request body should be writable"),
);
let response = executor::outgoing_request_send(outgoing_request);
let mut stream = executor::incoming_body(
incoming_request
.consume()
.expect("request should be consumable"),
);
OutgoingBody::finish(body, None);
let copy = async move {
while let Some(chunk) = stream.next().await {
body.send(chunk?).await?;
}
Ok::<_, anyhow::Error>(())
};
let response = response.await?;
let status = response.status();
if !(200..300).contains(&status) {
bail!("unexpected status: {status}");
}
Ok((copy, response))
}
fn server_error(response_out: ResponseOutparam) {
respond(500, response_out)
}
fn bad_request(response_out: ResponseOutparam) {
respond(400, response_out)
}
fn method_not_allowed(response_out: ResponseOutparam) {
respond(405, response_out)
}
fn respond(status: u16, response_out: ResponseOutparam) {
let response = OutgoingResponse::new(status, Fields::new());
let body = response.body().expect("response should be writable");
ResponseOutparam::set(response_out, Ok(response));
OutgoingBody::finish(body, None);
}
async fn hash(url: &Url) -> Result<String> {

88
crates/wasi-http/tests/all/main.rs

@ -2,7 +2,7 @@ use crate::http_server::Server;
use anyhow::{anyhow, Context, Result};
use futures::{channel::oneshot, future, stream, FutureExt};
use http_body::Frame;
use http_body_util::{combinators::BoxBody, Collected, StreamBody};
use http_body_util::{combinators::BoxBody, Collected, Empty, StreamBody};
use hyper::{body::Bytes, server::conn::http1, service::service_fn, Method, StatusCode};
use sha2::{Digest, Sha256};
use std::{collections::HashMap, iter, net::Ipv4Addr, str, sync::Arc};
@ -360,6 +360,73 @@ async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> {
#[test_log::test(tokio::test)]
async fn wasi_http_echo() -> Result<()> {
do_wasi_http_echo("echo", None).await
}
#[test_log::test(tokio::test)]
async fn wasi_http_double_echo() -> Result<()> {
let listener = tokio::net::TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 0)).await?;
let prefix = format!("http://{}", listener.local_addr()?);
let (_tx, rx) = oneshot::channel::<()>();
let server = async move {
loop {
let (stream, _) = listener.accept().await?;
task::spawn(async move {
if let Err(e) = http1::Builder::new()
.keep_alive(true)
.serve_connection(
stream,
service_fn(
move |request: hyper::Request<hyper::body::Incoming>| async move {
use http_body_util::BodyExt;
if let (&Method::POST, "/echo") =
(request.method(), request.uri().path())
{
Ok::<_, anyhow::Error>(hyper::Response::new(
request.into_body().boxed(),
))
} else {
Ok(hyper::Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(BoxBody::new(
Empty::new().map_err(|_| unreachable!()),
))?)
}
},
),
)
.await
{
eprintln!("error serving connection: {e:?}");
}
});
// Help rustc with type inference:
if false {
return Ok::<_, anyhow::Error>(());
}
}
}
.then(|result| {
if let Err(e) = result {
eprintln!("error listening for connections: {e:?}");
}
future::ready(())
})
.boxed();
task::spawn(async move {
drop(future::select(server, rx).await);
});
do_wasi_http_echo("double-echo", Some(&format!("{prefix}/echo"))).await
}
async fn do_wasi_http_echo(uri: &str, url_header: Option<&str>) -> Result<()> {
let body = {
// A sorta-random-ish megabyte
let mut n = 0_u8;
@ -371,13 +438,18 @@ async fn wasi_http_echo() -> Result<()> {
.collect::<Vec<_>>()
};
let request = hyper::Request::post("/echo")
.header("content-type", "application/octet-stream")
.body(BoxBody::new(StreamBody::new(stream::iter(
body.chunks(16 * 1024)
.map(|chunk| Ok::<_, anyhow::Error>(Frame::data(Bytes::copy_from_slice(chunk))))
.collect::<Vec<_>>(),
))))?;
let mut request =
hyper::Request::post(&format!("/{uri}")).header("content-type", "application/octet-stream");
if let Some(url_header) = url_header {
request = request.header("url", url_header);
}
let request = request.body(BoxBody::new(StreamBody::new(stream::iter(
body.chunks(16 * 1024)
.map(|chunk| Ok::<_, anyhow::Error>(Frame::data(Bytes::copy_from_slice(chunk))))
.collect::<Vec<_>>(),
))))?;
let response = run_wasi_http(
test_programs_artifacts::API_PROXY_STREAMING_COMPONENT,

Loading…
Cancel
Save