Browse Source

Remove `Table*Ext` traits for preview2 (#7118)

* wasi: Add typed helpers to `Table`

This will help cut down on the `Table*Ext` traits while retaining type
safety.

* Remove `TableNetworkExt` trait

* Remove `TableTcpSocketExt` trait

* Remove the `TableReaddirExt` trait

* Remove `TableFsExt` trait

This involed a fair bit of refactoring within the preview2-to-preview1
adapter to handle the new ownership of resources, but nothing major.

* Remove `TableStreamExt` trait

Additionally simplify some stream methods while I'm here.

* Remove `TablePollableExt` trait

* Fix tests

* Fix some more tests

* Use typed accessors for terminal-{input,output}

* Remove dead code in `Table`
pull/7126/head
Alex Crichton 1 year ago
committed by GitHub
parent
commit
270e92225d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      crates/test-programs/tests/reactor.rs
  2. 14
      crates/wasi-http/src/types_impl.rs
  3. 110
      crates/wasi/src/preview2/filesystem.rs
  4. 9
      crates/wasi/src/preview2/host/clocks.rs
  5. 385
      crates/wasi/src/preview2/host/filesystem.rs
  6. 8
      crates/wasi/src/preview2/host/instance_network.rs
  7. 177
      crates/wasi/src/preview2/host/io.rs
  8. 3
      crates/wasi/src/preview2/host/network.rs
  9. 159
      crates/wasi/src/preview2/host/tcp.rs
  10. 7
      crates/wasi/src/preview2/host/tcp_create_socket.rs
  11. 19
      crates/wasi/src/preview2/mod.rs
  12. 29
      crates/wasi/src/preview2/network.rs
  13. 74
      crates/wasi/src/preview2/poll.rs
  14. 280
      crates/wasi/src/preview2/preview1.rs
  15. 49
      crates/wasi/src/preview2/stdio.rs
  16. 202
      crates/wasi/src/preview2/stream.rs
  17. 83
      crates/wasi/src/preview2/table.rs
  18. 69
      crates/wasi/src/preview2/tcp.rs

6
crates/test-programs/tests/reactor.rs

@ -102,10 +102,8 @@ async fn reactor_tests() -> Result<()> {
// Note, this works because of the add_to_linker invocations using the
// `host` crate for `streams`, not because of `with` in the bindgen macro.
let writepipe = preview2::pipe::MemoryOutputPipe::new(4096);
let table_ix = preview2::TableStreamExt::push_output_stream(
store.data_mut().table_mut(),
Box::new(writepipe.clone()),
)?;
let stream: preview2::OutputStream = Box::new(writepipe.clone());
let table_ix = store.data_mut().table_mut().push_resource(stream)?;
let r = reactor.call_write_strings_to(&mut store, table_ix).await?;
assert_eq!(r, Ok(()));

14
crates/wasi-http/src/types_impl.rs

@ -17,9 +17,8 @@ use anyhow::Context;
use std::any::Any;
use wasmtime::component::Resource;
use wasmtime_wasi::preview2::{
bindings::io::poll::Pollable,
bindings::io::streams::{InputStream, OutputStream},
HostPollable, PollableFuture, TablePollableExt, TableStreamExt,
Pollable, PollableFuture,
};
impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
@ -360,9 +359,10 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
Box::pin(elem.downcast_mut::<HostFutureTrailers>().unwrap().ready())
}
// FIXME: this should use `push_child_resource`
let id = self
.table()
.push_host_pollable(HostPollable::TableEntry { index, make_future })?;
.push_resource(Pollable::TableEntry { index, make_future })?;
Ok(id)
}
@ -489,7 +489,8 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
)
}
let pollable = self.table().push_host_pollable(HostPollable::TableEntry {
// FIXME: this should use `push_child_resource`
let pollable = self.table().push_resource(Pollable::TableEntry {
index: id,
make_future,
})?;
@ -504,7 +505,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
let body = self.table().get_outgoing_body(id)?;
if let Some(stream) = body.body_output_stream.take() {
let dummy = Resource::<u32>::new_own(id);
let id = self.table().push_output_stream_child(stream, dummy)?;
let id = self.table().push_child_resource(stream, &dummy)?;
Ok(Ok(id))
} else {
Ok(Err(()))
@ -558,7 +559,8 @@ impl<T: WasiHttpView> crate::bindings::http::types::HostIncomingBody for T {
let body = self.table().get_incoming_body(&id)?;
if let Some(stream) = body.stream.take() {
let stream = self.table().push_input_stream_child(Box::new(stream), id)?;
let stream = InputStream::Host(Box::new(stream));
let stream = self.table().push_child_resource(stream, &id)?;
return Ok(Ok(stream));
}

110
crates/wasi/src/preview2/filesystem.rs

@ -1,13 +1,46 @@
use crate::preview2::bindings::filesystem::types::Descriptor;
use crate::preview2::bindings::filesystem::types;
use crate::preview2::{
AbortOnDropJoinHandle, HostOutputStream, OutputStreamError, StreamRuntimeError, StreamState,
Table, TableError,
};
use anyhow::anyhow;
use bytes::{Bytes, BytesMut};
use futures::future::{maybe_done, MaybeDone};
use std::sync::Arc;
use wasmtime::component::Resource;
pub enum Descriptor {
File(File),
Dir(Dir),
}
impl Descriptor {
pub fn file(&self) -> Result<&File, types::ErrorCode> {
match self {
Descriptor::File(f) => Ok(f),
Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
}
}
pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
match self {
Descriptor::Dir(d) => Ok(d),
Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
}
}
pub fn is_file(&self) -> bool {
match self {
Descriptor::File(_) => true,
Descriptor::Dir(_) => false,
}
}
pub fn is_dir(&self) -> bool {
match self {
Descriptor::File(_) => false,
Descriptor::Dir(_) => true,
}
}
}
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -17,7 +50,7 @@ bitflags::bitflags! {
}
}
pub(crate) struct File {
pub struct File {
/// Wrapped in an Arc because the same underlying file is used for
/// implementing the stream types. Also needed for [`spawn_blocking`].
///
@ -45,45 +78,6 @@ impl File {
tokio::task::spawn_blocking(move || body(&f)).await.unwrap()
}
}
pub(crate) trait TableFsExt {
fn push_file(&mut self, file: File) -> Result<Resource<Descriptor>, TableError>;
fn delete_file(&mut self, fd: Resource<Descriptor>) -> Result<File, TableError>;
fn is_file(&self, fd: &Resource<Descriptor>) -> bool;
fn get_file(&self, fd: &Resource<Descriptor>) -> Result<&File, TableError>;
fn push_dir(&mut self, dir: Dir) -> Result<Resource<Descriptor>, TableError>;
fn delete_dir(&mut self, fd: Resource<Descriptor>) -> Result<Dir, TableError>;
fn is_dir(&self, fd: &Resource<Descriptor>) -> bool;
fn get_dir(&self, fd: &Resource<Descriptor>) -> Result<&Dir, TableError>;
}
impl TableFsExt for Table {
fn push_file(&mut self, file: File) -> Result<Resource<Descriptor>, TableError> {
Ok(Resource::new_own(self.push(Box::new(file))?))
}
fn delete_file(&mut self, fd: Resource<Descriptor>) -> Result<File, TableError> {
self.delete(fd.rep())
}
fn is_file(&self, fd: &Resource<Descriptor>) -> bool {
self.is::<File>(fd.rep())
}
fn get_file(&self, fd: &Resource<Descriptor>) -> Result<&File, TableError> {
self.get(fd.rep())
}
fn push_dir(&mut self, dir: Dir) -> Result<Resource<Descriptor>, TableError> {
Ok(Resource::new_own(self.push(Box::new(dir))?))
}
fn delete_dir(&mut self, fd: Resource<Descriptor>) -> Result<Dir, TableError> {
self.delete(fd.rep())
}
fn is_dir(&self, fd: &Resource<Descriptor>) -> bool {
self.is::<Dir>(fd.rep())
}
fn get_dir(&self, fd: &Resource<Descriptor>) -> Result<&Dir, TableError> {
self.get(fd.rep())
}
}
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -94,7 +88,7 @@ bitflags::bitflags! {
}
#[derive(Clone)]
pub(crate) struct Dir {
pub struct Dir {
pub dir: Arc<cap_std::fs::Dir>,
pub perms: DirPerms,
pub file_perms: FilePerms,
@ -121,7 +115,7 @@ impl Dir {
}
}
pub(crate) struct FileInputStream {
pub struct FileInputStream {
file: Arc<cap_std::fs::File>,
position: u64,
}
@ -278,3 +272,29 @@ impl HostOutputStream for FileOutputStream {
}
}
}
pub struct ReaddirIterator(
std::sync::Mutex<
Box<dyn Iterator<Item = Result<types::DirectoryEntry, types::Error>> + Send + 'static>,
>,
);
impl ReaddirIterator {
pub(crate) fn new(
i: impl Iterator<Item = Result<types::DirectoryEntry, types::Error>> + Send + 'static,
) -> Self {
ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
}
pub(crate) fn next(&self) -> Result<Option<types::DirectoryEntry>, types::Error> {
self.0.lock().unwrap().next().transpose()
}
}
impl IntoIterator for ReaddirIterator {
type Item = Result<types::DirectoryEntry, types::Error>;
type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_inner().unwrap()
}
}

9
crates/wasi/src/preview2/host/clocks.rs

@ -4,9 +4,8 @@ use crate::preview2::bindings::{
clocks::monotonic_clock::{self, Instant},
clocks::timezone::{self, TimezoneDisplay},
clocks::wall_clock::{self, Datetime},
io::poll::Pollable,
};
use crate::preview2::{HostPollable, TablePollableExt, WasiView};
use crate::preview2::{Pollable, WasiView};
use cap_std::time::SystemTime;
use wasmtime::component::Resource;
@ -60,9 +59,7 @@ impl<T: WasiView> monotonic_clock::Host for T {
// Deadline is in the past, so pollable is always ready:
Ok(self
.table_mut()
.push_host_pollable(HostPollable::Closure(Box::new(|| {
Box::pin(async { Ok(()) })
})))?)
.push_resource(Pollable::Closure(Box::new(|| Box::pin(async { Ok(()) }))))?)
} else {
let duration = if absolute {
Duration::from_nanos(when - clock_now)
@ -79,7 +76,7 @@ impl<T: WasiView> monotonic_clock::Host for T {
);
Ok(self
.table_mut()
.push_host_pollable(HostPollable::Closure(Box::new(move || {
.push_resource(Pollable::Closure(Box::new(move || {
Box::pin(async move {
tracing::trace!(
"mkf: deadline = {:?}, now = {:?}",

385
crates/wasi/src/preview2/host/filesystem.rs

@ -1,10 +1,9 @@
use crate::preview2::bindings::clocks::wall_clock;
use crate::preview2::bindings::filesystem::types::{
DirectoryEntryStream, HostDescriptor, HostDirectoryEntryStream,
};
use crate::preview2::bindings::filesystem::types::{HostDescriptor, HostDirectoryEntryStream};
use crate::preview2::bindings::filesystem::{preopens, types};
use crate::preview2::bindings::io::streams;
use crate::preview2::filesystem::{Dir, File, TableFsExt};
use crate::preview2::bindings::io::streams::{InputStream, OutputStream};
use crate::preview2::filesystem::{Descriptor, Dir, File, ReaddirIterator};
use crate::preview2::filesystem::{FileInputStream, FileOutputStream};
use crate::preview2::{DirPerms, FilePerms, Table, TableError, WasiView};
use anyhow::Context;
use wasmtime::component::Resource;
@ -27,7 +26,7 @@ impl<T: WasiView> preopens::Host for T {
for (dir, name) in self.ctx().preopens.clone() {
let fd = self
.table_mut()
.push_dir(dir)
.push_resource(Descriptor::Dir(dir))
.with_context(|| format!("failed to push preopen {name}"))?;
results.push((fd, name));
}
@ -59,7 +58,7 @@ impl<T: WasiView> HostDescriptor for T {
Advice::NoReuse => A::NoReuse,
};
let f = self.table().get_file(&fd)?;
let f = self.table().get_resource(&fd)?.file()?;
f.spawn_blocking(move |f| f.advise(offset, len, advice))
.await?;
Ok(())
@ -67,28 +66,28 @@ impl<T: WasiView> HostDescriptor for T {
async fn sync_data(&mut self, fd: Resource<types::Descriptor>) -> Result<(), types::Error> {
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
match f.spawn_blocking(|f| f.sync_data()).await {
Ok(()) => Ok(()),
// On windows, `sync_data` uses `FileFlushBuffers` which fails with
// `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore
// this error, for POSIX compatibility.
#[cfg(windows)]
Err(e)
if e.raw_os_error()
== Some(windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED as _) =>
{
Ok(())
match table.get_resource(&fd)? {
Descriptor::File(f) => {
match f.spawn_blocking(|f| f.sync_data()).await {
Ok(()) => Ok(()),
// On windows, `sync_data` uses `FileFlushBuffers` which fails with
// `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore
// this error, for POSIX compatibility.
#[cfg(windows)]
Err(e)
if e.raw_os_error()
== Some(windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED as _) =>
{
Ok(())
}
Err(e) => Err(e.into()),
}
Err(e) => Err(e.into()),
}
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
d.spawn_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_data()?))
.await
} else {
Err(ErrorCode::BadDescriptor.into())
Descriptor::Dir(d) => {
d.spawn_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_data()?))
.await
}
}
}
@ -114,30 +113,29 @@ impl<T: WasiView> HostDescriptor for T {
}
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
let flags = f.spawn_blocking(|f| f.get_fd_flags()).await?;
let mut flags = get_from_fdflags(flags);
if f.perms.contains(FilePerms::READ) {
flags |= DescriptorFlags::READ;
}
if f.perms.contains(FilePerms::WRITE) {
flags |= DescriptorFlags::WRITE;
}
Ok(flags)
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
let flags = d.spawn_blocking(|d| d.get_fd_flags()).await?;
let mut flags = get_from_fdflags(flags);
if d.perms.contains(DirPerms::READ) {
flags |= DescriptorFlags::READ;
match table.get_resource(&fd)? {
Descriptor::File(f) => {
let flags = f.spawn_blocking(|f| f.get_fd_flags()).await?;
let mut flags = get_from_fdflags(flags);
if f.perms.contains(FilePerms::READ) {
flags |= DescriptorFlags::READ;
}
if f.perms.contains(FilePerms::WRITE) {
flags |= DescriptorFlags::WRITE;
}
Ok(flags)
}
if d.perms.contains(DirPerms::MUTATE) {
flags |= DescriptorFlags::MUTATE_DIRECTORY;
Descriptor::Dir(d) => {
let flags = d.spawn_blocking(|d| d.get_fd_flags()).await?;
let mut flags = get_from_fdflags(flags);
if d.perms.contains(DirPerms::READ) {
flags |= DescriptorFlags::READ;
}
if d.perms.contains(DirPerms::MUTATE) {
flags |= DescriptorFlags::MUTATE_DIRECTORY;
}
Ok(flags)
}
Ok(flags)
} else {
Err(ErrorCode::BadDescriptor.into())
}
}
@ -147,14 +145,12 @@ impl<T: WasiView> HostDescriptor for T {
) -> Result<types::DescriptorType, types::Error> {
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
let meta = f.spawn_blocking(|f| f.metadata()).await?;
Ok(descriptortype_from(meta.file_type()))
} else if table.is_dir(&fd) {
Ok(types::DescriptorType::Directory)
} else {
Err(ErrorCode::BadDescriptor.into())
match table.get_resource(&fd)? {
Descriptor::File(f) => {
let meta = f.spawn_blocking(|f| f.metadata()).await?;
Ok(descriptortype_from(meta.file_type()))
}
Descriptor::Dir(_) => Ok(types::DescriptorType::Directory),
}
}
@ -163,7 +159,7 @@ impl<T: WasiView> HostDescriptor for T {
fd: Resource<types::Descriptor>,
size: types::Filesize,
) -> Result<(), types::Error> {
let f = self.table().get_file(&fd)?;
let f = self.table().get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::WRITE) {
Err(ErrorCode::NotPermitted)?;
}
@ -180,26 +176,25 @@ impl<T: WasiView> HostDescriptor for T {
use fs_set_times::SetTimes;
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
if !f.perms.contains(FilePerms::WRITE) {
return Err(ErrorCode::NotPermitted.into());
match table.get_resource(&fd)? {
Descriptor::File(f) => {
if !f.perms.contains(FilePerms::WRITE) {
return Err(ErrorCode::NotPermitted.into());
}
let atim = systemtimespec_from(atim)?;
let mtim = systemtimespec_from(mtim)?;
f.spawn_blocking(|f| f.set_times(atim, mtim)).await?;
Ok(())
}
let atim = systemtimespec_from(atim)?;
let mtim = systemtimespec_from(mtim)?;
f.spawn_blocking(|f| f.set_times(atim, mtim)).await?;
Ok(())
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
Descriptor::Dir(d) => {
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
let atim = systemtimespec_from(atim)?;
let mtim = systemtimespec_from(mtim)?;
d.spawn_blocking(|d| d.set_times(atim, mtim)).await?;
Ok(())
}
let atim = systemtimespec_from(atim)?;
let mtim = systemtimespec_from(mtim)?;
d.spawn_blocking(|d| d.set_times(atim, mtim)).await?;
Ok(())
} else {
Err(ErrorCode::BadDescriptor.into())
}
}
@ -214,7 +209,7 @@ impl<T: WasiView> HostDescriptor for T {
let table = self.table();
let f = table.get_file(&fd)?;
let f = table.get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::READ) {
return Err(ErrorCode::NotPermitted.into());
}
@ -251,7 +246,7 @@ impl<T: WasiView> HostDescriptor for T {
use system_interface::fs::FileIoExt;
let table = self.table();
let f = table.get_file(&fd)?;
let f = table.get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::WRITE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -268,7 +263,7 @@ impl<T: WasiView> HostDescriptor for T {
fd: Resource<types::Descriptor>,
) -> Result<Resource<types::DirectoryEntryStream>, types::Error> {
let table = self.table_mut();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
return Err(ErrorCode::NotPermitted.into());
}
@ -325,33 +320,33 @@ impl<T: WasiView> HostDescriptor for T {
Err(ReaddirError::Io(e)) => Err(types::Error::from(e)),
Err(ReaddirError::IllegalSequence) => Err(ErrorCode::IllegalByteSequence.into()),
});
Ok(table.push_readdir(ReaddirIterator::new(entries))?)
Ok(table.push_resource(ReaddirIterator::new(entries))?)
}
async fn sync(&mut self, fd: Resource<types::Descriptor>) -> Result<(), types::Error> {
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
match f.spawn_blocking(|f| f.sync_all()).await {
Ok(()) => Ok(()),
// On windows, `sync_data` uses `FileFlushBuffers` which fails with
// `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore
// this error, for POSIX compatibility.
#[cfg(windows)]
Err(e)
if e.raw_os_error()
== Some(windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED as _) =>
{
Ok(())
match table.get_resource(&fd)? {
Descriptor::File(f) => {
match f.spawn_blocking(|f| f.sync_all()).await {
Ok(()) => Ok(()),
// On windows, `sync_data` uses `FileFlushBuffers` which fails with
// `ERROR_ACCESS_DENIED` if the file is not upen for writing. Ignore
// this error, for POSIX compatibility.
#[cfg(windows)]
Err(e)
if e.raw_os_error()
== Some(windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED as _) =>
{
Ok(())
}
Err(e) => Err(e.into()),
}
Err(e) => Err(e.into()),
}
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
d.spawn_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_all()?))
.await
} else {
Err(ErrorCode::BadDescriptor.into())
Descriptor::Dir(d) => {
d.spawn_blocking(|d| Ok(d.open(std::path::Component::CurDir)?.sync_all()?))
.await
}
}
}
@ -361,7 +356,7 @@ impl<T: WasiView> HostDescriptor for T {
path: String,
) -> Result<(), types::Error> {
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -374,18 +369,17 @@ impl<T: WasiView> HostDescriptor for T {
fd: Resource<types::Descriptor>,
) -> Result<types::DescriptorStat, types::Error> {
let table = self.table();
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
// No permissions check on stat: if opened, allowed to stat it
let meta = f.spawn_blocking(|f| f.metadata()).await?;
Ok(descriptorstat_from(meta))
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
// No permissions check on stat: if opened, allowed to stat it
let meta = d.spawn_blocking(|d| d.dir_metadata()).await?;
Ok(descriptorstat_from(meta))
} else {
Err(ErrorCode::BadDescriptor.into())
match table.get_resource(&fd)? {
Descriptor::File(f) => {
// No permissions check on stat: if opened, allowed to stat it
let meta = f.spawn_blocking(|f| f.metadata()).await?;
Ok(descriptorstat_from(meta))
}
Descriptor::Dir(d) => {
// No permissions check on stat: if opened, allowed to stat it
let meta = d.spawn_blocking(|d| d.dir_metadata()).await?;
Ok(descriptorstat_from(meta))
}
}
}
@ -396,7 +390,7 @@ impl<T: WasiView> HostDescriptor for T {
path: String,
) -> Result<types::DescriptorStat, types::Error> {
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
return Err(ErrorCode::NotPermitted.into());
}
@ -420,7 +414,7 @@ impl<T: WasiView> HostDescriptor for T {
use cap_fs_ext::DirExt;
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -458,11 +452,11 @@ impl<T: WasiView> HostDescriptor for T {
new_path: String,
) -> Result<(), types::Error> {
let table = self.table();
let old_dir = table.get_dir(&fd)?;
let old_dir = table.get_resource(&fd)?.dir()?;
if !old_dir.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
let new_dir = table.get_dir(&new_descriptor)?;
let new_dir = table.get_resource(&new_descriptor)?.dir()?;
if !new_dir.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -492,10 +486,7 @@ impl<T: WasiView> HostDescriptor for T {
use types::{DescriptorFlags, OpenFlags};
let table = self.table_mut();
if table.is_file(&fd) {
Err(ErrorCode::NotDirectory)?;
}
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
Err(ErrorCode::NotPermitted)?;
}
@ -584,12 +575,15 @@ impl<T: WasiView> HostDescriptor for T {
.await?;
match opened {
OpenResult::Dir(dir) => Ok(table.push_dir(Dir::new(dir, d.perms, d.file_perms))?),
OpenResult::File(file) => {
Ok(table.push_file(File::new(file, mask_file_perms(d.file_perms, flags)))?)
OpenResult::Dir(dir) => {
Ok(table.push_resource(Descriptor::Dir(Dir::new(dir, d.perms, d.file_perms)))?)
}
OpenResult::File(file) => Ok(table.push_resource(Descriptor::File(File::new(
file,
mask_file_perms(d.file_perms, flags),
)))?),
OpenResult::NotDir => Err(ErrorCode::NotDirectory.into()),
}
}
@ -602,9 +596,7 @@ impl<T: WasiView> HostDescriptor for T {
// tokio::fs::File just uses std::fs::File's Drop impl to close, so
// it doesn't appear anyone else has found this to be a problem.
// (Not that they could solve it without async drop...)
if table.delete_file(Resource::new_own(fd.rep())).is_err() {
table.delete_dir(fd)?;
}
table.delete_resource(fd)?;
Ok(())
}
@ -615,7 +607,7 @@ impl<T: WasiView> HostDescriptor for T {
path: String,
) -> Result<String, types::Error> {
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::READ) {
return Err(ErrorCode::NotPermitted.into());
}
@ -632,7 +624,7 @@ impl<T: WasiView> HostDescriptor for T {
path: String,
) -> Result<(), types::Error> {
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -647,11 +639,11 @@ impl<T: WasiView> HostDescriptor for T {
new_path: String,
) -> Result<(), types::Error> {
let table = self.table();
let old_dir = table.get_dir(&fd)?;
let old_dir = table.get_resource(&fd)?.dir()?;
if !old_dir.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
let new_dir = table.get_dir(&new_fd)?;
let new_dir = table.get_resource(&new_fd)?.dir()?;
if !new_dir.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -672,7 +664,7 @@ impl<T: WasiView> HostDescriptor for T {
use cap_fs_ext::DirExt;
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -688,7 +680,7 @@ impl<T: WasiView> HostDescriptor for T {
use cap_fs_ext::DirExt;
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
if !d.perms.contains(DirPerms::MUTATE) {
return Err(ErrorCode::NotPermitted.into());
}
@ -759,14 +751,9 @@ impl<T: WasiView> HostDescriptor for T {
&mut self,
fd: Resource<types::Descriptor>,
offset: types::Filesize,
) -> Result<Resource<streams::InputStream>, types::Error> {
use crate::preview2::{
filesystem::FileInputStream,
stream::{InternalInputStream, InternalTableStreamExt},
};
) -> Result<Resource<InputStream>, types::Error> {
// Trap if fd lookup fails:
let f = self.table().get_file(&fd)?;
let f = self.table().get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::READ) {
Err(types::ErrorCode::BadDescriptor)?;
@ -778,9 +765,7 @@ impl<T: WasiView> HostDescriptor for T {
let reader = FileInputStream::new(clone, offset);
// Insert the stream view into the table. Trap if the table is full.
let index = self
.table_mut()
.push_internal_input_stream(InternalInputStream::File(reader))?;
let index = self.table_mut().push_resource(InputStream::File(reader))?;
Ok(index)
}
@ -789,11 +774,9 @@ impl<T: WasiView> HostDescriptor for T {
&mut self,
fd: Resource<types::Descriptor>,
offset: types::Filesize,
) -> Result<Resource<streams::OutputStream>, types::Error> {
use crate::preview2::{filesystem::FileOutputStream, TableStreamExt};
) -> Result<Resource<OutputStream>, types::Error> {
// Trap if fd lookup fails:
let f = self.table().get_file(&fd)?;
let f = self.table().get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::WRITE) {
Err(types::ErrorCode::BadDescriptor)?;
@ -804,9 +787,10 @@ impl<T: WasiView> HostDescriptor for T {
// Create a stream view for it.
let writer = FileOutputStream::write_at(clone, offset);
let writer: OutputStream = Box::new(writer);
// Insert the stream view into the table. Trap if the table is full.
let index = self.table_mut().push_output_stream(Box::new(writer))?;
let index = self.table_mut().push_resource(writer)?;
Ok(index)
}
@ -814,11 +798,9 @@ impl<T: WasiView> HostDescriptor for T {
fn append_via_stream(
&mut self,
fd: Resource<types::Descriptor>,
) -> Result<Resource<streams::OutputStream>, types::Error> {
use crate::preview2::{filesystem::FileOutputStream, TableStreamExt};
) -> Result<Resource<OutputStream>, types::Error> {
// Trap if fd lookup fails:
let f = self.table().get_file(&fd)?;
let f = self.table().get_resource(&fd)?.file()?;
if !f.perms.contains(FilePerms::WRITE) {
Err(types::ErrorCode::BadDescriptor)?;
@ -828,9 +810,10 @@ impl<T: WasiView> HostDescriptor for T {
// Create a stream view for it.
let appender = FileOutputStream::append(clone);
let appender: OutputStream = Box::new(appender);
// Insert the stream view into the table. Trap if the table is full.
let index = self.table_mut().push_output_stream(Box::new(appender))?;
let index = self.table_mut().push_resource(appender)?;
Ok(index)
}
@ -876,7 +859,7 @@ impl<T: WasiView> HostDescriptor for T {
path: String,
) -> Result<types::MetadataHashValue, types::Error> {
let table = self.table();
let d = table.get_dir(&fd)?;
let d = table.get_resource(&fd)?.dir()?;
// No permissions check on metadata: if dir opened, allowed to stat it
let meta = d
.spawn_blocking(move |d| {
@ -898,12 +881,12 @@ impl<T: WasiView> HostDirectoryEntryStream for T {
stream: Resource<types::DirectoryEntryStream>,
) -> Result<Option<types::DirectoryEntry>, types::Error> {
let table = self.table();
let readdir = table.get_readdir(&stream)?;
let readdir = table.get_resource(&stream)?;
readdir.next()
}
fn drop(&mut self, stream: Resource<types::DirectoryEntryStream>) -> anyhow::Result<()> {
self.table_mut().delete_readdir(stream)?;
self.table_mut().delete_resource(stream)?;
Ok(())
}
}
@ -912,16 +895,15 @@ async fn get_descriptor_metadata(
table: &Table,
fd: Resource<types::Descriptor>,
) -> Result<cap_std::fs::Metadata, types::Error> {
if table.is_file(&fd) {
let f = table.get_file(&fd)?;
// No permissions check on metadata: if opened, allowed to stat it
Ok(f.spawn_blocking(|f| f.metadata()).await?)
} else if table.is_dir(&fd) {
let d = table.get_dir(&fd)?;
// No permissions check on metadata: if opened, allowed to stat it
Ok(d.spawn_blocking(|d| d.dir_metadata()).await?)
} else {
Err(ErrorCode::BadDescriptor.into())
match table.get_resource(&fd)? {
Descriptor::File(f) => {
// No permissions check on metadata: if opened, allowed to stat it
Ok(f.spawn_blocking(|f| f.metadata()).await?)
}
Descriptor::Dir(d) => {
// No permissions check on metadata: if opened, allowed to stat it
Ok(d.spawn_blocking(|d| d.dir_metadata()).await?)
}
}
}
@ -1105,69 +1087,6 @@ fn symlink_follow(path_flags: types::PathFlags) -> bool {
path_flags.contains(types::PathFlags::SYMLINK_FOLLOW)
}
pub(crate) struct ReaddirIterator(
std::sync::Mutex<
Box<dyn Iterator<Item = Result<types::DirectoryEntry, types::Error>> + Send + 'static>,
>,
);
impl ReaddirIterator {
fn new(
i: impl Iterator<Item = Result<types::DirectoryEntry, types::Error>> + Send + 'static,
) -> Self {
ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
}
fn next(&self) -> Result<Option<types::DirectoryEntry>, types::Error> {
self.0.lock().unwrap().next().transpose()
}
}
impl IntoIterator for ReaddirIterator {
type Item = Result<types::DirectoryEntry, types::Error>;
type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_inner().unwrap()
}
}
pub(crate) trait TableReaddirExt {
fn push_readdir(
&mut self,
readdir: ReaddirIterator,
) -> Result<Resource<DirectoryEntryStream>, TableError>;
fn delete_readdir(
&mut self,
fd: Resource<DirectoryEntryStream>,
) -> Result<ReaddirIterator, TableError>;
fn get_readdir(
&self,
fd: &Resource<DirectoryEntryStream>,
) -> Result<&ReaddirIterator, TableError>;
}
impl TableReaddirExt for Table {
fn push_readdir(
&mut self,
readdir: ReaddirIterator,
) -> Result<Resource<DirectoryEntryStream>, TableError> {
Ok(Resource::new_own(self.push(Box::new(readdir))?))
}
fn delete_readdir(
&mut self,
fd: Resource<DirectoryEntryStream>,
) -> Result<ReaddirIterator, TableError> {
self.delete(fd.rep())
}
fn get_readdir(
&self,
fd: &Resource<DirectoryEntryStream>,
) -> Result<&ReaddirIterator, TableError> {
self.get(fd.rep())
}
}
fn mask_file_perms(p: FilePerms, flags: types::DescriptorFlags) -> FilePerms {
use types::DescriptorFlags;
let mut out = FilePerms::empty();
@ -1187,9 +1106,9 @@ mod test {
fn table_readdir_works() {
let mut table = Table::new();
let ix = table
.push_readdir(ReaddirIterator::new(std::iter::empty()))
.push_resource(ReaddirIterator::new(std::iter::empty()))
.unwrap();
let _ = table.get_readdir(&ix).unwrap();
table.delete_readdir(ix).unwrap();
let _ = table.get_resource(&ix).unwrap();
table.delete_resource(ix).unwrap();
}
}

8
crates/wasi/src/preview2/host/instance_network.rs

@ -1,12 +1,12 @@
use crate::preview2::bindings::sockets::instance_network::{self, Network};
use crate::preview2::network::{HostNetworkState, TableNetworkExt};
use crate::preview2::bindings::sockets::instance_network;
use crate::preview2::network::Network;
use crate::preview2::WasiView;
use wasmtime::component::Resource;
impl<T: WasiView> instance_network::Host for T {
fn instance_network(&mut self) -> Result<Resource<Network>, anyhow::Error> {
let network = HostNetworkState::new(self.ctx().pool.clone());
let network = self.table_mut().push_network(network)?;
let network = Network::new(self.ctx().pool.clone());
let network = self.table_mut().push_resource(network)?;
Ok(network)
}
}

177
crates/wasi/src/preview2/host/io.rs

@ -1,13 +1,8 @@
use crate::preview2::{
bindings::io::poll::Pollable,
bindings::io::streams::{self, InputStream, OutputStream},
filesystem::FileInputStream,
poll::PollableFuture,
stream::{
HostInputStream, HostOutputStream, InternalInputStream, InternalTableStreamExt,
OutputStreamError, StreamRuntimeError, StreamState, TableStreamExt,
},
HostPollable, TableError, TablePollableExt, WasiView,
stream::{OutputStreamError, StreamRuntimeError, StreamState},
Pollable, TableError, WasiView,
};
use std::any::Any;
use std::future::Future;
@ -48,12 +43,12 @@ impl<T: WasiView> streams::Host for T {}
#[async_trait::async_trait]
impl<T: WasiView> streams::HostOutputStream for T {
fn drop(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<()> {
self.table_mut().delete_output_stream(stream)?;
self.table_mut().delete_resource(stream)?;
Ok(())
}
fn check_write(&mut self, stream: Resource<OutputStream>) -> Result<u64, streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
let s = self.table_mut().get_resource_mut(&stream)?;
let mut ready = s.write_ready();
let mut task = Context::from_waker(futures::task::noop_waker_ref());
match Pin::new(&mut ready).poll(&mut task) {
@ -68,31 +63,30 @@ impl<T: WasiView> streams::HostOutputStream for T {
stream: Resource<OutputStream>,
bytes: Vec<u8>,
) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
HostOutputStream::write(s, bytes.into())?;
self.table_mut()
.get_resource_mut(&stream)?
.write(bytes.into())?;
Ok(())
}
fn subscribe(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<Resource<Pollable>> {
// Ensure that table element is an output-stream:
let _ = self.table_mut().get_output_stream_mut(&stream)?;
fn output_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> {
let stream = stream
.downcast_mut::<Box<dyn HostOutputStream>>()
.expect("downcast to HostOutputStream failed");
.downcast_mut::<OutputStream>()
.expect("downcast to OutputStream failed");
Box::pin(async move {
let _ = stream.write_ready().await?;
Ok(())
})
}
Ok(self
.table_mut()
.push_host_pollable(HostPollable::TableEntry {
Ok(self.table_mut().push_child_resource(
Pollable::TableEntry {
index: stream.rep(),
make_future: output_stream_ready,
})?)
},
&stream,
)?)
}
async fn blocking_write_and_flush(
@ -100,7 +94,7 @@ impl<T: WasiView> streams::HostOutputStream for T {
stream: Resource<OutputStream>,
bytes: Vec<u8>,
) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
let s = self.table_mut().get_resource_mut(&stream)?;
if bytes.len() > 4096 {
return Err(streams::Error::trap(anyhow::anyhow!(
@ -113,11 +107,11 @@ impl<T: WasiView> streams::HostOutputStream for T {
let permit = s.write_ready().await?;
let len = bytes.len().min(permit);
let chunk = bytes.split_to(len);
HostOutputStream::write(s, chunk)?;
s.write(chunk)?;
}
HostOutputStream::flush(s)?;
let _ = s.write_ready().await?;
s.flush()?;
s.write_ready().await?;
Ok(())
}
@ -127,7 +121,7 @@ impl<T: WasiView> streams::HostOutputStream for T {
stream: Resource<OutputStream>,
len: u64,
) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
let s = self.table_mut().get_resource_mut(&stream)?;
if len > 4096 {
return Err(streams::Error::trap(anyhow::anyhow!(
@ -139,12 +133,12 @@ impl<T: WasiView> streams::HostOutputStream for T {
while len > 0 {
let permit = s.write_ready().await?;
let this_len = len.min(permit as u64);
HostOutputStream::write_zeroes(s, this_len as usize)?;
s.write_zeroes(this_len as usize)?;
len -= this_len;
}
HostOutputStream::flush(s)?;
let _ = s.write_ready().await?;
s.flush()?;
s.write_ready().await?;
Ok(())
}
@ -154,14 +148,14 @@ impl<T: WasiView> streams::HostOutputStream for T {
stream: Resource<OutputStream>,
len: u64,
) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
HostOutputStream::write_zeroes(s, len as usize)?;
self.table_mut()
.get_resource_mut(&stream)?
.write_zeroes(len as usize)?;
Ok(())
}
fn flush(&mut self, stream: Resource<OutputStream>) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
HostOutputStream::flush(s)?;
self.table_mut().get_resource_mut(&stream)?.flush()?;
Ok(())
}
@ -169,9 +163,9 @@ impl<T: WasiView> streams::HostOutputStream for T {
&mut self,
stream: Resource<OutputStream>,
) -> Result<(), streams::Error> {
let s = self.table_mut().get_output_stream_mut(&stream)?;
HostOutputStream::flush(s)?;
let _ = s.write_ready().await?;
let s = self.table_mut().get_resource_mut(&stream)?;
s.flush()?;
s.write_ready().await?;
Ok(())
}
@ -194,7 +188,7 @@ impl<T: WasiView> streams::HostOutputStream for T {
?;
let d: &mut Box<dyn crate::OutputStream> = ctx
.table_mut()
.get_output_stream_mut(dst)
.get_resource_mut(dst)
?;
let bytes_spliced: u64 = s.splice(&mut **d, len).await?;
@ -233,7 +227,7 @@ impl<T: WasiView> streams::HostOutputStream for T {
?;
let d: &mut Box<dyn crate::OutputStream> = ctx
.table_mut()
.get_output_stream_mut(dst)
.get_resource_mut(dst)
?;
let bytes_spliced: u64 = s.splice(&mut **d, len).await?;
@ -248,7 +242,7 @@ impl<T: WasiView> streams::HostOutputStream for T {
#[async_trait::async_trait]
impl<T: WasiView> streams::HostInputStream for T {
fn drop(&mut self, stream: Resource<InputStream>) -> anyhow::Result<()> {
self.table_mut().delete_internal_input_stream(stream)?;
self.table_mut().delete_resource(stream)?;
Ok(())
}
@ -257,9 +251,9 @@ impl<T: WasiView> streams::HostInputStream for T {
stream: Resource<InputStream>,
len: u64,
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(&stream)? {
InternalInputStream::Host(s) => {
let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) {
match self.table_mut().get_resource_mut(&stream)? {
InputStream::Host(s) => {
let (bytes, state) = match s.read(len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
@ -274,8 +268,8 @@ impl<T: WasiView> streams::HostInputStream for T {
Ok(Ok((bytes.into(), state.into())))
}
InternalInputStream::File(s) => {
let (bytes, state) = match FileInputStream::read(s, len as usize).await {
InputStream::File(s) => {
let (bytes, state) = match s.read(len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
@ -296,38 +290,10 @@ impl<T: WasiView> streams::HostInputStream for T {
stream: Resource<InputStream>,
len: u64,
) -> anyhow::Result<Result<(Vec<u8>, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(&stream)? {
InternalInputStream::Host(s) => {
s.ready().await?;
let (bytes, state) = match HostInputStream::read(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
debug_assert!(bytes.len() <= len as usize);
Ok(Ok((bytes.into(), state.into())))
}
InternalInputStream::File(s) => {
let (bytes, state) = match FileInputStream::read(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes.into(), state.into())))
}
if let InputStream::Host(s) = self.table_mut().get_resource_mut(&stream)? {
s.ready().await?;
}
self.read(stream, len).await
}
async fn skip(
@ -335,10 +301,10 @@ impl<T: WasiView> streams::HostInputStream for T {
stream: Resource<InputStream>,
len: u64,
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(&stream)? {
InternalInputStream::Host(s) => {
match self.table_mut().get_resource_mut(&stream)? {
InputStream::Host(s) => {
// TODO: the cast to usize should be fallible, use `.try_into()?`
let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) {
let (bytes_skipped, state) = match s.skip(len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
@ -352,8 +318,8 @@ impl<T: WasiView> streams::HostInputStream for T {
Ok(Ok((bytes_skipped as u64, state.into())))
}
InternalInputStream::File(s) => {
let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await {
InputStream::File(s) => {
let (bytes_skipped, state) = match s.skip(len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
@ -374,67 +340,38 @@ impl<T: WasiView> streams::HostInputStream for T {
stream: Resource<InputStream>,
len: u64,
) -> anyhow::Result<Result<(u64, streams::StreamStatus), ()>> {
match self.table_mut().get_internal_input_stream_mut(&stream)? {
InternalInputStream::Host(s) => {
s.ready().await?;
// TODO: the cast to usize should be fallible, use `.try_into()?`
let (bytes_skipped, state) = match HostInputStream::skip(s.as_mut(), len as usize) {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes_skipped as u64, state.into())))
}
InternalInputStream::File(s) => {
let (bytes_skipped, state) = match FileInputStream::skip(s, len as usize).await {
Ok(a) => a,
Err(e) => {
if let Some(e) = e.downcast_ref::<StreamRuntimeError>() {
tracing::debug!("stream runtime error: {e:?}");
return Ok(Err(()));
} else {
return Err(e);
}
}
};
Ok(Ok((bytes_skipped as u64, state.into())))
}
if let InputStream::Host(s) = self.table_mut().get_resource_mut(&stream)? {
s.ready().await?;
}
self.skip(stream, len).await
}
fn subscribe(&mut self, stream: Resource<InputStream>) -> anyhow::Result<Resource<Pollable>> {
// Ensure that table element is an input-stream:
let pollable = match self.table_mut().get_internal_input_stream_mut(&stream)? {
InternalInputStream::Host(_) => {
let pollable = match self.table_mut().get_resource(&stream)? {
InputStream::Host(_) => {
fn input_stream_ready<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> {
let stream = stream
.downcast_mut::<InternalInputStream>()
.expect("downcast to InternalInputStream failed");
.downcast_mut::<InputStream>()
.expect("downcast to InputStream failed");
match *stream {
InternalInputStream::Host(ref mut hs) => hs.ready(),
InputStream::Host(ref mut hs) => hs.ready(),
_ => unreachable!(),
}
}
HostPollable::TableEntry {
Pollable::TableEntry {
index: stream.rep(),
make_future: input_stream_ready,
}
}
// Files are always "ready" immediately (because we have no way to actually wait on
// readiness in epoll)
InternalInputStream::File(_) => {
HostPollable::Closure(Box::new(|| Box::pin(futures::future::ready(Ok(())))))
InputStream::File(_) => {
Pollable::Closure(Box::new(|| Box::pin(futures::future::ready(Ok(())))))
}
};
Ok(self.table_mut().push_host_pollable(pollable)?)
Ok(self.table_mut().push_child_resource(pollable, &stream)?)
}
}

3
crates/wasi/src/preview2/host/network.rs

@ -2,7 +2,6 @@ use crate::preview2::bindings::sockets::network::{
self, ErrorCode, IpAddressFamily, IpSocketAddress, Ipv4Address, Ipv4SocketAddress, Ipv6Address,
Ipv6SocketAddress,
};
use crate::preview2::network::TableNetworkExt;
use crate::preview2::{TableError, WasiView};
use std::io;
use wasmtime::component::Resource;
@ -13,7 +12,7 @@ impl<T: WasiView> crate::preview2::bindings::sockets::network::HostNetwork for T
fn drop(&mut self, this: Resource<network::Network>) -> Result<(), anyhow::Error> {
let table = self.table_mut();
table.delete_network(this)?;
table.delete_resource(this)?;
Ok(())
}

159
crates/wasi/src/preview2/host/tcp.rs

@ -1,14 +1,10 @@
use crate::preview2::bindings::{
io::poll::Pollable,
io::streams::{InputStream, OutputStream},
sockets::network::{self, ErrorCode, IpAddressFamily, IpSocketAddress, Network},
sockets::tcp::{self, ShutdownType},
};
use crate::preview2::network::TableNetworkExt;
use crate::preview2::poll::TablePollableExt;
use crate::preview2::stream::TableStreamExt;
use crate::preview2::tcp::{HostTcpSocketState, HostTcpState, TableTcpSocketExt};
use crate::preview2::{HostPollable, PollableFuture, WasiView};
use crate::preview2::tcp::{TcpSocket, TcpState};
use crate::preview2::{Pollable, PollableFuture, WasiView};
use cap_net_ext::{Blocking, PoolExt, TcpListenerExt};
use cap_std::net::TcpListener;
use io_lifetimes::AsSocketlike;
@ -28,14 +24,14 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
local_address: IpSocketAddress,
) -> Result<(), network::Error> {
let table = self.table_mut();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
match socket.tcp_state {
HostTcpState::Default => {}
TcpState::Default => {}
_ => return Err(ErrorCode::NotInProgress.into()),
}
let network = table.get_network(&network)?;
let network = table.get_resource(&network)?;
let binder = network.0.tcp_binder(local_address)?;
// Perform the OS bind call.
@ -43,22 +39,22 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
&*socket.tcp_socket().as_socketlike_view::<TcpListener>(),
)?;
let socket = table.get_tcp_socket_mut(&this)?;
socket.tcp_state = HostTcpState::BindStarted;
let socket = table.get_resource_mut(&this)?;
socket.tcp_state = TcpState::BindStarted;
Ok(())
}
fn finish_bind(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), network::Error> {
let table = self.table_mut();
let socket = table.get_tcp_socket_mut(&this)?;
let socket = table.get_resource_mut(&this)?;
match socket.tcp_state {
HostTcpState::BindStarted => {}
TcpState::BindStarted => {}
_ => return Err(ErrorCode::NotInProgress.into()),
}
socket.tcp_state = HostTcpState::Bound;
socket.tcp_state = TcpState::Bound;
Ok(())
}
@ -71,15 +67,15 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
) -> Result<(), network::Error> {
let table = self.table_mut();
let r = {
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
match socket.tcp_state {
HostTcpState::Default => {}
HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
TcpState::Default => {}
TcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
_ => return Err(ErrorCode::NotInProgress.into()),
}
let network = table.get_network(&network)?;
let network = table.get_resource(&network)?;
let connecter = network.0.tcp_connecter(remote_address)?;
// Do an OS `connect`. Our socket is non-blocking, so it'll either...
@ -93,8 +89,8 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
match r {
// succeed immediately,
Ok(()) => {
let socket = table.get_tcp_socket_mut(&this)?;
socket.tcp_state = HostTcpState::ConnectReady;
let socket = table.get_resource_mut(&this)?;
socket.tcp_state = TcpState::ConnectReady;
return Ok(());
}
// continue in progress,
@ -103,8 +99,8 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
Err(err) => return Err(err.into()),
}
let socket = table.get_tcp_socket_mut(&this)?;
socket.tcp_state = HostTcpState::Connecting;
let socket = table.get_resource_mut(&this)?;
socket.tcp_state = TcpState::Connecting;
Ok(())
}
@ -114,11 +110,11 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
this: Resource<tcp::TcpSocket>,
) -> Result<(Resource<InputStream>, Resource<OutputStream>), network::Error> {
let table = self.table_mut();
let socket = table.get_tcp_socket_mut(&this)?;
let socket = table.get_resource_mut(&this)?;
match socket.tcp_state {
HostTcpState::ConnectReady => {}
HostTcpState::Connecting => {
TcpState::ConnectReady => {}
TcpState::Connecting => {
// Do a `poll` to test for completion, using a timeout of zero
// to avoid blocking.
match rustix::event::poll(
@ -142,26 +138,22 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
_ => return Err(ErrorCode::NotInProgress.into()),
};
socket.tcp_state = HostTcpState::Connected;
socket.tcp_state = TcpState::Connected;
let (input, output) = socket.as_split();
let input_stream = self
.table_mut()
.push_input_stream_child(input, Resource::<tcp::TcpSocket>::new_borrow(this.rep()))?;
let output_stream = self
.table_mut()
.push_output_stream_child(output, Resource::<tcp::TcpSocket>::new_borrow(this.rep()))?;
let input_stream = self.table_mut().push_child_resource(input, &this)?;
let output_stream = self.table_mut().push_child_resource(output, &this)?;
Ok((input_stream, output_stream))
}
fn start_listen(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), network::Error> {
let table = self.table_mut();
let socket = table.get_tcp_socket_mut(&this)?;
let socket = table.get_resource_mut(&this)?;
match socket.tcp_state {
HostTcpState::Bound => {}
HostTcpState::ListenStarted => return Err(ErrorCode::AlreadyListening.into()),
HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
TcpState::Bound => {}
TcpState::ListenStarted => return Err(ErrorCode::AlreadyListening.into()),
TcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
_ => return Err(ErrorCode::NotInProgress.into()),
}
@ -170,21 +162,21 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
.as_socketlike_view::<TcpListener>()
.listen(None)?;
socket.tcp_state = HostTcpState::ListenStarted;
socket.tcp_state = TcpState::ListenStarted;
Ok(())
}
fn finish_listen(&mut self, this: Resource<tcp::TcpSocket>) -> Result<(), network::Error> {
let table = self.table_mut();
let socket = table.get_tcp_socket_mut(&this)?;
let socket = table.get_resource_mut(&this)?;
match socket.tcp_state {
HostTcpState::ListenStarted => {}
TcpState::ListenStarted => {}
_ => return Err(ErrorCode::NotInProgress.into()),
}
socket.tcp_state = HostTcpState::Listening;
socket.tcp_state = TcpState::Listening;
Ok(())
}
@ -201,11 +193,11 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
network::Error,
> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
match socket.tcp_state {
HostTcpState::Listening => {}
HostTcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
TcpState::Listening => {}
TcpState::Connected => return Err(ErrorCode::AlreadyConnected.into()),
_ => return Err(ErrorCode::NotInProgress.into()),
}
@ -216,22 +208,17 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
.as_socketlike_view::<TcpListener>()
.accept_with(Blocking::No)
})?;
let mut tcp_socket = HostTcpSocketState::from_tcp_stream(connection)?;
let mut tcp_socket = TcpSocket::from_tcp_stream(connection)?;
// Mark the socket as connected so that we can exit early from methods like `start-bind`.
tcp_socket.tcp_state = HostTcpState::Connected;
tcp_socket.tcp_state = TcpState::Connected;
let (input, output) = tcp_socket.as_split();
let output: OutputStream = output;
let tcp_socket = self.table_mut().push_tcp_socket(tcp_socket)?;
let input_stream = self.table_mut().push_input_stream_child(
input,
Resource::<tcp::TcpSocket>::new_borrow(tcp_socket.rep()),
)?;
let output_stream = self.table_mut().push_output_stream_child(
output,
Resource::<tcp::TcpSocket>::new_borrow(tcp_socket.rep()),
)?;
let tcp_socket = self.table_mut().push_resource(tcp_socket)?;
let input_stream = self.table_mut().push_child_resource(input, &tcp_socket)?;
let output_stream = self.table_mut().push_child_resource(output, &tcp_socket)?;
Ok((tcp_socket, input_stream, output_stream))
}
@ -241,7 +228,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
this: Resource<tcp::TcpSocket>,
) -> Result<IpSocketAddress, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
let addr = socket
.tcp_socket()
.as_socketlike_view::<std::net::TcpStream>()
@ -254,7 +241,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
this: Resource<tcp::TcpSocket>,
) -> Result<IpSocketAddress, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
let addr = socket
.tcp_socket()
.as_socketlike_view::<std::net::TcpStream>()
@ -267,7 +254,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
this: Resource<tcp::TcpSocket>,
) -> Result<IpAddressFamily, anyhow::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
// If `SO_DOMAIN` is available, use it.
//
@ -312,7 +299,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
fn ipv6_only(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::get_ipv6_v6only(socket.tcp_socket())?)
}
@ -322,7 +309,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: bool,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::set_ipv6_v6only(socket.tcp_socket(), value)?)
}
@ -332,10 +319,10 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: u64,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
match socket.tcp_state {
HostTcpState::Listening => {}
TcpState::Listening => {}
_ => return Err(ErrorCode::NotInProgress.into()),
}
@ -345,7 +332,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
fn keep_alive(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::get_socket_keepalive(socket.tcp_socket())?)
}
@ -355,13 +342,13 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: bool,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::set_socket_keepalive(socket.tcp_socket(), value)?)
}
fn no_delay(&mut self, this: Resource<tcp::TcpSocket>) -> Result<bool, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::get_tcp_nodelay(socket.tcp_socket())?)
}
@ -371,13 +358,13 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: bool,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::set_tcp_nodelay(socket.tcp_socket(), value)?)
}
fn unicast_hop_limit(&mut self, this: Resource<tcp::TcpSocket>) -> Result<u8, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
// We don't track whether the socket is IPv4 or IPv6 so try one and
// fall back to the other.
@ -398,7 +385,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: u8,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
// We don't track whether the socket is IPv4 or IPv6 so try one and
// fall back to the other.
@ -414,7 +401,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
this: Resource<tcp::TcpSocket>,
) -> Result<u64, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::get_socket_recv_buffer_size(socket.tcp_socket())? as u64)
}
@ -424,7 +411,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: u64,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?;
Ok(sockopt::set_socket_recv_buffer_size(
socket.tcp_socket(),
@ -434,7 +421,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
fn send_buffer_size(&mut self, this: Resource<tcp::TcpSocket>) -> Result<u64, network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
Ok(sockopt::get_socket_send_buffer_size(socket.tcp_socket())? as u64)
}
@ -444,7 +431,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
value: u64,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?;
Ok(sockopt::set_socket_send_buffer_size(
socket.tcp_socket(),
@ -455,14 +442,14 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
fn subscribe(&mut self, this: Resource<tcp::TcpSocket>) -> anyhow::Result<Resource<Pollable>> {
fn make_tcp_socket_future<'a>(stream: &'a mut dyn Any) -> PollableFuture<'a> {
let socket = stream
.downcast_mut::<HostTcpSocketState>()
.expect("downcast to HostTcpSocketState failed");
.downcast_mut::<TcpSocket>()
.expect("downcast to TcpSocket failed");
// Some states are ready immediately.
match socket.tcp_state {
HostTcpState::BindStarted
| HostTcpState::ListenStarted
| HostTcpState::ConnectReady => return Box::pin(async { Ok(()) }),
TcpState::BindStarted | TcpState::ListenStarted | TcpState::ConnectReady => {
return Box::pin(async { Ok(()) })
}
_ => {}
}
@ -479,12 +466,12 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
join
}
let pollable = HostPollable::TableEntry {
let pollable = Pollable::TableEntry {
index: this.rep(),
make_future: make_tcp_socket_future,
};
Ok(self.table_mut().push_host_pollable(pollable)?)
Ok(self.table_mut().push_child_resource(pollable, &this)?)
}
fn shutdown(
@ -493,7 +480,7 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
shutdown_type: ShutdownType,
) -> Result<(), network::Error> {
let table = self.table();
let socket = table.get_tcp_socket(&this)?;
let socket = table.get_resource(&this)?;
let how = match shutdown_type {
ShutdownType::Receive => std::net::Shutdown::Read,
@ -513,19 +500,19 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
// As in the filesystem implementation, we assume closing a socket
// doesn't block.
let dropped = table.delete_tcp_socket(this)?;
let dropped = table.delete_resource(this)?;
// If we might have an `event::poll` waiting on the socket, wake it up.
#[cfg(not(unix))]
{
match dropped.tcp_state {
HostTcpState::Default
| HostTcpState::BindStarted
| HostTcpState::Bound
| HostTcpState::ListenStarted
| HostTcpState::ConnectReady => {}
TcpState::Default
| TcpState::BindStarted
| TcpState::Bound
| TcpState::ListenStarted
| TcpState::ConnectReady => {}
HostTcpState::Listening | HostTcpState::Connecting | HostTcpState::Connected => {
TcpState::Listening | TcpState::Connecting | TcpState::Connected => {
match rustix::net::shutdown(&dropped.inner, rustix::net::Shutdown::ReadWrite) {
Ok(()) | Err(Errno::NOTCONN) => {}
Err(err) => Err(err).unwrap(),

7
crates/wasi/src/preview2/host/tcp_create_socket.rs

@ -1,9 +1,8 @@
use crate::preview2::bindings::{
sockets::network::{self, IpAddressFamily},
sockets::tcp::TcpSocket,
sockets::tcp_create_socket,
};
use crate::preview2::tcp::{HostTcpSocketState, TableTcpSocketExt};
use crate::preview2::tcp::TcpSocket;
use crate::preview2::WasiView;
use wasmtime::component::Resource;
@ -12,8 +11,8 @@ impl<T: WasiView> tcp_create_socket::Host for T {
&mut self,
address_family: IpAddressFamily,
) -> Result<Resource<TcpSocket>, network::Error> {
let socket = HostTcpSocketState::new(address_family.into())?;
let socket = self.table_mut().push_tcp_socket(socket)?;
let socket = TcpSocket::new(address_family.into())?;
let socket = self.table_mut().push_resource(socket)?;
Ok(socket)
}
}

19
crates/wasi/src/preview2/mod.rs

@ -37,14 +37,14 @@ pub use self::clocks::{HostMonotonicClock, HostWallClock};
pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView};
pub use self::error::I32Exit;
pub use self::filesystem::{DirPerms, FilePerms};
pub use self::poll::{ClosureFuture, HostPollable, MakeFuture, PollableFuture, TablePollableExt};
pub use self::poll::{ClosureFuture, MakeFuture, Pollable, PollableFuture};
pub use self::random::{thread_rng, Deterministic};
pub use self::stdio::{stderr, stdin, stdout, IsATTY, Stderr, Stdin, Stdout};
pub use self::stream::{
HostInputStream, HostOutputStream, OutputStreamError, StreamRuntimeError, StreamState,
TableStreamExt,
HostInputStream, HostOutputStream, InputStream, OutputStream, OutputStreamError,
StreamRuntimeError, StreamState,
};
pub use self::table::{OccupiedEntry, Table, TableError};
pub use self::table::{Table, TableError};
pub use cap_fs_ext::SystemTimeSpec;
pub use cap_rand::RngCore;
@ -146,6 +146,17 @@ pub mod bindings {
"wasi:filesystem/types"::"error-code": Error,
"wasi:sockets/network"::"error-code": Error,
},
with: {
"wasi:sockets/network/network": super::network::Network,
"wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket,
"wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator,
"wasi:filesystem/types/descriptor": super::filesystem::Descriptor,
"wasi:io/streams/input-stream": super::stream::InputStream,
"wasi:io/streams/output-stream": super::stream::OutputStream,
"wasi:io/poll/pollable": super::poll::Pollable,
"wasi:cli/terminal-input/terminal-input": super::stdio::TerminalInput,
"wasi:cli/terminal-output/terminal-output": super::stdio::TerminalOutput,
},
});
pub use wasi::*;

29
crates/wasi/src/preview2/network.rs

@ -1,34 +1,9 @@
use crate::preview2::bindings::sockets::network::Network;
use crate::preview2::{Table, TableError};
use cap_std::net::Pool;
use wasmtime::component::Resource;
pub(crate) struct HostNetworkState(pub(crate) Pool);
pub struct Network(pub(crate) Pool);
impl HostNetworkState {
impl Network {
pub fn new(pool: Pool) -> Self {
Self(pool)
}
}
pub(crate) trait TableNetworkExt {
fn push_network(&mut self, network: HostNetworkState) -> Result<Resource<Network>, TableError>;
fn delete_network(&mut self, fd: Resource<Network>) -> Result<HostNetworkState, TableError>;
fn is_network(&self, fd: &Resource<Network>) -> bool;
fn get_network(&self, fd: &Resource<Network>) -> Result<&HostNetworkState, TableError>;
}
impl TableNetworkExt for Table {
fn push_network(&mut self, network: HostNetworkState) -> Result<Resource<Network>, TableError> {
Ok(Resource::new_own(self.push(Box::new(network))?))
}
fn delete_network(&mut self, fd: Resource<Network>) -> Result<HostNetworkState, TableError> {
self.delete(fd.rep())
}
fn is_network(&self, fd: &Resource<Network>) -> bool {
self.is::<HostNetworkState>(fd.rep())
}
fn get_network(&self, fd: &Resource<Network>) -> Result<&HostNetworkState, TableError> {
self.get(fd.rep())
}
}

74
crates/wasi/src/preview2/poll.rs

@ -1,7 +1,4 @@
use crate::preview2::{
bindings::io::poll::{self, Pollable},
Table, TableError, WasiView,
};
use crate::preview2::{bindings::io::poll, WasiView};
use anyhow::Result;
use std::any::Any;
use std::collections::{hash_map::Entry, HashMap};
@ -19,8 +16,8 @@ pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + Sync +
/// A pollable is not the same thing as a Rust Future: the same pollable may be used to
/// repeatedly check for readiness of a given condition, e.g. if a stream is readable
/// or writable. So, rather than containing a Future, which can only become Ready once, a
/// HostPollable contains a way to create a Future in each call to `poll_list`.
pub enum HostPollable {
/// Pollable contains a way to create a Future in each call to `poll_list`.
pub enum Pollable {
/// Create a Future by calling a fn on another resource in the table. This
/// indirection means the created Future can use a mut borrow of another
/// resource in the Table (e.g. a stream)
@ -31,33 +28,6 @@ pub enum HostPollable {
Closure(ClosureFuture),
}
pub trait TablePollableExt {
fn push_host_pollable(&mut self, p: HostPollable) -> Result<Resource<Pollable>, TableError>;
fn get_host_pollable_mut(
&mut self,
fd: &Resource<Pollable>,
) -> Result<&mut HostPollable, TableError>;
fn delete_host_pollable(&mut self, fd: Resource<Pollable>) -> Result<HostPollable, TableError>;
}
impl TablePollableExt for Table {
fn push_host_pollable(&mut self, p: HostPollable) -> Result<Resource<Pollable>, TableError> {
Ok(Resource::new_own(match p {
HostPollable::TableEntry { index, .. } => self.push_child(Box::new(p), index)?,
HostPollable::Closure { .. } => self.push(Box::new(p))?,
}))
}
fn get_host_pollable_mut(
&mut self,
fd: &Resource<Pollable>,
) -> Result<&mut HostPollable, TableError> {
self.get_mut::<HostPollable>(fd.rep())
}
fn delete_host_pollable(&mut self, fd: Resource<Pollable>) -> Result<HostPollable, TableError> {
self.delete::<HostPollable>(fd.rep())
}
}
#[async_trait::async_trait]
impl<T: WasiView> poll::Host for T {
async fn poll_list(&mut self, pollables: Vec<Resource<Pollable>>) -> Result<Vec<u32>> {
@ -70,19 +40,17 @@ impl<T: WasiView> poll::Host for T {
for (ix, p) in pollables.iter().enumerate() {
let ix: u32 = ix.try_into()?;
match table.get_host_pollable_mut(&p)? {
HostPollable::Closure(f) => closure_futures.push((f(), vec![ix])),
HostPollable::TableEntry { index, make_future } => {
match table_futures.entry(*index) {
Entry::Vacant(v) => {
v.insert((*make_future, vec![ix]));
}
Entry::Occupied(mut o) => {
let (_, v) = o.get_mut();
v.push(ix);
}
match table.get_resource_mut(&p)? {
Pollable::Closure(f) => closure_futures.push((f(), vec![ix])),
Pollable::TableEntry { index, make_future } => match table_futures.entry(*index) {
Entry::Vacant(v) => {
v.insert((*make_future, vec![ix]));
}
}
Entry::Occupied(mut o) => {
let (_, v) = o.get_mut();
v.push(ix);
}
},
}
}
@ -133,9 +101,9 @@ impl<T: WasiView> poll::Host for T {
let table = self.table_mut();
let closure_future = match table.get_host_pollable_mut(&pollable)? {
HostPollable::Closure(f) => f(),
HostPollable::TableEntry { index, make_future } => {
let closure_future = match table.get_resource_mut(&pollable)? {
Pollable::Closure(f) => f(),
Pollable::TableEntry { index, make_future } => {
let index = *index;
let make_future = *make_future;
make_future(table.get_as_any_mut(index)?)
@ -149,14 +117,14 @@ impl<T: WasiView> poll::Host for T {
#[async_trait::async_trait]
impl<T: WasiView> crate::preview2::bindings::io::poll::HostPollable for T {
fn drop(&mut self, pollable: Resource<Pollable>) -> Result<()> {
self.table_mut().delete_host_pollable(pollable)?;
self.table_mut().delete_resource(pollable)?;
Ok(())
}
}
pub mod sync {
use crate::preview2::{
bindings::io::poll::{Host as AsyncHost, HostPollable as AsyncHostPollable},
bindings::io::poll as async_poll,
bindings::sync_io::io::poll::{self, Pollable},
in_tokio, WasiView,
};
@ -165,17 +133,17 @@ pub mod sync {
impl<T: WasiView> poll::Host for T {
fn poll_list(&mut self, pollables: Vec<Resource<Pollable>>) -> Result<Vec<u32>> {
in_tokio(async { AsyncHost::poll_list(self, pollables).await })
in_tokio(async { async_poll::Host::poll_list(self, pollables).await })
}
fn poll_one(&mut self, pollable: Resource<Pollable>) -> Result<()> {
in_tokio(async { AsyncHost::poll_one(self, pollable).await })
in_tokio(async { async_poll::Host::poll_one(self, pollable).await })
}
}
impl<T: WasiView> crate::preview2::bindings::sync_io::io::poll::HostPollable for T {
fn drop(&mut self, pollable: Resource<Pollable>) -> Result<()> {
AsyncHostPollable::drop(self, pollable)
async_poll::HostPollable::drop(self, pollable)
}
}
}

280
crates/wasi/src/preview2/preview1.rs

@ -6,14 +6,11 @@ use crate::preview2::bindings::clocks::{monotonic_clock, wall_clock};
use crate::preview2::bindings::filesystem::{preopens, types as filesystem};
use crate::preview2::bindings::io::poll;
use crate::preview2::bindings::io::streams;
use crate::preview2::filesystem::TableFsExt;
use crate::preview2::host::filesystem::TableReaddirExt;
use crate::preview2::{bindings, IsATTY, TableError, WasiView};
use anyhow::{anyhow, bail, Context};
use std::borrow::Borrow;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::mem::{size_of, size_of_val};
use std::mem::{self, size_of, size_of_val};
use std::ops::{Deref, DerefMut};
use std::slice;
use std::sync::atomic::{AtomicU64, Ordering};
@ -22,10 +19,10 @@ use wasmtime::component::Resource;
use wiggle::tracing::instrument;
use wiggle::{GuestError, GuestPtr, GuestSlice, GuestSliceMut, GuestStrCow, GuestType};
#[derive(Clone, Debug)]
#[derive(Debug)]
struct File {
/// The handle to the preview2 descriptor that this file is referencing.
fd: u32,
fd: Resource<filesystem::Descriptor>,
/// The current-position pointer.
position: Arc<AtomicU64>,
@ -128,12 +125,12 @@ impl BlockingMode {
}
}
#[derive(Clone, Debug)]
#[derive(Debug)]
enum Descriptor {
Stdin { input_stream: u32, isatty: IsATTY },
Stdout { output_stream: u32, isatty: IsATTY },
Stderr { output_stream: u32, isatty: IsATTY },
PreopenDirectory((u32, String)),
PreopenDirectory((Resource<filesystem::Descriptor>, String)),
File(File),
}
@ -240,7 +237,7 @@ impl Descriptors {
.context("failed to call `get-directories`")
.map_err(types::Error::trap)?
{
descriptors.push(Descriptor::PreopenDirectory((dir.0.rep(), dir.1)))?;
descriptors.push(Descriptor::PreopenDirectory((dir.0, dir.1)))?;
}
Ok(descriptors)
}
@ -316,13 +313,13 @@ pub trait WasiPreview1View: WasiView {
// call methods like [`TableFsExt::is_file`] and hiding complexity from preview1 method implementations.
struct Transaction<'a, T: WasiPreview1View + ?Sized> {
view: &'a mut T,
descriptors: Cell<Descriptors>,
descriptors: Descriptors,
}
impl<T: WasiPreview1View + ?Sized> Drop for Transaction<'_, T> {
/// Record changes in the [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`]
fn drop(&mut self) {
let descriptors = self.descriptors.take();
let descriptors = mem::take(&mut self.descriptors);
self.view.adapter_mut().descriptors = Some(descriptors);
}
}
@ -333,24 +330,19 @@ impl<T: WasiPreview1View + ?Sized> Transaction<'_, T> {
/// # Errors
///
/// Returns [`types::Errno::Badf`] if no [`Descriptor`] is found
fn get_descriptor(&mut self, fd: types::Fd) -> Result<&Descriptor> {
fn get_descriptor(&self, fd: types::Fd) -> Result<&Descriptor> {
let fd = fd.into();
let desc = self
.descriptors
.get_mut()
.get(&fd)
.ok_or(types::Errno::Badf)?;
let desc = self.descriptors.get(&fd).ok_or(types::Errno::Badf)?;
Ok(desc)
}
/// Borrows [`File`] corresponding to `fd`
/// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type
fn get_file(&mut self, fd: types::Fd) -> Result<&File> {
fn get_file(&self, fd: types::Fd) -> Result<&File> {
let fd = fd.into();
match self.descriptors.get_mut().get(&fd) {
Some(Descriptor::File(file @ File { fd, .. }))
if self.view.table().is_file(&Resource::new_borrow(*fd)) =>
{
match self.descriptors.get(&fd) {
Some(Descriptor::File(file @ File { fd, .. })) => {
self.view.table().get_resource(fd)?.file()?;
Ok(file)
}
_ => Err(types::Errno::Badf.into()),
@ -361,10 +353,9 @@ impl<T: WasiPreview1View + ?Sized> Transaction<'_, T> {
/// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type
fn get_file_mut(&mut self, fd: types::Fd) -> Result<&mut File> {
let fd = fd.into();
match self.descriptors.get_mut().get_mut(&fd) {
Some(Descriptor::File(file))
if self.view.table().is_file(&Resource::new_borrow(file.fd)) =>
{
match self.descriptors.get_mut(&fd) {
Some(Descriptor::File(file)) => {
self.view.table().get_resource(&file.fd)?.file()?;
Ok(file)
}
_ => Err(types::Errno::Badf.into()),
@ -377,11 +368,11 @@ impl<T: WasiPreview1View + ?Sized> Transaction<'_, T> {
/// # Errors
///
/// Returns [`types::Errno::Spipe`] if the descriptor corresponds to stdio
fn get_seekable(&mut self, fd: types::Fd) -> Result<&File> {
fn get_seekable(&self, fd: types::Fd) -> Result<&File> {
let fd = fd.into();
match self.descriptors.get_mut().get(&fd) {
match self.descriptors.get(&fd) {
Some(Descriptor::File(file @ File { fd, .. }))
if self.view.table().is_file(&Resource::new_borrow(*fd)) =>
if self.view.table().get_resource(fd)?.is_file() =>
{
Ok(file)
}
@ -396,10 +387,10 @@ impl<T: WasiPreview1View + ?Sized> Transaction<'_, T> {
}
/// Returns [`filesystem::Descriptor`] corresponding to `fd`
fn get_fd(&mut self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
fn get_fd(&self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
match self.get_descriptor(fd)? {
Descriptor::File(File { fd, .. }) => Ok(Resource::new_borrow(*fd)),
Descriptor::PreopenDirectory((fd, _)) => Ok(Resource::new_borrow(*fd)),
Descriptor::File(File { fd, .. }) => Ok(fd.borrowed()),
Descriptor::PreopenDirectory((fd, _)) => Ok(fd.borrowed()),
Descriptor::Stdin { .. } | Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => {
Err(types::Errno::Badf.into())
}
@ -408,23 +399,22 @@ impl<T: WasiPreview1View + ?Sized> Transaction<'_, T> {
/// Returns [`filesystem::Descriptor`] corresponding to `fd`
/// if it describes a [`Descriptor::File`] of [`crate::preview2::filesystem::File`] type
fn get_file_fd(&mut self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
self.get_file(fd)
.map(|File { fd, .. }| Resource::new_borrow(*fd))
fn get_file_fd(&self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
self.get_file(fd).map(|File { fd, .. }| fd.borrowed())
}
/// Returns [`filesystem::Descriptor`] corresponding to `fd`
/// if it describes a [`Descriptor::File`] or [`Descriptor::PreopenDirectory`]
/// of [`crate::preview2::filesystem::Dir`] type
fn get_dir_fd(&mut self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
fn get_dir_fd(&self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>> {
let fd = fd.into();
match self.descriptors.get_mut().get(&fd) {
match self.descriptors.get(&fd) {
Some(Descriptor::File(File { fd, .. }))
if self.view.table().is_dir(&Resource::new_borrow(*fd)) =>
if self.view.table().get_resource(fd)?.is_dir() =>
{
Ok(Resource::new_borrow(*fd))
Ok(fd.borrowed())
}
Some(Descriptor::PreopenDirectory((fd, _))) => Ok(Resource::new_borrow(*fd)),
Some(Descriptor::PreopenDirectory((fd, _))) => Ok(fd.borrowed()),
_ => Err(types::Errno::Badf.into()),
}
}
@ -460,7 +450,7 @@ trait WasiPreview1ViewExt:
/// Lazily initializes [`WasiPreview1Adapter`] returned by [`WasiPreview1View::adapter_mut`]
/// and returns [`filesystem::Descriptor`] corresponding to `fd`
fn get_fd(&mut self, fd: types::Fd) -> Result<Resource<filesystem::Descriptor>, types::Error> {
let mut st = self.transact()?;
let st = self.transact()?;
let fd = st.get_fd(fd)?;
Ok(fd)
}
@ -472,7 +462,7 @@ trait WasiPreview1ViewExt:
&mut self,
fd: types::Fd,
) -> Result<Resource<filesystem::Descriptor>, types::Error> {
let mut st = self.transact()?;
let st = self.transact()?;
let fd = st.get_file_fd(fd)?;
Ok(fd)
}
@ -485,7 +475,7 @@ trait WasiPreview1ViewExt:
&mut self,
fd: types::Fd,
) -> Result<Resource<filesystem::Descriptor>, types::Error> {
let mut st = self.transact()?;
let st = self.transact()?;
let fd = st.get_dir_fd(fd)?;
Ok(fd)
}
@ -1029,10 +1019,8 @@ impl<
let desc = self
.transact()?
.descriptors
.get_mut()
.remove(fd)
.ok_or(types::Errno::Badf)?
.clone();
.ok_or(types::Errno::Badf)?;
match desc {
Descriptor::Stdin { input_stream, .. } => {
streams::HostInputStream::drop(self, Resource::new_own(input_stream))
@ -1043,7 +1031,7 @@ impl<
.context("failed to call `drop-output-stream`")
}
Descriptor::File(File { fd, .. }) | Descriptor::PreopenDirectory((fd, _)) => {
filesystem::HostDescriptor::drop(self, Resource::new_own(fd))
filesystem::HostDescriptor::drop(self, fd)
.context("failed to call `drop-descriptor`")
}
}
@ -1131,18 +1119,15 @@ impl<
blocking_mode,
append,
..
}) => (*fd, *blocking_mode, *append),
}) => (fd.borrowed(), *blocking_mode, *append),
};
let flags = self
.get_flags(Resource::new_borrow(fd))
.await
.map_err(|e| {
e.try_into()
.context("failed to call `get-flags`")
.unwrap_or_else(types::Error::trap)
})?;
let flags = self.get_flags(fd.borrowed()).await.map_err(|e| {
e.try_into()
.context("failed to call `get-flags`")
.unwrap_or_else(types::Error::trap)
})?;
let fs_filetype = self
.get_type(Resource::new_borrow(fd))
.get_type(fd.borrowed())
.await
.map_err(|e| {
e.try_into()
@ -1224,14 +1209,15 @@ impl<
/// Return the attributes of an open file.
#[instrument(skip(self))]
async fn fd_filestat_get(&mut self, fd: types::Fd) -> Result<types::Filestat, types::Error> {
let desc = self.transact()?.get_descriptor(fd)?.clone();
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
match desc {
Descriptor::Stdin { isatty, .. }
| Descriptor::Stdout { isatty, .. }
| Descriptor::Stderr { isatty, .. } => Ok(types::Filestat {
dev: 0,
ino: 0,
filetype: isatty.into(),
filetype: (*isatty).into(),
nlink: 0,
size: 0,
atim: 0,
@ -1239,6 +1225,8 @@ impl<
ctim: 0,
}),
Descriptor::PreopenDirectory((fd, _)) | Descriptor::File(File { fd, .. }) => {
let fd = fd.borrowed();
drop(t);
let filesystem::DescriptorStat {
type_,
link_count: nlink,
@ -1246,19 +1234,16 @@ impl<
data_access_timestamp,
data_modification_timestamp,
status_change_timestamp,
} = self.stat(Resource::new_borrow(fd)).await.map_err(|e| {
} = self.stat(fd.borrowed()).await.map_err(|e| {
e.try_into()
.context("failed to call `stat`")
.unwrap_or_else(types::Error::trap)
})?;
let metadata_hash =
self.metadata_hash(Resource::new_borrow(fd))
.await
.map_err(|e| {
e.try_into()
.context("failed to call `metadata_hash`")
.unwrap_or_else(types::Error::trap)
})?;
let metadata_hash = self.metadata_hash(fd).await.map_err(|e| {
e.try_into()
.context("failed to call `metadata_hash`")
.unwrap_or_else(types::Error::trap)
})?;
let filetype = type_.try_into().map_err(types::Error::trap)?;
let zero = wall_clock::Datetime {
seconds: 0,
@ -1334,26 +1319,29 @@ impl<
fd: types::Fd,
iovs: &types::IovecArray<'a>,
) -> Result<types::Size, types::Error> {
let desc = self.transact()?.get_descriptor(fd)?.clone();
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
let (mut buf, read, state) = match desc {
Descriptor::File(File {
fd,
blocking_mode,
position,
..
}) if self.table().is_file(&Resource::new_borrow(fd)) => {
}) if t.view.table().get_resource(fd)?.is_file() => {
let fd = fd.borrowed();
let blocking_mode = *blocking_mode;
let position = position.clone();
drop(t);
let Some(buf) = first_non_empty_iovec(iovs)? else {
return Ok(0);
};
let pos = position.load(Ordering::Relaxed);
let stream = self
.read_via_stream(Resource::new_borrow(fd), pos)
.map_err(|e| {
e.try_into()
.context("failed to call `read-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let stream = self.read_via_stream(fd.borrowed(), pos).map_err(|e| {
e.try_into()
.context("failed to call `read-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let (read, state) = blocking_mode.read(self, stream, buf.len()).await?;
let n = read.len().try_into()?;
let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?;
@ -1362,13 +1350,15 @@ impl<
(buf, read, state)
}
Descriptor::Stdin { input_stream, .. } => {
let input = Resource::new_borrow(*input_stream);
drop(t);
let Some(buf) = first_non_empty_iovec(iovs)? else {
return Ok(0);
};
let (read, state) = stream_res(
streams::HostInputStream::blocking_read(
self,
Resource::new_borrow(input_stream),
input,
buf.len().try_into().unwrap_or(u64::MAX),
)
.await,
@ -1398,22 +1388,24 @@ impl<
iovs: &types::IovecArray<'a>,
offset: types::Filesize,
) -> Result<types::Size, types::Error> {
let desc = self.transact()?.get_descriptor(fd)?.clone();
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
let (mut buf, read, state) = match desc {
Descriptor::File(File {
fd, blocking_mode, ..
}) if self.table().is_file(&Resource::new_borrow(fd)) => {
}) if t.view.table().get_resource(fd)?.is_file() => {
let fd = fd.borrowed();
let blocking_mode = *blocking_mode;
drop(t);
let Some(buf) = first_non_empty_iovec(iovs)? else {
return Ok(0);
};
let stream = self
.read_via_stream(Resource::new_borrow(fd), offset)
.map_err(|e| {
e.try_into()
.context("failed to call `read-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let stream = self.read_via_stream(fd, offset).map_err(|e| {
e.try_into()
.context("failed to call `read-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let (read, state) = blocking_mode.read(self, stream, buf.len()).await?;
(buf, read, state)
}
@ -1443,30 +1435,32 @@ impl<
fd: types::Fd,
ciovs: &types::CiovecArray<'a>,
) -> Result<types::Size, types::Error> {
let desc = self.transact()?.get_descriptor(fd)?.clone();
match desc {
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
match *desc {
Descriptor::File(File {
fd,
ref fd,
blocking_mode,
append,
position,
}) if self.table().is_file(&Resource::new_borrow(fd)) => {
ref position,
}) if t.view.table().get_resource(fd)?.is_file() => {
let fd = fd.borrowed();
let position = position.clone();
drop(t);
let Some(buf) = first_non_empty_ciovec(ciovs)? else {
return Ok(0);
};
let (stream, pos) = if append {
let stream = self
.append_via_stream(Resource::new_borrow(fd))
.map_err(|e| {
e.try_into()
.context("failed to call `append-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let stream = self.append_via_stream(fd.borrowed()).map_err(|e| {
e.try_into()
.context("failed to call `append-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
(stream, 0)
} else {
let position = position.load(Ordering::Relaxed);
let stream = self
.write_via_stream(Resource::new_borrow(fd), position)
.write_via_stream(fd.borrowed(), position)
.map_err(|e| {
e.try_into()
.context("failed to call `write-via-stream`")
@ -1482,6 +1476,7 @@ impl<
Ok(n.try_into()?)
}
Descriptor::Stdout { output_stream, .. } | Descriptor::Stderr { output_stream, .. } => {
drop(t);
let Some(buf) = first_non_empty_ciovec(ciovs)? else {
return Ok(0);
};
@ -1503,21 +1498,24 @@ impl<
ciovs: &types::CiovecArray<'a>,
offset: types::Filesize,
) -> Result<types::Size, types::Error> {
let desc = self.transact()?.get_descriptor(fd)?.clone();
let n = match desc {
let t = self.transact()?;
let desc = t.get_descriptor(fd)?;
let n = match *desc {
Descriptor::File(File {
fd, blocking_mode, ..
}) if self.table().is_file(&Resource::new_borrow(fd)) => {
ref fd,
blocking_mode,
..
}) if t.view.table().get_resource(fd)?.is_file() => {
let fd = fd.borrowed();
drop(t);
let Some(buf) = first_non_empty_ciovec(ciovs)? else {
return Ok(0);
};
let stream = self
.write_via_stream(Resource::new_borrow(fd), offset)
.map_err(|e| {
e.try_into()
.context("failed to call `write-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
let stream = self.write_via_stream(fd, offset).map_err(|e| {
e.try_into()
.context("failed to call `write-via-stream`")
.unwrap_or_else(types::Error::trap)
})?;
blocking_mode.write(self, stream, &buf).await?
}
Descriptor::Stdout { .. } | Descriptor::Stderr { .. } => {
@ -1562,9 +1560,8 @@ impl<
#[instrument(skip(self))]
fn fd_renumber(&mut self, from: types::Fd, to: types::Fd) -> Result<(), types::Error> {
let mut st = self.transact()?;
let descriptors = st.descriptors.get_mut();
let desc = descriptors.remove(from).ok_or(types::Errno::Badf)?;
descriptors.insert(to.into(), desc);
let desc = st.descriptors.remove(from).ok_or(types::Errno::Badf)?;
st.descriptors.insert(to.into(), desc);
Ok(())
}
@ -1577,11 +1574,11 @@ impl<
offset: types::Filedelta,
whence: types::Whence,
) -> Result<types::Filesize, types::Error> {
let (fd, position) = {
let mut st = self.transact()?;
let File { fd, position, .. } = st.get_seekable(fd)?;
(*fd, Arc::clone(&position))
};
let t = self.transact()?;
let File { fd, position, .. } = t.get_seekable(fd)?;
let fd = fd.borrowed();
let position = position.clone();
drop(t);
let pos = match whence {
types::Whence::Set if offset >= 0 => offset as _,
types::Whence::Cur => position
@ -1589,12 +1586,11 @@ impl<
.checked_add_signed(offset)
.ok_or(types::Errno::Inval)?,
types::Whence::End => {
let filesystem::DescriptorStat { size, .. } =
self.stat(Resource::new_borrow(fd)).await.map_err(|e| {
e.try_into()
.context("failed to call `stat`")
.unwrap_or_else(types::Error::trap)
})?;
let filesystem::DescriptorStat { size, .. } = self.stat(fd).await.map_err(|e| {
e.try_into()
.context("failed to call `stat`")
.unwrap_or_else(types::Error::trap)
})?;
size.checked_add_signed(offset).ok_or(types::Errno::Inval)?
}
_ => return Err(types::Errno::Inval.into()),
@ -1678,7 +1674,7 @@ impl<
for (entry, d_next) in self
.table_mut()
// remove iterator from table and use it directly:
.delete_readdir(stream)?
.delete_resource(stream)?
.into_iter()
.zip(3u64..)
{
@ -1914,21 +1910,19 @@ impl<
flags |= filesystem::DescriptorFlags::REQUESTED_WRITE_SYNC;
}
let desc = self.transact()?.get_descriptor(dirfd)?.clone();
let dirfd = match desc {
Descriptor::PreopenDirectory((fd, _)) => fd,
Descriptor::File(File { fd, .. }) if self.table().is_dir(&Resource::new_borrow(fd)) => {
fd
}
Descriptor::File(File { fd: _, .. }) => {
// NOTE: Unlike most other methods, legacy implementation returns `NOTDIR` here
return Err(types::Errno::Notdir.into());
let t = self.transact()?;
let dirfd = match t.get_descriptor(dirfd)? {
Descriptor::PreopenDirectory((fd, _)) => fd.borrowed(),
Descriptor::File(File { fd, .. }) => {
t.view.table().get_resource(fd)?.dir()?;
fd.borrowed()
}
_ => return Err(types::Errno::Badf.into()),
};
drop(t);
let fd = self
.open_at(
Resource::new_borrow(dirfd),
dirfd,
dirflags.into(),
path,
oflags.into(),
@ -1941,8 +1935,8 @@ impl<
.context("failed to call `open-at`")
.unwrap_or_else(types::Error::trap)
})?;
let fd = self.transact()?.descriptors.get_mut().push_file(File {
fd: fd.rep(),
let fd = self.transact()?.descriptors.push_file(File {
fd,
position: Default::default(),
append: fdflags.contains(types::Fdflags::APPEND),
blocking_mode: BlockingMode::from_fdflags(&fdflags),
@ -2140,3 +2134,13 @@ impl<
todo!("preview1 sock_shutdown is not implemented")
}
}
trait ResourceExt<T> {
fn borrowed(&self) -> Resource<T>;
}
impl<T: 'static> ResourceExt<T> for Resource<T> {
fn borrowed(&self) -> Resource<T> {
Resource::new_borrow(self.rep())
}
}

49
crates/wasi/src/preview2/stdio.rs

@ -4,7 +4,6 @@ use crate::preview2::bindings::cli::{
};
use crate::preview2::bindings::io::streams;
use crate::preview2::pipe::{self, AsyncWriteStream};
use crate::preview2::stream::TableStreamExt;
use crate::preview2::{HostInputStream, HostOutputStream, WasiView};
use std::io::IsTerminal;
use wasmtime::component::Resource;
@ -160,72 +159,68 @@ pub enum IsATTY {
impl<T: WasiView> stdin::Host for T {
fn get_stdin(&mut self) -> Result<Resource<streams::InputStream>, anyhow::Error> {
let stream = self.ctx_mut().stdin.stream();
Ok(self.table_mut().push_input_stream(stream)?)
Ok(self
.table_mut()
.push_resource(streams::InputStream::Host(stream))?)
}
}
impl<T: WasiView> stdout::Host for T {
fn get_stdout(&mut self) -> Result<Resource<streams::OutputStream>, anyhow::Error> {
let stream = self.ctx_mut().stdout.stream();
Ok(self.table_mut().push_output_stream(stream)?)
Ok(self.table_mut().push_resource(stream)?)
}
}
impl<T: WasiView> stderr::Host for T {
fn get_stderr(&mut self) -> Result<Resource<streams::OutputStream>, anyhow::Error> {
let stream = self.ctx_mut().stderr.stream();
Ok(self.table_mut().push_output_stream(stream)?)
Ok(self.table_mut().push_resource(stream)?)
}
}
pub struct HostTerminalInput;
pub struct HostTerminalOutput;
pub struct TerminalInput;
pub struct TerminalOutput;
impl<T: WasiView> terminal_input::Host for T {}
impl<T: WasiView> crate::preview2::bindings::cli::terminal_input::HostTerminalInput for T {
fn drop(&mut self, r: Resource<terminal_input::TerminalInput>) -> anyhow::Result<()> {
self.table_mut().delete::<HostTerminalInput>(r.rep())?;
impl<T: WasiView> terminal_input::HostTerminalInput for T {
fn drop(&mut self, r: Resource<TerminalInput>) -> anyhow::Result<()> {
self.table_mut().delete_resource(r)?;
Ok(())
}
}
impl<T: WasiView> terminal_output::Host for T {}
impl<T: WasiView> crate::preview2::bindings::cli::terminal_output::HostTerminalOutput for T {
fn drop(&mut self, r: Resource<terminal_output::TerminalOutput>) -> anyhow::Result<()> {
self.table_mut().delete::<HostTerminalOutput>(r.rep())?;
impl<T: WasiView> terminal_output::HostTerminalOutput for T {
fn drop(&mut self, r: Resource<TerminalOutput>) -> anyhow::Result<()> {
self.table_mut().delete_resource(r)?;
Ok(())
}
}
impl<T: WasiView> terminal_stdin::Host for T {
fn get_terminal_stdin(
&mut self,
) -> anyhow::Result<Option<Resource<terminal_input::TerminalInput>>> {
fn get_terminal_stdin(&mut self) -> anyhow::Result<Option<Resource<TerminalInput>>> {
if self.ctx().stdin.isatty() {
let fd = self.table_mut().push(Box::new(HostTerminalInput))?;
Ok(Some(Resource::new_own(fd)))
let fd = self.table_mut().push_resource(TerminalInput)?;
Ok(Some(fd))
} else {
Ok(None)
}
}
}
impl<T: WasiView> terminal_stdout::Host for T {
fn get_terminal_stdout(
&mut self,
) -> anyhow::Result<Option<Resource<terminal_output::TerminalOutput>>> {
fn get_terminal_stdout(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
if self.ctx().stdout.isatty() {
let fd = self.table_mut().push(Box::new(HostTerminalOutput))?;
Ok(Some(Resource::new_own(fd)))
let fd = self.table_mut().push_resource(TerminalOutput)?;
Ok(Some(fd))
} else {
Ok(None)
}
}
}
impl<T: WasiView> terminal_stderr::Host for T {
fn get_terminal_stderr(
&mut self,
) -> anyhow::Result<Option<Resource<terminal_output::TerminalOutput>>> {
fn get_terminal_stderr(&mut self) -> anyhow::Result<Option<Resource<TerminalOutput>>> {
if self.ctx().stderr.isatty() {
let fd = self.table_mut().push(Box::new(HostTerminalOutput))?;
Ok(Some(Resource::new_own(fd)))
let fd = self.table_mut().push_resource(TerminalOutput)?;
Ok(Some(fd))
} else {
Ok(None)
}

202
crates/wasi/src/preview2/stream.rs

@ -1,10 +1,7 @@
use crate::preview2::bindings::io::streams::{InputStream, OutputStream};
use crate::preview2::filesystem::FileInputStream;
use crate::preview2::{Table, TableError};
use anyhow::Error;
use bytes::Bytes;
use std::fmt;
use wasmtime::component::Resource;
/// An error which should be reported to Wasm as a runtime error, rather than
/// an error which should trap Wasm execution. The definition for runtime
@ -168,204 +165,9 @@ pub trait HostOutputStream: Send + Sync {
}
}
pub(crate) enum InternalInputStream {
pub enum InputStream {
Host(Box<dyn HostInputStream>),
File(FileInputStream),
}
pub(crate) trait InternalTableStreamExt {
fn push_internal_input_stream(
&mut self,
istream: InternalInputStream,
) -> Result<Resource<InputStream>, TableError>;
fn push_internal_input_stream_child<T: 'static>(
&mut self,
istream: InternalInputStream,
parent: Resource<T>,
) -> Result<Resource<InputStream>, TableError>;
fn get_internal_input_stream_mut(
&mut self,
fd: &Resource<InputStream>,
) -> Result<&mut InternalInputStream, TableError>;
fn delete_internal_input_stream(
&mut self,
fd: Resource<InputStream>,
) -> Result<InternalInputStream, TableError>;
}
impl InternalTableStreamExt for Table {
fn push_internal_input_stream(
&mut self,
istream: InternalInputStream,
) -> Result<Resource<InputStream>, TableError> {
Ok(Resource::new_own(self.push(Box::new(istream))?))
}
fn push_internal_input_stream_child<T: 'static>(
&mut self,
istream: InternalInputStream,
parent: Resource<T>,
) -> Result<Resource<InputStream>, TableError> {
Ok(Resource::new_own(
self.push_child(Box::new(istream), parent.rep())?,
))
}
fn get_internal_input_stream_mut(
&mut self,
fd: &Resource<InputStream>,
) -> Result<&mut InternalInputStream, TableError> {
self.get_mut(fd.rep())
}
fn delete_internal_input_stream(
&mut self,
fd: Resource<InputStream>,
) -> Result<InternalInputStream, TableError> {
self.delete(fd.rep())
}
}
/// Extension trait for managing [`HostInputStream`]s and [`HostOutputStream`]s in the [`Table`].
pub trait TableStreamExt {
/// Push a [`HostInputStream`] into a [`Table`], returning the table index.
fn push_input_stream(
&mut self,
istream: Box<dyn HostInputStream>,
) -> Result<Resource<InputStream>, TableError>;
/// Same as [`push_input_stream`](Self::push_output_stream) except assigns a parent resource to
/// the input-stream created.
fn push_input_stream_child<T: 'static>(
&mut self,
istream: Box<dyn HostInputStream>,
parent: Resource<T>,
) -> Result<Resource<InputStream>, TableError>;
/// Get a mutable reference to a [`HostInputStream`] in a [`Table`].
fn get_input_stream_mut(
&mut self,
fd: &Resource<InputStream>,
) -> Result<&mut dyn HostInputStream, TableError>;
/// Remove [`HostInputStream`] from table:
fn delete_input_stream(
&mut self,
fd: Resource<InputStream>,
) -> Result<Box<dyn HostInputStream>, TableError>;
/// Push a [`HostOutputStream`] into a [`Table`], returning the table index.
fn push_output_stream(
&mut self,
ostream: Box<dyn HostOutputStream>,
) -> Result<Resource<OutputStream>, TableError>;
/// Same as [`push_output_stream`](Self::push_output_stream) except assigns a parent resource
/// to the output-stream created.
fn push_output_stream_child<T: 'static>(
&mut self,
ostream: Box<dyn HostOutputStream>,
parent: Resource<T>,
) -> Result<Resource<OutputStream>, TableError>;
/// Get a mutable reference to a [`HostOutputStream`] in a [`Table`].
fn get_output_stream_mut(
&mut self,
fd: &Resource<OutputStream>,
) -> Result<&mut dyn HostOutputStream, TableError>;
/// Remove [`HostOutputStream`] from table:
fn delete_output_stream(
&mut self,
fd: Resource<OutputStream>,
) -> Result<Box<dyn HostOutputStream>, TableError>;
}
impl TableStreamExt for Table {
fn push_input_stream(
&mut self,
istream: Box<dyn HostInputStream>,
) -> Result<Resource<InputStream>, TableError> {
self.push_internal_input_stream(InternalInputStream::Host(istream))
}
fn push_input_stream_child<T: 'static>(
&mut self,
istream: Box<dyn HostInputStream>,
parent: Resource<T>,
) -> Result<Resource<InputStream>, TableError> {
self.push_internal_input_stream_child(InternalInputStream::Host(istream), parent)
}
fn get_input_stream_mut(
&mut self,
fd: &Resource<InputStream>,
) -> Result<&mut dyn HostInputStream, TableError> {
match self.get_internal_input_stream_mut(fd)? {
InternalInputStream::Host(ref mut h) => Ok(h.as_mut()),
_ => Err(TableError::WrongType),
}
}
fn delete_input_stream(
&mut self,
fd: Resource<InputStream>,
) -> Result<Box<dyn HostInputStream>, TableError> {
let occ = self.entry(fd.rep())?;
match occ.get().downcast_ref::<InternalInputStream>() {
Some(InternalInputStream::Host(_)) => {
let any = occ.remove_entry()?;
match *any.downcast().expect("downcast checked above") {
InternalInputStream::Host(h) => Ok(h),
_ => unreachable!("variant checked above"),
}
}
_ => Err(TableError::WrongType),
}
}
fn push_output_stream(
&mut self,
ostream: Box<dyn HostOutputStream>,
) -> Result<Resource<OutputStream>, TableError> {
Ok(Resource::new_own(self.push(Box::new(ostream))?))
}
fn push_output_stream_child<T: 'static>(
&mut self,
ostream: Box<dyn HostOutputStream>,
parent: Resource<T>,
) -> Result<Resource<OutputStream>, TableError> {
Ok(Resource::new_own(
self.push_child(Box::new(ostream), parent.rep())?,
))
}
fn get_output_stream_mut(
&mut self,
fd: &Resource<OutputStream>,
) -> Result<&mut dyn HostOutputStream, TableError> {
let boxed: &mut Box<dyn HostOutputStream> = self.get_mut(fd.rep())?;
Ok(boxed.as_mut())
}
fn delete_output_stream(
&mut self,
fd: Resource<OutputStream>,
) -> Result<Box<dyn HostOutputStream>, TableError> {
self.delete(fd.rep())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn input_stream_in_table() {
let dummy = crate::preview2::pipe::ClosedInputStream;
let mut table = Table::new();
// Put it into the table:
let ix = table.push_input_stream(Box::new(dummy)).unwrap();
// Get a mut ref to it:
let _ = table.get_input_stream_mut(&ix).unwrap();
// Delete it:
let _ = table.delete_input_stream(ix).unwrap();
}
#[test]
fn output_stream_in_table() {
let dummy = crate::preview2::pipe::SinkOutputStream;
let mut table = Table::new();
// Put it in the table:
let ix = table.push_output_stream(Box::new(dummy)).unwrap();
// Get a mut ref to it:
let _ = table.get_output_stream_mut(&ix).unwrap();
// Delete it:
let _ = table.delete_output_stream(ix).unwrap();
}
}
pub type OutputStream = Box<dyn HostOutputStream>;

83
crates/wasi/src/preview2/table.rs

@ -1,5 +1,6 @@
use std::any::Any;
use std::collections::{BTreeSet, HashMap};
use wasmtime::component::Resource;
#[derive(thiserror::Error, Debug)]
pub enum TableError {
@ -63,32 +64,6 @@ impl TableEntry {
}
}
/// Like [`std::collections::hash_map::OccupiedEntry`], with a subset of
/// methods available in order to uphold [`Table`] invariants.
pub struct OccupiedEntry<'a> {
table: &'a mut Table,
index: u32,
}
impl<'a> OccupiedEntry<'a> {
/// Get the dynamically-typed reference to the resource.
pub fn get(&self) -> &(dyn Any + Send + Sync + 'static) {
self.table.map.get(&self.index).unwrap().entry.as_ref()
}
/// Get the dynamically-typed mutable reference to the resource.
pub fn get_mut(&mut self) -> &mut (dyn Any + Send + Sync + 'static) {
self.table.map.get_mut(&self.index).unwrap().entry.as_mut()
}
/// Remove the resource from the table, returning the contents of the
/// resource.
/// May fail with [`TableError::HasChildren`] if the entry has any
/// children, see [`Table::push_child`].
/// If this method fails, the [`OccupiedEntry`] is consumed, but the
/// resource remains in the table.
pub fn remove_entry(self) -> Result<Box<dyn Any + Send + Sync>, TableError> {
self.table.delete_entry(self.index).map(|e| e.entry)
}
}
impl Table {
/// Create an empty table
pub fn new() -> Self {
@ -107,13 +82,21 @@ impl Table {
self.push_(TableEntry::new(entry, None))
}
/// Same as `push`, but typed.
pub fn push_resource<T>(&mut self, entry: T) -> Result<Resource<T>, TableError>
where
T: Send + Sync + 'static,
{
let idx = self.push(Box::new(entry))?;
Ok(Resource::new_own(idx))
}
/// Insert a resource at the next available index, and track that it has a
/// parent resource.
///
/// The parent must exist to create a child. All children resources must
/// be destroyed before a parent can be destroyed - otherwise [`Table::delete`]
/// or [`OccupiedEntry::remove_entry`] will fail with
/// [`TableError::HasChildren`].
/// will fail with [`TableError::HasChildren`].
///
/// Parent-child relationships are tracked inside the table to ensure that
/// a parent resource is not deleted while it has live children. This
@ -142,6 +125,20 @@ impl Table {
Ok(child)
}
/// Same as `push_child`, but typed.
pub fn push_child_resource<T, U>(
&mut self,
entry: T,
parent: &Resource<U>,
) -> Result<Resource<T>, TableError>
where
T: Send + Sync + 'static,
U: 'static,
{
let idx = self.push_child(Box::new(entry), parent.rep())?;
Ok(Resource::new_own(idx))
}
fn push_(&mut self, e: TableEntry) -> Result<u32, TableError> {
// NOTE: The performance of this new key calculation could be very bad once keys wrap
// around.
@ -216,16 +213,17 @@ impl Table {
}
}
/// Get an [`OccupiedEntry`] corresponding to a table entry, if it exists. This allows you to
/// remove or replace the entry based on its contents. The methods available are a subset of
/// [`std::collections::hash_map::OccupiedEntry`] - it does not give access to the key, it
/// restricts replacing the entry to items of the same type, and it does not allow for deletion.
pub fn entry<'a>(&'a mut self, index: u32) -> Result<OccupiedEntry<'a>, TableError> {
if self.map.contains_key(&index) {
Ok(OccupiedEntry { table: self, index })
} else {
Err(TableError::NotPresent)
}
/// Same as `get`, but typed
pub fn get_resource<T: Any + Sized>(&self, key: &Resource<T>) -> Result<&T, TableError> {
self.get(key.rep())
}
/// Same as `get_mut`, but typed
pub fn get_resource_mut<T: Any + Sized>(
&mut self,
key: &Resource<T>,
) -> Result<&mut T, TableError> {
self.get_mut(key.rep())
}
fn delete_entry(&mut self, key: u32) -> Result<TableEntry, TableError> {
@ -283,6 +281,15 @@ impl Table {
}
}
/// Same as `delete`, but typed
pub fn delete_resource<T>(&mut self, resource: Resource<T>) -> Result<T, TableError>
where
T: Any,
{
debug_assert!(resource.owned());
self.delete(resource.rep())
}
/// Zip the values of the map with mutable references to table entries corresponding to each
/// key. As the keys in the [HashMap] are unique, this iterator can give mutable references
/// with the same lifetime as the mutable reference to the [Table].

69
crates/wasi/src/preview2/tcp.rs

@ -1,20 +1,17 @@
use super::{HostInputStream, HostOutputStream, OutputStreamError};
use crate::preview2::bindings::sockets::tcp::TcpSocket;
use crate::preview2::{
with_ambient_tokio_runtime, AbortOnDropJoinHandle, StreamState, Table, TableError,
};
use crate::preview2::stream::{InputStream, OutputStream};
use crate::preview2::{with_ambient_tokio_runtime, AbortOnDropJoinHandle, StreamState};
use cap_net_ext::{AddressFamily, Blocking, TcpListenerExt};
use cap_std::net::TcpListener;
use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike};
use std::io;
use std::sync::Arc;
use wasmtime::component::Resource;
/// The state of a TCP socket.
///
/// This represents the various states a socket can be in during the
/// activities of binding, listening, accepting, and connecting.
pub(crate) enum HostTcpState {
pub(crate) enum TcpState {
/// The initial state for a newly-created socket.
Default,
@ -45,13 +42,13 @@ pub(crate) enum HostTcpState {
///
/// The inner state is wrapped in an Arc because the same underlying socket is
/// used for implementing the stream types.
pub(crate) struct HostTcpSocketState {
/// The part of a `HostTcpSocketState` which is reference-counted so that we
pub struct TcpSocket {
/// The part of a `TcpSocket` which is reference-counted so that we
/// can pass it to async tasks.
pub(crate) inner: Arc<tokio::net::TcpStream>,
/// The current state in the bind/listen/accept/connect progression.
pub(crate) tcp_state: HostTcpState,
pub(crate) tcp_state: TcpState,
}
pub(crate) struct TcpReadStream {
@ -215,7 +212,7 @@ impl HostOutputStream for TcpWriteStream {
}
}
impl HostTcpSocketState {
impl TcpSocket {
/// Create a new socket in the given family.
pub fn new(family: AddressFamily) -> io::Result<Self> {
// Create a new host socket and set it to non-blocking, which is needed
@ -224,7 +221,7 @@ impl HostTcpSocketState {
Self::from_tcp_listener(tcp_listener)
}
/// Create a `HostTcpSocketState` from an existing socket.
/// Create a `TcpSocket` from an existing socket.
///
/// The socket must be in non-blocking mode.
pub fn from_tcp_stream(tcp_socket: cap_std::net::TcpStream) -> io::Result<Self> {
@ -239,7 +236,7 @@ impl HostTcpSocketState {
Ok(Self {
inner: Arc::new(stream),
tcp_state: HostTcpState::Default,
tcp_state: TcpState::Default,
})
}
@ -248,53 +245,9 @@ impl HostTcpSocketState {
}
/// Create the input/output stream pair for a tcp socket.
pub fn as_split(&self) -> (Box<impl HostInputStream>, Box<impl HostOutputStream>) {
pub fn as_split(&self) -> (InputStream, OutputStream) {
let input = Box::new(TcpReadStream::new(self.inner.clone()));
let output = Box::new(TcpWriteStream::new(self.inner.clone()));
(input, output)
}
}
pub(crate) trait TableTcpSocketExt {
fn push_tcp_socket(
&mut self,
tcp_socket: HostTcpSocketState,
) -> Result<Resource<TcpSocket>, TableError>;
fn delete_tcp_socket(
&mut self,
fd: Resource<TcpSocket>,
) -> Result<HostTcpSocketState, TableError>;
fn is_tcp_socket(&self, fd: &Resource<TcpSocket>) -> bool;
fn get_tcp_socket(&self, fd: &Resource<TcpSocket>) -> Result<&HostTcpSocketState, TableError>;
fn get_tcp_socket_mut(
&mut self,
fd: &Resource<TcpSocket>,
) -> Result<&mut HostTcpSocketState, TableError>;
}
impl TableTcpSocketExt for Table {
fn push_tcp_socket(
&mut self,
tcp_socket: HostTcpSocketState,
) -> Result<Resource<TcpSocket>, TableError> {
Ok(Resource::new_own(self.push(Box::new(tcp_socket))?))
}
fn delete_tcp_socket(
&mut self,
fd: Resource<TcpSocket>,
) -> Result<HostTcpSocketState, TableError> {
self.delete(fd.rep())
}
fn is_tcp_socket(&self, fd: &Resource<TcpSocket>) -> bool {
self.is::<HostTcpSocketState>(fd.rep())
}
fn get_tcp_socket(&self, fd: &Resource<TcpSocket>) -> Result<&HostTcpSocketState, TableError> {
self.get(fd.rep())
}
fn get_tcp_socket_mut(
&mut self,
fd: &Resource<TcpSocket>,
) -> Result<&mut HostTcpSocketState, TableError> {
self.get_mut(fd.rep())
(InputStream::Host(input), output)
}
}

Loading…
Cancel
Save