From 4005a813e81e7f1423598d873fdb9a07919e54e9 Mon Sep 17 00:00:00 2001 From: Nick Fitzgerald Date: Thu, 29 Aug 2024 11:40:13 -0700 Subject: [PATCH] Make the Wasmtime CLI use async (#9184) * Make the Wasmtime CLI use async This means that interrupting a running Wasm program will now work correctly, even when the program is blocked on I/O or waiting on a timeout or some such. This also involved making `wasi-threads` async-compatible. Co-Authored-By: Alex Crichton * rustfmt * Make `run` command enable the `tokio` feature * Add a test for CLI, timeouts, and sleeping forever * Fix warning --------- Co-authored-by: Alex Crichton --- Cargo.lock | 1 + Cargo.toml | 4 +- .../src/bin/cli_sleep_forever.rs | 5 + crates/wasi-threads/Cargo.toml | 1 + crates/wasi-threads/src/lib.rs | 19 ++- crates/wasmtime/src/engine.rs | 7 + src/commands/run.rs | 152 ++++++++++-------- src/common.rs | 6 +- tests/all/cli_tests.rs | 22 +++ 9 files changed, 149 insertions(+), 68 deletions(-) create mode 100644 crates/test-programs/src/bin/cli_sleep_forever.rs diff --git a/Cargo.lock b/Cargo.lock index 71e06994b7..7415893b26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3994,6 +3994,7 @@ dependencies = [ "rand", "wasi-common", "wasmtime", + "wasmtime-wasi", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c17af9ed92..8b7711f2ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ wasmtime-cranelift = { workspace = true, optional = true } wasmtime-environ = { workspace = true } wasmtime-explorer = { workspace = true, optional = true } wasmtime-wast = { workspace = true, optional = true } -wasi-common = { workspace = true, default-features = true, features = ["exit"], optional = true } +wasi-common = { workspace = true, default-features = true, features = ["exit", "tokio"], optional = true } wasmtime-wasi = { workspace = true, default-features = true, optional = true } wasmtime-wasi-nn = { workspace = true, optional = true } wasmtime-wasi-runtime-config = { workspace = true, optional = true } @@ -439,7 +439,7 @@ explore = ["dep:wasmtime-explorer", "dep:tempfile"] wast = ["dep:wasmtime-wast"] config = ["cache"] compile = ["cranelift"] -run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common"] +run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common", "dep:tokio"] [[test]] name = "host_segfault" diff --git a/crates/test-programs/src/bin/cli_sleep_forever.rs b/crates/test-programs/src/bin/cli_sleep_forever.rs new file mode 100644 index 0000000000..f2be9622ce --- /dev/null +++ b/crates/test-programs/src/bin/cli_sleep_forever.rs @@ -0,0 +1,5 @@ +use std::time::Duration; + +fn main() { + std::thread::sleep(Duration::from_nanos(u64::MAX)); +} diff --git a/crates/wasi-threads/Cargo.toml b/crates/wasi-threads/Cargo.toml index 0a5a681d70..8ec1c96164 100644 --- a/crates/wasi-threads/Cargo.toml +++ b/crates/wasi-threads/Cargo.toml @@ -21,3 +21,4 @@ log = { workspace = true } rand = "0.8" wasi-common = { workspace = true, features = ["exit"]} wasmtime = { workspace = true, features = ['threads'] } +wasmtime-wasi = { workspace = true } diff --git a/crates/wasi-threads/src/lib.rs b/crates/wasi-threads/src/lib.rs index 580be8c3e7..be775f5a4c 100644 --- a/crates/wasi-threads/src/lib.rs +++ b/crates/wasi-threads/src/lib.rs @@ -61,7 +61,14 @@ impl WasiThreadsCtx { let result = catch_unwind(AssertUnwindSafe(|| { // Each new instance is created in its own store. let mut store = Store::new(&instance_pre.module().engine(), host); - let instance = instance_pre.instantiate(&mut store).unwrap(); + + let instance = if instance_pre.module().engine().is_async() { + wasmtime_wasi::runtime::in_tokio(instance_pre.instantiate_async(&mut store)) + } else { + instance_pre.instantiate(&mut store) + } + .unwrap(); + let thread_entry_point = instance .get_typed_func::<(i32, i32), ()>(&mut store, WASI_ENTRY_POINT) .unwrap(); @@ -77,7 +84,15 @@ impl WasiThreadsCtx { WASI_ENTRY_POINT, thread_start_arg ); - match thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg)) { + let res = if instance_pre.module().engine().is_async() { + wasmtime_wasi::runtime::in_tokio( + thread_entry_point + .call_async(&mut store, (wasi_thread_id, thread_start_arg)), + ) + } else { + thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg)) + }; + match res { Ok(_) => log::trace!("exiting thread id = {} normally", wasi_thread_id), Err(e) => { log::trace!("exiting thread id = {} due to error", wasi_thread_id); diff --git a/crates/wasmtime/src/engine.rs b/crates/wasmtime/src/engine.rs index 8fa1797706..fbd6d8aac1 100644 --- a/crates/wasmtime/src/engine.rs +++ b/crates/wasmtime/src/engine.rs @@ -185,6 +185,13 @@ impl Engine { Arc::ptr_eq(&a.inner, &b.inner) } + /// Returns whether the engine is configured to support async functions. + #[cfg(feature = "async")] + #[inline] + pub fn is_async(&self) -> bool { + self.config().async_support + } + /// Detects whether the bytes provided are a precompiled object produced by /// Wasmtime. /// diff --git a/src/commands/run.rs b/src/commands/run.rs index 4aa504aed2..d061085763 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -88,6 +88,7 @@ impl RunCommand { self.run.common.init_logging()?; let mut config = self.run.common.config(None, None)?; + config.async_support(true); if self.run.common.wasm.timeout.is_some() { config.epoch_interruption(true); @@ -149,61 +150,78 @@ impl RunCommand { store.set_fuel(fuel)?; } - // Load the preload wasm modules. - let mut modules = Vec::new(); - if let RunTarget::Core(m) = &main { - modules.push((String::new(), m.clone())); - } - for (name, path) in self.preloads.iter() { - // Read the wasm module binary either as `*.wat` or a raw binary - let module = match self.run.load_module(&engine, path)? { - RunTarget::Core(m) => m, - #[cfg(feature = "component-model")] - RunTarget::Component(_) => bail!("components cannot be loaded with `--preload`"), - }; - modules.push((name.clone(), module.clone())); + // Always run the module asynchronously to ensure that the module can be + // interrupted, even if it is blocking on I/O or a timeout or something. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build()?; - // Add the module's functions to the linker. - match &mut linker { - #[cfg(feature = "cranelift")] - CliLinker::Core(linker) => { - linker.module(&mut store, name, &module).context(format!( - "failed to process preload `{}` at `{}`", - name, - path.display() - ))?; - } - #[cfg(not(feature = "cranelift"))] - CliLinker::Core(_) => { - bail!("support for --preload disabled at compile time"); + let dur = self + .run + .common + .wasm + .timeout + .unwrap_or(std::time::Duration::MAX); + let result = runtime.block_on(async { + tokio::time::timeout(dur, async { + // Load the preload wasm modules. + let mut modules = Vec::new(); + if let RunTarget::Core(m) = &main { + modules.push((String::new(), m.clone())); } - #[cfg(feature = "component-model")] - CliLinker::Component(_) => { - bail!("--preload cannot be used with components"); + for (name, path) in self.preloads.iter() { + // Read the wasm module binary either as `*.wat` or a raw binary + let module = match self.run.load_module(&engine, path)? { + RunTarget::Core(m) => m, + #[cfg(feature = "component-model")] + RunTarget::Component(_) => { + bail!("components cannot be loaded with `--preload`") + } + }; + modules.push((name.clone(), module.clone())); + + // Add the module's functions to the linker. + match &mut linker { + #[cfg(feature = "cranelift")] + CliLinker::Core(linker) => { + linker + .module_async(&mut store, name, &module) + .await + .context(format!( + "failed to process preload `{}` at `{}`", + name, + path.display() + ))?; + } + #[cfg(not(feature = "cranelift"))] + CliLinker::Core(_) => { + bail!("support for --preload disabled at compile time"); + } + #[cfg(feature = "component-model")] + CliLinker::Component(_) => { + bail!("--preload cannot be used with components"); + } + } } - } - } - // Pre-emptively initialize and install a Tokio runtime ambiently in the - // environment when executing the module. Without this whenever a WASI - // call is made that needs to block on a future a Tokio runtime is - // configured and entered, and this appears to be slower than simply - // picking an existing runtime out of the environment and using that. - // The goal of this is to improve the performance of WASI-related - // operations that block in the CLI since the CLI doesn't use async to - // invoke WebAssembly. - let result = wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| { - self.load_main_module(&mut store, &mut linker, &main, modules) - .with_context(|| { - format!( - "failed to run main module `{}`", - self.module_and_args[0].to_string_lossy() - ) - }) + self.load_main_module(&mut store, &mut linker, &main, modules) + .await + .with_context(|| { + format!( + "failed to run main module `{}`", + self.module_and_args[0].to_string_lossy() + ) + }) + }) + .await }); // Load the main wasm module. - match result { + match result.unwrap_or_else(|elapsed| { + Err(anyhow::Error::from(wasmtime::Trap::Interrupt)) + .with_context(|| format!("timed out after {elapsed}")) + }) { Ok(()) => (), Err(e) => { // Exit the process if Wasmtime understands the error; @@ -367,7 +385,7 @@ impl RunCommand { }); } - fn load_main_module( + async fn load_main_module( &self, store: &mut Store, linker: &mut CliLinker, @@ -403,15 +421,20 @@ impl RunCommand { let result = match linker { CliLinker::Core(linker) => { let module = module.unwrap_core(); - let instance = linker.instantiate(&mut *store, &module).context(format!( - "failed to instantiate {:?}", - self.module_and_args[0] - ))?; + let instance = linker + .instantiate_async(&mut *store, &module) + .await + .context(format!( + "failed to instantiate {:?}", + self.module_and_args[0] + ))?; // If `_initialize` is present, meaning a reactor, then invoke // the function. if let Some(func) = instance.get_func(&mut *store, "_initialize") { - func.typed::<(), ()>(&store)?.call(&mut *store, ())?; + func.typed::<(), ()>(&store)? + .call_async(&mut *store, ()) + .await?; } // Look for the specific function provided or otherwise look for @@ -429,7 +452,7 @@ impl RunCommand { }; match func { - Some(func) => self.invoke_func(store, func), + Some(func) => self.invoke_func(store, func).await, None => Ok(()), } } @@ -441,14 +464,16 @@ impl RunCommand { let component = module.unwrap_component(); - let command = wasmtime_wasi::bindings::sync::Command::instantiate( + let command = wasmtime_wasi::bindings::Command::instantiate_async( &mut *store, component, linker, - )?; + ) + .await?; let result = command .wasi_cli_run() .call_run(&mut *store) + .await .context("failed to invoke `run` function") .map_err(|e| self.handle_core_dump(&mut *store, e)); @@ -465,7 +490,7 @@ impl RunCommand { result } - fn invoke_func(&self, store: &mut Store, func: Func) -> Result<()> { + async fn invoke_func(&self, store: &mut Store, func: Func) -> Result<()> { let ty = func.ty(&store); if ty.params().len() > 0 { eprintln!( @@ -505,7 +530,8 @@ impl RunCommand { // out, if there are any. let mut results = vec![Val::null_func_ref(); ty.results().len()]; let invoke_res = func - .call(&mut *store, &values, &mut results) + .call_async(&mut *store, &values, &mut results) + .await .with_context(|| { if let Some(name) = &self.invoke { format!("failed to invoke `{name}`") @@ -600,7 +626,7 @@ impl RunCommand { // are enabled, then use the historical preview1 // implementation. (Some(false), _) | (None, Some(true)) => { - wasi_common::sync::add_to_linker(linker, |host| { + wasi_common::tokio::add_to_linker(linker, |host| { host.preview1_ctx.as_mut().unwrap() })?; self.set_preview1_ctx(store)?; @@ -613,11 +639,11 @@ impl RunCommand { // default-disabled in the future. (Some(true), _) | (None, Some(false) | None) => { if self.run.common.wasi.preview0 != Some(false) { - wasmtime_wasi::preview0::add_to_linker_sync(linker, |t| { + wasmtime_wasi::preview0::add_to_linker_async(linker, |t| { t.preview2_ctx() })?; } - wasmtime_wasi::preview1::add_to_linker_sync(linker, |t| { + wasmtime_wasi::preview1::add_to_linker_async(linker, |t| { t.preview2_ctx() })?; self.set_preview2_ctx(store)?; @@ -626,7 +652,7 @@ impl RunCommand { } #[cfg(feature = "component-model")] CliLinker::Component(linker) => { - wasmtime_wasi::add_to_linker_sync(linker)?; + wasmtime_wasi::add_to_linker_async(linker)?; self.set_preview2_ctx(store)?; } } diff --git a/src/common.rs b/src/common.rs index 79837d7907..a0599b3329 100644 --- a/src/common.rs +++ b/src/common.rs @@ -260,7 +260,11 @@ impl RunCommon { // the program as the CLI. This helps improve the performance of some // blocking operations in WASI, for example, by skipping the // back-and-forth between sync and async. - builder.allow_blocking_current_thread(true); + // + // However, do not set this if a timeout is configured, as that would + // cause the timeout to be ignored if the guest does, for example, + // something like `sleep(FOREVER)`. + builder.allow_blocking_current_thread(self.common.wasm.timeout.is_none()); if self.common.wasi.inherit_env == Some(true) { for (k, v) in std::env::vars() { diff --git a/tests/all/cli_tests.rs b/tests/all/cli_tests.rs index fce63cb0aa..dfb31842a9 100644 --- a/tests/all/cli_tests.rs +++ b/tests/all/cli_tests.rs @@ -1443,6 +1443,28 @@ mod test_programs { Ok(()) } + #[test] + fn cli_sleep_forever() -> Result<()> { + for timeout in [ + // Tests still pass when we race with going to sleep. + "-Wtimeout=1ns", + // Tests pass when we wait till the Wasm has (likely) gone to sleep. + "-Wtimeout=250ms", + ] { + let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER]).unwrap_err(); + let e = e.to_string(); + println!("Got error: {e}"); + assert!(e.contains("interrupt")); + + let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER_COMPONENT]).unwrap_err(); + let e = e.to_string(); + println!("Got error: {e}"); + assert!(e.contains("interrupt")); + } + + Ok(()) + } + /// Helper structure to manage an invocation of `wasmtime serve` struct WasmtimeServe { child: Option,