Browse Source

wasmtime-wasi: move bindings to its own file, move runtime to its own mod (#8045)

* move bindings to its own file, move runtime to its own mod

* fix
pull/8051/head
Pat Hickey 8 months ago
committed by GitHub
parent
commit
6e064f4f21
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 5
      crates/wasi-http/src/body.rs
  2. 8
      crates/wasi-http/src/types.rs
  3. 4
      crates/wasi-http/tests/all/main.rs
  4. 120
      crates/wasi/src/bindings.rs
  5. 5
      crates/wasi/src/filesystem.rs
  6. 3
      crates/wasi/src/host/filesystem/sync.rs
  7. 3
      crates/wasi/src/host/io.rs
  8. 3
      crates/wasi/src/host/tcp.rs
  9. 15
      crates/wasi/src/ip_name_lookup.rs
  10. 275
      crates/wasi/src/lib.rs
  11. 4
      crates/wasi/src/pipe.rs
  12. 3
      crates/wasi/src/poll.rs
  13. 2
      crates/wasi/src/preview0.rs
  14. 2
      crates/wasi/src/preview1.rs
  15. 175
      crates/wasi/src/runtime.rs
  16. 12
      crates/wasi/src/tcp.rs
  17. 2
      crates/wasi/src/udp.rs
  18. 4
      crates/wasi/src/write_stream.rs

5
crates/wasi-http/src/body.rs

@ -10,7 +10,8 @@ use std::task::{Context, Poll};
use std::{pin::Pin, sync::Arc, time::Duration};
use tokio::sync::{mpsc, oneshot};
use wasmtime_wasi::{
poll_noop, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, StreamError, Subscribe,
runtime::{poll_noop, AbortOnDropJoinHandle},
HostInputStream, HostOutputStream, StreamError, Subscribe,
};
pub type HyperIncomingBody = BoxBody<Bytes, types::ErrorCode>;
@ -35,7 +36,7 @@ impl BodyWithTimeout {
inner,
between_bytes_timeout,
reset_sleep: true,
timeout: Box::pin(wasmtime_wasi::with_ambient_tokio_runtime(|| {
timeout: Box::pin(wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| {
tokio::time::sleep(Duration::new(0, 0))
})),
}

8
crates/wasi-http/src/types.rs

@ -15,7 +15,7 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
use wasmtime::component::{Resource, ResourceTable};
use wasmtime_wasi::{AbortOnDropJoinHandle, Subscribe};
use wasmtime_wasi::{runtime::AbortOnDropJoinHandle, Subscribe};
/// Capture the state necessary for use in the wasi-http API implementation.
pub struct WasiHttpCtx;
@ -122,7 +122,7 @@ pub fn default_send_request(
between_bytes_timeout,
}: OutgoingRequest,
) -> wasmtime::Result<Resource<HostFutureIncomingResponse>> {
let handle = wasmtime_wasi::spawn(async move {
let handle = wasmtime_wasi::runtime::spawn(async move {
let resp = handler(
authority,
use_tls,
@ -212,7 +212,7 @@ async fn handler(
.map_err(|_| types::ErrorCode::ConnectionTimeout)?
.map_err(hyper_request_error)?;
let worker = wasmtime_wasi::spawn(async move {
let worker = wasmtime_wasi::runtime::spawn(async move {
match conn.await {
Ok(()) => {}
// TODO: shouldn't throw away this error and ideally should
@ -234,7 +234,7 @@ async fn handler(
.map_err(|_| types::ErrorCode::ConnectionTimeout)?
.map_err(hyper_request_error)?;
let worker = wasmtime_wasi::spawn(async move {
let worker = wasmtime_wasi::runtime::spawn(async move {
match conn.await {
Ok(()) => {}
// TODO: same as above, shouldn't throw this error away.

4
crates/wasi-http/tests/all/main.rs

@ -170,7 +170,7 @@ async fn run_wasi_http(
let (sender, receiver) = tokio::sync::oneshot::channel();
let out = store.data_mut().new_response_outparam(sender)?;
let handle = wasmtime_wasi::spawn(async move {
let handle = wasmtime_wasi::runtime::spawn(async move {
proxy
.wasi_http_incoming_handler()
.call_handle(&mut store, req, out)
@ -278,7 +278,7 @@ async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> {
let response = handle(request.into_parts().0).map(|resp| {
Ok(IncomingResponseInternal {
resp,
worker: Arc::new(wasmtime_wasi::spawn(future::ready(()))),
worker: Arc::new(wasmtime_wasi::runtime::spawn(future::ready(()))),
between_bytes_timeout,
})
});

120
crates/wasi/src/bindings.rs

@ -0,0 +1,120 @@
// Generate traits for synchronous bindings.
//
// Note that this is only done for interfaces which can block, or those which
// have some functions in `only_imports` below for being async.
pub mod sync_io {
pub(crate) mod _internal {
use crate::{FsError, StreamError};
wasmtime::component::bindgen!({
path: "wit",
interfaces: "
import wasi:io/poll@0.2.0;
import wasi:io/streams@0.2.0;
import wasi:filesystem/types@0.2.0;
",
tracing: true,
trappable_error_type: {
"wasi:io/streams/stream-error" => StreamError,
"wasi:filesystem/types/error-code" => FsError,
},
with: {
"wasi:clocks/wall-clock": crate::bindings::clocks::wall_clock,
"wasi:filesystem/types/descriptor": super::super::filesystem::types::Descriptor,
"wasi:filesystem/types/directory-entry-stream": super::super::filesystem::types::DirectoryEntryStream,
"wasi:io/poll/pollable": super::super::io::poll::Pollable,
"wasi:io/streams/input-stream": super::super::io::streams::InputStream,
"wasi:io/streams/output-stream": super::super::io::streams::OutputStream,
"wasi:io/error/error": super::super::io::error::Error,
}
});
}
pub use self::_internal::wasi::{filesystem, io};
}
wasmtime::component::bindgen!({
path: "wit",
world: "wasi:cli/imports",
tracing: true,
async: {
// Only these functions are `async` and everything else is sync
// meaning that it basically doesn't need to block. These functions
// are the only ones that need to block.
//
// Note that at this time `only_imports` works on function names
// which in theory can be shared across interfaces, so this may
// need fancier syntax in the future.
only_imports: [
"[method]descriptor.access-at",
"[method]descriptor.advise",
"[method]descriptor.change-directory-permissions-at",
"[method]descriptor.change-file-permissions-at",
"[method]descriptor.create-directory-at",
"[method]descriptor.get-flags",
"[method]descriptor.get-type",
"[method]descriptor.is-same-object",
"[method]descriptor.link-at",
"[method]descriptor.lock-exclusive",
"[method]descriptor.lock-shared",
"[method]descriptor.metadata-hash",
"[method]descriptor.metadata-hash-at",
"[method]descriptor.open-at",
"[method]descriptor.read",
"[method]descriptor.read-directory",
"[method]descriptor.readlink-at",
"[method]descriptor.remove-directory-at",
"[method]descriptor.rename-at",
"[method]descriptor.set-size",
"[method]descriptor.set-times",
"[method]descriptor.set-times-at",
"[method]descriptor.stat",
"[method]descriptor.stat-at",
"[method]descriptor.symlink-at",
"[method]descriptor.sync",
"[method]descriptor.sync-data",
"[method]descriptor.try-lock-exclusive",
"[method]descriptor.try-lock-shared",
"[method]descriptor.unlink-file-at",
"[method]descriptor.unlock",
"[method]descriptor.write",
"[method]input-stream.read",
"[method]input-stream.blocking-read",
"[method]input-stream.blocking-skip",
"[method]input-stream.skip",
"[method]output-stream.forward",
"[method]output-stream.splice",
"[method]output-stream.blocking-splice",
"[method]output-stream.blocking-flush",
"[method]output-stream.blocking-write",
"[method]output-stream.blocking-write-and-flush",
"[method]output-stream.blocking-write-zeroes-and-flush",
"[method]directory-entry-stream.read-directory-entry",
"poll",
"[method]pollable.block",
"[method]pollable.ready",
],
},
trappable_error_type: {
"wasi:io/streams/stream-error" => crate::StreamError,
"wasi:filesystem/types/error-code" => crate::FsError,
"wasi:sockets/network/error-code" => crate::SocketError,
},
with: {
"wasi:sockets/network/network": super::network::Network,
"wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket,
"wasi:sockets/udp/udp-socket": super::udp::UdpSocket,
"wasi:sockets/udp/incoming-datagram-stream": super::udp::IncomingDatagramStream,
"wasi:sockets/udp/outgoing-datagram-stream": super::udp::OutgoingDatagramStream,
"wasi:sockets/ip-name-lookup/resolve-address-stream": super::ip_name_lookup::ResolveAddressStream,
"wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator,
"wasi:filesystem/types/descriptor": super::filesystem::Descriptor,
"wasi:io/streams/input-stream": super::stream::InputStream,
"wasi:io/streams/output-stream": super::stream::OutputStream,
"wasi:io/error/error": super::stream::Error,
"wasi:io/poll/pollable": super::poll::Pollable,
"wasi:cli/terminal-input/terminal-input": super::stdio::TerminalInput,
"wasi:cli/terminal-output/terminal-output": super::stdio::TerminalOutput,
},
});
pub use wasi::*;

5
crates/wasi/src/filesystem.rs

@ -1,7 +1,6 @@
use crate::bindings::filesystem::types;
use crate::{
spawn_blocking, AbortOnDropJoinHandle, HostOutputStream, StreamError, Subscribe, TrappableError,
};
use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle};
use crate::{HostOutputStream, StreamError, Subscribe, TrappableError};
use anyhow::anyhow;
use bytes::{Bytes, BytesMut};
use std::io;

3
crates/wasi/src/host/filesystem/sync.rs

@ -1,7 +1,8 @@
use crate::bindings::filesystem::types as async_filesystem;
use crate::bindings::sync_io::filesystem::types as sync_filesystem;
use crate::bindings::sync_io::io::streams;
use crate::{in_tokio, FsError, FsResult};
use crate::runtime::in_tokio;
use crate::{FsError, FsResult};
use wasmtime::component::Resource;
impl<T: async_filesystem::Host> sync_filesystem::Host for T {

3
crates/wasi/src/host/io.rs

@ -232,7 +232,8 @@ pub mod sync {
},
bindings::sync_io::io::poll::Pollable,
bindings::sync_io::io::streams::{self, InputStream, OutputStream},
in_tokio, StreamError, StreamResult, WasiView,
runtime::in_tokio,
StreamError, StreamResult, WasiView,
};
use wasmtime::component::Resource;

3
crates/wasi/src/host/tcp.rs

@ -1,5 +1,6 @@
use crate::host::network::util;
use crate::network::SocketAddrUse;
use crate::runtime::with_ambient_tokio_runtime;
use crate::tcp::{TcpReadStream, TcpSocket, TcpState, TcpWriteStream};
use crate::{
bindings::{
@ -9,7 +10,7 @@ use crate::{
},
network::SocketAddressFamily,
};
use crate::{with_ambient_tokio_runtime, Pollable, SocketResult, WasiView};
use crate::{Pollable, SocketResult, WasiView};
use io_lifetimes::AsSocketlike;
use rustix::io::Errno;
use rustix::net::sockopt;

15
crates/wasi/src/ip_name_lookup.rs

@ -2,7 +2,8 @@ use crate::bindings::sockets::ip_name_lookup::{Host, HostResolveAddressStream};
use crate::bindings::sockets::network::{ErrorCode, IpAddress, Network};
use crate::host::network::util;
use crate::poll::{subscribe, Pollable, Subscribe};
use crate::{spawn_blocking, AbortOnDropJoinHandle, SocketError, WasiView};
use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle};
use crate::{SocketError, WasiView};
use anyhow::Result;
use std::mem;
use std::net::{Ipv6Addr, ToSocketAddrs};
@ -48,12 +49,14 @@ impl<T: WasiView> HostResolveAddressStream for T {
let stream: &mut ResolveAddressStream = self.table().get_mut(&resource)?;
loop {
match stream {
ResolveAddressStream::Waiting(future) => match crate::poll_noop(Pin::new(future)) {
Some(result) => {
*stream = ResolveAddressStream::Done(result.map(|v| v.into_iter()));
ResolveAddressStream::Waiting(future) => {
match crate::runtime::poll_noop(Pin::new(future)) {
Some(result) => {
*stream = ResolveAddressStream::Done(result.map(|v| v.into_iter()));
}
None => return Err(ErrorCode::WouldBlock.into()),
}
None => return Err(ErrorCode::WouldBlock.into()),
},
}
ResolveAddressStream::Done(slot @ Err(_)) => {
mem::replace(slot, Ok(Vec::new().into_iter()))?;
unreachable!();

275
crates/wasi/src/lib.rs

@ -6,10 +6,7 @@
//!
//!
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub mod bindings;
mod clocks;
pub mod command;
mod ctx;
@ -25,6 +22,7 @@ pub mod preview0;
#[cfg(feature = "preview1")]
pub mod preview1;
mod random;
pub mod runtime;
mod stdio;
mod stream;
mod tcp;
@ -47,272 +45,3 @@ pub use self::stream::{
pub use cap_fs_ext::SystemTimeSpec;
pub use cap_rand::RngCore;
pub use wasmtime::component::{ResourceTable, ResourceTableError};
pub mod bindings {
// Generate traits for synchronous bindings.
//
// Note that this is only done for interfaces which can block, or those which
// have some functions in `only_imports` below for being async.
pub mod sync_io {
pub(crate) mod _internal {
use crate::{FsError, StreamError};
wasmtime::component::bindgen!({
path: "wit",
interfaces: "
import wasi:io/poll@0.2.0;
import wasi:io/streams@0.2.0;
import wasi:filesystem/types@0.2.0;
",
tracing: true,
trappable_error_type: {
"wasi:io/streams/stream-error" => StreamError,
"wasi:filesystem/types/error-code" => FsError,
},
with: {
"wasi:clocks/wall-clock": crate::bindings::clocks::wall_clock,
"wasi:filesystem/types/descriptor": super::super::filesystem::types::Descriptor,
"wasi:filesystem/types/directory-entry-stream": super::super::filesystem::types::DirectoryEntryStream,
"wasi:io/poll/pollable": super::super::io::poll::Pollable,
"wasi:io/streams/input-stream": super::super::io::streams::InputStream,
"wasi:io/streams/output-stream": super::super::io::streams::OutputStream,
"wasi:io/error/error": super::super::io::error::Error,
}
});
}
pub use self::_internal::wasi::{filesystem, io};
}
wasmtime::component::bindgen!({
path: "wit",
world: "wasi:cli/imports",
tracing: true,
async: {
// Only these functions are `async` and everything else is sync
// meaning that it basically doesn't need to block. These functions
// are the only ones that need to block.
//
// Note that at this time `only_imports` works on function names
// which in theory can be shared across interfaces, so this may
// need fancier syntax in the future.
only_imports: [
"[method]descriptor.access-at",
"[method]descriptor.advise",
"[method]descriptor.change-directory-permissions-at",
"[method]descriptor.change-file-permissions-at",
"[method]descriptor.create-directory-at",
"[method]descriptor.get-flags",
"[method]descriptor.get-type",
"[method]descriptor.is-same-object",
"[method]descriptor.link-at",
"[method]descriptor.lock-exclusive",
"[method]descriptor.lock-shared",
"[method]descriptor.metadata-hash",
"[method]descriptor.metadata-hash-at",
"[method]descriptor.open-at",
"[method]descriptor.read",
"[method]descriptor.read-directory",
"[method]descriptor.readlink-at",
"[method]descriptor.remove-directory-at",
"[method]descriptor.rename-at",
"[method]descriptor.set-size",
"[method]descriptor.set-times",
"[method]descriptor.set-times-at",
"[method]descriptor.stat",
"[method]descriptor.stat-at",
"[method]descriptor.symlink-at",
"[method]descriptor.sync",
"[method]descriptor.sync-data",
"[method]descriptor.try-lock-exclusive",
"[method]descriptor.try-lock-shared",
"[method]descriptor.unlink-file-at",
"[method]descriptor.unlock",
"[method]descriptor.write",
"[method]input-stream.read",
"[method]input-stream.blocking-read",
"[method]input-stream.blocking-skip",
"[method]input-stream.skip",
"[method]output-stream.forward",
"[method]output-stream.splice",
"[method]output-stream.blocking-splice",
"[method]output-stream.blocking-flush",
"[method]output-stream.blocking-write",
"[method]output-stream.blocking-write-and-flush",
"[method]output-stream.blocking-write-zeroes-and-flush",
"[method]directory-entry-stream.read-directory-entry",
"poll",
"[method]pollable.block",
"[method]pollable.ready",
],
},
trappable_error_type: {
"wasi:io/streams/stream-error" => crate::StreamError,
"wasi:filesystem/types/error-code" => crate::FsError,
"wasi:sockets/network/error-code" => crate::SocketError,
},
with: {
"wasi:sockets/network/network": super::network::Network,
"wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket,
"wasi:sockets/udp/udp-socket": super::udp::UdpSocket,
"wasi:sockets/udp/incoming-datagram-stream": super::udp::IncomingDatagramStream,
"wasi:sockets/udp/outgoing-datagram-stream": super::udp::OutgoingDatagramStream,
"wasi:sockets/ip-name-lookup/resolve-address-stream": super::ip_name_lookup::ResolveAddressStream,
"wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator,
"wasi:filesystem/types/descriptor": super::filesystem::Descriptor,
"wasi:io/streams/input-stream": super::stream::InputStream,
"wasi:io/streams/output-stream": super::stream::OutputStream,
"wasi:io/error/error": super::stream::Error,
"wasi:io/poll/pollable": super::poll::Pollable,
"wasi:cli/terminal-input/terminal-input": super::stdio::TerminalInput,
"wasi:cli/terminal-output/terminal-output": super::stdio::TerminalOutput,
},
});
pub use wasi::*;
}
pub(crate) static RUNTIME: once_cell::sync::Lazy<tokio::runtime::Runtime> =
once_cell::sync::Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()
.unwrap()
});
pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
impl<T> Drop for AbortOnDropJoinHandle<T> {
fn drop(&mut self) {
self.0.abort()
}
}
impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
type Target = tokio::task::JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
&mut self.0
}
}
impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
fn from(jh: tokio::task::JoinHandle<T>) -> Self {
AbortOnDropJoinHandle(jh)
}
}
impl<T> Future for AbortOnDropJoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.as_mut().0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
}
}
}
pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
AbortOnDropJoinHandle(j)
}
pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
AbortOnDropJoinHandle(j)
}
pub fn in_tokio<F: Future>(f: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(h) => {
let _enter = h.enter();
h.block_on(f)
}
// The `yield_now` here is non-obvious and if you're reading this
// you're likely curious about why it's here. This is currently required
// to get some features of "sync mode" working correctly, such as with
// the CLI. To illustrate why this is required, consider a program
// organized as:
//
// * A program has a `pollable` that it's waiting on.
// * This `pollable` is always ready .
// * Actually making the corresponding operation ready, however,
// requires some background work on Tokio's part.
// * The program is looping on "wait for readiness" coupled with
// performing the operation.
//
// In this situation this program ends up infinitely looping in waiting
// for pollables. The reason appears to be that when we enter the tokio
// runtime here it doesn't necessary yield to background work because
// the provided future `f` is ready immediately. The future `f` will run
// through the list of pollables and determine one of them is ready.
//
// Historically this happened with UDP sockets. A test send a datagram
// from one socket to another and the other socket infinitely didn't
// receive the data. This appeared to be because the server socket was
// waiting on `READABLE | WRITABLE` (which is itself a bug but ignore
// that) and the socket was currently in the "writable" state but never
// ended up receiving a notification for the "readable" state. Moving
// the socket to "readable" would require Tokio to perform some
// background work via epoll/kqueue/handle events but if the future
// provided here is always ready, then that never happened.
//
// Thus the `yield_now()` is an attempt to force Tokio to go do some
// background work eventually and look at new interest masks for
// example. This is a bit of a kludge but everything's already a bit
// wonky in synchronous mode anyway. Note that this is hypothesized to
// not be an issue in async mode because async mode typically has the
// Tokio runtime in a separate thread or otherwise participating in a
// larger application, it's only here in synchronous mode where we
// effectively own the runtime that we need some special care.
Err(_) => {
let _enter = RUNTIME.enter();
RUNTIME.block_on(async move {
tokio::task::yield_now().await;
f.await
})
}
}
}
/// Executes the closure `f` with an "ambient Tokio runtime" which basically
/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.
///
/// If a `Handle` is already available, e.g. in async contexts, then `f` is run
/// immediately. Otherwise for synchronous contexts this crate's fallback
/// runtime is configured and then `f` is executed.
pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
match tokio::runtime::Handle::try_current() {
Ok(_) => f(),
Err(_) => {
let _enter = RUNTIME.enter();
f()
}
}
}
/// Attempts to get the result of a `future`.
///
/// This function does not block and will poll the provided future once. If the
/// result is here then `Some` is returned, otherwise `None` is returned.
///
/// Note that by polling `future` this means that `future` must be re-polled
/// later if it's to wake up a task.
pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
where
F: Future,
{
let mut task = Context::from_waker(futures::task::noop_waker_ref());
match future.poll(&mut task) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
}
}

4
crates/wasi/src/pipe.rs

@ -112,7 +112,7 @@ pub struct AsyncReadStream {
closed: bool,
buffer: Option<Result<Bytes, StreamError>>,
receiver: mpsc::Receiver<Result<Bytes, StreamError>>,
_join_handle: crate::AbortOnDropJoinHandle<()>,
_join_handle: crate::runtime::AbortOnDropJoinHandle<()>,
}
impl AsyncReadStream {
@ -120,7 +120,7 @@ impl AsyncReadStream {
/// provided by this struct, the argument must impl [`tokio::io::AsyncRead`].
pub fn new<T: tokio::io::AsyncRead + Send + Unpin + 'static>(mut reader: T) -> Self {
let (sender, receiver) = mpsc::channel(1);
let join_handle = crate::spawn(async move {
let join_handle = crate::runtime::spawn(async move {
loop {
use tokio::io::AsyncReadExt;
let mut buf = bytes::BytesMut::with_capacity(4096);

3
crates/wasi/src/poll.rs

@ -154,7 +154,8 @@ pub mod sync {
use crate::{
bindings::io::poll as async_poll,
bindings::sync_io::io::poll::{self, Pollable},
in_tokio, WasiView,
runtime::in_tokio,
WasiView,
};
use anyhow::Result;
use wasmtime::component::Resource;

2
crates/wasi/src/preview0.rs

@ -52,7 +52,7 @@ mod sync {
// Small wrapper around `in_tokio` to add a `Result` layer which is always
// `Ok`
fn in_tokio<F: Future>(future: F) -> Result<F::Output> {
Ok(crate::in_tokio(future))
Ok(crate::runtime::in_tokio(future))
}
}

2
crates/wasi/src/preview1.rs

@ -542,7 +542,7 @@ mod sync {
// Small wrapper around `in_tokio` to add a `Result` layer which is always
// `Ok`
fn in_tokio<F: Future>(future: F) -> Result<F::Output> {
Ok(crate::in_tokio(future))
Ok(crate::runtime::in_tokio(future))
}
}

175
crates/wasi/src/runtime.rs

@ -0,0 +1,175 @@
//! This module provides an "ambient Tokio runtime"
//! [`with_ambient_tokio_runtime`]. Embedders of wasmtime-wasi may do so from
//! synchronous Rust, and not use tokio directly. The implementation of
//! wasmtime-wasi requires a tokio executor in a way that is [deeply tied to
//! its
//! design](https://github.com/bytecodealliance/wasmtime/issues/7973#issuecomment-1960513214).
//! When used from a sychrnonous wasmtime context, this module provides the
//! wrapper function [`in_tokio`] used throughout the shim implementations of
//! synchronous component binding `Host` traits in terms of the async ones.
//!
//! This module also provides a thin wrapper on tokio's tasks.
//! [`AbortOnDropJoinHandle`], which is exactly like a
//! [`tokio::task::JoinHandle`] except for the obvious behavioral change. This
//! whole crate, and any child crates which spawn tasks as part of their
//! implementations, should please use this crate's [`spawn`] and
//! [`spawn_blocking`] over tokio's. so we wanted the type name to stick out
//! if someone misses it.
//!
//! Each of these facilities should be used by dependencies of wasmtime-wasi
//! which when implementing component bindings.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) static RUNTIME: once_cell::sync::Lazy<tokio::runtime::Runtime> =
once_cell::sync::Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()
.unwrap()
});
/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when
/// the handle is dropped.
///
/// This behavior makes it easier to tie a worker task to the lifetime of a Resource
/// by keeping this handle owned by the Resource.
pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
impl<T> Drop for AbortOnDropJoinHandle<T> {
fn drop(&mut self) {
self.0.abort()
}
}
impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
type Target = tokio::task::JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
&mut self.0
}
}
impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
fn from(jh: tokio::task::JoinHandle<T>) -> Self {
AbortOnDropJoinHandle(jh)
}
}
impl<T> Future for AbortOnDropJoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.as_mut().0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
}
}
}
pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
AbortOnDropJoinHandle(j)
}
pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
AbortOnDropJoinHandle(j)
}
pub fn in_tokio<F: Future>(f: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(h) => {
let _enter = h.enter();
h.block_on(f)
}
// The `yield_now` here is non-obvious and if you're reading this
// you're likely curious about why it's here. This is currently required
// to get some features of "sync mode" working correctly, such as with
// the CLI. To illustrate why this is required, consider a program
// organized as:
//
// * A program has a `pollable` that it's waiting on.
// * This `pollable` is always ready .
// * Actually making the corresponding operation ready, however,
// requires some background work on Tokio's part.
// * The program is looping on "wait for readiness" coupled with
// performing the operation.
//
// In this situation this program ends up infinitely looping in waiting
// for pollables. The reason appears to be that when we enter the tokio
// runtime here it doesn't necessary yield to background work because
// the provided future `f` is ready immediately. The future `f` will run
// through the list of pollables and determine one of them is ready.
//
// Historically this happened with UDP sockets. A test send a datagram
// from one socket to another and the other socket infinitely didn't
// receive the data. This appeared to be because the server socket was
// waiting on `READABLE | WRITABLE` (which is itself a bug but ignore
// that) and the socket was currently in the "writable" state but never
// ended up receiving a notification for the "readable" state. Moving
// the socket to "readable" would require Tokio to perform some
// background work via epoll/kqueue/handle events but if the future
// provided here is always ready, then that never happened.
//
// Thus the `yield_now()` is an attempt to force Tokio to go do some
// background work eventually and look at new interest masks for
// example. This is a bit of a kludge but everything's already a bit
// wonky in synchronous mode anyway. Note that this is hypothesized to
// not be an issue in async mode because async mode typically has the
// Tokio runtime in a separate thread or otherwise participating in a
// larger application, it's only here in synchronous mode where we
// effectively own the runtime that we need some special care.
Err(_) => {
let _enter = RUNTIME.enter();
RUNTIME.block_on(async move {
tokio::task::yield_now().await;
f.await
})
}
}
}
/// Executes the closure `f` with an "ambient Tokio runtime" which basically
/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.
///
/// If a `Handle` is already available, e.g. in async contexts, then `f` is run
/// immediately. Otherwise for synchronous contexts this crate's fallback
/// runtime is configured and then `f` is executed.
pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
match tokio::runtime::Handle::try_current() {
Ok(_) => f(),
Err(_) => {
let _enter = RUNTIME.enter();
f()
}
}
}
/// Attempts to get the result of a `future`.
///
/// This function does not block and will poll the provided future once. If the
/// result is here then `Some` is returned, otherwise `None` is returned.
///
/// Note that by polling `future` this means that `future` must be re-polled
/// later if it's to wake up a task.
pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
where
F: Future,
{
let mut task = Context::from_waker(futures::task::noop_waker_ref());
match future.poll(&mut task) {
Poll::Ready(result) => Some(result),
Poll::Pending => None,
}
}

12
crates/wasi/src/tcp.rs

@ -1,8 +1,6 @@
use super::network::SocketAddressFamily;
use super::{
with_ambient_tokio_runtime, HostInputStream, HostOutputStream, SocketResult, StreamError,
};
use crate::{AbortOnDropJoinHandle, Subscribe};
use crate::network::SocketAddressFamily;
use crate::runtime::{with_ambient_tokio_runtime, AbortOnDropJoinHandle};
use crate::{HostInputStream, HostOutputStream, SocketResult, StreamError, Subscribe};
use anyhow::{Error, Result};
use cap_net_ext::AddressFamily;
use futures::Future;
@ -164,7 +162,7 @@ impl TcpWriteStream {
assert!(matches!(self.last_write, LastWrite::Done));
let stream = self.stream.clone();
self.last_write = LastWrite::Waiting(crate::spawn(async move {
self.last_write = LastWrite::Waiting(crate::runtime::spawn(async move {
// Note: we are not using the AsyncWrite impl here, and instead using the TcpStream
// primitive try_write, which goes directly to attempt a write with mio. This has
// two advantages: 1. this operation takes a &TcpStream instead of a &mut TcpStream
@ -236,7 +234,7 @@ impl HostOutputStream for TcpWriteStream {
let writable = self.stream.writable();
futures::pin_mut!(writable);
if super::poll_noop(writable).is_none() {
if crate::runtime::poll_noop(writable).is_none() {
return Ok(0);
}
Ok(SOCKET_READY_SIZE)

2
crates/wasi/src/udp.rs

@ -1,6 +1,6 @@
use crate::host::network::util;
use crate::poll::Subscribe;
use crate::with_ambient_tokio_runtime;
use crate::runtime::with_ambient_tokio_runtime;
use async_trait::async_trait;
use cap_net_ext::{AddressFamily, Blocking};
use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike};

4
crates/wasi/src/write_stream.rs

@ -139,7 +139,7 @@ impl Worker {
/// Provides a [`HostOutputStream`] impl from a [`tokio::io::AsyncWrite`] impl
pub struct AsyncWriteStream {
worker: Arc<Worker>,
_join_handle: crate::AbortOnDropJoinHandle<()>,
_join_handle: crate::runtime::AbortOnDropJoinHandle<()>,
}
impl AsyncWriteStream {
@ -152,7 +152,7 @@ impl AsyncWriteStream {
let worker = Arc::new(Worker::new(write_budget));
let w = Arc::clone(&worker);
let join_handle = crate::spawn(async move { w.work(writer).await });
let join_handle = crate::runtime::spawn(async move { w.work(writer).await });
AsyncWriteStream {
worker,

Loading…
Cancel
Save