Skip to content

Commit

Permalink
Feat : alerts module (#95)
Browse files Browse the repository at this point in the history
* feat : alerts module init

* feat : added alerts in jobs/mod.rs

* feat : updated messages

* feat : updated tests with alerts module mock

* changelog : update

* feat : added additional alerts

* feat : added test for alerts and added logs at queue level

* refactor : removed redundant alerts

* feat : updated workflow

* feat : resolved comments

* feat : added all alerts for queues

* feat : updated code

* remove queue url const

* feat : tests and lint fixes

---------

Co-authored-by: apoorvsadana <95699312+apoorvsadana@users.noreply.github.com>
  • Loading branch information
ocdbytes and apoorvsadana authored Aug 30, 2024
1 parent 77bf9c6 commit f84ba41
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 24 deletions.
3 changes: 1 addition & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ MONGODB_CONNECTION_STRING=
PROVER_SERVICE=
SHARP_CUSTOMER_ID=
SHARP_URL=
# [IMP!!!] These are test certificates (they don't work)
SHARP_USER_CRT=
SHARP_USER_KEY=
SHARP_SERVER_CRT=
Expand All @@ -48,4 +47,4 @@ MADARA_RPC_URL=
MEMORY_PAGES_CONTRACT_ADDRESS=
PRIVATE_KEY=
ETHEREUM_PRIVATE_KEY=
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS=
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS=
23 changes: 19 additions & 4 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY"
AWS_REGION="us-east-1"
AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566"
AWS_DEFAULT_REGION="localhost"

##### STORAGE #####

Expand All @@ -20,8 +21,14 @@ AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket"
QUEUE_PROVIDER="sqs"
SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue"
SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue"
SQS_JOB_HANDLE_FAILURE_QUEUE_URL=
SQS_WORKER_TRIGGER_QUEUE_URL=
SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_handle_failure_queue"
SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_worker_trigger_queue"

##### SNS #####
ALERTS="sns"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

##### DATABASE #####

Expand All @@ -46,7 +53,15 @@ SETTLEMENT_LAYER="ethereum"
SETTLEMENT_RPC_URL="http://localhost:3001"
MADARA_RPC_URL="http://localhost:3000"
L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
MEMORY_PAGES_CONTRACT_ADDRESS="0x000000000000000000000000000000000001dead"
MEMORY_PAGES_CONTRACT_ADDRESS="0x47312450B3Ac8b5b8e247a6bB6d523e7605bDb60"
PRIVATE_KEY="0xdead"
ETHEREUM_PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0x000000000000000000000000000000000002dead"
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81"
STARKNET_OPERATOR_ADDRESS="0x2C169DFe5fBbA12957Bdd0Ba47d9CEDbFE260CA7"
ETHEREUM_BLAST_RPC_URL="https://eth-mainnet.public.blastapi.io"

##### E2E test vars #####

L2_BLOCK_NUMBER_FOR_TEST=671070
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
localstack:
image: localstack/localstack
env:
SERVICES: s3, sqs
SERVICES: s3, sqs, sns
DEFAULT_REGION: us-east-1
AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY"
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- alerts module.
- Tests for Settlement client.
- Worker queues to listen for trigger events.
- Tests for prover client.
Expand Down
41 changes: 33 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-std = "1.12.0"
async-trait = { workspace = true }
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1.36.0"
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::alerts::Alerts;
use async_trait::async_trait;
use aws_sdk_sns::config::Region;
use aws_sdk_sns::Client;
use utils::env_utils::get_env_var_or_panic;

pub struct AWSSNS {
client: Client,
}

impl AWSSNS {
/// To create a new SNS client
pub async fn new() -> Self {
let sns_region = get_env_var_or_panic("AWS_SNS_REGION");
let config = aws_config::from_env().region(Region::new(sns_region)).load().await;
AWSSNS { client: Client::new(&config) }
}
}

#[async_trait]
impl Alerts for AWSSNS {
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> {
let topic_arn = get_env_var_or_panic("AWS_SNS_ARN");
self.client.publish().topic_arn(topic_arn).message(message_body).send().await?;
Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use async_trait::async_trait;
use mockall::automock;

pub mod aws_sns;

#[automock]
#[async_trait]
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
}
32 changes: 30 additions & 2 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
Expand Down Expand Up @@ -43,6 +45,8 @@ pub struct Config {
queue: Box<dyn QueueProvider>,
/// Storage client
storage: Box<dyn DataStorage>,
/// Alerts client
alerts: Box<dyn Alerts>,
}

/// Initializes the app config
Expand Down Expand Up @@ -74,11 +78,23 @@ pub async fn init_config() -> Config {

let storage_client = build_storage_client(&aws_config).await;

Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client)
let alerts_client = build_alert_client().await;

Config::new(
Arc::new(provider),
da_client,
prover_client,
settlement_client,
database,
queue,
storage_client,
alerts_client,
)
}

impl Config {
/// Create a new config
#[allow(clippy::too_many_arguments)]
pub fn new(
starknet_client: Arc<JsonRpcClient<HttpTransport>>,
da_client: Box<dyn DaClient>,
Expand All @@ -87,8 +103,9 @@ impl Config {
database: Box<dyn Database>,
queue: Box<dyn QueueProvider>,
storage: Box<dyn DataStorage>,
alerts: Box<dyn Alerts>,
) -> Self {
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage }
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts }
}

/// Returns the starknet client
Expand Down Expand Up @@ -125,6 +142,11 @@ impl Config {
pub fn storage(&self) -> &dyn DataStorage {
self.storage.as_ref()
}

/// Returns the alerts client
pub fn alerts(&self) -> &dyn Alerts {
self.alerts.as_ref()
}
}

/// The app config. It can be accessed from anywhere inside the service.
Expand Down Expand Up @@ -191,6 +213,12 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box<dyn DataStorage
}
}

pub async fn build_alert_client() -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new().await),
_ => panic!("Unsupported Alert Client"),
}
}
pub fn build_queue_client(_aws_config: &SdkConfig) -> Box<dyn QueueProvider + Send + Sync> {
match get_env_var_or_panic("QUEUE_PROVIDER").as_str() {
"sqs" => Box::new(SqsQueue {}),
Expand Down
7 changes: 3 additions & 4 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ pub async fn create_job(
}

let job_handler = factory::get_job_handler(&job_type).await;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?;

config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
Expand Down Expand Up @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> {

let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.as_ref(), &mut job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

// Fetching the job again because update status above will update the job version
Expand Down Expand Up @@ -239,16 +241,13 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> {
);
add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
} else {
// TODO: send alert
}
}
JobVerificationStatus::Pending => {
log::info!("Inclusion is still pending for job {}. Pushing back to queue.", job.id);
let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
if verify_attempts >= job_handler.max_verification_attempts() {
// TODO: send alert
log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id);
config
.database()
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
Expand Down
14 changes: 14 additions & 0 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ where
}
Err(e) => {
log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);
config()
.await
.alerts()
.send_alert_message(e.to_string())
.await
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;

match message.nack().await {
Ok(_) => Err(ConsumptionError::FailedToHandleJob {
job_id: job_message.id,
Expand Down Expand Up @@ -201,6 +208,13 @@ where
}
Err(e) => {
log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e);
config()
.await
.alerts()
.send_alert_message(e.to_string())
.await
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;

message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?;
Err(ConsumptionError::FailedToSpawnWorker {
worker_trigger_type: job_message.worker,
Expand Down
Loading

0 comments on commit f84ba41

Please sign in to comment.