Browse Source

Keep the worker handle in an Arc to fix 7413 (#7426)

pull/6594/merge
Trevor Elliott 1 year ago
committed by GitHub
parent
commit
f63350e06b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      crates/wasi-http/src/body.rs
  2. 8
      crates/wasi-http/src/types.rs
  3. 3
      crates/wasi-http/src/types_impl.rs
  4. 2
      crates/wasi-http/tests/all/main.rs

5
crates/wasi-http/src/body.rs

@ -20,6 +20,7 @@ pub type HyperIncomingBody = BoxBody<Bytes, anyhow::Error>;
/// want to do that unless the user asks to consume the body.
pub struct HostIncomingBodyBuilder {
pub body: HyperIncomingBody,
pub worker: Option<Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
pub between_bytes_timeout: Duration,
}
@ -100,6 +101,10 @@ impl HostIncomingBodyBuilder {
// loop running to relieve backpressure, so we get to the trailers.
let _ = body_writer.send(Ok(data)).await;
}
// Force the builder's worker to be owned by this task, ensuring that it's kept around
// until this task exits.
drop(self.worker);
});
HostIncomingBody {

8
crates/wasi-http/src/types.rs

@ -9,6 +9,7 @@ use anyhow::Context;
use http_body_util::BodyExt;
use hyper::header::HeaderName;
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
@ -38,6 +39,7 @@ pub trait WasiHttpView: Send {
let (parts, body) = req.into_parts();
let body = HostIncomingBodyBuilder {
body,
worker: None,
// TODO: this needs to be plumbed through
between_bytes_timeout: std::time::Duration::from_millis(600 * 1000),
};
@ -162,7 +164,7 @@ pub fn default_send_request(
Ok(IncomingResponseInternal {
resp,
worker,
worker: Arc::new(worker),
between_bytes_timeout,
})
});
@ -273,7 +275,7 @@ pub struct HostIncomingResponse {
pub status: u16,
pub headers: FieldMap,
pub body: Option<HostIncomingBodyBuilder>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
pub worker: Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>,
}
pub struct HostOutgoingResponse {
@ -324,7 +326,7 @@ pub enum HostFields {
pub struct IncomingResponseInternal {
pub resp: hyper::Response<HyperIncomingBody>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
pub worker: Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>,
pub between_bytes_timeout: std::time::Duration,
}

3
crates/wasi-http/src/types_impl.rs

@ -13,7 +13,7 @@ use crate::{
};
use anyhow::Context;
use hyper::header::HeaderName;
use std::any::Any;
use std::{any::Any, sync::Arc};
use wasmtime::component::Resource;
use wasmtime_wasi::preview2::{
bindings::io::streams::{InputStream, OutputStream},
@ -744,6 +744,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::HostFutureIncomingResponse f
headers: FieldMap::from(parts.headers),
body: Some(HostIncomingBodyBuilder {
body,
worker: Some(Arc::clone(&resp.worker)),
between_bytes_timeout: resp.between_bytes_timeout,
}),
worker: resp.worker,

2
crates/wasi-http/tests/all/main.rs

@ -268,7 +268,7 @@ async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> {
Ok(view.table().push(HostFutureIncomingResponse::Ready(
handle(request.into_parts().0).map(|resp| IncomingResponseInternal {
resp,
worker: preview2::spawn(future::ready(Ok(()))),
worker: Arc::new(preview2::spawn(future::ready(Ok(())))),
between_bytes_timeout,
}),
))?)

Loading…
Cancel
Save