Skip to content

Commit

Permalink
Begin implementation of WASM - host API
Browse files Browse the repository at this point in the history
  • Loading branch information
drogus committed Mar 8, 2024
1 parent a64701e commit ce3238e
Show file tree
Hide file tree
Showing 18 changed files with 1,467 additions and 440 deletions.
877 changes: 560 additions & 317 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ members = [
"service",
"worker",
"wasm",
"bindings",
]

[workspace.dependencies]
tokio = { version = "1.20", features = ["full"] }
anyhow = { version = "1", features = ["backtrace"] }
thiserror = "1"
tokio-serde = { version = "0.8.0", features = ["json"] }
tokio-serde = { version = "0.9.0", features = ["json"] }
tokio-util = { version = "0.7.3", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.108"
futures = "0.3.21"
uuid = { version = "1.4.1", features = ["serde", "v4"] }
wasmtime = "15"
wasmtime-wasi = { version = "15", features = ["tokio"] }
wasi-common = "15"
wiggle = "15"


wasmtime = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wasmtime-wasi = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wasi-common = { git = "https://github.com/bytecodealliance/wasmtime.git" }
wiggle = { git = "https://github.com/bytecodealliance/wasmtime.git" }
num-rational = { version = "0.4", features = ["serde"]}
borsh = { version = "1.3", features = ["unstable__schema", "derive"] }
9 changes: 9 additions & 0 deletions bindings/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "crows-bindings"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
borsh.workspace = true
116 changes: 116 additions & 0 deletions bindings/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use borsh::{from_slice, to_vec, BorshDeserialize, BorshSchema, BorshSerialize};
use std::{cell::RefCell, collections::HashMap, mem::MaybeUninit};

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub enum HTTPMethod {
HEAD,
GET,
POST,
PUT,
DELETE,
OPTIONS,
}

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub struct HTTPRequest {
url: String,
method: HTTPMethod,
headers: HashMap<String, String>,
body: Option<String>,
}

#[derive(Debug, BorshDeserialize, BorshSerialize)]
pub struct HTTPError {}

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub struct HTTPResponse {
// TODO: these should not be public I think, I'd prefer to do a public interface for them
pub headers: HashMap<String, String>,
pub body: String,
pub status: u16,
}

fn extract_from_return_value(value: u64) -> (u8, u32, u32) {
let status = ((value >> 56) & 0xFF) as u8;
let length = ((value >> 32) & 0x00FFFFFF) as u32;
let ptr = (value & 0xFFFFFFFF) as u32;
(status, length, ptr)
}

mod bindings {
#[link(wasm_import_module = "crows")]
extern "C" {
pub fn log(content: *mut u8, content_len: usize);
pub fn http(content: *mut u8, content_len: usize) -> u64;
pub fn consume_buffer(index: u32, content: *mut u8, content_len: usize);

}
}

fn with_buffer<R>(f: impl FnOnce(&mut Vec<u8>) -> R) -> R {
thread_local! {
static BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::new());
}

BUFFER.with(|r| {
let mut buf = r.borrow_mut();
buf.clear();
f(&mut buf)
})
}

pub fn http_request(
url: String,
method: HTTPMethod,
headers: HashMap<String, String>,
body: String,
) -> Result<HTTPResponse, HTTPError> {
let body = Some(body);
let request = HTTPRequest {
method,
url,
headers,
body,
};

call_host_function(&request, |buf| unsafe {
bindings::http(buf.as_mut_ptr(), buf.len())
})
}

fn call_host_function<T, R, E>(arguments: &T, f: impl FnOnce(&mut Vec<u8>) -> u64) -> Result<R, E>
where
T: BorshSerialize,
R: BorshDeserialize,
E: BorshDeserialize,
{
let mut encoded = to_vec(arguments).unwrap();

println!("encoded length: {}", encoded.len());
let (status, length, index) = with_buffer(|mut buf| {
buf.append(&mut encoded);
let response = f(&mut buf);

extract_from_return_value(response)
});

println!("response: {status}, {length}, {index}");
with_buffer(|buf| {
let capacity = buf.capacity();
if capacity < length as usize {
let additional = length as usize - buf.capacity();
buf.reserve_exact(additional);
}

unsafe {
bindings::consume_buffer(index, buf.as_mut_ptr(), length as usize);
buf.set_len(length as usize);
}

if status == 0 {
Ok(from_slice(buf).expect("Couldn't decode message from the host"))
} else {
Err(from_slice(buf).expect("Couldn't decode message from the host"))
}
})
}
3 changes: 3 additions & 0 deletions bindings/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}
21 changes: 14 additions & 7 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use crows_utils::services::{
WorkerStatus,
};
use crows_utils::services::{Coordinator, WorkerToCoordinator};
use crows_utils::ModuleId;
use uuid::Uuid;

// TODO: I don't like the fact that we have to wrap the client in Mutex and option. It should
// be easier to match the client object with the request to the service. I should probably
// add a context object at some point.
// TODO: Client should probably be thread safe for easier handling
#[derive(Default)]
struct WorkerToCoordinatorService {
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
client: Arc<Mutex<Option<WorkerClient>>>,
}
Expand All @@ -39,7 +41,7 @@ impl WorkerToCoordinator for WorkerToCoordinatorService {

#[derive(Clone, Default)]
struct CoordinatorService {
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
}

Expand All @@ -56,19 +58,24 @@ impl Coordinator for CoordinatorService {
name: String,
content: Vec<u8>,
) -> Result<(), CoordinatorError> {
let id = ModuleId::new(name.clone(), &content);

// TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be gzipping
// once and sending to N clients
// send each uploaded scenario to all of the workers
for (_, worker_entry) in self.workers.lock().await.iter() {
let locked = worker_entry.client.lock();
let mut futures = Vec::new();
futures.push(async {
if let Some(client) = locked.await.as_mut() {
client.upload_scenario(name.clone(), content.clone()).await;
// TODO: handle Result
client.upload_scenario(id.clone(), content.clone()).await;
}
});

join_all(futures).await;
}
self.scenarios.lock().await.insert(name, content);
self.scenarios.lock().await.insert(id, content);

Ok(())
}
Expand Down Expand Up @@ -103,7 +110,7 @@ pub async fn main() {
.parse()
.unwrap();

let original_scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>> = Default::default();
let original_scenarios: Arc<Mutex<HashMap<ModuleId, Vec<u8>>>> = Default::default();
let original_workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>> = Default::default();

let scenarios = original_scenarios.clone();
Expand All @@ -130,8 +137,8 @@ pub async fn main() {

// sent all the current scenarios to a new worker node
let locked = scenarios.lock().await;
for (name, content) in locked.iter() {
let _ = client.upload_scenario(name.clone(), content.clone()).await;
for (id, content) in locked.iter() {
let _ = client.upload_scenario(id.clone(), content.clone()).await;
}
drop(locked);

Expand Down
Loading

0 comments on commit ce3238e

Please sign in to comment.