Skip to content

Commit

Permalink
Refactor WASM package a bit
Browse files Browse the repository at this point in the history
Instance now has a unique store, but it can share engine and linker. On
top of that we also return a stdout receiver when creating an instance
  • Loading branch information
drogus committed Mar 11, 2024
1 parent 47f64e1 commit 8848721
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 127 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ wasi-common = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wiggle = { git = "https://github.com/bytecodealliance/wasmtime.git" }
num-rational = { version = "0.4", features = ["serde"]}
borsh = { version = "1.3", features = ["unstable__schema", "derive"] }
bytes = "1.5"
2 changes: 1 addition & 1 deletion rust-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ use crows_bindings::{http_request, HTTPMethod::*};
#[export_name="test"]
pub fn test() {
let response = http_request("https://example.com".into(), GET, HashMap::new(), "".into());
println!("response: {:?}", response.unwrap());
println!("response: {:?}", response.unwrap().status);
}
2 changes: 2 additions & 0 deletions wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ wasmtime-wasi.workspace = true
wasi-common.workspace = true
wiggle.workspace = true
borsh.workspace = true
futures.workspace = true
bytes.workspace = true
251 changes: 133 additions & 118 deletions wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use anyhow::anyhow;
use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize};
use crows_bindings::{HTTPError, HTTPMethod, HTTPRequest, HTTPResponse};
use crows_utils::{services::RunId};
use crows_utils::services::RunId;
use reqwest::header::{HeaderName, HeaderValue};
use reqwest::{Body, Request, Url};
use std::str::FromStr;
use std::{any::Any, collections::HashMap, io::IoSlice};
use tokio::sync::mpsc::UnboundedReceiver;
use wasi_common::file::{FdFlags, FileType};
use wasi_common::WasiFile;
use wasi_common::{
file::{FdFlags, FileType},
};
use wasmtime::{
Caller, Config, Engine, Linker, Memory, MemoryType, Module, Store
};
use wasmtime_wasi::{StdoutStream};
use borsh::{BorshSerialize, BorshDeserialize, from_slice, to_vec};
use wasmtime::{Caller, Config, Engine, Linker, Memory, MemoryType, Module, Store};
use wasmtime_wasi::{StdoutStream, StreamResult};

#[macro_export]
macro_rules! ok_or_return {
Expand All @@ -39,37 +36,39 @@ pub enum Error {

// A runtime should be run in a single async runtime. Ideally also in a single
// thread as we want a share-nothing architecture for performance and simplicity
#[derive(Default)]
pub struct Runtime {
pub struct Runtime<'a> {
// we index instances with the run id, cause technically we can run
// scenarios from multiple modules on a single runtime
// TODO: might be simpler to assume only one module? I'm not sure yet if it's worth
// it. If the overhead is too big I'd probably refactor it to allow only one module
// at any point in time. I would like to start with multiple modules, though, to first
// see if it's actually problematic, maybe it's not and it seems to give more flexibility
instances: HashMap<RunId, Vec<Instance>>,
pub instances: HashMap<RunId, Vec<Instance<'a>>>,
pub environment: Environment,
}

impl Runtime {
pub fn new() -> Self {
Self::default()
impl<'a> Runtime<'a> {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
instances: HashMap::new(),
environment: Environment::new()?,
})
}

// TODO: it looks like the id/module pair should be in a separate data type, might
// be worth to extract it in the future
pub fn create_instances(
&mut self,
id: RunId,
count: usize,
instance: &Instance,
) -> Result<(), Error> {
let mut instances = self.instances.get_mut(&id).ok_or(Error::NoSuchRun(id))?;
for _ in (0..count).into_iter() {
instances.push(instance.clone());
}

Ok(())
}
// pub fn create_instances(
// &mut self,
// id: RunId,
// count: usize,
// instance: &'a Instance<'_>,
// ) -> Result<(), Error> {
// let instances = self.instances.get_mut(&id).ok_or(Error::NoSuchRun(id))?;
// for _ in (0..count).into_iter() {
// instances.push(instance);
// }
//
// Ok(())
// }
}

struct WasiHostCtx {
Expand Down Expand Up @@ -162,7 +161,7 @@ impl WasiHostCtx {
len: u32,
) -> anyhow::Result<()> {
let memory = get_memory(&mut caller)?;
let (mut slice, store) = memory.data_and_store_mut(&mut caller);
let (slice, store) = memory.data_and_store_mut(&mut caller);

let buffer = store.buffers.try_remove(index as usize).unwrap();
anyhow::ensure!(
Expand Down Expand Up @@ -229,130 +228,125 @@ impl wasmtime_wasi::preview1::WasiPreview1View for WasiHostCtx {
}
}

#[derive(Clone)]
pub struct Instance {
pub struct Environment {
engine: Engine,
module: Module,
linker: Linker<WasiHostCtx>,
}

pub fn get_memory<T>(caller: &mut Caller<'_, T>) -> anyhow::Result<Memory> {
Ok(caller.get_export("memory").unwrap().into_memory().unwrap())
pub struct Instance<'a> {
engine: &'a Engine,
module: Module,
linker: &'a Linker<WasiHostCtx>,
store: wasmtime::Store<WasiHostCtx>,
instance: wasmtime::Instance,
}

impl Instance {
pub fn new(raw_module: Vec<u8>) -> Result<Self, anyhow::Error> {
impl Environment {
pub fn new() -> anyhow::Result<Self> {
let mut config = Config::new();
config.async_support(true);
config.consume_fuel(true);

let engine = Engine::new(&config)?;

let module = Module::from_binary(&engine, &raw_module)?;

let mut linker = Linker::new(&engine);

// linker.func_wrap("crows", "log", WasiHostCtx::log).unwrap();
linker
.func_wrap("crows", "consume_buffer", WasiHostCtx::consume_buffer)
.unwrap();
linker
.func_wrap2_async("crows", "http", |caller, ptr, len| {
Box::new(async move {
WasiHostCtx::http(caller, ptr, len).await
})
Box::new(async move { WasiHostCtx::http(caller, ptr, len).await })
})
.unwrap();
// let _ = linker.func_new_async(
// "crows",
// "http_request",
// http_request_type,
// |_, _params, results| {
// Box::new(async {
// println!("Waiting 5s in rust async code");
// tokio::time::sleep(Duration::from_secs(5)).await;
// println!("Finished waiting");
// results[0] = Val::I32(1111 as i32);
// Ok(())
// })
// },
// )?;
// linker.func_wrap("crows", "print", |param: i32| {
// println!("Got value: {param}")
// })?;

wasmtime_wasi::preview1::add_to_linker_async(&mut linker)?;

Ok(Self {
engine,
module,
linker: linker,
})
Ok(Self { engine, linker })
}
}

pub async fn run_wasm(instance: &Instance) -> anyhow::Result<()> {
let (sender, receiver) = std::sync::mpsc::channel();
tokio::spawn(async move {
tokio::task::spawn_blocking(move || {
while let Ok(message) = receiver.recv() {
println!("stdout: {}", String::from_utf8(message).unwrap());
}
})
.await;
});

let stdout = RemoteStdout { sender };
// let wasi = WasiCtxBuilder::new()
// .
// .stdout(stdout.clone())
// Set an environment variable so the wasm knows its name.
// .env("NAME", &inputs.name)?
// .build();
let wasi_ctx = wasmtime_wasi::WasiCtxBuilder::new()
.stdout(stdout.clone())
.inherit_stdio()
.build();

let host_ctx = WasiHostCtx {
preview2_ctx: wasi_ctx,
preview2_table: wasmtime::component::ResourceTable::new(),
preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter::new(),
buffers: slab::Slab::default(),
memory: None,
client: reqwest::Client::new(),
};
let mut store: Store<WasiHostCtx> = Store::new(&instance.engine, host_ctx);
pub fn get_memory<T>(caller: &mut Caller<'_, T>) -> anyhow::Result<Memory> {
Ok(caller.get_export("memory").unwrap().into_memory().unwrap())
}

let memory = Memory::new(&mut store, MemoryType::new(1, None)).unwrap();
store.data_mut().memory = Some(memory);
impl<'a> Instance<'a> {
pub fn new_store(
engine: &Engine,
) -> anyhow::Result<(wasmtime::Store<WasiHostCtx>, UnboundedReceiver<Vec<u8>>)> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

let stdout = RemoteStdout { sender };

let wasi_ctx = wasmtime_wasi::WasiCtxBuilder::new()
.stdout(stdout)
// .inherit_stdio()
.build();

let host_ctx = WasiHostCtx {
preview2_ctx: wasi_ctx,
preview2_table: wasmtime::component::ResourceTable::new(),
preview1_adapter: wasmtime_wasi::preview1::WasiPreview1Adapter::new(),
buffers: slab::Slab::default(),
memory: None,
client: reqwest::Client::new(),
};
let mut store: Store<WasiHostCtx> = Store::new(engine, host_ctx);

// WebAssembly execution will be paused for an async yield every time it
// consumes 10000 fuel. Fuel will be refilled u64::MAX times.
store.fuel_async_yield_interval(Some(10000))?;
store.set_fuel(u64::MAX).unwrap();
let memory = Memory::new(&mut store, MemoryType::new(1, None)).unwrap();
store.data_mut().memory = Some(memory);

// Instantiate into our own unique store using the shared linker, afterwards
// acquiring the `_start` function for the module and executing it.
let instance = instance
.linker
.instantiate_async(&mut store, &instance.module)
.await?;
// WebAssembly execution will be paused for an async yield every time it
// consumes 10000 fuel. Fuel will be refilled u64::MAX times.
store.fuel_async_yield_interval(Some(10000))?;
store.set_fuel(u64::MAX).unwrap();

let func = instance
.clone()
.get_typed_func::<(), ()>(&mut store, "test")?;
Ok((store, receiver))
}

pub async fn new(
raw_module: &Vec<u8>,
env: &'a Environment,
) -> anyhow::Result<(Self, UnboundedReceiver<Vec<u8>>)> {
let module = Module::from_binary(&env.engine, raw_module)?;

let (mut store, stdout_receiver) = Instance::new_store(&env.engine)?;
let instance = env.linker.instantiate_async(&mut store, &module).await?;

// let func = instance
// .clone()
// .get_typed_func::<(), ()>(&mut store, "test")?;

// func.call_async(&mut store, ()).await?;

// drop(store);

func.call_async(&mut store, ()).await?;
Ok((
Self {
engine: &env.engine,
module,
linker: &env.linker,
store,
instance,
},
stdout_receiver,
))
}
}

drop(store);
pub async fn run_wasm(instance: &mut Instance<'_>) -> anyhow::Result<()> {
let func = instance
.instance
.get_typed_func::<(), ()>(&mut instance.store, "test")?;

func.call_async(&mut instance.store, ()).await?;

Ok(())
}

#[derive(Clone)]
struct RemoteStdout {
sender: std::sync::mpsc::Sender<Vec<u8>>,
sender: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}

#[wiggle::async_trait]
Expand All @@ -371,18 +365,39 @@ impl WasiFile for RemoteStdout {
for slice in bufs {
let slice = slice.to_vec();
size += slice.len() as u64;
self.sender.send(slice);
self.sender.send(slice).unwrap();
}
Ok(size)
}
}

impl wasmtime_wasi::HostOutputStream for RemoteStdout {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
self.sender.send(bytes.to_vec()).unwrap();

Ok(())
}

fn flush(&mut self) -> StreamResult<()> {
Ok(())
}

fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
}
}

impl StdoutStream for RemoteStdout {
fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
todo!()
Box::new(self.clone())
}

fn isatty(&self) -> bool {
todo!()
false
}
}

#[async_trait::async_trait]
impl wasmtime_wasi::Subscribe for RemoteStdout {
async fn ready(&mut self) {}
}
Loading

0 comments on commit 8848721

Please sign in to comment.