Browse Source
* Implement wasi-keyvalue * Allow preset data for In-Memory provider, rename allow_hosts to allow_redis_hosts * Add vets --------- Co-authored-by: Alex Crichton <alex@alexcrichton.com>pull/9030/head
Xinzhao Xu
3 months ago
committed by
GitHub
24 changed files with 1231 additions and 1 deletions
@ -0,0 +1,51 @@ |
|||
use test_programs::keyvalue::wasi::keyvalue::{atomics, batch, store}; |
|||
|
|||
fn main() { |
|||
let identifier = std::env::var_os("IDENTIFIER") |
|||
.unwrap() |
|||
.into_string() |
|||
.unwrap(); |
|||
let bucket = store::open(&identifier).unwrap(); |
|||
|
|||
if identifier != "" { |
|||
// for In-Memory provider, we have preset this data
|
|||
assert_eq!(atomics::increment(&bucket, "atomics_key", 5).unwrap(), 5); |
|||
} |
|||
assert_eq!(atomics::increment(&bucket, "atomics_key", 1).unwrap(), 6); |
|||
|
|||
let resp = bucket.list_keys(None).unwrap(); |
|||
assert_eq!(resp.keys, vec!["atomics_key".to_string()]); |
|||
|
|||
bucket.set("hello", "world".as_bytes()).unwrap(); |
|||
|
|||
let v = bucket.get("hello").unwrap(); |
|||
assert_eq!(String::from_utf8(v.unwrap()).unwrap(), "world"); |
|||
|
|||
assert_eq!(bucket.exists("hello").unwrap(), true); |
|||
bucket.delete("hello").unwrap(); |
|||
assert_eq!(bucket.exists("hello").unwrap(), false); |
|||
|
|||
batch::set_many( |
|||
&bucket, |
|||
&[ |
|||
("a1".to_string(), "v1".as_bytes().to_vec()), |
|||
("b1".to_string(), "v1".as_bytes().to_vec()), |
|||
("c1".to_string(), "v1".as_bytes().to_vec()), |
|||
], |
|||
) |
|||
.unwrap(); |
|||
batch::delete_many(&bucket, &["a1".to_string(), "c1".to_string()]).unwrap(); |
|||
let values = batch::get_many( |
|||
&bucket, |
|||
&["a1".to_string(), "b1".to_string(), "c1".to_string()], |
|||
) |
|||
.unwrap(); |
|||
assert_eq!( |
|||
values, |
|||
vec![ |
|||
None, |
|||
Some(("b1".to_string(), "v1".as_bytes().to_vec())), |
|||
None |
|||
] |
|||
); |
|||
} |
@ -0,0 +1,26 @@ |
|||
[package] |
|||
name = "wasmtime-wasi-keyvalue" |
|||
version.workspace = true |
|||
authors.workspace = true |
|||
edition.workspace = true |
|||
repository = "https://github.com/bytecodealliance/wasmtime" |
|||
license = "Apache-2.0 WITH LLVM-exception" |
|||
description = "Wasmtime implementation of the wasi-keyvalue API" |
|||
|
|||
[lints] |
|||
workspace = true |
|||
|
|||
[dependencies] |
|||
anyhow = { workspace = true } |
|||
wasmtime = { workspace = true, features = ["runtime", "async", "component-model"] } |
|||
async-trait = { workspace = true } |
|||
url = { workspace = true } |
|||
redis = { workspace = true, optional = true, features = ["tokio-comp"] } |
|||
|
|||
[dev-dependencies] |
|||
test-programs-artifacts = { workspace = true } |
|||
wasmtime-wasi = { workspace = true } |
|||
tokio = { workspace = true, features = ["macros"] } |
|||
|
|||
[features] |
|||
redis = ["dep:redis"] |
@ -0,0 +1,442 @@ |
|||
//! # Wasmtime's [wasi-keyvalue] Implementation
|
|||
//!
|
|||
//! This crate provides a Wasmtime host implementation of the [wasi-keyvalue]
|
|||
//! API. With this crate, the runtime can run components that call APIs in
|
|||
//! [wasi-keyvalue] and provide components with access to key-value storages.
|
|||
//!
|
|||
//! Currently supported storage backends:
|
|||
//! * In-Memory (empty identifier)
|
|||
//! * Redis, supported identifier format:
|
|||
//! * `redis://[<username>][:<password>@]<hostname>[:port][/<db>]`
|
|||
//! * `redis+unix:///<path>[?db=<db>[&pass=<password>][&user=<username>]]`
|
|||
//!
|
|||
//! # Examples
|
|||
//!
|
|||
//! The usage of this crate is very similar to other WASI API implementations
|
|||
//! such as [wasi:cli] and [wasi:http].
|
|||
//!
|
|||
//! A common scenario is accessing redis in a [wasi:cli] component.
|
|||
//! A standalone example of doing all this looks like:
|
|||
//!
|
|||
//! ```
|
|||
//! use wasmtime::{
|
|||
//! component::{Linker, ResourceTable},
|
|||
//! Config, Engine, Result, Store,
|
|||
//! };
|
|||
//! use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiView};
|
|||
//! use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
|
|||
//!
|
|||
//! #[tokio::main]
|
|||
//! async fn main() -> Result<()> {
|
|||
//! let mut config = Config::new();
|
|||
//! config.async_support(true);
|
|||
//! let engine = Engine::new(&config)?;
|
|||
//!
|
|||
//! let mut store = Store::new(&engine, Ctx {
|
|||
//! table: ResourceTable::new(),
|
|||
//! wasi_ctx: WasiCtxBuilder::new().build(),
|
|||
//! wasi_keyvalue_ctx: WasiKeyValueCtxBuilder::new().build(),
|
|||
//! });
|
|||
//!
|
|||
//! let mut linker = Linker::<Ctx>::new(&engine);
|
|||
//! wasmtime_wasi::add_to_linker_async(&mut linker)?;
|
|||
//! // add `wasi-runtime-config` world's interfaces to the linker
|
|||
//! wasmtime_wasi_keyvalue::add_to_linker(&mut linker, |h: &mut Ctx| {
|
|||
//! WasiKeyValue::new(&h.wasi_keyvalue_ctx, &mut h.table)
|
|||
//! })?;
|
|||
//!
|
|||
//! // ... use `linker` to instantiate within `store` ...
|
|||
//!
|
|||
//! Ok(())
|
|||
//! }
|
|||
//!
|
|||
//! struct Ctx {
|
|||
//! table: ResourceTable,
|
|||
//! wasi_ctx: WasiCtx,
|
|||
//! wasi_keyvalue_ctx: WasiKeyValueCtx,
|
|||
//! }
|
|||
//!
|
|||
//! impl WasiView for Ctx {
|
|||
//! fn table(&mut self) -> &mut ResourceTable { &mut self.table }
|
|||
//! fn ctx(&mut self) -> &mut WasiCtx { &mut self.wasi_ctx }
|
|||
//! }
|
|||
//! ```
|
|||
//!
|
|||
//! [wasi-keyvalue]: https://github.com/WebAssembly/wasi-keyvalue
|
|||
//! [wasi:cli]: https://docs.rs/wasmtime-wasi/latest
|
|||
//! [wasi:http]: https://docs.rs/wasmtime-wasi-http/latest
|
|||
|
|||
#![deny(missing_docs)] |
|||
|
|||
mod provider; |
|||
mod generated { |
|||
wasmtime::component::bindgen!({ |
|||
path: "wit", |
|||
world: "wasi:keyvalue/imports", |
|||
trappable_imports: true, |
|||
async: true, |
|||
with: { |
|||
"wasi:keyvalue/store/bucket": crate::Bucket, |
|||
}, |
|||
trappable_error_type: { |
|||
"wasi:keyvalue/store/error" => crate::Error, |
|||
}, |
|||
}); |
|||
} |
|||
|
|||
use self::generated::wasi::keyvalue; |
|||
use anyhow::Result; |
|||
use async_trait::async_trait; |
|||
use std::collections::HashMap; |
|||
use std::fmt::Display; |
|||
use url::Url; |
|||
use wasmtime::component::{Resource, ResourceTable, ResourceTableError}; |
|||
|
|||
#[doc(hidden)] |
|||
pub enum Error { |
|||
NoSuchStore, |
|||
AccessDenied, |
|||
Other(String), |
|||
} |
|||
|
|||
impl From<ResourceTableError> for Error { |
|||
fn from(err: ResourceTableError) -> Self { |
|||
Self::Other(err.to_string()) |
|||
} |
|||
} |
|||
|
|||
pub(crate) fn to_other_error(e: impl Display) -> Error { |
|||
Error::Other(e.to_string()) |
|||
} |
|||
|
|||
#[doc(hidden)] |
|||
pub struct Bucket { |
|||
inner: Box<dyn Host + Send>, |
|||
} |
|||
|
|||
#[async_trait] |
|||
trait Host { |
|||
async fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, Error>; |
|||
|
|||
async fn set(&mut self, key: String, value: Vec<u8>) -> Result<(), Error>; |
|||
|
|||
async fn delete(&mut self, key: String) -> Result<(), Error>; |
|||
|
|||
async fn exists(&mut self, key: String) -> Result<bool, Error>; |
|||
|
|||
async fn list_keys( |
|||
&mut self, |
|||
cursor: Option<u64>, |
|||
) -> Result<keyvalue::store::KeyResponse, Error>; |
|||
|
|||
async fn increment(&mut self, key: String, delta: u64) -> Result<u64, Error>; |
|||
|
|||
async fn get_many( |
|||
&mut self, |
|||
keys: Vec<String>, |
|||
) -> Result<Vec<Option<(String, Vec<u8>)>>, Error>; |
|||
|
|||
async fn set_many(&mut self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>; |
|||
|
|||
async fn delete_many(&mut self, keys: Vec<String>) -> Result<(), Error>; |
|||
} |
|||
|
|||
/// Builder-style structure used to create a [`WasiKeyValueCtx`].
|
|||
#[derive(Default)] |
|||
pub struct WasiKeyValueCtxBuilder { |
|||
in_memory_data: HashMap<String, Vec<u8>>, |
|||
#[cfg(feature = "redis")] |
|||
allowed_redis_hosts: Vec<String>, |
|||
#[cfg(feature = "redis")] |
|||
redis_connection_timeout: Option<std::time::Duration>, |
|||
#[cfg(feature = "redis")] |
|||
redis_response_timeout: Option<std::time::Duration>, |
|||
} |
|||
|
|||
impl WasiKeyValueCtxBuilder { |
|||
/// Creates a builder for a new context with default parameters set.
|
|||
pub fn new() -> Self { |
|||
Default::default() |
|||
} |
|||
|
|||
/// Preset data for the In-Memory provider.
|
|||
pub fn in_memory_data<I, K, V>(mut self, data: I) -> Self |
|||
where |
|||
I: IntoIterator<Item = (K, V)>, |
|||
K: Into<String>, |
|||
V: Into<Vec<u8>>, |
|||
{ |
|||
self.in_memory_data = data |
|||
.into_iter() |
|||
.map(|(k, v)| (k.into(), v.into())) |
|||
.collect(); |
|||
self |
|||
} |
|||
|
|||
/// Appends a list of Redis hosts to the allow-listed set each component gets
|
|||
/// access to. It can be in the format `<hostname>[:port]` or a unix domain
|
|||
/// socket path.
|
|||
///
|
|||
/// # Examples
|
|||
///
|
|||
/// ```
|
|||
/// use wasmtime_wasi_keyvalue::WasiKeyValueCtxBuilder;
|
|||
///
|
|||
/// # fn main() {
|
|||
/// let ctx = WasiKeyValueCtxBuilder::new()
|
|||
/// .allow_redis_hosts(&["localhost:1234", "/var/run/redis.sock"])
|
|||
/// .build();
|
|||
/// # }
|
|||
/// ```
|
|||
#[cfg(feature = "redis")] |
|||
pub fn allow_redis_hosts(mut self, hosts: &[impl AsRef<str>]) -> Self { |
|||
self.allowed_redis_hosts |
|||
.extend(hosts.iter().map(|h| h.as_ref().to_owned())); |
|||
self |
|||
} |
|||
|
|||
/// Sets the connection timeout parameter for the Redis provider.
|
|||
#[cfg(feature = "redis")] |
|||
pub fn redis_connection_timeout(mut self, t: std::time::Duration) -> Self { |
|||
self.redis_connection_timeout = Some(t); |
|||
self |
|||
} |
|||
|
|||
/// Sets the response timeout parameter for the Redis provider.
|
|||
#[cfg(feature = "redis")] |
|||
pub fn redis_response_timeout(mut self, t: std::time::Duration) -> Self { |
|||
self.redis_response_timeout = Some(t); |
|||
self |
|||
} |
|||
|
|||
/// Uses the configured context so far to construct the final [`WasiKeyValueCtx`].
|
|||
pub fn build(self) -> WasiKeyValueCtx { |
|||
WasiKeyValueCtx { |
|||
in_memory_data: self.in_memory_data, |
|||
#[cfg(feature = "redis")] |
|||
allowed_redis_hosts: self.allowed_redis_hosts, |
|||
#[cfg(feature = "redis")] |
|||
redis_connection_timeout: self.redis_connection_timeout, |
|||
#[cfg(feature = "redis")] |
|||
redis_response_timeout: self.redis_response_timeout, |
|||
} |
|||
} |
|||
} |
|||
|
|||
/// Capture the state necessary for use in the `wasi-keyvalue` API implementation.
|
|||
pub struct WasiKeyValueCtx { |
|||
in_memory_data: HashMap<String, Vec<u8>>, |
|||
#[cfg(feature = "redis")] |
|||
allowed_redis_hosts: Vec<String>, |
|||
#[cfg(feature = "redis")] |
|||
redis_connection_timeout: Option<std::time::Duration>, |
|||
#[cfg(feature = "redis")] |
|||
redis_response_timeout: Option<std::time::Duration>, |
|||
} |
|||
|
|||
impl WasiKeyValueCtx { |
|||
/// Convenience function for calling [`WasiKeyValueCtxBuilder::new`].
|
|||
pub fn builder() -> WasiKeyValueCtxBuilder { |
|||
WasiKeyValueCtxBuilder::new() |
|||
} |
|||
|
|||
#[cfg(feature = "redis")] |
|||
fn allow_redis_host(&self, u: &Url) -> bool { |
|||
let host = match u.host() { |
|||
Some(h) => match u.port() { |
|||
Some(port) => format!("{}:{}", h, port), |
|||
None => h.to_string(), |
|||
}, |
|||
// unix domain socket path
|
|||
None => u.path().to_string(), |
|||
}; |
|||
self.allowed_redis_hosts.contains(&host) |
|||
} |
|||
} |
|||
|
|||
/// A wrapper capturing the needed internal `wasi-keyvalue` state.
|
|||
pub struct WasiKeyValue<'a> { |
|||
ctx: &'a WasiKeyValueCtx, |
|||
table: &'a mut ResourceTable, |
|||
} |
|||
|
|||
impl<'a> WasiKeyValue<'a> { |
|||
/// Create a new view into the `wasi-keyvalue` state.
|
|||
pub fn new(ctx: &'a WasiKeyValueCtx, table: &'a mut ResourceTable) -> Self { |
|||
Self { ctx, table } |
|||
} |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl keyvalue::store::Host for WasiKeyValue<'_> { |
|||
async fn open(&mut self, identifier: String) -> Result<Resource<Bucket>, Error> { |
|||
if identifier == "" { |
|||
return Ok(self.table.push(Bucket { |
|||
inner: Box::new(provider::inmemory::InMemory::new( |
|||
self.ctx.in_memory_data.clone(), |
|||
)), |
|||
})?); |
|||
} |
|||
|
|||
let u = Url::parse(&identifier).map_err(to_other_error)?; |
|||
match u.scheme() { |
|||
"redis" | "redis+unix" => { |
|||
#[cfg(not(feature = "redis"))] |
|||
{ |
|||
return Err(Error::Other( |
|||
"Cannot enable Redis support when the crate is not compiled with this feature." |
|||
.to_string(), |
|||
)); |
|||
} |
|||
#[cfg(feature = "redis")] |
|||
{ |
|||
if !self.ctx.allow_redis_host(&u) { |
|||
return Err(Error::Other(format!( |
|||
"the identifier {} is not in the allowed list", |
|||
identifier |
|||
))); |
|||
} |
|||
|
|||
let host = provider::redis::open( |
|||
identifier, |
|||
self.ctx.redis_response_timeout, |
|||
self.ctx.redis_connection_timeout, |
|||
) |
|||
.await?; |
|||
Ok(self.table.push(Bucket { |
|||
inner: Box::new(host), |
|||
})?) |
|||
} |
|||
} |
|||
_ => Err(Error::NoSuchStore), |
|||
} |
|||
} |
|||
|
|||
fn convert_error(&mut self, err: Error) -> Result<keyvalue::store::Error> { |
|||
match err { |
|||
Error::NoSuchStore => Ok(keyvalue::store::Error::NoSuchStore), |
|||
Error::AccessDenied => Ok(keyvalue::store::Error::AccessDenied), |
|||
Error::Other(e) => Ok(keyvalue::store::Error::Other(e)), |
|||
} |
|||
} |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl keyvalue::store::HostBucket for WasiKeyValue<'_> { |
|||
async fn get( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
key: String, |
|||
) -> Result<Option<Vec<u8>>, Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.get(key).await |
|||
} |
|||
|
|||
async fn set( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
key: String, |
|||
value: Vec<u8>, |
|||
) -> Result<(), Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.set(key, value).await |
|||
} |
|||
|
|||
async fn delete(&mut self, bucket: Resource<Bucket>, key: String) -> Result<(), Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.delete(key).await |
|||
} |
|||
|
|||
async fn exists(&mut self, bucket: Resource<Bucket>, key: String) -> Result<bool, Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.exists(key).await |
|||
} |
|||
|
|||
async fn list_keys( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
cursor: Option<u64>, |
|||
) -> Result<keyvalue::store::KeyResponse, Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.list_keys(cursor).await |
|||
} |
|||
|
|||
fn drop(&mut self, bucket: Resource<Bucket>) -> Result<()> { |
|||
self.table.delete(bucket)?; |
|||
Ok(()) |
|||
} |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl keyvalue::atomics::Host for WasiKeyValue<'_> { |
|||
async fn increment( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
key: String, |
|||
delta: u64, |
|||
) -> Result<u64, Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.increment(key, delta).await |
|||
} |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl keyvalue::batch::Host for WasiKeyValue<'_> { |
|||
async fn get_many( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
keys: Vec<String>, |
|||
) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.get_many(keys).await |
|||
} |
|||
|
|||
async fn set_many( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
key_values: Vec<(String, Vec<u8>)>, |
|||
) -> Result<(), Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.set_many(key_values).await |
|||
} |
|||
|
|||
async fn delete_many( |
|||
&mut self, |
|||
bucket: Resource<Bucket>, |
|||
keys: Vec<String>, |
|||
) -> Result<(), Error> { |
|||
let bucket = self.table.get_mut(&bucket)?; |
|||
bucket.inner.delete_many(keys).await |
|||
} |
|||
} |
|||
|
|||
/// Add all the `wasi-keyvalue` world's interfaces to a [`wasmtime::component::Linker`].
|
|||
pub fn add_to_linker<T: Send>( |
|||
l: &mut wasmtime::component::Linker<T>, |
|||
f: impl Fn(&mut T) -> WasiKeyValue<'_> + Send + Sync + Copy + 'static, |
|||
) -> Result<()> { |
|||
keyvalue::store::add_to_linker_get_host(l, f)?; |
|||
keyvalue::atomics::add_to_linker_get_host(l, f)?; |
|||
keyvalue::batch::add_to_linker_get_host(l, f)?; |
|||
Ok(()) |
|||
} |
|||
|
|||
#[cfg(test)] |
|||
mod tests { |
|||
#[test] |
|||
#[cfg(feature = "redis")] |
|||
fn test_allow_redis_host() { |
|||
let ctx = super::WasiKeyValueCtx::builder() |
|||
.allow_redis_hosts(&["127.0.0.1:1234", "localhost", "/var/run/redis.sock"]) |
|||
.build(); |
|||
assert!(ctx.allow_redis_host(&super::Url::parse("redis://127.0.0.1:1234/db").unwrap())); |
|||
assert!(ctx.allow_redis_host(&super::Url::parse("redis://localhost").unwrap())); |
|||
assert!(!ctx.allow_redis_host(&super::Url::parse("redis://192.168.0.1").unwrap())); |
|||
assert!(ctx.allow_redis_host( |
|||
&super::Url::parse("redis+unix:///var/run/redis.sock?db=db").unwrap() |
|||
)); |
|||
assert!(!ctx.allow_redis_host( |
|||
&super::Url::parse("redis+unix:///var/local/redis.sock?db=db").unwrap() |
|||
)); |
|||
} |
|||
} |
@ -0,0 +1,94 @@ |
|||
use crate::{generated::wasi::keyvalue::store::KeyResponse, to_other_error, Error, Host}; |
|||
use async_trait::async_trait; |
|||
use std::collections::HashMap; |
|||
use std::sync::{Arc, Mutex}; |
|||
|
|||
#[derive(Default)] |
|||
pub(crate) struct InMemory { |
|||
store: Arc<Mutex<HashMap<String, Vec<u8>>>>, |
|||
} |
|||
|
|||
impl InMemory { |
|||
pub(crate) fn new(data: HashMap<String, Vec<u8>>) -> Self { |
|||
Self { |
|||
store: Arc::new(Mutex::new(data)), |
|||
} |
|||
} |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl Host for InMemory { |
|||
async fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, Error> { |
|||
let store = self.store.lock().unwrap(); |
|||
Ok(store.get(&key).cloned()) |
|||
} |
|||
|
|||
async fn set(&mut self, key: String, value: Vec<u8>) -> Result<(), Error> { |
|||
let mut store = self.store.lock().unwrap(); |
|||
store.insert(key, value); |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn delete(&mut self, key: String) -> Result<(), Error> { |
|||
let mut store = self.store.lock().unwrap(); |
|||
store.remove(&key); |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn exists(&mut self, key: String) -> Result<bool, Error> { |
|||
let store = self.store.lock().unwrap(); |
|||
Ok(store.contains_key(&key)) |
|||
} |
|||
|
|||
async fn list_keys(&mut self, cursor: Option<u64>) -> Result<KeyResponse, Error> { |
|||
let store = self.store.lock().unwrap(); |
|||
let keys: Vec<String> = store.keys().cloned().collect(); |
|||
let cursor = cursor.unwrap_or(0) as usize; |
|||
let keys_slice = &keys[cursor..]; |
|||
Ok(KeyResponse { |
|||
keys: keys_slice.to_vec(), |
|||
cursor: None, |
|||
}) |
|||
} |
|||
|
|||
async fn increment(&mut self, key: String, delta: u64) -> Result<u64, Error> { |
|||
let mut store = self.store.lock().unwrap(); |
|||
let value = store |
|||
.entry(key.clone()) |
|||
.or_insert("0".to_string().into_bytes()); |
|||
let current_value = String::from_utf8(value.clone()) |
|||
.map_err(to_other_error)? |
|||
.parse::<u64>() |
|||
.map_err(to_other_error)?; |
|||
let new_value = current_value + delta; |
|||
*value = new_value.to_string().into_bytes(); |
|||
Ok(new_value) |
|||
} |
|||
|
|||
async fn get_many( |
|||
&mut self, |
|||
keys: Vec<String>, |
|||
) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> { |
|||
let store = self.store.lock().unwrap(); |
|||
Ok(keys |
|||
.into_iter() |
|||
.map(|key| store.get(&key).map(|value| (key.clone(), value.clone()))) |
|||
.collect()) |
|||
} |
|||
|
|||
async fn set_many(&mut self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> { |
|||
let mut store = self.store.lock().unwrap(); |
|||
for (key, value) in key_values { |
|||
store.insert(key, value); |
|||
} |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn delete_many(&mut self, keys: Vec<String>) -> Result<(), Error> { |
|||
let mut store = self.store.lock().unwrap(); |
|||
for key in keys { |
|||
store.remove(&key); |
|||
} |
|||
Ok(()) |
|||
} |
|||
} |
@ -0,0 +1,3 @@ |
|||
pub(crate) mod inmemory; |
|||
#[cfg(feature = "redis")] |
|||
pub(crate) mod redis; |
@ -0,0 +1,106 @@ |
|||
use crate::{generated::wasi::keyvalue::store::KeyResponse, Error, Host}; |
|||
use anyhow::Result; |
|||
use async_trait::async_trait; |
|||
use redis::{aio::MultiplexedConnection, AsyncCommands, RedisError}; |
|||
use std::time::Duration; |
|||
|
|||
struct Redis { |
|||
conn: MultiplexedConnection, |
|||
} |
|||
|
|||
impl From<RedisError> for Error { |
|||
fn from(err: RedisError) -> Self { |
|||
Self::Other(err.to_string()) |
|||
} |
|||
} |
|||
|
|||
pub(crate) async fn open( |
|||
identifier: String, |
|||
response_timeout: Option<Duration>, |
|||
connection_timeout: Option<Duration>, |
|||
) -> Result<impl Host, RedisError> { |
|||
let client = redis::Client::open(identifier)?; |
|||
let conn = client |
|||
.get_multiplexed_async_connection_with_timeouts( |
|||
response_timeout.unwrap_or(Duration::MAX), |
|||
connection_timeout.unwrap_or(Duration::MAX), |
|||
) |
|||
.await?; |
|||
Ok(Redis { conn }) |
|||
} |
|||
|
|||
#[async_trait] |
|||
impl Host for Redis { |
|||
async fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, Error> { |
|||
let v: Option<Vec<u8>> = self.conn.get(key).await?; |
|||
Ok(v) |
|||
} |
|||
|
|||
async fn set(&mut self, key: String, value: Vec<u8>) -> Result<(), Error> { |
|||
let _: () = self.conn.set(key, value).await?; |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn delete(&mut self, key: String) -> Result<(), Error> { |
|||
let _: () = self.conn.del(key).await?; |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn exists(&mut self, key: String) -> Result<bool, Error> { |
|||
let exists: bool = self.conn.exists(key).await?; |
|||
Ok(exists) |
|||
} |
|||
|
|||
async fn list_keys(&mut self, cursor: Option<u64>) -> Result<KeyResponse, Error> { |
|||
let cursor = cursor.unwrap_or(0); |
|||
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN") |
|||
.arg(cursor) |
|||
.query_async(&mut self.conn) |
|||
.await?; |
|||
|
|||
Ok(KeyResponse { |
|||
keys, |
|||
cursor: if new_cursor == 0 { |
|||
None |
|||
} else { |
|||
Some(new_cursor) |
|||
}, |
|||
}) |
|||
} |
|||
|
|||
async fn increment(&mut self, key: String, delta: u64) -> Result<u64, Error> { |
|||
let v: u64 = self.conn.incr(key, delta).await?; |
|||
Ok(v) |
|||
} |
|||
|
|||
async fn get_many( |
|||
&mut self, |
|||
keys: Vec<String>, |
|||
) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> { |
|||
let values: Vec<Option<Vec<u8>>> = self.conn.get(keys.clone()).await?; |
|||
|
|||
Ok(keys |
|||
.into_iter() |
|||
.zip(values.into_iter()) |
|||
.map(|(key, value)| value.map(|v| (key, v))) |
|||
.collect()) |
|||
} |
|||
|
|||
async fn set_many(&mut self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> { |
|||
let mut pipe = redis::pipe(); |
|||
for (key, value) in key_values { |
|||
pipe.set(key, value).ignore(); |
|||
} |
|||
pipe.query_async(&mut self.conn).await?; |
|||
Ok(()) |
|||
} |
|||
|
|||
async fn delete_many(&mut self, keys: Vec<String>) -> Result<(), Error> { |
|||
let mut pipe = redis::pipe(); |
|||
for key in keys { |
|||
pipe.del(key).ignore(); |
|||
} |
|||
pipe.query_async(&mut self.conn).await?; |
|||
Ok(()) |
|||
} |
|||
} |
@ -0,0 +1,93 @@ |
|||
use anyhow::{anyhow, Result}; |
|||
use test_programs_artifacts::{foreach_keyvalue, KEYVALUE_MAIN_COMPONENT}; |
|||
use wasmtime::{ |
|||
component::{Component, Linker, ResourceTable}, |
|||
Store, |
|||
}; |
|||
use wasmtime_wasi::{bindings::Command, WasiCtx, WasiCtxBuilder, WasiView}; |
|||
use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder}; |
|||
|
|||
struct Ctx { |
|||
table: ResourceTable, |
|||
wasi_ctx: WasiCtx, |
|||
wasi_keyvalue_ctx: WasiKeyValueCtx, |
|||
} |
|||
|
|||
impl WasiView for Ctx { |
|||
fn table(&mut self) -> &mut ResourceTable { |
|||
&mut self.table |
|||
} |
|||
|
|||
fn ctx(&mut self) -> &mut WasiCtx { |
|||
&mut self.wasi_ctx |
|||
} |
|||
} |
|||
|
|||
async fn run_wasi(path: &str, ctx: Ctx) -> Result<()> { |
|||
let engine = test_programs_artifacts::engine(|config| { |
|||
config.async_support(true); |
|||
}); |
|||
let mut store = Store::new(&engine, ctx); |
|||
let component = Component::from_file(&engine, path)?; |
|||
|
|||
let mut linker = Linker::new(&engine); |
|||
wasmtime_wasi::add_to_linker_async(&mut linker)?; |
|||
wasmtime_wasi_keyvalue::add_to_linker(&mut linker, |h: &mut Ctx| { |
|||
WasiKeyValue::new(&h.wasi_keyvalue_ctx, &mut h.table) |
|||
})?; |
|||
|
|||
let command = Command::instantiate_async(&mut store, &component, &linker).await?; |
|||
command |
|||
.wasi_cli_run() |
|||
.call_run(&mut store) |
|||
.await? |
|||
.map_err(|()| anyhow!("command returned with failing exit status")) |
|||
} |
|||
|
|||
macro_rules! assert_test_exists { |
|||
($name:ident) => { |
|||
#[allow(unused_imports)] |
|||
use self::$name as _; |
|||
}; |
|||
} |
|||
|
|||
foreach_keyvalue!(assert_test_exists); |
|||
|
|||
#[tokio::test(flavor = "multi_thread")] |
|||
async fn keyvalue_main() -> Result<()> { |
|||
run_wasi( |
|||
KEYVALUE_MAIN_COMPONENT, |
|||
Ctx { |
|||
table: ResourceTable::new(), |
|||
wasi_ctx: WasiCtxBuilder::new() |
|||
.inherit_stderr() |
|||
.env("IDENTIFIER", "") |
|||
.build(), |
|||
wasi_keyvalue_ctx: WasiKeyValueCtxBuilder::new() |
|||
.in_memory_data([("atomics_key", "5")]) |
|||
.build(), |
|||
}, |
|||
) |
|||
.await |
|||
} |
|||
|
|||
#[cfg(feature = "redis")] |
|||
#[tokio::test(flavor = "multi_thread")] |
|||
async fn keyvalue_redis() -> Result<()> { |
|||
run_wasi( |
|||
KEYVALUE_MAIN_COMPONENT, |
|||
Ctx { |
|||
table: ResourceTable::new(), |
|||
wasi_ctx: WasiCtxBuilder::new() |
|||
.inherit_stderr() |
|||
.env("IDENTIFIER", "redis://127.0.0.1/") |
|||
.build(), |
|||
wasi_keyvalue_ctx: WasiKeyValueCtxBuilder::new() |
|||
.allow_redis_hosts(&["127.0.0.1"]) |
|||
.redis_connection_timeout(std::time::Duration::from_secs(5)) |
|||
.redis_response_timeout(std::time::Duration::from_secs(5)) |
|||
.build(), |
|||
}, |
|||
) |
|||
.await |
|||
} |
@ -0,0 +1,22 @@ |
|||
/// A keyvalue interface that provides atomic operations. |
|||
/// |
|||
/// Atomic operations are single, indivisible operations. When a fault causes an atomic operation to |
|||
/// fail, it will appear to the invoker of the atomic operation that the action either completed |
|||
/// successfully or did nothing at all. |
|||
/// |
|||
/// Please note that this interface is bare functions that take a reference to a bucket. This is to |
|||
/// get around the current lack of a way to "extend" a resource with additional methods inside of |
|||
/// wit. Future version of the interface will instead extend these methods on the base `bucket` |
|||
/// resource. |
|||
interface atomics { |
|||
use store.{bucket, error}; |
|||
|
|||
/// Atomically increment the value associated with the key in the store by the given delta. It |
|||
/// returns the new value. |
|||
/// |
|||
/// If the key does not exist in the store, it creates a new key-value pair with the value set |
|||
/// to the given delta. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
increment: func(bucket: borrow<bucket>, key: string, delta: u64) -> result<u64, error>; |
|||
} |
@ -0,0 +1,63 @@ |
|||
/// A keyvalue interface that provides batch operations. |
|||
/// |
|||
/// A batch operation is an operation that operates on multiple keys at once. |
|||
/// |
|||
/// Batch operations are useful for reducing network round-trip time. For example, if you want to |
|||
/// get the values associated with 100 keys, you can either do 100 get operations or you can do 1 |
|||
/// batch get operation. The batch operation is faster because it only needs to make 1 network call |
|||
/// instead of 100. |
|||
/// |
|||
/// A batch operation does not guarantee atomicity, meaning that if the batch operation fails, some |
|||
/// of the keys may have been modified and some may not. |
|||
/// |
|||
/// This interface does has the same consistency guarantees as the `store` interface, meaning that |
|||
/// you should be able to "read your writes." |
|||
/// |
|||
/// Please note that this interface is bare functions that take a reference to a bucket. This is to |
|||
/// get around the current lack of a way to "extend" a resource with additional methods inside of |
|||
/// wit. Future version of the interface will instead extend these methods on the base `bucket` |
|||
/// resource. |
|||
interface batch { |
|||
use store.{bucket, error}; |
|||
|
|||
/// Get the key-value pairs associated with the keys in the store. It returns a list of |
|||
/// key-value pairs. |
|||
/// |
|||
/// If any of the keys do not exist in the store, it returns a `none` value for that pair in the |
|||
/// list. |
|||
/// |
|||
/// MAY show an out-of-date value if there are concurrent writes to the store. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
get-many: func(bucket: borrow<bucket>, keys: list<string>) -> result<list<option<tuple<string, list<u8>>>>, error>; |
|||
|
|||
/// Set the values associated with the keys in the store. If the key already exists in the |
|||
/// store, it overwrites the value. |
|||
/// |
|||
/// Note that the key-value pairs are not guaranteed to be set in the order they are provided. |
|||
/// |
|||
/// If any of the keys do not exist in the store, it creates a new key-value pair. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not |
|||
/// rollback the key-value pairs that were already set. Thus, this batch operation does not |
|||
/// guarantee atomicity, implying that some key-value pairs could be set while others might |
|||
/// fail. |
|||
/// |
|||
/// Other concurrent operations may also be able to see the partial results. |
|||
set-many: func(bucket: borrow<bucket>, key-values: list<tuple<string, list<u8>>>) -> result<_, error>; |
|||
|
|||
/// Delete the key-value pairs associated with the keys in the store. |
|||
/// |
|||
/// Note that the key-value pairs are not guaranteed to be deleted in the order they are |
|||
/// provided. |
|||
/// |
|||
/// If any of the keys do not exist in the store, it skips the key. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not |
|||
/// rollback the key-value pairs that were already deleted. Thus, this batch operation does not |
|||
/// guarantee atomicity, implying that some key-value pairs could be deleted while others might |
|||
/// fail. |
|||
/// |
|||
/// Other concurrent operations may also be able to see the partial results. |
|||
delete-many: func(bucket: borrow<bucket>, keys: list<string>) -> result<_, error>; |
|||
} |
@ -0,0 +1,122 @@ |
|||
/// A keyvalue interface that provides eventually consistent key-value operations. |
|||
/// |
|||
/// Each of these operations acts on a single key-value pair. |
|||
/// |
|||
/// The value in the key-value pair is defined as a `u8` byte array and the intention is that it is |
|||
/// the common denominator for all data types defined by different key-value stores to handle data, |
|||
/// ensuring compatibility between different key-value stores. Note: the clients will be expecting |
|||
/// serialization/deserialization overhead to be handled by the key-value store. The value could be |
|||
/// a serialized object from JSON, HTML or vendor-specific data types like AWS S3 objects. |
|||
/// |
|||
/// Data consistency in a key value store refers to the guarantee that once a write operation |
|||
/// completes, all subsequent read operations will return the value that was written. |
|||
/// |
|||
/// Any implementation of this interface must have enough consistency to guarantee "reading your |
|||
/// writes." In particular, this means that the client should never get a value that is older than |
|||
/// the one it wrote, but it MAY get a newer value if one was written around the same time. These |
|||
/// guarantees only apply to the same client (which will likely be provided by the host or an |
|||
/// external capability of some kind). In this context a "client" is referring to the caller or |
|||
/// guest that is consuming this interface. Once a write request is committed by a specific client, |
|||
/// all subsequent read requests by the same client will reflect that write or any subsequent |
|||
/// writes. Another client running in a different context may or may not immediately see the result |
|||
/// due to the replication lag. As an example of all of this, if a value at a given key is A, and |
|||
/// the client writes B, then immediately reads, it should get B. If something else writes C in |
|||
/// quick succession, then the client may get C. However, a client running in a separate context may |
|||
/// still see A or B |
|||
interface store { |
|||
/// The set of errors which may be raised by functions in this package |
|||
variant error { |
|||
/// The host does not recognize the store identifier requested. |
|||
no-such-store, |
|||
|
|||
/// The requesting component does not have access to the specified store |
|||
/// (which may or may not exist). |
|||
access-denied, |
|||
|
|||
/// Some implementation-specific error has occurred (e.g. I/O) |
|||
other(string) |
|||
} |
|||
|
|||
/// A response to a `list-keys` operation. |
|||
record key-response { |
|||
/// The list of keys returned by the query. |
|||
keys: list<string>, |
|||
/// The continuation token to use to fetch the next page of keys. If this is `null`, then |
|||
/// there are no more keys to fetch. |
|||
cursor: option<u64> |
|||
} |
|||
|
|||
/// Get the bucket with the specified identifier. |
|||
/// |
|||
/// `identifier` must refer to a bucket provided by the host. |
|||
/// |
|||
/// `error::no-such-store` will be raised if the `identifier` is not recognized. |
|||
open: func(identifier: string) -> result<bucket, error>; |
|||
|
|||
/// A bucket is a collection of key-value pairs. Each key-value pair is stored as a entry in the |
|||
/// bucket, and the bucket itself acts as a collection of all these entries. |
|||
/// |
|||
/// It is worth noting that the exact terminology for bucket in key-value stores can very |
|||
/// depending on the specific implementation. For example: |
|||
/// |
|||
/// 1. Amazon DynamoDB calls a collection of key-value pairs a table |
|||
/// 2. Redis has hashes, sets, and sorted sets as different types of collections |
|||
/// 3. Cassandra calls a collection of key-value pairs a column family |
|||
/// 4. MongoDB calls a collection of key-value pairs a collection |
|||
/// 5. Riak calls a collection of key-value pairs a bucket |
|||
/// 6. Memcached calls a collection of key-value pairs a slab |
|||
/// 7. Azure Cosmos DB calls a collection of key-value pairs a container |
|||
/// |
|||
/// In this interface, we use the term `bucket` to refer to a collection of key-value pairs |
|||
resource bucket { |
|||
/// Get the value associated with the specified `key` |
|||
/// |
|||
/// The value is returned as an option. If the key-value pair exists in the |
|||
/// store, it returns `Ok(value)`. If the key does not exist in the |
|||
/// store, it returns `Ok(none)`. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
get: func(key: string) -> result<option<list<u8>>, error>; |
|||
|
|||
/// Set the value associated with the key in the store. If the key already |
|||
/// exists in the store, it overwrites the value. |
|||
/// |
|||
/// If the key does not exist in the store, it creates a new key-value pair. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
set: func(key: string, value: list<u8>) -> result<_, error>; |
|||
|
|||
/// Delete the key-value pair associated with the key in the store. |
|||
/// |
|||
/// If the key does not exist in the store, it does nothing. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
delete: func(key: string) -> result<_, error>; |
|||
|
|||
/// Check if the key exists in the store. |
|||
/// |
|||
/// If the key exists in the store, it returns `Ok(true)`. If the key does |
|||
/// not exist in the store, it returns `Ok(false)`. |
|||
/// |
|||
/// If any other error occurs, it returns an `Err(error)`. |
|||
exists: func(key: string) -> result<bool, error>; |
|||
|
|||
/// Get all the keys in the store with an optional cursor (for use in pagination). It |
|||
/// returns a list of keys. Please note that for most KeyValue implementations, this is a |
|||
/// can be a very expensive operation and so it should be used judiciously. Implementations |
|||
/// can return any number of keys in a single response, but they should never attempt to |
|||
/// send more data than is reasonable (i.e. on a small edge device, this may only be a few |
|||
/// KB, while on a large machine this could be several MB). Any response should also return |
|||
/// a cursor that can be used to fetch the next page of keys. See the `key-response` record |
|||
/// for more information. |
|||
/// |
|||
/// Note that the keys are not guaranteed to be returned in any particular order. |
|||
/// |
|||
/// If the store is empty, it returns an empty list. |
|||
/// |
|||
/// MAY show an out-of-date list of keys if there are concurrent writes to the store. |
|||
/// |
|||
/// If any error occurs, it returns an `Err(error)`. |
|||
list-keys: func(cursor: option<u64>) -> result<key-response, error>; |
|||
} |
|||
} |
@ -0,0 +1,16 @@ |
|||
/// A keyvalue interface that provides watch operations. |
|||
/// |
|||
/// This interface is used to provide event-driven mechanisms to handle |
|||
/// keyvalue changes. |
|||
interface watcher { |
|||
/// A keyvalue interface that provides handle-watch operations. |
|||
use store.{bucket}; |
|||
|
|||
/// Handle the `set` event for the given bucket and key. It includes a reference to the `bucket` |
|||
/// that can be used to interact with the store. |
|||
on-set: func(bucket: bucket, key: string, value: list<u8>); |
|||
|
|||
/// Handle the `delete` event for the given bucket and key. It includes a reference to the |
|||
/// `bucket` that can be used to interact with the store. |
|||
on-delete: func(bucket: bucket, key: string); |
|||
} |
@ -0,0 +1,26 @@ |
|||
package wasi:keyvalue@0.2.0-draft; |
|||
|
|||
/// The `wasi:keyvalue/imports` world provides common APIs for interacting with key-value stores. |
|||
/// Components targeting this world will be able to do: |
|||
/// |
|||
/// 1. CRUD (create, read, update, delete) operations on key-value stores. |
|||
/// 2. Atomic `increment` and CAS (compare-and-swap) operations. |
|||
/// 3. Batch operations that can reduce the number of round trips to the network. |
|||
world imports { |
|||
/// The `store` capability allows the component to perform eventually consistent operations on |
|||
/// the key-value store. |
|||
import store; |
|||
|
|||
/// The `atomic` capability allows the component to perform atomic / `increment` and CAS |
|||
/// (compare-and-swap) operations. |
|||
import atomics; |
|||
|
|||
/// The `batch` capability allows the component to perform eventually consistent batch |
|||
/// operations that can reduce the number of round trips to the network. |
|||
import batch; |
|||
} |
|||
|
|||
world watch-service { |
|||
include imports; |
|||
export watcher; |
|||
} |
@ -0,0 +1,6 @@ |
|||
// We actually don't use this; it's just to let bindgen! find the corresponding world in wit/deps. |
|||
package wasmtime:wasi; |
|||
|
|||
world bindings { |
|||
include wasi:keyvalue/imports@0.2.0-draft; |
|||
} |
Loading…
Reference in new issue