Skip to content

Commit

Permalink
A bit of cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
drogus committed Mar 19, 2024
1 parent 1b7cf10 commit 9203bc5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 77 deletions.
2 changes: 0 additions & 2 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub struct ConstantArrivalRateConfig {
pub rate: usize,
pub time_unit: Duration,
pub allocated_vus: usize,
pub maximum_vus: Option<usize>,
}

impl ExecutorConfig for ConstantArrivalRateConfig {
Expand All @@ -45,7 +44,6 @@ impl Default for ConstantArrivalRateConfig {
rate: Default::default(),
time_unit: Duration::from_secs(1),
allocated_vus: Default::default(),
maximum_vus: Default::default(),
}
}
}
42 changes: 14 additions & 28 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,24 @@ use std::pin::Pin;

use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::ops::Deref;

use futures::prelude::*;
use futures::TryStreamExt;
use std::future::Future;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
time::sleep,
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use tokio_serde::formats::SymmetricalJson;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use uuid::Uuid;
use futures::TryStreamExt;

pub use serde;
pub use tokio;
pub mod services;
pub use crows_service;

// ModuleId will be used to distinguish between different versions of modules
// TODO: I don't have time to check the properties of the default Hash implementation
// it should be checked at some point. It's probably good enough, but I would like to confirm
#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
pub struct ModuleId {
name: String,
version: String, // version will be a sha256 hash of the module contents
}

impl ModuleId {
pub fn new(name: String, content: &[u8]) -> Self {
let version = sha256::digest(content);

Self {
name,
version,
}
}
}

pub struct Server {
listener: TcpListener,
}
Expand Down Expand Up @@ -246,6 +222,16 @@ impl Client {
break;
}
} else {
// TODO: at the moment we block a service while executing a
// single request. this is because we allow to use &mut self
// in services and thus with the current implementation it
// would be hard to share the service object. It would be better to
// change it to always be &self and control the interior
// mutability with a lock. we could still allow for &mut
// methods, but only &mut methods would block. Another option
// would be to always use &self and thus require the
// implementation to deal with locking for each attribute that
// needs it
let deserialized = serde_json::from_str::<<T as Service<DummyType>>::Request>(&message.message).unwrap();
let response = service.handle_request(deserialized).await;

Expand Down
12 changes: 6 additions & 6 deletions utils/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{self as utils, ModuleId};
use serde::{Deserialize, Serialize};
use crate::{self as utils};
use crows_service::service;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use num_rational::Rational64;

#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum CoordinatorError {
Expand All @@ -16,7 +15,7 @@ pub enum CoordinatorError {
#[error("Failed to compile module")]
FailedToCompileModule,
#[error("Couldn't fetch config: {0}")]
CouldNotFetchConfig(String)
CouldNotFetchConfig(String),
}

#[derive(Error, Debug, Serialize, Deserialize, Clone)]
Expand All @@ -27,6 +26,8 @@ pub enum WorkerError {
ScenarioNotFound,
#[error("could not create a module from binary")]
CouldNotCreateModule,
#[error("could not create runtime: {0}")]
CouldNotCreateRuntime(String),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -55,7 +56,7 @@ pub trait WorkerToCoordinator {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WorkerStatus {
Available,
Busy
Busy,
}

#[service(variant = "server", other_side = Client)]
Expand All @@ -76,7 +77,6 @@ pub struct WorkerData {
pub trait Worker {
async fn upload_scenario(&mut self, name: String, content: Vec<u8>);
async fn ping(&self) -> String;
// async fn prepare(&mut self, id: ModuleId, concurrency: usize, rate: Rational64) -> Result<RunId, WorkerError>;
async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError>;
async fn get_data(&self) -> WorkerData;
}
Expand Down
19 changes: 3 additions & 16 deletions wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use reqwest::{Body, Request, Url};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::{from_slice, to_vec};
use tokio::time::Instant;
use std::collections::VecDeque;
use std::pin::Pin;
use std::str::FromStr;
Expand Down Expand Up @@ -144,22 +145,6 @@ impl Runtime {
}
}
}

// TODO: it looks like the id/module pair should be in a separate data type, might
// be worth to extract it in the future
// pub fn create_instances(
// &mut self,
// id: RunId,
// count: usize,
// instance: &'a Instance<'_>,
// ) -> Result<(), Error> {
// let instances = self.instances.get_mut(&id).ok_or(Error::NoSuchRun(id))?;
// for _ in (0..count).into_iter() {
// instances.push(instance);
// }
//
// Ok(())
// }
}

pub struct WasiHostCtx {
Expand Down Expand Up @@ -268,9 +253,11 @@ impl WasiHostCtx {

*reqw_req.body_mut() = request.body.map(|b| Body::from(b));

let instant = Instant::now();
let response = client.execute(reqw_req).await.map_err(|err| HTTPError {
message: format!("Error when sending a request: {err:?}"),
})?;
let _duration = instant.elapsed();

let mut headers = HashMap::new();
for (name, value) in response.headers().iter() {
Expand Down
40 changes: 15 additions & 25 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,17 @@ impl Worker for WorkerService {
todo!()
}

// async fn prepare(
// &mut self,
// id: ModuleId,
// concurrency: usize,
// rate: Rational64,
// ) -> Result<RunId, WorkerError> {
// let run_id: RunId = RunId::new();
//
// // TODO: we should check if we have a given module available and if not ask coordinator
// // to send it. For now let's assume we have the module id
// let info = RunInfo::new(run_id.clone(), concurrency, rate, id);
// self.runs.insert(run_id.clone(), info);
//
// Ok(run_id)
// }

async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError> {
// PLAN
// either pass as an argument or fetch Executor::Config?
let locked = self.scenarios.read().await;
let scenario = locked
.get(&name)
.ok_or(WorkerError::ScenarioNotFound)?
.clone();
drop(locked);

// TODO: remove unwrap
let runtime = Runtime::new(&scenario).unwrap();
let mut executor = Executors::get_executor(config, runtime).await;
let runtime = Runtime::new(&scenario)
.map_err(|err| WorkerError::CouldNotCreateRuntime(err.to_string()))?;
let mut executor = Executors::create_executor(config, runtime).await;
// TODO: prepare should be an entirely separate step and coordinator should wait for
// prepare from all of the workers
executor.prepare().await;
Expand All @@ -102,9 +84,6 @@ impl Worker for WorkerService {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: allow to set the number of CPUs
let cpus = num_cpus::get();

let coordinator_address: String =
std::env::var("COORDINATOR_ADDRESS").unwrap_or("127.0.0.1:8181".into());
let hostname: String = std::env::var("WORKER_NAME").unwrap();
Expand Down Expand Up @@ -142,7 +121,7 @@ enum Executors {
}

impl Executors {
pub async fn get_executor(config: Config, runtime: Runtime) -> Self {
pub async fn create_executor(config: Config, runtime: Runtime) -> Self {
match config {
Config::ConstantArrivalRate(config) => {
Executors::ConstantArrivalRateExecutor(ConstantArrivalRateExecutor {
Expand Down Expand Up @@ -175,6 +154,9 @@ struct ConstantArrivalRateExecutor {
runtime: Runtime,
}

// TODO: k6 supports an option to set maximum number of VUs. For now
// I haven't bothered to implement any limits, but it might be useful for bigger
// tests maybe?
impl Executor for ConstantArrivalRateExecutor {
async fn run(&mut self) -> anyhow::Result<()> {
let rate_per_second = self.config.rate as f64 / self.config.time_unit.as_secs_f64();
Expand All @@ -188,8 +170,16 @@ impl Executor for ConstantArrivalRateExecutor {
eprintln!("An error occurred while running a scenario: {err:?}");
}
});
// TODO: at the moment we always sleep for a calculated amount of time
// This may be wrong, especially when duration is very low, because
// with a very high request rate the time needed to spawn a task may
// be substantial enough to delay execution. So technically we should
// calculate how much time passed since sending the previous request and
// only sleep for the remaining duration
tokio::time::sleep(sleep_duration).await;

// TODO: wait for all of the allocated instances finish, ie. implement
// "graceful stop"
if instant.elapsed() > self.config.duration {
return Ok(());
}
Expand Down

0 comments on commit 9203bc5

Please sign in to comment.