Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Create Rust iml-state-machine
Browse files Browse the repository at this point in the history
WIP

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Oct 16, 2020
1 parent 066aea3 commit 12870dd
Show file tree
Hide file tree
Showing 17 changed files with 690 additions and 168 deletions.
320 changes: 189 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ members = [
'iml-services/iml-snapshot',
'iml-services/iml-stats',
'iml-sfa',
'iml-state-machine',
'iml-system-test-utils',
'iml-systemd',
'iml-task-runner',
Expand Down
2 changes: 1 addition & 1 deletion chroma-manager.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass {{WARP_DRIVE_PROXY_PASS}};
proxy_pass {{WARP_DRIVE_PROXY_PASS}}/messaging;
}

location /mailbox {
Expand Down
13 changes: 13 additions & 0 deletions iml-state-machine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
authors = ["IML Team <iml@whamcloud.com>"]
edition = "2018"
name = "iml-state-machine"
version = "0.1.0"

[dependencies]
futures = "0.3"
iml-wire-types = {path = "../iml-wire-types", version = "0.4"}
petgraph = "0.5"
serde = {version = "1", features = ["derive"]}
serde_json = "1"
thiserror = "1.0"
27 changes: 27 additions & 0 deletions iml-state-machine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# IML State Machine Model

```
┌────────────────────────────────────────────┐
│ │
│ Client mount │
│ │
│ │
└────────────────────────────────────────────┘
┌───────Depends on─────┴────────Depends On─────────┐
│ │
│ │
│ │
│ │
▼ ▼
┌────────────────────────────────────────────┐ ┌────────────────────────────────────────────┐
│ │ │ │
│ LNet │ │ Filesystem │
│ │ │ │
│ │ │ │
└────────────────────────────────────────────┘ └────────────────────────────────────────────┘
```
160 changes: 160 additions & 0 deletions iml-state-machine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) 2020 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

mod snapshot;

use futures::{stream, StreamExt};
use iml_wire_types::{
state_machine::Transition,
warp_drive::{Cache, RecordId},
};
use petgraph::{
algo::astar,
graph::{DiGraph, NodeIndex},
prelude::*,
Direction,
};
use std::collections::HashSet;

trait GraphExt<N: Eq + PartialEq, E: Eq + PartialEq> {
fn find_node_idx(&self, x: &N) -> Option<NodeIndex>;
}

impl<N: Eq + PartialEq, E: Eq + PartialEq> GraphExt<N, E> for Graph<N, E> {
fn find_node_idx(&self, x: &N) -> Option<NodeIndex> {
self.node_indices().find(|i| &self[*i] == x)
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("State Not Found")]
NotFound,
}

pub fn get_available_transitions(id: RecordId, cache: &Cache) -> Option<HashSet<Transition>> {
match id {
RecordId::Snapshot(id) => {
let snap = cache.snapshot.get(&id)?;

let (g, node_idx) = snapshot::get_graph_state(&snap)?;

let g2 = get_directed_subgraph(&g, node_idx, Direction::Outgoing);

let xs: HashSet<_> = g2
.raw_edges()
.iter()
.map(|x| Transition::Snapshot(x.weight))
.collect();

Some(xs)
}
_ => None,
}
}

pub fn get_transition_path(id: RecordId, cache: &Cache, x: Transition) -> Option<Vec<Transition>> {
match (id, x) {
(RecordId::Snapshot(id), Transition::Snapshot(x)) => {
let snap = cache.snapshot.get(&id)?;

let (g, node_idx) = snapshot::get_graph_state(&snap)?;

let xs = astar(
&g,
node_idx,
|finish| {
g.edges_directed(finish, Direction::Incoming)
.any(|edge| edge.weight() == &x)
},
|_| 1,
|_| 0,
)?
.1;

let xs = xs.iter().zip(xs.iter().skip(1)).collect::<Vec<_>>();

let mut out = vec![];

for (a, b) in xs {
let e = g.find_edge(*a, *b)?;

let edge = *&g[e];

out.push(Transition::Snapshot(edge));
}

Some(out)
}
_ => None,
}
}

async fn run_transition(
id: RecordId,
cache: &Cache,
x: Transition,
) -> Option<Result<Vec<Transition>, Error>> {
match (id, x) {
(RecordId::Snapshot(id), Transition::Snapshot(t)) => {
let xs = get_transition_path(RecordId::Snapshot(id), cache, x)?;

let snap = cache.snapshot.get(&id)?;

let xs = stream::iter(xs)
.fold(vec![], |mut acc, x| async move {
let r = snapshot::update(snap, t).await;

acc.push(r);

acc
})
.await;
}
_ => {}
}

None
}

fn get_directed_subgraph<N: Copy, E: Copy>(
graph: &DiGraph<N, E>,
root: NodeIndex,
dir: petgraph::Direction,
) -> DiGraph<N, E> {
let mut subgraph = DiGraph::new();
let mut dfs = Dfs::new(&graph, root);
let mut nodes = std::collections::HashMap::new();

while let Some(node) = dfs.next(&graph) {
let mut neighbors = graph.neighbors_directed(node, dir).detach();

while let Some(edge) = neighbors.next_edge(&graph) {
let (g_from, g_to) = graph.edge_endpoints(edge).unwrap();

if g_to == root {
continue;
}

if !nodes.contains_key(&g_from.index()) {
let n = subgraph.add_node(*&graph[g_from]);
nodes.insert(g_from.index(), n);
}

if !nodes.contains_key(&g_to.index()) {
let n = subgraph.add_node(*&graph[g_to]);
nodes.insert(g_to.index(), n);
}

let s_from = nodes.get(&g_from.index()).unwrap();
let s_to = nodes.get(&g_to.index()).unwrap();

let edge = *&graph[edge];

subgraph.add_edge(*s_from, *s_to, edge);
}
}

subgraph
}
88 changes: 88 additions & 0 deletions iml-state-machine/src/lnet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use futures::{Future, FutureExt};
use petgraph::graph::DiGraph;
use std::{ io, pin::Pin};

pub enum LnetStates {
Unconfigured,
Unloaded,
Down,
Up,
}

impl Default for LnetStates {
fn default() -> Self {
Self::Unconfigured
}
}

impl LnetStates {
fn step(self, next: &Self) {
match (self, next) {
(Self::Unconfigured, Self::Unloaded) => {}
(Self::Unloaded, Self::Down) => {}
(Self::Down, Self::Up) => {}
(Self::Up, Self::Down) => {}
(Self::Down, Self::Unloaded) => {}
(Self::Unloaded, Self::Unconfigured) => {}
_ => {}
};
}
}

async fn configure() -> Result<(), io::Error> {
Ok(())
}

async fn load() -> Result<(), io::Error> {
Ok(())
}

async fn start() -> Result<(), io::Error> {
Ok(())
}

async fn stop() -> Result<(), io::Error> {
Ok(())
}

async fn unload() -> Result<(), io::Error> {
Ok(())
}

async fn unconfigure() -> Result<(), io::Error> {
Ok(())
}

type BoxedFuture = Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>>;

type Transition = Box<dyn Fn() -> BoxedFuture + Send + Sync>;

fn mk_transition<Fut>(f: fn() -> Fut) -> Transition
where
Fut: Future<Output = Result<(), io::Error>> + Send + 'static,
{
Box::new(move || f().boxed())
}

fn build_graph() -> DiGraph::<LnetStates, Transition> {
let mut deps = DiGraph::<LnetStates, Transition>::new();

let unconfigured = deps.add_node(LnetStates::Unconfigured);
let unloaded = deps.add_node(LnetStates::Unloaded);
let down = deps.add_node(LnetStates::Down);
let up = deps.add_node(LnetStates::Up);

deps.add_edge(unconfigured, unloaded, mk_transition(configure));
deps.add_edge(unloaded, down, mk_transition(load));
deps.add_edge(down, up, mk_transition(start));
deps.add_edge(up, down, mk_transition(stop));
deps.add_edge(down, unloaded, mk_transition(unload));
deps.add_edge(unloaded, unconfigured, mk_transition(unconfigure));

deps

}
53 changes: 53 additions & 0 deletions iml-state-machine/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2020 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use crate::{Error, GraphExt};
use futures::{Future, FutureExt};
use iml_wire_types::{
snapshot::SnapshotRecord,
state_machine::snapshot::{State, Transition},
warp_drive::Cache,
};
use petgraph::{graph::DiGraph, graph::NodeIndex, graphmap::DiGraphMap, Graph};

pub async fn update(x: &SnapshotRecord, t: Transition) {
match t {
Transition::Mount => {}
Transition::Unmount => {}
Transition::Remove => {}
}
}

pub fn get_state(x: &SnapshotRecord) -> State {
if x.mounted == Some(true) {
State::Mounted
} else if x.mounted == Some(false) {
State::Unmounted
} else {
State::Unmounted
}
}

pub fn get_graph_state(x: &SnapshotRecord) -> Option<(DiGraph<State, Transition>, NodeIndex)> {
let state = get_state(x);

let g = build_graph();

g.find_node_idx(&state).map(move |x| (g, x))
}

pub fn build_graph() -> DiGraph<State, Transition> {
let mut deps = DiGraph::<State, Transition>::new();

let unmounted = deps.add_node(State::Unmounted);
let mounted = deps.add_node(State::Mounted);
let removed = deps.add_node(State::Removed);

deps.add_edge(unmounted, mounted, Transition::Mount);
deps.add_edge(mounted, unmounted, Transition::Unmount);
deps.add_edge(unmounted, removed, Transition::Remove);
deps.add_edge(mounted, removed, Transition::Remove);

deps
}
1 change: 1 addition & 0 deletions iml-warp-drive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ iml-manager-client = {path = "../iml-manager-client", version = "0.4"}
iml-manager-env = {path = "../iml-manager-env", version = "0.4"}
iml-postgres = {path = "../iml-postgres", version = "0.4"}
iml-rabbit = {path = "../iml-rabbit", version = "0.4"}
iml-state-machine = {path = "../iml-state-machine", version = "0.1"}
iml-tracing = {version = "0.3", path = "../iml-tracing"}
iml-wire-types = {path = "../iml-wire-types", version = "0.4", features = ["postgres-interop"]}
serde = {version = "1", features = ["derive"]}
Expand Down
2 changes: 2 additions & 0 deletions iml-warp-drive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod db_record;
pub mod error;
pub mod listen;
pub mod locks;
pub mod messaging;
pub mod request;
pub mod state_machine;
pub mod users;

pub use db_record::*;
5 changes: 4 additions & 1 deletion iml-warp-drive/src/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
// license that can be found in the LICENSE file.

use crate::request::Request;
use futures::{Stream, TryStreamExt};
use futures::{lock::Mutex, Stream, TryStreamExt};
use im::{HashMap, HashSet};
use iml_rabbit::{
basic_consume, basic_publish, bind_queue, declare_transient_exchange, declare_transient_queue,
message::Delivery, purge_queue, BasicConsumeOptions, Channel, ExchangeKind, ImlRabbitError,
Queue,
};
use iml_wire_types::{LockAction, LockChange, ToCompositeId};
use std::sync::Arc;

pub type SharedLocks = Arc<Mutex<Locks>>;

/// Declares the exchange for rpc comms
async fn declare_rpc_exchange(c: &Channel) -> Result<(), ImlRabbitError> {
Expand Down
Loading

0 comments on commit 12870dd

Please sign in to comment.