Browse Source
* preview2: make everything but streams/io and poll/poll synchronous
* streams: get rid of as_any method, which is no longer used
* delete legacy sched and pollable concepts
* more code motion and renaming
* make tokio a workspace dep, because we need it directly in wasmtime-wasi
* HostPollable exists
* more fixes
* pollable can trap, and implement clock properly
* HostPollable is now a generator of futures
because we need to be able to poll a pollable many times
* explain various todo!s
* Synchronous version of the wasi-preview2-components tests
* Change with_tokio to accept the future as an argument
* Store futures in the PollOneoff struct instead, to avoid dropping them
* Remove TODO for HostOutputStream impl for WritePipe
* Implement pollable for ReadPipe
* Use a Notify when ReadPipe is ready
* wip
* wip
* Read/write pipe ends with tokio channels
* Empty reader/writer wrappers
* EmptyStream, and warning cleanup
* Wrapped reader/writer structs
* Rework stdio in terms of wrapped read/write
* Add MemoryOutputPipe and update tests
* Remove todo
* rewrite nearly everything
* implement the pipe stuff
* wibble
* fix MemoryOutputPipe just enough to make the tests compile
* Move the table iteration into a helper function
* AsyncFd stream implementation to fix stdin on unix
* Rename Wrapped{Read,Write} streams to Async{Read,Write}Stream
* Move async io wrappers into stream.rs
* Fix the sync tests
* fix test uses of pipes, juggle tokio context for stdin construction
* add some fixmes
* the future i named Never is defined in futures-util as pending
which is a better name
* i believe this is a correct implementation of one global stdin resource
* move unix stdin to its own file
* make most of the mods private
* fix build - we are skipping rust 1.70
due to llvm regressions in s390x and riscv64 which are fixed in 1.71 and
will not be backported
* preview1-in-preview2: use async funcs for io, and the async io interface
prtest:full
* windows stdin support
* done!
* table ext functions: fix tests
* tests: expect poll_oneoff_{files,stdio} to pass on all platforms
* export the bindings under wasmtime_wasi::preview2::bindings
rather than preview2::wasi.
and command moves to wasmtime_wasi::preview2::command as well.
* fix renaming of wasi to bindings in tests
* use block_in_place throughout filesystem
and move block_on and block_in_place to be pub crate at the root
* AsyncFdStream: ensure file is nonblocking
* tests: block_in_place requires multi-threaded runtime
* actually, use fcntl_setfl to make the asyncfd file nonblocking
* fix windows block_on
* docs, remove unnecessary methods
* more docs
* Add a workspace dependency on bytes-1.4
* Remove vectored stream operations
* Rework the read/write stream traits
* Add a size parameter to `read`, and switch to usize for traits
* Pipe through the bool -> stream-status change in wit
* Plumb stream-status through write operations in wit
* write host trait also gives streamstate
* hook new stream host read/write back up to the wit bindgen
* sketchy AsyncReadStream impl
* Fill out implementations for AsyncReadStream and AsyncWriteStream
* some reasonable read tests
* more
* first smoke test for AsyncWriteStream
* bunch of AsyncWriteStream tests
* half-baked idea that the output-stream interface will need a flush mechanism
* adapter: fixes for changes to stream wit
* fix new rust 1.71 warnings
* make stdin work on unix without using AsyncFdStream
inline the tokio docs example of how to impl AsyncRead for an AsyncFd,
except theres some "minor" changes because stdin doesnt impl Read on
&Stdin whereas tcpstream from the example does
* delete AsyncFdStream for now
it turns out to be kinda hard and we can always work on adding it back
in later.
* Implement some memory pipe operations, and move async wrappers to the pipe mod
* Make blocking_write actually block until everything is written
* Remove debug print
* Adapter stdio should use blocking write
Rust guests will panic if the write returns less than the number of
bytes sent with stdio.
* Clean up implementations of {blocking_}write_zeros and skip
* Remove debug macro usage
* Move EmptyStream to pipe, and split it into four variants
Use EmptyInputStream and SinkOutputStream as the defaults for stdin and
stdout/stderr respectively.
* Add a big warning about resource lifetime tracking in pollables
* Start working through changes to the filesystem implementation
* Remove todos in the filesystem implementation
* Avoid lifetime errors by moving blocking operations to File and Dir
* Fix more lifetime issues with `block`
* Finish filling out translation impl
* fix warnings
* we can likely eliminate block_in_place in the stdin implementations
* sync command uses sync filesystem, start of translation layer
* symc filesystem: all the trait boilerplate is in place
just need to finish the from impl boilerplate
* finish type conversion boilerplate
* Revert "half-baked idea that the output-stream interface will need a flush mechanism"
This reverts commit 3eb762e333
.
* cargo fmt
* test type fixes
* renames and comments
* refactor stream table internals so we can have a blocking variant...
* preview1 host adapter: stdout/stderr use blocking_write here too
* filesystem streams are blocking now
* fixes
* satisfy cargo doc
* cargo vet: dep upgrades taken care of by imports from mozilla
* unix stdio: eliminate block_in_place
* replace private in_tokio with spawn, since its only used for spawning
* comments
* worker thread stdin implementation can be tested on linux, i guess
and start outlining a test plan
* eliminate tokio boilerplate - no longer using tokios lock
* rename our private block_on to in_tokio
* fill in missing file input skip
* code review: fix MemoryInputPipe. Closed status is always available immediately.
* code review: empty input stream is not essential, closed input stream is a better fi for stdin
* code review: unreachable
* turn worker thread (windows) stdin off
* expect preview2-based poll_oneoff_stdio to fail on windows
* command directory_list test: no need to inherit stdin
* preview1 in preview2: turn off inherit_stdio except for poll_oneoff_stdio
* wasi-preview2-components: apparently inherit_stdio was on everywhere here as well. turn it off
except for poll_oneoff_stdio
* extend timeout for riscv64 i suppose
---------
Co-authored-by: Trevor Elliott <telliott@fastly.com>
pull/6751/head
Pat Hickey
1 year ago
committed by
GitHub
41 changed files with 3795 additions and 1783 deletions
@ -0,0 +1,297 @@ |
|||
#![cfg(feature = "test_programs")] |
|||
use anyhow::Result; |
|||
use tempfile::TempDir; |
|||
use wasmtime::{component::Linker, Config, Engine, Store}; |
|||
use wasmtime_wasi::preview2::{ |
|||
command::sync::{add_to_linker, Command}, |
|||
pipe::MemoryOutputPipe, |
|||
DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView, |
|||
}; |
|||
|
|||
lazy_static::lazy_static! { |
|||
static ref ENGINE: Engine = { |
|||
let mut config = Config::new(); |
|||
config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); |
|||
config.wasm_component_model(true); |
|||
config.async_support(false); |
|||
|
|||
let engine = Engine::new(&config).unwrap(); |
|||
engine |
|||
}; |
|||
} |
|||
// uses ENGINE, creates a fn get_component(&str) -> Component
|
|||
include!(concat!(env!("OUT_DIR"), "/wasi_tests_components.rs")); |
|||
|
|||
pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> { |
|||
let prefix = format!("wasi_components_{}_", exe_name); |
|||
let tempdir = tempfile::Builder::new().prefix(&prefix).tempdir()?; |
|||
Ok(tempdir) |
|||
} |
|||
|
|||
fn run(name: &str, inherit_stdio: bool) -> Result<()> { |
|||
let workspace = prepare_workspace(name)?; |
|||
let stdout = MemoryOutputPipe::new(); |
|||
let stderr = MemoryOutputPipe::new(); |
|||
let r = { |
|||
let mut linker = Linker::new(&ENGINE); |
|||
add_to_linker(&mut linker)?; |
|||
|
|||
// Create our wasi context.
|
|||
// Additionally register any preopened directories if we have them.
|
|||
let mut builder = WasiCtxBuilder::new(); |
|||
|
|||
if inherit_stdio { |
|||
builder = builder.inherit_stdio(); |
|||
} else { |
|||
builder = builder |
|||
.set_stdout(stdout.clone()) |
|||
.set_stderr(stderr.clone()); |
|||
} |
|||
builder = builder.set_args(&[name, "."]); |
|||
println!("preopen: {:?}", workspace); |
|||
let preopen_dir = |
|||
cap_std::fs::Dir::open_ambient_dir(workspace.path(), cap_std::ambient_authority())?; |
|||
builder = builder.push_preopened_dir(preopen_dir, DirPerms::all(), FilePerms::all(), "."); |
|||
for (var, val) in test_programs::wasi_tests_environment() { |
|||
builder = builder.push_env(var, val); |
|||
} |
|||
|
|||
let mut table = Table::new(); |
|||
let wasi = builder.build(&mut table)?; |
|||
struct Ctx { |
|||
wasi: WasiCtx, |
|||
table: Table, |
|||
} |
|||
impl WasiView for Ctx { |
|||
fn ctx(&self) -> &WasiCtx { |
|||
&self.wasi |
|||
} |
|||
fn ctx_mut(&mut self) -> &mut WasiCtx { |
|||
&mut self.wasi |
|||
} |
|||
fn table(&self) -> &Table { |
|||
&self.table |
|||
} |
|||
fn table_mut(&mut self) -> &mut Table { |
|||
&mut self.table |
|||
} |
|||
} |
|||
|
|||
let ctx = Ctx { wasi, table }; |
|||
let mut store = Store::new(&ENGINE, ctx); |
|||
let (command, _instance) = Command::instantiate(&mut store, &get_component(name), &linker)?; |
|||
command |
|||
.call_run(&mut store)? |
|||
.map_err(|()| anyhow::anyhow!("run returned a failure"))?; |
|||
Ok(()) |
|||
}; |
|||
|
|||
r.map_err(move |trap: anyhow::Error| { |
|||
let stdout = stdout.try_into_inner().expect("single ref to stdout"); |
|||
if !stdout.is_empty() { |
|||
println!("guest stdout:\n{}\n===", String::from_utf8_lossy(&stdout)); |
|||
} |
|||
let stderr = stderr.try_into_inner().expect("single ref to stderr"); |
|||
if !stderr.is_empty() { |
|||
println!("guest stderr:\n{}\n===", String::from_utf8_lossy(&stderr)); |
|||
} |
|||
trap.context(format!( |
|||
"error while testing wasi-tests {} with cap-std-sync", |
|||
name |
|||
)) |
|||
})?; |
|||
Ok(()) |
|||
} |
|||
|
|||
// Below here is mechanical: there should be one test for every binary in
|
|||
// wasi-tests. The only differences should be should_panic annotations for
|
|||
// tests which fail.
|
|||
#[test_log::test] |
|||
fn big_random_buf() { |
|||
run("big_random_buf", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn clock_time_get() { |
|||
run("clock_time_get", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn close_preopen() { |
|||
run("close_preopen", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn dangling_fd() { |
|||
run("dangling_fd", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn dangling_symlink() { |
|||
run("dangling_symlink", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn directory_seek() { |
|||
run("directory_seek", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn dir_fd_op_failures() { |
|||
run("dir_fd_op_failures", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn fd_advise() { |
|||
run("fd_advise", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn fd_filestat_get() { |
|||
run("fd_filestat_get", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn fd_filestat_set() { |
|||
run("fd_filestat_set", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn fd_flags_set() { |
|||
run("fd_flags_set", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn fd_readdir() { |
|||
run("fd_readdir", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn file_allocate() { |
|||
run("file_allocate", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn file_pread_pwrite() { |
|||
run("file_pread_pwrite", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn file_seek_tell() { |
|||
run("file_seek_tell", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn file_truncation() { |
|||
run("file_truncation", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn file_unbuffered_write() { |
|||
run("file_unbuffered_write", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
#[cfg_attr(windows, should_panic)] |
|||
fn interesting_paths() { |
|||
run("interesting_paths", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn isatty() { |
|||
run("isatty", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn nofollow_errors() { |
|||
run("nofollow_errors", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn overwrite_preopen() { |
|||
run("overwrite_preopen", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_exists() { |
|||
run("path_exists", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_filestat() { |
|||
run("path_filestat", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_link() { |
|||
run("path_link", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_create_existing() { |
|||
run("path_open_create_existing", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_read_write() { |
|||
run("path_open_read_write", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_dirfd_not_dir() { |
|||
run("path_open_dirfd_not_dir", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_missing() { |
|||
run("path_open_missing", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_nonblock() { |
|||
run("path_open_nonblock", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_rename_dir_trailing_slashes() { |
|||
run("path_rename_dir_trailing_slashes", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
#[should_panic] |
|||
fn path_rename_file_trailing_slashes() { |
|||
run("path_rename_file_trailing_slashes", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_rename() { |
|||
run("path_rename", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_symlink_trailing_slashes() { |
|||
run("path_symlink_trailing_slashes", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn poll_oneoff_files() { |
|||
run("poll_oneoff_files", false).unwrap() |
|||
} |
|||
|
|||
#[cfg_attr(windows, should_panic)] |
|||
#[test_log::test] |
|||
fn poll_oneoff_stdio() { |
|||
run("poll_oneoff_stdio", true).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn readlink() { |
|||
run("readlink", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
#[should_panic] |
|||
fn remove_directory_trailing_slashes() { |
|||
run("remove_directory_trailing_slashes", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn remove_nonempty_directory() { |
|||
run("remove_nonempty_directory", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn renumber() { |
|||
run("renumber", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn sched_yield() { |
|||
run("sched_yield", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn stdio() { |
|||
run("stdio", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn symlink_create() { |
|||
run("symlink_create", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn symlink_filestat() { |
|||
run("symlink_filestat", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn symlink_loop() { |
|||
run("symlink_loop", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn unlink_file_trailing_slashes() { |
|||
run("unlink_file_trailing_slashes", false).unwrap() |
|||
} |
|||
#[test_log::test] |
|||
fn path_open_preopen() { |
|||
run("path_open_preopen", false).unwrap() |
|||
} |
@ -0,0 +1,91 @@ |
|||
use crate::preview2::WasiView; |
|||
|
|||
wasmtime::component::bindgen!({ |
|||
world: "wasi:preview/command", |
|||
tracing: true, |
|||
async: true, |
|||
trappable_error_type: { |
|||
"filesystem"::"error-code": Error, |
|||
"streams"::"stream-error": Error, |
|||
}, |
|||
with: { |
|||
"wasi:filesystem/filesystem": crate::preview2::bindings::filesystem::filesystem, |
|||
"wasi:clocks/monotonic_clock": crate::preview2::bindings::clocks::monotonic_clock, |
|||
"wasi:poll/poll": crate::preview2::bindings::poll::poll, |
|||
"wasi:io/streams": crate::preview2::bindings::io::streams, |
|||
"wasi:clocks/timezone": crate::preview2::bindings::clocks::timezone, |
|||
"wasi:clocks/wall_clock": crate::preview2::bindings::clocks::wall_clock, |
|||
"wasi:random/random": crate::preview2::bindings::random::random, |
|||
"wasi:cli_base/environment": crate::preview2::bindings::cli_base::environment, |
|||
"wasi:cli_base/exit": crate::preview2::bindings::cli_base::exit, |
|||
"wasi:cli_base/preopens": crate::preview2::bindings::cli_base::preopens, |
|||
"wasi:cli_base/stdin": crate::preview2::bindings::cli_base::stdin, |
|||
"wasi:cli_base/stdout": crate::preview2::bindings::cli_base::stdout, |
|||
"wasi:cli_base/stderr": crate::preview2::bindings::cli_base::stderr, |
|||
}, |
|||
}); |
|||
|
|||
pub fn add_to_linker<T: WasiView>(l: &mut wasmtime::component::Linker<T>) -> anyhow::Result<()> { |
|||
crate::preview2::bindings::clocks::wall_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::clocks::monotonic_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::clocks::timezone::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::filesystem::filesystem::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::poll::poll::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::io::streams::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::random::random::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::exit::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::environment::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::preopens::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stdin::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stdout::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stderr::add_to_linker(l, |t| t)?; |
|||
Ok(()) |
|||
} |
|||
|
|||
pub mod sync { |
|||
use crate::preview2::WasiView; |
|||
|
|||
wasmtime::component::bindgen!({ |
|||
world: "wasi:preview/command", |
|||
tracing: true, |
|||
async: false, |
|||
trappable_error_type: { |
|||
"filesystem"::"error-code": Error, |
|||
"streams"::"stream-error": Error, |
|||
}, |
|||
with: { |
|||
"wasi:filesystem/filesystem": crate::preview2::bindings::sync_io::filesystem::filesystem, |
|||
"wasi:clocks/monotonic_clock": crate::preview2::bindings::clocks::monotonic_clock, |
|||
"wasi:poll/poll": crate::preview2::bindings::sync_io::poll::poll, |
|||
"wasi:io/streams": crate::preview2::bindings::sync_io::io::streams, |
|||
"wasi:clocks/timezone": crate::preview2::bindings::clocks::timezone, |
|||
"wasi:clocks/wall_clock": crate::preview2::bindings::clocks::wall_clock, |
|||
"wasi:random/random": crate::preview2::bindings::random::random, |
|||
"wasi:cli_base/environment": crate::preview2::bindings::cli_base::environment, |
|||
"wasi:cli_base/exit": crate::preview2::bindings::cli_base::exit, |
|||
"wasi:cli_base/preopens": crate::preview2::bindings::cli_base::preopens, |
|||
"wasi:cli_base/stdin": crate::preview2::bindings::cli_base::stdin, |
|||
"wasi:cli_base/stdout": crate::preview2::bindings::cli_base::stdout, |
|||
"wasi:cli_base/stderr": crate::preview2::bindings::cli_base::stderr, |
|||
}, |
|||
}); |
|||
|
|||
pub fn add_to_linker<T: WasiView>( |
|||
l: &mut wasmtime::component::Linker<T>, |
|||
) -> anyhow::Result<()> { |
|||
crate::preview2::bindings::clocks::wall_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::clocks::monotonic_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::clocks::timezone::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::sync_io::filesystem::filesystem::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::sync_io::poll::poll::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::sync_io::io::streams::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::random::random::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::exit::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::environment::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::preopens::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stdin::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stdout::add_to_linker(l, |t| t)?; |
|||
crate::preview2::bindings::cli_base::stderr::add_to_linker(l, |t| t)?; |
|||
Ok(()) |
|||
} |
|||
} |
@ -0,0 +1,157 @@ |
|||
use crate::preview2::{ |
|||
bindings::poll::poll::{self, Pollable}, |
|||
Table, TableError, WasiView, |
|||
}; |
|||
use anyhow::Result; |
|||
use std::any::Any; |
|||
use std::collections::{hash_map::Entry, HashMap}; |
|||
use std::future::Future; |
|||
use std::pin::Pin; |
|||
use std::task::{Context, Poll}; |
|||
|
|||
pub type PollableFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>; |
|||
pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>; |
|||
pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + Sync + 'static>; |
|||
|
|||
/// A host representation of the `wasi:poll/poll.pollable` resource.
|
|||
///
|
|||
/// A pollable is not the same thing as a Rust Future: the same pollable may be used to
|
|||
/// repeatedly check for readiness of a given condition, e.g. if a stream is readable
|
|||
/// or writable. So, rather than containing a Future, which can only become Ready once, a
|
|||
/// HostPollable contains a way to create a Future in each call to poll_oneoff.
|
|||
pub enum HostPollable { |
|||
/// Create a Future by calling a fn on another resource in the table. This
|
|||
/// indirection means the created Future can use a mut borrow of another
|
|||
/// resource in the Table (e.g. a stream)
|
|||
///
|
|||
/// FIXME: we currently aren't tracking the lifetime of the resource along
|
|||
/// with this entry, which means that this index could be occupied by something
|
|||
/// unrelated by the time we poll it again. This is a crash vector, because
|
|||
/// the [`MakeFuture`] would panic if the type of the index has changed, and
|
|||
/// would yield undefined behavior otherwise. We'll likely fix this by making
|
|||
/// the parent resources of a pollable clean up their pollable entries when
|
|||
/// they are destroyed (e.g. the HostInputStream would track the pollables it
|
|||
/// has created).
|
|||
///
|
|||
/// WARNING: do not deploy this library to production until the above issue has
|
|||
/// been fixed.
|
|||
TableEntry { index: u32, make_future: MakeFuture }, |
|||
/// Create a future by calling an owned, static closure. This is used for
|
|||
/// pollables which do not share state with another resource in the Table
|
|||
/// (e.g. a timer)
|
|||
Closure(ClosureFuture), |
|||
} |
|||
|
|||
pub trait TablePollableExt { |
|||
fn push_host_pollable(&mut self, p: HostPollable) -> Result<u32, TableError>; |
|||
fn get_host_pollable_mut(&mut self, fd: u32) -> Result<&mut HostPollable, TableError>; |
|||
fn delete_host_pollable(&mut self, fd: u32) -> Result<HostPollable, TableError>; |
|||
} |
|||
|
|||
impl TablePollableExt for Table { |
|||
fn push_host_pollable(&mut self, p: HostPollable) -> Result<u32, TableError> { |
|||
self.push(Box::new(p)) |
|||
} |
|||
fn get_host_pollable_mut(&mut self, fd: u32) -> Result<&mut HostPollable, TableError> { |
|||
self.get_mut::<HostPollable>(fd) |
|||
} |
|||
fn delete_host_pollable(&mut self, fd: u32) -> Result<HostPollable, TableError> { |
|||
self.delete::<HostPollable>(fd) |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> poll::Host for T { |
|||
async fn drop_pollable(&mut self, pollable: Pollable) -> Result<()> { |
|||
self.table_mut().delete_host_pollable(pollable)?; |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn poll_oneoff(&mut self, pollables: Vec<Pollable>) -> Result<Vec<bool>> { |
|||
type ReadylistIndex = usize; |
|||
|
|||
let table = self.table_mut(); |
|||
|
|||
let mut table_futures: HashMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = HashMap::new(); |
|||
let mut closure_futures: Vec<(PollableFuture<'_>, Vec<ReadylistIndex>)> = Vec::new(); |
|||
|
|||
for (ix, p) in pollables.iter().enumerate() { |
|||
match table.get_host_pollable_mut(*p)? { |
|||
HostPollable::Closure(f) => closure_futures.push((f(), vec![ix])), |
|||
HostPollable::TableEntry { index, make_future } => { |
|||
match table_futures.entry(*index) { |
|||
Entry::Vacant(v) => { |
|||
v.insert((*make_future, vec![ix])); |
|||
} |
|||
Entry::Occupied(mut o) => { |
|||
let (_, v) = o.get_mut(); |
|||
v.push(ix); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) { |
|||
let entry = entry?; |
|||
closure_futures.push((make_future(entry), readylist_indices)); |
|||
} |
|||
|
|||
struct PollOneoff<'a> { |
|||
elems: Vec<(PollableFuture<'a>, Vec<ReadylistIndex>)>, |
|||
} |
|||
impl<'a> Future for PollOneoff<'a> { |
|||
type Output = Result<Vec<bool>>; |
|||
|
|||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
|||
let mut any_ready = false; |
|||
let mut results = vec![false; self.elems.len()]; |
|||
for (fut, readylist_indicies) in self.elems.iter_mut() { |
|||
match fut.as_mut().poll(cx) { |
|||
Poll::Ready(Ok(())) => { |
|||
for r in readylist_indicies { |
|||
results[*r] = true; |
|||
} |
|||
any_ready = true; |
|||
} |
|||
Poll::Ready(Err(e)) => { |
|||
return Poll::Ready(Err( |
|||
e.context(format!("poll_oneoff {readylist_indicies:?}")) |
|||
)); |
|||
} |
|||
Poll::Pending => {} |
|||
} |
|||
} |
|||
if any_ready { |
|||
Poll::Ready(Ok(results)) |
|||
} else { |
|||
Poll::Pending |
|||
} |
|||
} |
|||
} |
|||
|
|||
Ok(PollOneoff { |
|||
elems: closure_futures, |
|||
} |
|||
.await?) |
|||
} |
|||
} |
|||
|
|||
pub mod sync { |
|||
use crate::preview2::{ |
|||
bindings::poll::poll::Host as AsyncHost, |
|||
bindings::sync_io::poll::poll::{self, Pollable}, |
|||
in_tokio, WasiView, |
|||
}; |
|||
use anyhow::Result; |
|||
|
|||
impl<T: WasiView> poll::Host for T { |
|||
fn drop_pollable(&mut self, pollable: Pollable) -> Result<()> { |
|||
in_tokio(async { AsyncHost::drop_pollable(self, pollable).await }) |
|||
} |
|||
|
|||
fn poll_oneoff(&mut self, pollables: Vec<Pollable>) -> Result<Vec<bool>> { |
|||
in_tokio(async { AsyncHost::poll_oneoff(self, pollables).await }) |
|||
} |
|||
} |
|||
} |
@ -1,48 +1,37 @@ |
|||
use crate::preview2::wasi::cli_base::environment; |
|||
use crate::preview2::wasi::cli_base::preopens; |
|||
use crate::preview2::wasi::cli_base::stderr; |
|||
use crate::preview2::wasi::cli_base::stdin; |
|||
use crate::preview2::wasi::cli_base::stdout; |
|||
use crate::preview2::wasi::filesystem::filesystem; |
|||
use crate::preview2::wasi::io::streams; |
|||
use crate::preview2::bindings::cli_base::{environment, preopens, stderr, stdin, stdout}; |
|||
use crate::preview2::bindings::filesystem::filesystem; |
|||
use crate::preview2::bindings::io::streams; |
|||
use crate::preview2::WasiView; |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> environment::Host for T { |
|||
async fn get_environment(&mut self) -> anyhow::Result<Vec<(String, String)>> { |
|||
fn get_environment(&mut self) -> anyhow::Result<Vec<(String, String)>> { |
|||
Ok(self.ctx().env.clone()) |
|||
} |
|||
async fn get_arguments(&mut self) -> anyhow::Result<Vec<String>> { |
|||
fn get_arguments(&mut self) -> anyhow::Result<Vec<String>> { |
|||
Ok(self.ctx().args.clone()) |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> preopens::Host for T { |
|||
async fn get_directories( |
|||
&mut self, |
|||
) -> Result<Vec<(filesystem::Descriptor, String)>, anyhow::Error> { |
|||
fn get_directories(&mut self) -> Result<Vec<(filesystem::Descriptor, String)>, anyhow::Error> { |
|||
Ok(self.ctx().preopens.clone()) |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> stdin::Host for T { |
|||
async fn get_stdin(&mut self) -> Result<streams::InputStream, anyhow::Error> { |
|||
fn get_stdin(&mut self) -> Result<streams::InputStream, anyhow::Error> { |
|||
Ok(self.ctx().stdin) |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> stdout::Host for T { |
|||
async fn get_stdout(&mut self) -> Result<streams::OutputStream, anyhow::Error> { |
|||
fn get_stdout(&mut self) -> Result<streams::OutputStream, anyhow::Error> { |
|||
Ok(self.ctx().stdout) |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> stderr::Host for T { |
|||
async fn get_stderr(&mut self) -> Result<streams::OutputStream, anyhow::Error> { |
|||
fn get_stderr(&mut self) -> Result<streams::OutputStream, anyhow::Error> { |
|||
Ok(self.ctx().stderr) |
|||
} |
|||
} |
|||
|
@ -0,0 +1,613 @@ |
|||
use crate::preview2::bindings::filesystem::filesystem as async_filesystem; |
|||
use crate::preview2::bindings::sync_io::filesystem::filesystem as sync_filesystem; |
|||
use crate::preview2::bindings::sync_io::io::streams; |
|||
use crate::preview2::in_tokio; |
|||
|
|||
impl<T: async_filesystem::Host> sync_filesystem::Host for T { |
|||
fn advise( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
offset: sync_filesystem::Filesize, |
|||
len: sync_filesystem::Filesize, |
|||
advice: sync_filesystem::Advice, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::advise(self, fd, offset, len, advice.into()).await |
|||
})?) |
|||
} |
|||
|
|||
fn sync_data(&mut self, fd: sync_filesystem::Descriptor) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::sync_data(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn get_flags( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<sync_filesystem::DescriptorFlags, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { async_filesystem::Host::get_flags(self, fd).await })?.into()) |
|||
} |
|||
|
|||
fn get_type( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<sync_filesystem::DescriptorType, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { async_filesystem::Host::get_type(self, fd).await })?.into()) |
|||
} |
|||
|
|||
fn set_size( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
size: sync_filesystem::Filesize, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::set_size(self, fd, size).await |
|||
})?) |
|||
} |
|||
|
|||
fn set_times( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
atim: sync_filesystem::NewTimestamp, |
|||
mtim: sync_filesystem::NewTimestamp, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::set_times(self, fd, atim.into(), mtim.into()).await |
|||
})?) |
|||
} |
|||
|
|||
fn read( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
len: sync_filesystem::Filesize, |
|||
offset: sync_filesystem::Filesize, |
|||
) -> Result<(Vec<u8>, bool), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::read(self, fd, len, offset).await |
|||
})?) |
|||
} |
|||
|
|||
fn write( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
buf: Vec<u8>, |
|||
offset: sync_filesystem::Filesize, |
|||
) -> Result<sync_filesystem::Filesize, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::write(self, fd, buf, offset).await |
|||
})?) |
|||
} |
|||
|
|||
fn read_directory( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<sync_filesystem::DirectoryEntryStream, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::read_directory(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn read_directory_entry( |
|||
&mut self, |
|||
stream: sync_filesystem::DirectoryEntryStream, |
|||
) -> Result<Option<sync_filesystem::DirectoryEntry>, sync_filesystem::Error> { |
|||
Ok( |
|||
in_tokio(async { async_filesystem::Host::read_directory_entry(self, stream).await })? |
|||
.map(|e| e.into()), |
|||
) |
|||
} |
|||
|
|||
fn drop_directory_entry_stream( |
|||
&mut self, |
|||
stream: sync_filesystem::DirectoryEntryStream, |
|||
) -> anyhow::Result<()> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::drop_directory_entry_stream(self, stream).await |
|||
})?) |
|||
} |
|||
|
|||
fn sync(&mut self, fd: sync_filesystem::Descriptor) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::sync(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn create_directory_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::create_directory_at(self, fd, path).await |
|||
})?) |
|||
} |
|||
|
|||
fn stat( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<sync_filesystem::DescriptorStat, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { async_filesystem::Host::stat(self, fd).await })?.into()) |
|||
} |
|||
|
|||
fn stat_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
) -> Result<sync_filesystem::DescriptorStat, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::stat_at(self, fd, path_flags.into(), path).await |
|||
})? |
|||
.into()) |
|||
} |
|||
|
|||
fn set_times_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
atim: sync_filesystem::NewTimestamp, |
|||
mtim: sync_filesystem::NewTimestamp, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::set_times_at( |
|||
self, |
|||
fd, |
|||
path_flags.into(), |
|||
path, |
|||
atim.into(), |
|||
mtim.into(), |
|||
) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn link_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
// TODO delete the path flags from this function
|
|||
old_path_flags: sync_filesystem::PathFlags, |
|||
old_path: String, |
|||
new_descriptor: sync_filesystem::Descriptor, |
|||
new_path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::link_at( |
|||
self, |
|||
fd, |
|||
old_path_flags.into(), |
|||
old_path, |
|||
new_descriptor, |
|||
new_path, |
|||
) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn open_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
oflags: sync_filesystem::OpenFlags, |
|||
flags: sync_filesystem::DescriptorFlags, |
|||
mode: sync_filesystem::Modes, |
|||
) -> Result<sync_filesystem::Descriptor, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::open_at( |
|||
self, |
|||
fd, |
|||
path_flags.into(), |
|||
path, |
|||
oflags.into(), |
|||
flags.into(), |
|||
mode.into(), |
|||
) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn drop_descriptor(&mut self, fd: sync_filesystem::Descriptor) -> anyhow::Result<()> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::drop_descriptor(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn readlink_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path: String, |
|||
) -> Result<String, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::readlink_at(self, fd, path).await |
|||
})?) |
|||
} |
|||
|
|||
fn remove_directory_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::remove_directory_at(self, fd, path).await |
|||
})?) |
|||
} |
|||
|
|||
fn rename_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
old_path: String, |
|||
new_fd: sync_filesystem::Descriptor, |
|||
new_path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::rename_at(self, fd, old_path, new_fd, new_path).await |
|||
})?) |
|||
} |
|||
|
|||
fn symlink_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
src_path: String, |
|||
dest_path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::symlink_at(self, fd, src_path, dest_path).await |
|||
})?) |
|||
} |
|||
|
|||
fn unlink_file_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path: String, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::unlink_file_at(self, fd, path).await |
|||
})?) |
|||
} |
|||
|
|||
fn access_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
access: sync_filesystem::AccessType, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::access_at(self, fd, path_flags.into(), path, access.into()) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn change_file_permissions_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
mode: sync_filesystem::Modes, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::change_file_permissions_at( |
|||
self, |
|||
fd, |
|||
path_flags.into(), |
|||
path, |
|||
mode.into(), |
|||
) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn change_directory_permissions_at( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
path_flags: sync_filesystem::PathFlags, |
|||
path: String, |
|||
mode: sync_filesystem::Modes, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::change_directory_permissions_at( |
|||
self, |
|||
fd, |
|||
path_flags.into(), |
|||
path, |
|||
mode.into(), |
|||
) |
|||
.await |
|||
})?) |
|||
} |
|||
|
|||
fn lock_shared( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::lock_shared(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn lock_exclusive( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::lock_exclusive(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn try_lock_shared( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::try_lock_shared(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn try_lock_exclusive( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::try_lock_exclusive(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn unlock(&mut self, fd: sync_filesystem::Descriptor) -> Result<(), sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::unlock(self, fd).await |
|||
})?) |
|||
} |
|||
|
|||
fn read_via_stream( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
offset: sync_filesystem::Filesize, |
|||
) -> Result<streams::InputStream, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::read_via_stream(self, fd, offset).await |
|||
})?) |
|||
} |
|||
|
|||
fn write_via_stream( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
offset: sync_filesystem::Filesize, |
|||
) -> Result<streams::OutputStream, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::write_via_stream(self, fd, offset).await |
|||
})?) |
|||
} |
|||
|
|||
fn append_via_stream( |
|||
&mut self, |
|||
fd: sync_filesystem::Descriptor, |
|||
) -> Result<streams::OutputStream, sync_filesystem::Error> { |
|||
Ok(in_tokio(async { |
|||
async_filesystem::Host::append_via_stream(self, fd).await |
|||
})?) |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::ErrorCode> for sync_filesystem::ErrorCode { |
|||
fn from(other: async_filesystem::ErrorCode) -> Self { |
|||
use async_filesystem::ErrorCode; |
|||
match other { |
|||
ErrorCode::Access => Self::Access, |
|||
ErrorCode::WouldBlock => Self::WouldBlock, |
|||
ErrorCode::Already => Self::Already, |
|||
ErrorCode::BadDescriptor => Self::BadDescriptor, |
|||
ErrorCode::Busy => Self::Busy, |
|||
ErrorCode::Deadlock => Self::Deadlock, |
|||
ErrorCode::Quota => Self::Quota, |
|||
ErrorCode::Exist => Self::Exist, |
|||
ErrorCode::FileTooLarge => Self::FileTooLarge, |
|||
ErrorCode::IllegalByteSequence => Self::IllegalByteSequence, |
|||
ErrorCode::InProgress => Self::InProgress, |
|||
ErrorCode::Interrupted => Self::Interrupted, |
|||
ErrorCode::Invalid => Self::Invalid, |
|||
ErrorCode::Io => Self::Io, |
|||
ErrorCode::IsDirectory => Self::IsDirectory, |
|||
ErrorCode::Loop => Self::Loop, |
|||
ErrorCode::TooManyLinks => Self::TooManyLinks, |
|||
ErrorCode::MessageSize => Self::MessageSize, |
|||
ErrorCode::NameTooLong => Self::NameTooLong, |
|||
ErrorCode::NoDevice => Self::NoDevice, |
|||
ErrorCode::NoEntry => Self::NoEntry, |
|||
ErrorCode::NoLock => Self::NoLock, |
|||
ErrorCode::InsufficientMemory => Self::InsufficientMemory, |
|||
ErrorCode::InsufficientSpace => Self::InsufficientSpace, |
|||
ErrorCode::NotDirectory => Self::NotDirectory, |
|||
ErrorCode::NotEmpty => Self::NotEmpty, |
|||
ErrorCode::NotRecoverable => Self::NotRecoverable, |
|||
ErrorCode::Unsupported => Self::Unsupported, |
|||
ErrorCode::NoTty => Self::NoTty, |
|||
ErrorCode::NoSuchDevice => Self::NoSuchDevice, |
|||
ErrorCode::Overflow => Self::Overflow, |
|||
ErrorCode::NotPermitted => Self::NotPermitted, |
|||
ErrorCode::Pipe => Self::Pipe, |
|||
ErrorCode::ReadOnly => Self::ReadOnly, |
|||
ErrorCode::InvalidSeek => Self::InvalidSeek, |
|||
ErrorCode::TextFileBusy => Self::TextFileBusy, |
|||
ErrorCode::CrossDevice => Self::CrossDevice, |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::Error> for sync_filesystem::Error { |
|||
fn from(other: async_filesystem::Error) -> Self { |
|||
match other.downcast() { |
|||
Ok(errorcode) => Self::from(sync_filesystem::ErrorCode::from(errorcode)), |
|||
Err(other) => Self::trap(other), |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<sync_filesystem::Advice> for async_filesystem::Advice { |
|||
fn from(other: sync_filesystem::Advice) -> Self { |
|||
use sync_filesystem::Advice; |
|||
match other { |
|||
Advice::Normal => Self::Normal, |
|||
Advice::Sequential => Self::Sequential, |
|||
Advice::Random => Self::Random, |
|||
Advice::WillNeed => Self::WillNeed, |
|||
Advice::DontNeed => Self::DontNeed, |
|||
Advice::NoReuse => Self::NoReuse, |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::DescriptorFlags> for sync_filesystem::DescriptorFlags { |
|||
fn from(other: async_filesystem::DescriptorFlags) -> Self { |
|||
let mut out = Self::empty(); |
|||
if other.contains(async_filesystem::DescriptorFlags::READ) { |
|||
out |= Self::READ; |
|||
} |
|||
if other.contains(async_filesystem::DescriptorFlags::WRITE) { |
|||
out |= Self::WRITE; |
|||
} |
|||
if other.contains(async_filesystem::DescriptorFlags::FILE_INTEGRITY_SYNC) { |
|||
out |= Self::FILE_INTEGRITY_SYNC; |
|||
} |
|||
if other.contains(async_filesystem::DescriptorFlags::DATA_INTEGRITY_SYNC) { |
|||
out |= Self::DATA_INTEGRITY_SYNC; |
|||
} |
|||
if other.contains(async_filesystem::DescriptorFlags::REQUESTED_WRITE_SYNC) { |
|||
out |= Self::REQUESTED_WRITE_SYNC; |
|||
} |
|||
if other.contains(async_filesystem::DescriptorFlags::MUTATE_DIRECTORY) { |
|||
out |= Self::MUTATE_DIRECTORY; |
|||
} |
|||
out |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::DescriptorType> for sync_filesystem::DescriptorType { |
|||
fn from(other: async_filesystem::DescriptorType) -> Self { |
|||
use async_filesystem::DescriptorType; |
|||
match other { |
|||
DescriptorType::RegularFile => Self::RegularFile, |
|||
DescriptorType::Directory => Self::Directory, |
|||
DescriptorType::BlockDevice => Self::BlockDevice, |
|||
DescriptorType::CharacterDevice => Self::CharacterDevice, |
|||
DescriptorType::Fifo => Self::Fifo, |
|||
DescriptorType::Socket => Self::Socket, |
|||
DescriptorType::SymbolicLink => Self::SymbolicLink, |
|||
DescriptorType::Unknown => Self::Unknown, |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::DirectoryEntry> for sync_filesystem::DirectoryEntry { |
|||
fn from(other: async_filesystem::DirectoryEntry) -> Self { |
|||
Self { |
|||
inode: other.inode, |
|||
type_: other.type_.into(), |
|||
name: other.name, |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<async_filesystem::DescriptorStat> for sync_filesystem::DescriptorStat { |
|||
fn from(other: async_filesystem::DescriptorStat) -> Self { |
|||
Self { |
|||
device: other.device, |
|||
inode: other.inode, |
|||
type_: other.type_.into(), |
|||
link_count: other.link_count, |
|||
size: other.size, |
|||
data_access_timestamp: other.data_access_timestamp, |
|||
data_modification_timestamp: other.data_modification_timestamp, |
|||
status_change_timestamp: other.status_change_timestamp, |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<sync_filesystem::PathFlags> for async_filesystem::PathFlags { |
|||
fn from(other: sync_filesystem::PathFlags) -> Self { |
|||
let mut out = Self::empty(); |
|||
if other.contains(sync_filesystem::PathFlags::SYMLINK_FOLLOW) { |
|||
out |= Self::SYMLINK_FOLLOW; |
|||
} |
|||
out |
|||
} |
|||
} |
|||
|
|||
impl From<sync_filesystem::NewTimestamp> for async_filesystem::NewTimestamp { |
|||
fn from(other: sync_filesystem::NewTimestamp) -> Self { |
|||
use sync_filesystem::NewTimestamp; |
|||
match other { |
|||
NewTimestamp::NoChange => Self::NoChange, |
|||
NewTimestamp::Now => Self::Now, |
|||
NewTimestamp::Timestamp(datetime) => Self::Timestamp(datetime), |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl From<sync_filesystem::OpenFlags> for async_filesystem::OpenFlags { |
|||
fn from(other: sync_filesystem::OpenFlags) -> Self { |
|||
let mut out = Self::empty(); |
|||
if other.contains(sync_filesystem::OpenFlags::CREATE) { |
|||
out |= Self::CREATE; |
|||
} |
|||
if other.contains(sync_filesystem::OpenFlags::DIRECTORY) { |
|||
out |= Self::DIRECTORY; |
|||
} |
|||
if other.contains(sync_filesystem::OpenFlags::EXCLUSIVE) { |
|||
out |= Self::EXCLUSIVE; |
|||
} |
|||
if other.contains(sync_filesystem::OpenFlags::TRUNCATE) { |
|||
out |= Self::TRUNCATE; |
|||
} |
|||
out |
|||
} |
|||
} |
|||
impl From<sync_filesystem::DescriptorFlags> for async_filesystem::DescriptorFlags { |
|||
fn from(other: sync_filesystem::DescriptorFlags) -> Self { |
|||
let mut out = Self::empty(); |
|||
if other.contains(sync_filesystem::DescriptorFlags::READ) { |
|||
out |= Self::READ; |
|||
} |
|||
if other.contains(sync_filesystem::DescriptorFlags::WRITE) { |
|||
out |= Self::WRITE; |
|||
} |
|||
if other.contains(sync_filesystem::DescriptorFlags::FILE_INTEGRITY_SYNC) { |
|||
out |= Self::FILE_INTEGRITY_SYNC; |
|||
} |
|||
if other.contains(sync_filesystem::DescriptorFlags::DATA_INTEGRITY_SYNC) { |
|||
out |= Self::DATA_INTEGRITY_SYNC; |
|||
} |
|||
if other.contains(sync_filesystem::DescriptorFlags::REQUESTED_WRITE_SYNC) { |
|||
out |= Self::REQUESTED_WRITE_SYNC; |
|||
} |
|||
if other.contains(sync_filesystem::DescriptorFlags::MUTATE_DIRECTORY) { |
|||
out |= Self::MUTATE_DIRECTORY; |
|||
} |
|||
out |
|||
} |
|||
} |
|||
impl From<sync_filesystem::Modes> for async_filesystem::Modes { |
|||
fn from(other: sync_filesystem::Modes) -> Self { |
|||
let mut out = Self::empty(); |
|||
if other.contains(sync_filesystem::Modes::READABLE) { |
|||
out |= Self::READABLE; |
|||
} |
|||
if other.contains(sync_filesystem::Modes::WRITABLE) { |
|||
out |= Self::WRITABLE; |
|||
} |
|||
if other.contains(sync_filesystem::Modes::EXECUTABLE) { |
|||
out |= Self::EXECUTABLE; |
|||
} |
|||
out |
|||
} |
|||
} |
|||
impl From<sync_filesystem::AccessType> for async_filesystem::AccessType { |
|||
fn from(other: sync_filesystem::AccessType) -> Self { |
|||
use sync_filesystem::AccessType; |
|||
match other { |
|||
AccessType::Access(modes) => Self::Access(modes.into()), |
|||
AccessType::Exists => Self::Exists, |
|||
} |
|||
} |
|||
} |
@ -1,83 +0,0 @@ |
|||
use crate::preview2::{ |
|||
stream::TableStreamExt, |
|||
wasi::clocks::monotonic_clock::Instant, |
|||
wasi::io::streams::{InputStream, OutputStream}, |
|||
wasi::poll::poll::{self, Pollable}, |
|||
WasiView, |
|||
}; |
|||
|
|||
/// A pollable resource table entry.
|
|||
#[derive(Copy, Clone)] |
|||
pub(crate) enum PollableEntry { |
|||
/// Poll for read events.
|
|||
Read(InputStream), |
|||
/// Poll for write events.
|
|||
Write(OutputStream), |
|||
/// Poll for a monotonic-clock timer.
|
|||
MonotonicClock(Instant, bool), |
|||
/* FIXME: need to rebuild the poll interface to let pollables be created in different crates.
|
|||
/// Poll for a tcp-socket.
|
|||
TcpSocket(TcpSocket), |
|||
*/ |
|||
} |
|||
|
|||
// Implementatations of the interface. The bodies had been pulled out into
|
|||
// functions above to allow them to be shared between the two worlds, which
|
|||
// used to require different traits . Features have been added to facilitate
|
|||
// sharing between worlds, but I want to avoid the huge whitespace diff on
|
|||
// this PR.
|
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T: WasiView> poll::Host for T { |
|||
async fn drop_pollable(&mut self, pollable: Pollable) -> anyhow::Result<()> { |
|||
self.table_mut().delete::<PollableEntry>(pollable)?; |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn poll_oneoff(&mut self, futures: Vec<Pollable>) -> anyhow::Result<Vec<bool>> { |
|||
use crate::preview2::sched::{sync::SyncSched, Poll, Userdata, WasiSched}; |
|||
|
|||
// Convert `futures` into `Poll` subscriptions.
|
|||
let mut poll = Poll::new(); |
|||
let len = futures.len(); |
|||
for (index, future) in futures.into_iter().enumerate() { |
|||
let userdata = Userdata::from(index as u64); |
|||
|
|||
match *self.table().get(future)? { |
|||
PollableEntry::Read(stream) => { |
|||
let wasi_stream: &dyn crate::preview2::InputStream = |
|||
self.table().get_input_stream(stream)?; |
|||
poll.subscribe_read(wasi_stream, userdata); |
|||
} |
|||
PollableEntry::Write(stream) => { |
|||
let wasi_stream: &dyn crate::preview2::OutputStream = |
|||
self.table().get_output_stream(stream)?; |
|||
poll.subscribe_write(wasi_stream, userdata); |
|||
} |
|||
PollableEntry::MonotonicClock(when, absolute) => { |
|||
poll.subscribe_monotonic_clock( |
|||
&*self.ctx().monotonic_clock, |
|||
when, |
|||
absolute, |
|||
userdata, |
|||
); |
|||
} /* |
|||
PollableEntry::TcpSocket(tcp_socket) => { |
|||
let wasi_tcp_socket: &dyn crate::WasiTcpSocket = |
|||
self.table().get_tcp_socket(tcp_socket)?; |
|||
poll.subscribe_tcp_socket(wasi_tcp_socket, userdata); |
|||
} |
|||
*/ |
|||
} |
|||
} |
|||
|
|||
// Do the poll.
|
|||
SyncSched.poll_oneoff(&mut poll).await?; |
|||
|
|||
let mut results = vec![false; len]; |
|||
for (_result, data) in poll.results() { |
|||
results[u64::from(data) as usize] = true; |
|||
} |
|||
Ok(results) |
|||
} |
|||
} |
@ -1,110 +0,0 @@ |
|||
#![allow(dead_code)] |
|||
use crate::preview2::{ |
|||
clocks::HostMonotonicClock, |
|||
stream::{InputStream, OutputStream}, |
|||
}; |
|||
use anyhow::Error; |
|||
pub(crate) mod subscription; |
|||
pub(crate) mod sync; |
|||
pub use cap_std::time::Duration; |
|||
|
|||
pub(crate) use subscription::{ |
|||
MonotonicClockSubscription, RwSubscription, Subscription, SubscriptionResult, |
|||
}; |
|||
|
|||
#[async_trait::async_trait] |
|||
pub(crate) trait WasiSched: Send + Sync { |
|||
async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error>; |
|||
async fn sched_yield(&self) -> Result<(), Error>; |
|||
async fn sleep(&self, duration: Duration) -> Result<(), Error>; |
|||
} |
|||
|
|||
#[derive(Debug, Copy, Clone, PartialEq, Eq)] |
|||
pub(crate) struct Userdata(u64); |
|||
impl From<u64> for Userdata { |
|||
fn from(u: u64) -> Userdata { |
|||
Userdata(u) |
|||
} |
|||
} |
|||
|
|||
impl From<Userdata> for u64 { |
|||
fn from(u: Userdata) -> u64 { |
|||
u.0 |
|||
} |
|||
} |
|||
|
|||
pub(crate) struct Poll<'a> { |
|||
subs: Vec<(Subscription<'a>, Userdata)>, |
|||
} |
|||
|
|||
impl<'a> Poll<'a> { |
|||
pub fn new() -> Self { |
|||
Self { subs: Vec::new() } |
|||
} |
|||
pub fn subscribe_monotonic_clock( |
|||
&mut self, |
|||
clock: &'a dyn HostMonotonicClock, |
|||
deadline: u64, |
|||
absolute: bool, |
|||
ud: Userdata, |
|||
) { |
|||
let absolute_deadline: u64 = if absolute { |
|||
deadline |
|||
} else { |
|||
// Convert a relative deadline to an absolute one. Use a saturating
|
|||
// add because there are no meaningful timeouts after the monotonic
|
|||
// clock overflows.
|
|||
clock.now().saturating_add(deadline) |
|||
}; |
|||
self.subs.push(( |
|||
Subscription::MonotonicClock(MonotonicClockSubscription { |
|||
clock, |
|||
absolute_deadline, |
|||
}), |
|||
ud, |
|||
)); |
|||
} |
|||
pub fn subscribe_read(&mut self, stream: &'a dyn InputStream, ud: Userdata) { |
|||
self.subs.push(( |
|||
Subscription::ReadWrite(RwSubscription::new_input(stream)), |
|||
ud, |
|||
)); |
|||
} |
|||
pub fn subscribe_write(&mut self, stream: &'a dyn OutputStream, ud: Userdata) { |
|||
self.subs.push(( |
|||
Subscription::ReadWrite(RwSubscription::new_output(stream)), |
|||
ud, |
|||
)); |
|||
} |
|||
/* FIXME need to redo poll interface to support pollables defined in other crates
|
|||
pub fn subscribe_tcp_socket(&mut self, tcp_socket: &'a dyn WasiTcpSocket, ud: Userdata) { |
|||
self.subs.push(( |
|||
Subscription::ReadWrite(RwSubscription::new_tcp_socket(tcp_socket)), |
|||
ud, |
|||
)); |
|||
} |
|||
*/ |
|||
pub fn results(self) -> impl Iterator<Item = (SubscriptionResult, Userdata)> + 'a { |
|||
self.subs |
|||
.into_iter() |
|||
.filter_map(|(s, ud)| SubscriptionResult::from_subscription(s).map(|r| (r, ud))) |
|||
} |
|||
pub fn is_empty(&self) -> bool { |
|||
self.subs.is_empty() |
|||
} |
|||
pub fn earliest_clock_deadline(&self) -> Option<&MonotonicClockSubscription<'a>> { |
|||
self.subs |
|||
.iter() |
|||
.filter_map(|(s, _ud)| match s { |
|||
Subscription::MonotonicClock(t) => Some(t), |
|||
_ => None, |
|||
}) |
|||
.min_by(|a, b| a.absolute_deadline.cmp(&b.absolute_deadline)) |
|||
} |
|||
pub fn rw_subscriptions<'b>(&'b mut self) -> impl Iterator<Item = &'b mut RwSubscription<'a>> { |
|||
self.subs.iter_mut().filter_map(|sub| match &mut sub.0 { |
|||
Subscription::ReadWrite(rwsub) => Some(rwsub), |
|||
_ => None, |
|||
}) |
|||
} |
|||
} |
@ -1,105 +0,0 @@ |
|||
use crate::preview2::{ |
|||
clocks::HostMonotonicClock, |
|||
stream::{InputStream, OutputStream}, |
|||
}; |
|||
use anyhow::Error; |
|||
use bitflags::bitflags; |
|||
|
|||
bitflags! { |
|||
#[derive(Copy, Clone, Debug, PartialEq, Eq)] |
|||
pub struct RwEventFlags: u32 { |
|||
const HANGUP = 0b1; |
|||
} |
|||
} |
|||
|
|||
pub enum RwStream<'a> { |
|||
// fixme: rename?
|
|||
Read(&'a dyn InputStream), |
|||
Write(&'a dyn OutputStream), |
|||
/* |
|||
TcpSocket(&'a dyn WasiTcpSocket), |
|||
*/ |
|||
} |
|||
|
|||
pub struct RwSubscription<'a> { |
|||
pub stream: RwStream<'a>, |
|||
status: Option<Result<RwEventFlags, Error>>, |
|||
} |
|||
|
|||
impl<'a> RwSubscription<'a> { |
|||
pub fn new_input(stream: &'a dyn InputStream) -> Self { |
|||
Self { |
|||
stream: RwStream::Read(stream), |
|||
status: None, |
|||
} |
|||
} |
|||
pub fn new_output(stream: &'a dyn OutputStream) -> Self { |
|||
Self { |
|||
stream: RwStream::Write(stream), |
|||
status: None, |
|||
} |
|||
} |
|||
/* |
|||
pub fn new_tcp_socket(tcp_socket: &'a dyn WasiTcpSocket) -> Self { |
|||
Self { |
|||
stream: RwStream::TcpSocket(tcp_socket), |
|||
status: None, |
|||
} |
|||
} |
|||
*/ |
|||
pub fn complete(&mut self, flags: RwEventFlags) { |
|||
self.status = Some(Ok(flags)) |
|||
} |
|||
pub fn error(&mut self, error: Error) { |
|||
self.status = Some(Err(error)) |
|||
} |
|||
pub fn result(&mut self) -> Option<Result<RwEventFlags, Error>> { |
|||
self.status.take() |
|||
} |
|||
pub fn is_complete(&self) -> bool { |
|||
self.status.is_some() |
|||
} |
|||
} |
|||
|
|||
pub struct MonotonicClockSubscription<'a> { |
|||
pub clock: &'a dyn HostMonotonicClock, |
|||
pub absolute_deadline: u64, |
|||
} |
|||
|
|||
impl<'a> MonotonicClockSubscription<'a> { |
|||
pub fn now(&self) -> u64 { |
|||
self.clock.now() |
|||
} |
|||
pub fn duration_until(&self) -> Option<u64> { |
|||
self.absolute_deadline.checked_sub(self.now()) |
|||
} |
|||
pub fn result(&self) -> Option<Result<(), Error>> { |
|||
if self.now() >= self.absolute_deadline { |
|||
Some(Ok(())) |
|||
} else { |
|||
None |
|||
} |
|||
} |
|||
} |
|||
|
|||
pub enum Subscription<'a> { |
|||
ReadWrite(RwSubscription<'a>), |
|||
MonotonicClock(MonotonicClockSubscription<'a>), |
|||
} |
|||
|
|||
#[derive(Debug)] |
|||
pub enum SubscriptionResult { |
|||
ReadWrite(Result<RwEventFlags, Error>), |
|||
MonotonicClock(Result<(), Error>), |
|||
} |
|||
|
|||
impl SubscriptionResult { |
|||
pub fn from_subscription(s: Subscription) -> Option<SubscriptionResult> { |
|||
match s { |
|||
Subscription::ReadWrite(mut s) => { |
|||
s.result().map(|sub| SubscriptionResult::ReadWrite(sub)) |
|||
} |
|||
Subscription::MonotonicClock(s) => s.result().map(SubscriptionResult::MonotonicClock), |
|||
} |
|||
} |
|||
} |
@ -1,156 +0,0 @@ |
|||
use crate::preview2::sched::{ |
|||
subscription::{RwEventFlags, RwStream}, |
|||
Poll, WasiSched, |
|||
}; |
|||
use rustix::event::{PollFd, PollFlags}; |
|||
use std::thread; |
|||
use std::time::Duration; |
|||
|
|||
use anyhow::Error; |
|||
|
|||
pub(crate) async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> { |
|||
// Collect all stream I/O subscriptions. Clock subscriptions are handled
|
|||
// separately below.
|
|||
let mut ready = false; |
|||
let mut pollfds = Vec::new(); |
|||
for rwsub in poll.rw_subscriptions() { |
|||
match rwsub.stream { |
|||
RwStream::Read(stream) => { |
|||
// Poll things that can be polled.
|
|||
if let Some(fd) = stream.pollable_read() { |
|||
#[cfg(unix)] |
|||
{ |
|||
pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::IN)); |
|||
continue; |
|||
} |
|||
|
|||
#[cfg(windows)] |
|||
{ |
|||
if let Some(fd) = fd.as_socket() { |
|||
pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::IN)); |
|||
continue; |
|||
} |
|||
} |
|||
} |
|||
|
|||
// Allow in-memory buffers or other immediately-available
|
|||
// sources to complete successfully.
|
|||
if let Ok(nbytes) = stream.num_ready_bytes().await { |
|||
if nbytes != 0 { |
|||
rwsub.complete(RwEventFlags::empty()); |
|||
ready = true; |
|||
continue; |
|||
} |
|||
} |
|||
|
|||
return Err(anyhow::anyhow!("stream is not pollable for reading")); |
|||
} |
|||
|
|||
RwStream::Write(stream) => { |
|||
let fd = stream |
|||
.pollable_write() |
|||
.ok_or_else(|| anyhow::anyhow!("stream is not pollable for writing"))?; |
|||
|
|||
#[cfg(unix)] |
|||
{ |
|||
pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::OUT)); |
|||
} |
|||
|
|||
#[cfg(windows)] |
|||
{ |
|||
if let Some(fd) = fd.as_socket() { |
|||
pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::OUT)); |
|||
} else { |
|||
return Err(anyhow::anyhow!( |
|||
"unimplemented: polling for writing to non-OS resources" |
|||
)); |
|||
} |
|||
} |
|||
} /* FIXME redesign of sched to make it possible to define pollables out of crate
|
|||
RwStream::TcpSocket(tcp_socket) => { |
|||
let fd = tcp_socket.pollable(); |
|||
pollfds.push(PollFd::from_borrowed_fd(fd, PollFlags::IN | PollFlags::PRI)); |
|||
} |
|||
*/ |
|||
} |
|||
} |
|||
|
|||
// If we didn't have any streams that are immediately available, do an OS
|
|||
// `poll` to wait for streams to become available.
|
|||
if !ready { |
|||
loop { |
|||
let poll_timeout = if let Some(t) = poll.earliest_clock_deadline() { |
|||
// Convert the timeout to milliseconds for `poll`, rounding up.
|
|||
//
|
|||
// TODO: On Linux and FreeBSD, we could use `ppoll` instead
|
|||
// which takes a `timespec.`
|
|||
((t.absolute_deadline.saturating_sub(t.clock.now()) + 999_999) / 1_000_000) |
|||
.try_into() |
|||
.map_err(|_| anyhow::anyhow!("overflow: poll timeout"))? |
|||
} else { |
|||
// A negative value requests an infinite timeout.
|
|||
-1 |
|||
}; |
|||
tracing::debug!( |
|||
poll_timeout = tracing::field::debug(poll_timeout), |
|||
poll_fds = tracing::field::debug(&pollfds), |
|||
"poll" |
|||
); |
|||
match rustix::event::poll(&mut pollfds, poll_timeout) { |
|||
Ok(_num_ready) => { |
|||
ready = true; |
|||
break; |
|||
} |
|||
Err(rustix::io::Errno::INTR) => continue, |
|||
Err(err) => return Err(std::io::Error::from(err).into()), |
|||
} |
|||
} |
|||
|
|||
assert_eq!(poll.rw_subscriptions().count(), pollfds.len()); |
|||
|
|||
// If the OS `poll` returned events, record them.
|
|||
if ready { |
|||
// Iterate through the stream subscriptions, skipping those that
|
|||
// were already completed due to being immediately available.
|
|||
for (rwsub, pollfd) in poll.rw_subscriptions().zip(pollfds.into_iter()) { |
|||
let revents = pollfd.revents(); |
|||
if revents.contains(PollFlags::NVAL) { |
|||
rwsub.error(anyhow::anyhow!("rw subscription badf")); |
|||
} else if revents.contains(PollFlags::ERR) { |
|||
rwsub.error(anyhow::anyhow!("rw subscription io error")); |
|||
} else if revents.contains(PollFlags::HUP) { |
|||
rwsub.complete(RwEventFlags::HANGUP); |
|||
} else { |
|||
rwsub.complete(RwEventFlags::empty()); |
|||
}; |
|||
} |
|||
} |
|||
}; |
|||
|
|||
// If we had no immediately-available events and no events becoming
|
|||
// available in a `poll`, it means we timed out. Report that event.
|
|||
if !ready { |
|||
poll.earliest_clock_deadline() |
|||
.expect("timed out") |
|||
.result() |
|||
.expect("timer deadline is past") |
|||
.unwrap() |
|||
} |
|||
|
|||
Ok(()) |
|||
} |
|||
pub(crate) struct SyncSched; |
|||
#[async_trait::async_trait] |
|||
impl WasiSched for SyncSched { |
|||
async fn poll_oneoff<'a>(&self, poll: &mut Poll<'a>) -> Result<(), Error> { |
|||
poll_oneoff(poll).await |
|||
} |
|||
async fn sched_yield(&self) -> Result<(), Error> { |
|||
thread::yield_now(); |
|||
Ok(()) |
|||
} |
|||
async fn sleep(&self, duration: Duration) -> Result<(), Error> { |
|||
std::thread::sleep(duration); |
|||
Ok(()) |
|||
} |
|||
} |
@ -0,0 +1,130 @@ |
|||
use crate::preview2::{pipe::AsyncReadStream, HostInputStream, StreamState}; |
|||
use anyhow::Error; |
|||
use bytes::Bytes; |
|||
use futures::ready; |
|||
use std::future::Future; |
|||
use std::io::{self, Read}; |
|||
use std::pin::Pin; |
|||
use std::task::{Context, Poll}; |
|||
use tokio::io::unix::AsyncFd; |
|||
use tokio::io::{AsyncRead, ReadBuf}; |
|||
|
|||
// wasmtime cant use std::sync::OnceLock yet because of a llvm regression in
|
|||
// 1.70. when 1.71 is released, we can switch to using std here.
|
|||
use once_cell::sync::OnceCell as OnceLock; |
|||
|
|||
use std::sync::Mutex; |
|||
|
|||
// We need a single global instance of the AsyncFd<Stdin> because creating
|
|||
// this instance registers the process's stdin fd with epoll, which will
|
|||
// return an error if an fd is registered more than once.
|
|||
struct GlobalStdin(Mutex<AsyncReadStream>); |
|||
static STDIN: OnceLock<GlobalStdin> = OnceLock::new(); |
|||
|
|||
impl GlobalStdin { |
|||
fn new() -> anyhow::Result<Self> { |
|||
Ok(Self(Mutex::new(AsyncReadStream::new(InnerStdin::new()?)))) |
|||
} |
|||
fn read(&self, size: usize) -> Result<(Bytes, StreamState), Error> { |
|||
HostInputStream::read(&mut *self.0.lock().unwrap(), size) |
|||
} |
|||
fn ready<'a>(&'a self) -> impl Future<Output = Result<(), Error>> + 'a { |
|||
// Custom Future impl takes the std mutex in each invocation of poll.
|
|||
// Required so we don't have to use a tokio mutex, which we can't take from
|
|||
// inside a sync context in Self::read.
|
|||
//
|
|||
// Taking the lock, creating a fresh ready() future, polling it once, and
|
|||
// then releasing the lock is acceptable here because the ready() future
|
|||
// is only ever going to await on a single channel recv, plus some management
|
|||
// of a state machine (for buffering).
|
|||
struct Ready<'a>(&'a GlobalStdin); |
|||
impl<'a> Future for Ready<'a> { |
|||
type Output = Result<(), Error>; |
|||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { |
|||
let mut locked = self.as_mut().0 .0.lock().unwrap(); |
|||
let fut = locked.ready(); |
|||
tokio::pin!(fut); |
|||
fut.poll(cx) |
|||
} |
|||
} |
|||
Ready(self) |
|||
} |
|||
} |
|||
|
|||
pub struct Stdin; |
|||
impl Stdin { |
|||
fn get_global() -> &'static GlobalStdin { |
|||
// Creation must be running in a tokio context to succeed.
|
|||
match tokio::runtime::Handle::try_current() { |
|||
Ok(_) => STDIN.get_or_init(|| { |
|||
GlobalStdin::new().expect("creating AsyncFd for stdin in existing tokio context") |
|||
}), |
|||
Err(_) => STDIN.get_or_init(|| { |
|||
crate::preview2::in_tokio(async { |
|||
GlobalStdin::new() |
|||
.expect("creating AsyncFd for stdin in internal tokio context") |
|||
}) |
|||
}), |
|||
} |
|||
} |
|||
} |
|||
|
|||
pub fn stdin() -> Stdin { |
|||
Stdin |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl crate::preview2::HostInputStream for Stdin { |
|||
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { |
|||
Self::get_global().read(size) |
|||
} |
|||
|
|||
async fn ready(&mut self) -> Result<(), Error> { |
|||
Self::get_global().ready().await |
|||
} |
|||
} |
|||
|
|||
struct InnerStdin { |
|||
inner: AsyncFd<std::io::Stdin>, |
|||
} |
|||
|
|||
impl InnerStdin { |
|||
pub fn new() -> anyhow::Result<Self> { |
|||
use rustix::fs::OFlags; |
|||
use std::os::fd::AsRawFd; |
|||
|
|||
let stdin = std::io::stdin(); |
|||
|
|||
let borrowed_fd = unsafe { rustix::fd::BorrowedFd::borrow_raw(stdin.as_raw_fd()) }; |
|||
let flags = rustix::fs::fcntl_getfl(borrowed_fd)?; |
|||
if !flags.contains(OFlags::NONBLOCK) { |
|||
rustix::fs::fcntl_setfl(borrowed_fd, flags.difference(OFlags::NONBLOCK))?; |
|||
} |
|||
|
|||
Ok(Self { |
|||
inner: AsyncFd::new(std::io::stdin())?, |
|||
}) |
|||
} |
|||
} |
|||
|
|||
impl AsyncRead for InnerStdin { |
|||
fn poll_read( |
|||
mut self: Pin<&mut Self>, |
|||
cx: &mut Context<'_>, |
|||
buf: &mut ReadBuf<'_>, |
|||
) -> Poll<io::Result<()>> { |
|||
loop { |
|||
let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?; |
|||
|
|||
let unfilled = buf.initialize_unfilled(); |
|||
match guard.try_io(|inner| inner.get_mut().read(unfilled)) { |
|||
Ok(Ok(len)) => { |
|||
buf.advance(len); |
|||
return Poll::Ready(Ok(())); |
|||
} |
|||
Ok(Err(err)) => return Poll::Ready(Err(err)), |
|||
Err(_would_block) => continue, |
|||
} |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,117 @@ |
|||
use crate::preview2::{HostInputStream, StreamState}; |
|||
use anyhow::{Context, Error}; |
|||
use bytes::Bytes; |
|||
use tokio::sync::{mpsc, oneshot}; |
|||
|
|||
// wasmtime cant use std::sync::OnceLock yet because of a llvm regression in
|
|||
// 1.70. when 1.71 is released, we can switch to using std here.
|
|||
use once_cell::sync::OnceCell as OnceLock; |
|||
|
|||
use std::sync::Mutex; |
|||
|
|||
// We need a single global instance of the AsyncFd<Stdin> because creating
|
|||
// this instance registers the process's stdin fd with epoll, which will
|
|||
// return an error if an fd is registered more than once.
|
|||
struct GlobalStdin { |
|||
tx: mpsc::Sender<oneshot::Sender<anyhow::Result<()>>>, |
|||
// FIXME use a Watch to check for readiness instead of sending a oneshot sender
|
|||
} |
|||
static STDIN: OnceLock<Mutex<GlobalStdin>> = OnceLock::new(); |
|||
|
|||
fn create() -> Mutex<GlobalStdin> { |
|||
let (tx, mut rx) = mpsc::channel::<oneshot::Sender<anyhow::Result<()>>>(1); |
|||
std::thread::spawn(move || { |
|||
use std::io::BufRead; |
|||
// A client is interested in stdin's readiness.
|
|||
// Don't care about the None case - the GlobalStdin sender on the other
|
|||
// end of this pipe will live forever, because it lives inside the OnceLock.
|
|||
while let Some(msg) = rx.blocking_recv() { |
|||
// Fill buf - can we skip this if its
|
|||
// already filled?
|
|||
// also, this could block forever and the
|
|||
// client could give up. in that case,
|
|||
// another client may want to start waiting
|
|||
let r = std::io::stdin() |
|||
.lock() |
|||
.fill_buf() |
|||
.map(|_| ()) |
|||
.map_err(anyhow::Error::from); |
|||
// tell the client stdin is ready for reading.
|
|||
// don't care if the client happens to have died.
|
|||
let _ = msg.send(r); |
|||
} |
|||
}); |
|||
|
|||
Mutex::new(GlobalStdin { tx }) |
|||
} |
|||
|
|||
pub struct Stdin; |
|||
impl Stdin { |
|||
fn get_global() -> &'static Mutex<GlobalStdin> { |
|||
STDIN.get_or_init(|| create()) |
|||
} |
|||
} |
|||
|
|||
pub fn stdin() -> Stdin { |
|||
// This implementation still needs to be fixed, and we need better test coverage.
|
|||
// We are deferring that work to a future PR.
|
|||
// https://github.com/bytecodealliance/wasmtime/pull/6556#issuecomment-1646232646
|
|||
panic!("worker-thread based stdin is not yet implemented"); |
|||
// Stdin
|
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl HostInputStream for Stdin { |
|||
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { |
|||
use std::io::Read; |
|||
let mut buf = vec![0; size]; |
|||
// FIXME: this is actually blocking. This whole implementation is likely bogus as a result
|
|||
let nbytes = std::io::stdin().read(&mut buf)?; |
|||
buf.truncate(nbytes); |
|||
Ok(( |
|||
buf.into(), |
|||
if nbytes > 0 { |
|||
StreamState::Open |
|||
} else { |
|||
StreamState::Closed |
|||
}, |
|||
)) |
|||
} |
|||
|
|||
async fn ready(&mut self) -> Result<(), Error> { |
|||
use mpsc::error::TrySendError; |
|||
use std::future::Future; |
|||
use std::pin::Pin; |
|||
use std::task::{Context, Poll}; |
|||
|
|||
// Custom Future impl takes the std mutex in each invocation of poll.
|
|||
// Required so we don't have to use a tokio mutex, which we can't take from
|
|||
// inside a sync context in Self::read.
|
|||
//
|
|||
// Take the lock, attempt to
|
|||
struct Send(Option<oneshot::Sender<anyhow::Result<()>>>); |
|||
impl Future for Send { |
|||
type Output = anyhow::Result<()>; |
|||
fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> { |
|||
let locked = Stdin::get_global().lock().unwrap(); |
|||
let to_send = self.as_mut().0.take().expect("to_send should be some"); |
|||
match locked.tx.try_send(to_send) { |
|||
Ok(()) => Poll::Ready(Ok(())), |
|||
Err(TrySendError::Full(to_send)) => { |
|||
self.as_mut().0.replace(to_send); |
|||
Poll::Pending |
|||
} |
|||
Err(TrySendError::Closed(_)) => { |
|||
Poll::Ready(Err(anyhow::anyhow!("channel to GlobalStdin closed"))) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
let (result_tx, rx) = oneshot::channel::<anyhow::Result<()>>(); |
|||
Box::pin(Send(Some(result_tx))) |
|||
.await |
|||
.context("sending message to worker thread")?; |
|||
rx.await.expect("channel is always alive") |
|||
} |
|||
} |
@ -1,210 +1,302 @@ |
|||
use crate::preview2::filesystem::{FileInputStream, FileOutputStream}; |
|||
use crate::preview2::{Table, TableError}; |
|||
use anyhow::Error; |
|||
use std::any::Any; |
|||
use bytes::Bytes; |
|||
|
|||
/// An input bytestream.
|
|||
///
|
|||
/// This is "pseudo" because the real streams will be a type in wit, and
|
|||
/// built into the wit bindings, and will support async and type parameters.
|
|||
/// This pseudo-stream abstraction is synchronous and only supports bytes.
|
|||
#[async_trait::async_trait] |
|||
pub trait InputStream: Send + Sync { |
|||
fn as_any(&self) -> &dyn Any; |
|||
#[derive(Clone, Copy, Debug, PartialEq)] |
|||
pub enum StreamState { |
|||
Open, |
|||
Closed, |
|||
} |
|||
|
|||
/// If this stream is reading from a host file descriptor, return it so
|
|||
/// that it can be polled with a host poll.
|
|||
#[cfg(unix)] |
|||
fn pollable_read(&self) -> Option<rustix::fd::BorrowedFd> { |
|||
None |
|||
impl StreamState { |
|||
pub fn is_closed(&self) -> bool { |
|||
*self == Self::Closed |
|||
} |
|||
} |
|||
|
|||
/// If this stream is reading from a host file descriptor, return it so
|
|||
/// that it can be polled with a host poll.
|
|||
#[cfg(windows)] |
|||
fn pollable_read(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> { |
|||
None |
|||
} |
|||
/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
|
|||
/// bytestream which can be read from.
|
|||
#[async_trait::async_trait] |
|||
pub trait HostInputStream: Send + Sync { |
|||
/// Read bytes. On success, returns a pair holding the number of bytes
|
|||
/// read and a flag indicating whether the end of the stream was reached.
|
|||
/// Important: this read must be non-blocking!
|
|||
fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error>; |
|||
|
|||
/// Read bytes. On success, returns a pair holding the number of bytes read
|
|||
/// and a flag indicating whether the end of the stream was reached.
|
|||
async fn read(&mut self, _buf: &mut [u8]) -> Result<(u64, bool), Error> { |
|||
Err(anyhow::anyhow!("badf")) |
|||
} |
|||
/// Read bytes from a stream and discard them. Important: this method must
|
|||
/// be non-blocking!
|
|||
fn skip(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> { |
|||
let mut nread = 0; |
|||
let mut state = StreamState::Open; |
|||
|
|||
/// Vectored-I/O form of `read`.
|
|||
async fn read_vectored<'a>( |
|||
&mut self, |
|||
_bufs: &mut [std::io::IoSliceMut<'a>], |
|||
) -> Result<(u64, bool), Error> { |
|||
Err(anyhow::anyhow!("badf")) |
|||
} |
|||
let (bs, read_state) = self.read(nelem)?; |
|||
// TODO: handle the case where `bs.len()` is less than `nelem`
|
|||
nread += bs.len(); |
|||
if read_state.is_closed() { |
|||
state = read_state; |
|||
} |
|||
|
|||
/// Test whether vectored I/O reads are known to be optimized in the
|
|||
/// underlying implementation.
|
|||
fn is_read_vectored(&self) -> bool { |
|||
false |
|||
Ok((nread, state)) |
|||
} |
|||
|
|||
/// Read bytes from a stream and discard them.
|
|||
async fn skip(&mut self, nelem: u64) -> Result<(u64, bool), Error> { |
|||
let mut nread = 0; |
|||
let mut saw_end = false; |
|||
|
|||
// TODO: Optimize by reading more than one byte at a time.
|
|||
for _ in 0..nelem { |
|||
let (num, end) = self.read(&mut [0]).await?; |
|||
nread += num; |
|||
if end { |
|||
saw_end = true; |
|||
break; |
|||
} |
|||
/// Check for read readiness: this method blocks until the stream is ready
|
|||
/// for reading.
|
|||
async fn ready(&mut self) -> Result<(), Error>; |
|||
} |
|||
|
|||
/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
|
|||
/// A bytestream which can be written to.
|
|||
#[async_trait::async_trait] |
|||
pub trait HostOutputStream: Send + Sync { |
|||
/// Write bytes. On success, returns the number of bytes written.
|
|||
/// Important: this write must be non-blocking!
|
|||
fn write(&mut self, bytes: Bytes) -> Result<(usize, StreamState), Error>; |
|||
|
|||
/// Transfer bytes directly from an input stream to an output stream.
|
|||
/// Important: this splice must be non-blocking!
|
|||
fn splice( |
|||
&mut self, |
|||
src: &mut dyn HostInputStream, |
|||
nelem: usize, |
|||
) -> Result<(usize, StreamState), Error> { |
|||
let mut nspliced = 0; |
|||
let mut state = StreamState::Open; |
|||
|
|||
// TODO: handle the case where `bs.len()` is less than `nelem`
|
|||
let (bs, read_state) = src.read(nelem)?; |
|||
// TODO: handle the case where write returns less than `bs.len()`
|
|||
let (nwritten, _write_state) = self.write(bs)?; |
|||
nspliced += nwritten; |
|||
if read_state.is_closed() { |
|||
state = read_state; |
|||
} |
|||
|
|||
Ok((nread, saw_end)) |
|||
Ok((nspliced, state)) |
|||
} |
|||
|
|||
/// Return the number of bytes that may be read without blocking.
|
|||
async fn num_ready_bytes(&self) -> Result<u64, Error> { |
|||
Ok(0) |
|||
/// Repeatedly write a byte to a stream. Important: this write must be
|
|||
/// non-blocking!
|
|||
fn write_zeroes(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> { |
|||
// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
|
|||
// repeatedly from a 'static buffer of zeros.
|
|||
let bs = Bytes::from_iter(core::iter::repeat(0 as u8).take(nelem)); |
|||
let r = self.write(bs)?; |
|||
Ok(r) |
|||
} |
|||
|
|||
/// Test whether this stream is readable.
|
|||
async fn readable(&self) -> Result<(), Error>; |
|||
/// Check for write readiness: this method blocks until the stream is
|
|||
/// ready for writing.
|
|||
async fn ready(&mut self) -> Result<(), Error>; |
|||
} |
|||
|
|||
/// An output bytestream.
|
|||
///
|
|||
/// This is "pseudo" because the real streams will be a type in wit, and
|
|||
/// built into the wit bindings, and will support async and type parameters.
|
|||
/// This pseudo-stream abstraction is synchronous and only supports bytes.
|
|||
#[async_trait::async_trait] |
|||
pub trait OutputStream: Send + Sync { |
|||
fn as_any(&self) -> &dyn Any; |
|||
pub(crate) enum InternalInputStream { |
|||
Host(Box<dyn HostInputStream>), |
|||
File(FileInputStream), |
|||
} |
|||
|
|||
/// If this stream is writing from a host file descriptor, return it so
|
|||
/// that it can be polled with a host poll.
|
|||
#[cfg(unix)] |
|||
fn pollable_write(&self) -> Option<rustix::fd::BorrowedFd> { |
|||
None |
|||
} |
|||
pub(crate) enum InternalOutputStream { |
|||
Host(Box<dyn HostOutputStream>), |
|||
File(FileOutputStream), |
|||
} |
|||
|
|||
/// If this stream is writing from a host file descriptor, return it so
|
|||
/// that it can be polled with a host poll.
|
|||
#[cfg(windows)] |
|||
fn pollable_write(&self) -> Option<io_extras::os::windows::BorrowedHandleOrSocket> { |
|||
None |
|||
} |
|||
pub(crate) trait InternalTableStreamExt { |
|||
fn push_internal_input_stream( |
|||
&mut self, |
|||
istream: InternalInputStream, |
|||
) -> Result<u32, TableError>; |
|||
fn get_internal_input_stream_mut( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<&mut InternalInputStream, TableError>; |
|||
fn delete_internal_input_stream(&mut self, fd: u32) -> Result<InternalInputStream, TableError>; |
|||
|
|||
/// Write bytes. On success, returns the number of bytes written.
|
|||
async fn write(&mut self, _buf: &[u8]) -> Result<u64, Error> { |
|||
Err(anyhow::anyhow!("badf")) |
|||
fn push_internal_output_stream( |
|||
&mut self, |
|||
ostream: InternalOutputStream, |
|||
) -> Result<u32, TableError>; |
|||
fn get_internal_output_stream_mut( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<&mut InternalOutputStream, TableError>; |
|||
fn delete_internal_output_stream( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<InternalOutputStream, TableError>; |
|||
} |
|||
impl InternalTableStreamExt for Table { |
|||
fn push_internal_input_stream( |
|||
&mut self, |
|||
istream: InternalInputStream, |
|||
) -> Result<u32, TableError> { |
|||
self.push(Box::new(istream)) |
|||
} |
|||
|
|||
/// Vectored-I/O form of `write`.
|
|||
async fn write_vectored<'a>(&mut self, _bufs: &[std::io::IoSlice<'a>]) -> Result<u64, Error> { |
|||
Err(anyhow::anyhow!("badf")) |
|||
fn get_internal_input_stream_mut( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<&mut InternalInputStream, TableError> { |
|||
self.get_mut(fd) |
|||
} |
|||
|
|||
/// Test whether vectored I/O writes are known to be optimized in the
|
|||
/// underlying implementation.
|
|||
fn is_write_vectored(&self) -> bool { |
|||
false |
|||
fn delete_internal_input_stream(&mut self, fd: u32) -> Result<InternalInputStream, TableError> { |
|||
self.delete(fd) |
|||
} |
|||
|
|||
/// Transfer bytes directly from an input stream to an output stream.
|
|||
async fn splice( |
|||
fn push_internal_output_stream( |
|||
&mut self, |
|||
src: &mut dyn InputStream, |
|||
nelem: u64, |
|||
) -> Result<(u64, bool), Error> { |
|||
let mut nspliced = 0; |
|||
let mut saw_end = false; |
|||
|
|||
// TODO: Optimize by splicing more than one byte at a time.
|
|||
for _ in 0..nelem { |
|||
let mut buf = [0u8]; |
|||
let (num, end) = src.read(&mut buf).await?; |
|||
self.write(&buf).await?; |
|||
nspliced += num; |
|||
if end { |
|||
saw_end = true; |
|||
break; |
|||
} |
|||
} |
|||
|
|||
Ok((nspliced, saw_end)) |
|||
ostream: InternalOutputStream, |
|||
) -> Result<u32, TableError> { |
|||
self.push(Box::new(ostream)) |
|||
} |
|||
|
|||
/// Repeatedly write a byte to a stream.
|
|||
async fn write_zeroes(&mut self, nelem: u64) -> Result<u64, Error> { |
|||
let mut nwritten = 0; |
|||
|
|||
// TODO: Optimize by writing more than one byte at a time.
|
|||
for _ in 0..nelem { |
|||
let num = self.write(&[0]).await?; |
|||
if num == 0 { |
|||
break; |
|||
} |
|||
nwritten += num; |
|||
} |
|||
|
|||
Ok(nwritten) |
|||
fn get_internal_output_stream_mut( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<&mut InternalOutputStream, TableError> { |
|||
self.get_mut(fd) |
|||
} |
|||
fn delete_internal_output_stream( |
|||
&mut self, |
|||
fd: u32, |
|||
) -> Result<InternalOutputStream, TableError> { |
|||
self.delete(fd) |
|||
} |
|||
|
|||
/// Test whether this stream is writable.
|
|||
async fn writable(&self) -> Result<(), Error>; |
|||
} |
|||
|
|||
/// Extension trait for managing [`HostInputStream`]s and [`HostOutputStream`]s in the [`Table`].
|
|||
pub trait TableStreamExt { |
|||
fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError>; |
|||
fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError>; |
|||
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError>; |
|||
/// Push a [`HostInputStream`] into a [`Table`], returning the table index.
|
|||
fn push_input_stream(&mut self, istream: Box<dyn HostInputStream>) -> Result<u32, TableError>; |
|||
/// Get a mutable reference to a [`HostInputStream`] in a [`Table`].
|
|||
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut dyn HostInputStream, TableError>; |
|||
/// Remove [`HostInputStream`] from table:
|
|||
fn delete_input_stream(&mut self, fd: u32) -> Result<Box<dyn HostInputStream>, TableError>; |
|||
|
|||
/// Push a [`HostOutputStream`] into a [`Table`], returning the table index.
|
|||
fn push_output_stream(&mut self, ostream: Box<dyn HostOutputStream>) |
|||
-> Result<u32, TableError>; |
|||
/// Get a mutable reference to a [`HostOutputStream`] in a [`Table`].
|
|||
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut dyn HostOutputStream, TableError>; |
|||
|
|||
fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError>; |
|||
fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError>; |
|||
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError>; |
|||
/// Remove [`HostOutputStream`] from table:
|
|||
fn delete_output_stream(&mut self, fd: u32) -> Result<Box<dyn HostOutputStream>, TableError>; |
|||
} |
|||
impl TableStreamExt for Table { |
|||
fn push_input_stream(&mut self, istream: Box<dyn InputStream>) -> Result<u32, TableError> { |
|||
self.push(Box::new(istream)) |
|||
fn push_input_stream(&mut self, istream: Box<dyn HostInputStream>) -> Result<u32, TableError> { |
|||
self.push_internal_input_stream(InternalInputStream::Host(istream)) |
|||
} |
|||
fn get_input_stream(&self, fd: u32) -> Result<&dyn InputStream, TableError> { |
|||
self.get::<Box<dyn InputStream>>(fd).map(|f| f.as_ref()) |
|||
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut dyn HostInputStream, TableError> { |
|||
match self.get_internal_input_stream_mut(fd)? { |
|||
InternalInputStream::Host(ref mut h) => Ok(h.as_mut()), |
|||
_ => Err(TableError::WrongType), |
|||
} |
|||
} |
|||
fn get_input_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn InputStream>, TableError> { |
|||
self.get_mut::<Box<dyn InputStream>>(fd) |
|||
fn delete_input_stream(&mut self, fd: u32) -> Result<Box<dyn HostInputStream>, TableError> { |
|||
let occ = self.entry(fd)?; |
|||
match occ.get().downcast_ref::<InternalInputStream>() { |
|||
Some(InternalInputStream::Host(_)) => { |
|||
let (_, any) = occ.remove_entry(); |
|||
match *any.downcast().expect("downcast checked above") { |
|||
InternalInputStream::Host(h) => Ok(h), |
|||
_ => unreachable!("variant checked above"), |
|||
} |
|||
} |
|||
_ => Err(TableError::WrongType), |
|||
} |
|||
} |
|||
|
|||
fn push_output_stream(&mut self, ostream: Box<dyn OutputStream>) -> Result<u32, TableError> { |
|||
self.push(Box::new(ostream)) |
|||
fn push_output_stream( |
|||
&mut self, |
|||
ostream: Box<dyn HostOutputStream>, |
|||
) -> Result<u32, TableError> { |
|||
self.push_internal_output_stream(InternalOutputStream::Host(ostream)) |
|||
} |
|||
fn get_output_stream(&self, fd: u32) -> Result<&dyn OutputStream, TableError> { |
|||
self.get::<Box<dyn OutputStream>>(fd).map(|f| f.as_ref()) |
|||
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut dyn HostOutputStream, TableError> { |
|||
match self.get_internal_output_stream_mut(fd)? { |
|||
InternalOutputStream::Host(ref mut h) => Ok(h.as_mut()), |
|||
_ => Err(TableError::WrongType), |
|||
} |
|||
} |
|||
fn get_output_stream_mut(&mut self, fd: u32) -> Result<&mut Box<dyn OutputStream>, TableError> { |
|||
self.get_mut::<Box<dyn OutputStream>>(fd) |
|||
fn delete_output_stream(&mut self, fd: u32) -> Result<Box<dyn HostOutputStream>, TableError> { |
|||
let occ = self.entry(fd)?; |
|||
match occ.get().downcast_ref::<InternalOutputStream>() { |
|||
Some(InternalOutputStream::Host(_)) => { |
|||
let (_, any) = occ.remove_entry(); |
|||
match *any.downcast().expect("downcast checked above") { |
|||
InternalOutputStream::Host(h) => Ok(h), |
|||
_ => unreachable!("variant checked above"), |
|||
} |
|||
} |
|||
_ => Err(TableError::WrongType), |
|||
} |
|||
} |
|||
} |
|||
|
|||
#[cfg(test)] |
|||
mod test { |
|||
use super::*; |
|||
use crate::preview2::pipe::{ReadPipe, WritePipe}; |
|||
|
|||
#[test] |
|||
fn input_stream_in_table() { |
|||
let empty_pipe = ReadPipe::new(std::io::empty()); |
|||
struct DummyInputStream; |
|||
#[async_trait::async_trait] |
|||
impl HostInputStream for DummyInputStream { |
|||
fn read(&mut self, _size: usize) -> Result<(Bytes, StreamState), Error> { |
|||
unimplemented!(); |
|||
} |
|||
async fn ready(&mut self) -> Result<(), Error> { |
|||
unimplemented!(); |
|||
} |
|||
} |
|||
|
|||
let dummy = DummyInputStream; |
|||
let mut table = Table::new(); |
|||
let ix = table.push_input_stream(Box::new(empty_pipe)).unwrap(); |
|||
let _ = table.get_input_stream(ix).unwrap(); |
|||
// Put it into the table:
|
|||
let ix = table.push_input_stream(Box::new(dummy)).unwrap(); |
|||
// Get a mut ref to it:
|
|||
let _ = table.get_input_stream_mut(ix).unwrap(); |
|||
// Fails at wrong type:
|
|||
assert!(matches!( |
|||
table.get_output_stream_mut(ix), |
|||
Err(TableError::WrongType) |
|||
)); |
|||
// Delete it:
|
|||
let _ = table.delete_input_stream(ix).unwrap(); |
|||
// Now absent from table:
|
|||
assert!(matches!( |
|||
table.get_input_stream_mut(ix), |
|||
Err(TableError::NotPresent) |
|||
)); |
|||
} |
|||
|
|||
#[test] |
|||
fn output_stream_in_table() { |
|||
let dev_null = WritePipe::new(std::io::sink()); |
|||
struct DummyOutputStream; |
|||
#[async_trait::async_trait] |
|||
impl HostOutputStream for DummyOutputStream { |
|||
fn write(&mut self, _: Bytes) -> Result<(usize, StreamState), Error> { |
|||
unimplemented!(); |
|||
} |
|||
async fn ready(&mut self) -> Result<(), Error> { |
|||
unimplemented!(); |
|||
} |
|||
} |
|||
|
|||
let dummy = DummyOutputStream; |
|||
let mut table = Table::new(); |
|||
let ix = table.push_output_stream(Box::new(dev_null)).unwrap(); |
|||
let _ = table.get_output_stream(ix).unwrap(); |
|||
// Put it in the table:
|
|||
let ix = table.push_output_stream(Box::new(dummy)).unwrap(); |
|||
// Get a mut ref to it:
|
|||
let _ = table.get_output_stream_mut(ix).unwrap(); |
|||
// Fails at wrong type:
|
|||
assert!(matches!( |
|||
table.get_input_stream_mut(ix), |
|||
Err(TableError::WrongType) |
|||
)); |
|||
// Delete it:
|
|||
let _ = table.delete_output_stream(ix).unwrap(); |
|||
// Now absent:
|
|||
assert!(matches!( |
|||
table.get_output_stream_mut(ix), |
|||
Err(TableError::NotPresent) |
|||
)); |
|||
} |
|||
} |
|||
|
@ -1,43 +0,0 @@ |
|||
use crate::preview2::WasiView; |
|||
|
|||
wasmtime::component::bindgen!({ |
|||
world: "wasi:preview/command", |
|||
tracing: true, |
|||
async: true, |
|||
trappable_error_type: { |
|||
"filesystem"::"error-code": Error, |
|||
"streams"::"stream-error": Error, |
|||
}, |
|||
with: { |
|||
"wasi:filesystem/filesystem": crate::preview2::wasi::filesystem::filesystem, |
|||
"wasi:clocks/monotonic_clock": crate::preview2::wasi::clocks::monotonic_clock, |
|||
"wasi:poll/poll": crate::preview2::wasi::poll::poll, |
|||
"wasi:io/streams": crate::preview2::wasi::io::streams, |
|||
"wasi:clocks/timezone": crate::preview2::wasi::clocks::timezone, |
|||
"wasi:clocks/wall_clock": crate::preview2::wasi::clocks::wall_clock, |
|||
"wasi:random/random": crate::preview2::wasi::random::random, |
|||
"wasi:cli_base/environment": crate::preview2::wasi::cli_base::environment, |
|||
"wasi:cli_base/exit": crate::preview2::wasi::cli_base::exit, |
|||
"wasi:cli_base/preopens": crate::preview2::wasi::cli_base::preopens, |
|||
"wasi:cli_base/stdin": crate::preview2::wasi::cli_base::stdin, |
|||
"wasi:cli_base/stdout": crate::preview2::wasi::cli_base::stdout, |
|||
"wasi:cli_base/stderr": crate::preview2::wasi::cli_base::stderr, |
|||
}, |
|||
}); |
|||
|
|||
pub fn add_to_linker<T: WasiView>(l: &mut wasmtime::component::Linker<T>) -> anyhow::Result<()> { |
|||
crate::preview2::wasi::clocks::wall_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::clocks::monotonic_clock::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::clocks::timezone::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::filesystem::filesystem::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::poll::poll::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::io::streams::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::random::random::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::exit::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::environment::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::preopens::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::stdin::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::stdout::add_to_linker(l, |t| t)?; |
|||
crate::preview2::wasi::cli_base::stderr::add_to_linker(l, |t| t)?; |
|||
Ok(()) |
|||
} |
@ -1,30 +0,0 @@ |
|||
pub mod command; |
|||
|
|||
wasmtime::component::bindgen!({ |
|||
path: "wit", |
|||
interfaces: " |
|||
import wasi:clocks/wall-clock |
|||
import wasi:clocks/monotonic-clock |
|||
import wasi:clocks/timezone |
|||
import wasi:filesystem/filesystem |
|||
import wasi:random/random |
|||
import wasi:random/insecure |
|||
import wasi:random/insecure-seed |
|||
import wasi:poll/poll |
|||
import wasi:io/streams |
|||
import wasi:cli-base/environment |
|||
import wasi:cli-base/preopens |
|||
import wasi:cli-base/exit |
|||
import wasi:cli-base/stdin |
|||
import wasi:cli-base/stdout |
|||
import wasi:cli-base/stderr |
|||
", |
|||
tracing: true, |
|||
async: true, |
|||
trappable_error_type: { |
|||
"filesystem"::"error-code": Error, |
|||
"streams"::"stream-error": Error, |
|||
} |
|||
}); |
|||
|
|||
pub use wasi::*; |
Loading…
Reference in new issue