Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Create Rust iml-state-machine
Browse files Browse the repository at this point in the history
- Use `warp-drive` `Cache` as a realtime singleton to get the current
system state. This acts much the same way as the `job_scheduler`
`ObjectCache` does but instead gets realtime updates from the db instead
of needing to be notified of changes by other processes.
- Use petgraph to build a graph consisting of `State` nodes and `Edge`
edges. `Edge` is an enum that is either a `Transition` or a
`Dependency`. Add some methods via an Extenstion trait to find
transitions / shortest transition paths.
- Create a `Job` trait that can either be invoked directly via a
`Command`, or indirectly via a `Transition`.
- Create a `Steps` struct that holds a list of free fns (much like
action plugins). These steps are run serially within a job.
- Refactor service address bindings to not coopt the nginx proxy host.
- Add an input type for `RecordId`s
- Add a http_client to the graphql context
- Add graphql query and mutation for statemachine

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Oct 29, 2020
1 parent 7cdb969 commit 7689314
Show file tree
Hide file tree
Showing 32 changed files with 1,458 additions and 62 deletions.
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ members = [
'iml-services/iml-snapshot',
'iml-services/iml-stats',
'iml-sfa',
'iml-state-machine',
'iml-system-test-utils',
'iml-systemd',
'iml-task-runner',
Expand Down
11 changes: 10 additions & 1 deletion chroma-manager.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass {{WARP_DRIVE_PROXY_PASS}};
proxy_pass {{WARP_DRIVE_PROXY_PASS}}/messaging;
}

location /state_machine {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Server $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass {{WARP_DRIVE_PROXY_PASS}}/state_machine;
}

location /mailbox {
Expand Down
3 changes: 2 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ services:
- "manager-config:/var/lib/chroma"
- "report:/var/spool/iml/report"
environment:
- PROXY_HOST=iml-api
- PROXY_HOST=nginx
- SERVICE_HOST=iml-api
- RUST_LOG=info,sqlx::query=warn
- BRANDING
- USE_STRATAGEM
Expand Down
21 changes: 18 additions & 3 deletions iml-api/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

mod state_machine;
mod stratagem;
mod task;

Expand All @@ -15,6 +16,7 @@ use futures::{future::join_all, TryFutureExt, TryStreamExt};
use iml_postgres::{
active_mgs_host_fqdn, fqdn_by_host_id, sqlx, sqlx::postgres::types::PgInterval, PgPool,
};
use iml_manager_client::Client;
use iml_rabbit::{ImlRabbitError, Pool};
use iml_wire_types::{
db::LogMessageRecord,
Expand Down Expand Up @@ -139,6 +141,9 @@ impl QueryRoot {
fn task(&self) -> task::TaskQuery {
task::TaskQuery
}
fn state_machine(&self) -> state_machine::StateMachineQuery {
state_machine::StateMachineQuery
}
#[graphql(arguments(
limit(description = "optional paging limit, defaults to all rows"),
offset(description = "Offset into items, defaults to 0"),
Expand Down Expand Up @@ -602,6 +607,9 @@ impl MutationRoot {
fn task(&self) -> task::TaskMutation {
task::TaskMutation
}
fn state_machine(&self) -> state_machine::StateMachineMutation {
state_machine::StateMachineMutation
}
#[graphql(arguments(
fsname(description = "Filesystem to snapshot"),
name(description = "Name of the snapshot"),
Expand Down Expand Up @@ -852,8 +860,14 @@ impl MutationRoot {
.map(|x| x.id);

if let Some(id) = maybe_id {
configure_snapshot_timer(id, fsname, interval.0, use_barrier.unwrap_or_default())
.await?;
configure_snapshot_timer(
context.http_client.clone(),
id,
fsname,
interval.0,
use_barrier.unwrap_or_default(),
)
.await?;
}

Ok(true)
Expand All @@ -866,7 +880,7 @@ impl MutationRoot {
.execute(&context.pg_pool)
.await?;

remove_snapshot_timer(id).await?;
remove_snapshot_timer(context.http_client.clone(), id).await?;

Ok(true)
}
Expand Down Expand Up @@ -963,6 +977,7 @@ pub(crate) type Schema = RootNode<'static, QueryRoot, MutationRoot, EmptySubscri
pub(crate) struct Context {
pub(crate) pg_pool: PgPool,
pub(crate) rabbit_pool: Pool,
pub(crate) http_client: Client,
}

impl juniper::Context for Context {}
Expand Down
150 changes: 150 additions & 0 deletions iml-api/src/graphql/state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright (c) 2020 DDN. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use crate::graphql::Context;
use iml_manager_client::{post, Client, ImlManagerClientError};
use iml_manager_env::get_proxy_url;
use iml_postgres::sqlx;
use iml_wire_types::{
snapshot::{Destroy, Mount, Unmount},
state_machine::{Command, Job, Transition},
warp_drive::{GraphqlRecordId, RecordId},
};

pub(crate) struct StateMachineMutation;

#[juniper::graphql_object(Context = Context)]
impl StateMachineMutation {
/// Run a state_machine `Transition` for a given record
async fn run_transition(
context: &Context,
record_id: GraphqlRecordId,
transition: Transition,
) -> juniper::FieldResult<bool> {
let record_id = RecordId::from(record_id);

let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?;

let mut jobs = vec![];

for x in xs {
match (record_id, x) {
(RecordId::Snapshot(x), Transition::MountSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::MountSnapshotJob(Mount {
fsname: x.filesystem_name,
name: x.snapshot_name,
}));
}
(RecordId::Snapshot(x), Transition::UnmountSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::UnmountSnapshotJob(Unmount {
fsname: x.filesystem_name,
name: x.snapshot_name,
}))
}
(RecordId::Snapshot(x), Transition::RemoveSnapshot) => {
let x = sqlx::query!(
"SELECT filesystem_name, snapshot_name FROM snapshot WHERE id = $1",
x
)
.fetch_one(&context.pg_pool)
.await?;

jobs.push(Job::RemoveSnapshotJob(Destroy {
fsname: x.filesystem_name,
name: x.snapshot_name,
force: true,
}))
}
_ => {}
}
}

let cmd = Command {
message: "Running Transition".to_string(),
jobs,
};

let mut url = get_proxy_url();

url.set_path("state_machine/run_command/");

post(context.http_client.clone(), url.as_str(), cmd)
.await?
.error_for_status()?
.json()
.await?;

Ok(true)
}
}

pub(crate) struct StateMachineQuery;

#[juniper::graphql_object(Context = Context)]
impl StateMachineQuery {
/// Given a record, figure out the possible transitions available for it
async fn get_transitions(
context: &Context,
record_id: GraphqlRecordId,
) -> juniper::FieldResult<Vec<Transition>> {
let mut url = get_proxy_url();

url.set_path("state_machine/get_transitions/");

let xs = post(
context.http_client.clone(),
url.as_str(),
RecordId::from(record_id),
)
.await?
.error_for_status()?
.json()
.await?;

Ok(xs)
}
/// Given a record and transition, figure out the shortest possible path for that
/// Record to reach that transition.
async fn get_transition_path(
context: &Context,
record_id: GraphqlRecordId,
transition: Transition,
) -> juniper::FieldResult<Vec<Transition>> {
let xs = get_transition_path(context.http_client.clone(), record_id, transition).await?;

Ok(xs)
}
}

async fn get_transition_path(
client: Client,
record_id: impl Into<RecordId>,
transition: Transition,
) -> Result<Vec<Transition>, ImlManagerClientError> {
let mut url = get_proxy_url();

url.set_path("state_machine/get_transition_path/");

let xs = post(client, url.as_str(), (record_id.into(), transition))
.await?
.error_for_status()?
.json()
.await?;

Ok(xs)
}
6 changes: 5 additions & 1 deletion iml-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod error;
mod graphql;
mod timer;

use iml_manager_client::get_client;
use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
use iml_rabbit::{self, create_connection_filter};
Expand All @@ -22,7 +23,7 @@ const DEFAULT_POOL_LIMIT: u32 = 5;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();

let addr = iml_manager_env::get_iml_api_addr();
let addr = iml_manager_env::get_iml_api_bind_addr();

let conf = Conf {
allow_anonymous_read: iml_manager_env::get_allow_anonymous_read(),
Expand All @@ -42,6 +43,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let pg_pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;

let http_client = get_client()?;

let schema = Arc::new(graphql::Schema::new(
graphql::QueryRoot,
graphql::MutationRoot,
Expand All @@ -52,6 +55,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = Arc::new(graphql::Context {
pg_pool,
rabbit_pool,
http_client,
});
let ctx_filter = warp::any().map(move || Arc::clone(&ctx));

Expand Down
9 changes: 3 additions & 6 deletions iml-api/src/timer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::ImlApiError;
use iml_manager_client::{delete, get_client, put};
use iml_manager_client::{delete, put, Client};
use iml_manager_env::{get_timer_addr, running_in_docker};
use std::time::Duration;

Expand All @@ -12,6 +12,7 @@ pub struct TimerConfig {
}

pub async fn configure_snapshot_timer(
client: Client,
config_id: i32,
fsname: String,
interval: Duration,
Expand Down Expand Up @@ -73,8 +74,6 @@ ExecStart={}
service_config,
};

let client = get_client()?;

let url = format!("http://{}/configure/", get_timer_addr());
tracing::debug!(
"Sending snapshot interval config to timer service: {:?} {:?}",
Expand All @@ -86,9 +85,7 @@ ExecStart={}
Ok(())
}

pub async fn remove_snapshot_timer(config_id: i32) -> Result<(), ImlApiError> {
let client = get_client()?;

pub async fn remove_snapshot_timer(client: Client, config_id: i32) -> Result<(), ImlApiError> {
delete(
client,
format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,16 @@ server {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass http://127.0.0.1:8890;
proxy_pass http://127.0.0.1:8890/messaging;
}
location /state_machine {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Server $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_pass http://127.0.0.1:8890/state_machine;
}
location /mailbox {
Expand Down
Loading

0 comments on commit 7689314

Please sign in to comment.