From e3370592c37ed4823ba0bdff1022bfdba2dfe44e Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Tue, 20 Aug 2024 18:12:24 +0530 Subject: [PATCH 01/14] feat : alerts module init --- .env.example | 4 ++ .env.test | 2 + Cargo.lock | 41 +++++++++++++++---- crates/orchestrator/Cargo.toml | 1 + crates/orchestrator/src/alerts/aws_sns/mod.rs | 27 ++++++++++++ crates/orchestrator/src/alerts/mod.rs | 9 ++++ crates/orchestrator/src/config.rs | 21 +++++++++- crates/orchestrator/src/lib.rs | 2 + 8 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 crates/orchestrator/src/alerts/aws_sns/mod.rs create mode 100644 crates/orchestrator/src/alerts/mod.rs diff --git a/.env.example b/.env.example index 259562b5..832658bb 100644 --- a/.env.example +++ b/.env.example @@ -33,3 +33,7 @@ SQS_JOB_VERIFICATION_QUEUE_URL= # S3 AWS_S3_BUCKET_NAME= AWS_S3_BUCKET_REGION= + +# SNS +AWS_SNS_REGION= +AWS_SNS_ARN= \ No newline at end of file diff --git a/.env.test b/.env.test index 586dbcbc..2b47d060 100644 --- a/.env.test +++ b/.env.test @@ -8,6 +8,8 @@ AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" AWS_DEFAULT_REGION="localhost" +AWS_SNS_REGION="us-east-1" +AWS_SNS_ARN= ##### On chain config ##### diff --git a/Cargo.lock b/Cargo.lock index bd538040..02845ac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1647,9 +1647,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" +checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -1663,6 +1663,7 @@ dependencies = [ "fastrand 2.1.0", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -1704,6 +1705,29 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sns" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab2b9787b8d9d3094ace9585e785079cfc583199ec620ab067b599e8850c1a6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sqs" version = "1.36.0" @@ -1907,9 +1931,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.2" +version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" +checksum = "0abbf454960d0db2ad12684a1640120e7557294b0ff8e2f11236290a1b293225" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1934,9 +1958,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1951,9 +1975,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "6cee7cadb433c781d3299b916fbf620fea813bf38f49db282fb6858141a05cc8" dependencies = [ "base64-simd", "bytes", @@ -6321,6 +6345,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-sdk-sns", "aws-sdk-sqs", "axum 0.7.5", "axum-macros", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index e5121529..1d67b237 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -19,6 +19,7 @@ async-std = "1.12.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } +aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } aws-sdk-sqs = "1.36.0" axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs new file mode 100644 index 00000000..401ba7f2 --- /dev/null +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -0,0 +1,27 @@ +use async_trait::async_trait; +use crate::alerts::Alerts; +use aws_sdk_sns::config::Region; +use aws_sdk_sns::Client; +use utils::env_utils::get_env_var_or_panic; + +pub struct AWSSNS { + client: Client, +} + +impl AWSSNS { + /// To create a new SNS client + pub async fn new() -> Self { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + AWSSNS { client: Client::new(&config) } + } +} + +#[async_trait] +impl Alerts for AWSSNS { + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> { + let topic_arn = get_env_var_or_panic("AWS_SNS_ARN"); + self.client.publish().topic_arn(topic_arn).message(message_body).send().await?; + Ok(()) + } +} diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs new file mode 100644 index 00000000..9b21e124 --- /dev/null +++ b/crates/orchestrator/src/alerts/mod.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; + +pub mod aws_sns; + +#[async_trait] +pub trait Alerts: Send + Sync { + /// To send an alert message to our alert service + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; +} diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 447b25ec..ba4647c5 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -18,6 +18,8 @@ use tokio::sync::OnceCell; use utils::env_utils::get_env_var_or_panic; use utils::settings::default::DefaultSettingsProvider; use utils::settings::SettingsProvider; +use crate::alerts::Alerts; +use crate::alerts::aws_sns::AWSSNS; use crate::database::mongodb::config::MongoDbConfig; use crate::database::mongodb::MongoDb; @@ -42,6 +44,8 @@ pub struct Config { queue: Box, /// Storage client storage: Box, + /// Alerts client + alerts: Box, } /// Initializes the app config @@ -66,8 +70,10 @@ pub async fn init_config() -> Config { let prover_client = build_prover_service(&settings_provider); let storage_client = build_storage_client().await; + + let alerts_client = build_alert_client().await; - Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client) + Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client, alerts_client) } impl Config { @@ -80,8 +86,9 @@ impl Config { database: Box, queue: Box, storage: Box, + alerts: Box ) -> Self { - Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage } + Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts } } /// Returns the starknet client @@ -118,6 +125,9 @@ impl Config { pub fn storage(&self) -> &dyn DataStorage { self.storage.as_ref() } + + /// Returns the alerts client + pub fn alerts(&self) -> &dyn Alerts { self.alerts.as_ref() } } /// The app config. It can be accessed from anywhere inside the service. @@ -183,3 +193,10 @@ pub async fn build_storage_client() -> Box { _ => panic!("Unsupported Storage Client"), } } + +pub async fn build_alert_client() -> Box { + match get_env_var_or_panic("ALERTS").as_str() { + "sns" => Box::new(AWSSNS::new().await), + _ => panic!("Unsupported Alert Client") + } +} diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 3d02378b..12a464de 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -20,3 +20,5 @@ pub mod routes; pub mod tests; /// Contains workers which act like cron jobs pub mod workers; +/// Contains the trait implementations for alerts +pub mod alerts; \ No newline at end of file From b944033a59c93368415580e0e4b48d09c9d69217 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 22 Aug 2024 00:56:14 +0530 Subject: [PATCH 02/14] feat : added alerts in jobs/mod.rs --- crates/orchestrator/src/alerts/aws_sns/mod.rs | 2 +- crates/orchestrator/src/config.rs | 30 +++++++---- crates/orchestrator/src/jobs/mod.rs | 51 ++++++++++++++++++- crates/orchestrator/src/lib.rs | 4 +- 4 files changed, 73 insertions(+), 14 deletions(-) diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs index 401ba7f2..9aa95f9a 100644 --- a/crates/orchestrator/src/alerts/aws_sns/mod.rs +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -1,5 +1,5 @@ -use async_trait::async_trait; use crate::alerts::Alerts; +use async_trait::async_trait; use aws_sdk_sns::config::Region; use aws_sdk_sns::Client; use utils::env_utils::get_env_var_or_panic; diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index ba4647c5..67a9b5a1 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType}; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; @@ -18,8 +20,6 @@ use tokio::sync::OnceCell; use utils::env_utils::get_env_var_or_panic; use utils::settings::default::DefaultSettingsProvider; use utils::settings::SettingsProvider; -use crate::alerts::Alerts; -use crate::alerts::aws_sns::AWSSNS; use crate::database::mongodb::config::MongoDbConfig; use crate::database::mongodb::MongoDb; @@ -70,14 +70,24 @@ pub async fn init_config() -> Config { let prover_client = build_prover_service(&settings_provider); let storage_client = build_storage_client().await; - + let alerts_client = build_alert_client().await; - Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client, alerts_client) + Config::new( + Arc::new(provider), + da_client, + prover_client, + settlement_client, + database, + queue, + storage_client, + alerts_client, + ) } impl Config { /// Create a new config + #[allow(clippy::too_many_arguments)] pub fn new( starknet_client: Arc>, da_client: Box, @@ -86,7 +96,7 @@ impl Config { database: Box, queue: Box, storage: Box, - alerts: Box + alerts: Box, ) -> Self { Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts } } @@ -125,9 +135,11 @@ impl Config { pub fn storage(&self) -> &dyn DataStorage { self.storage.as_ref() } - + /// Returns the alerts client - pub fn alerts(&self) -> &dyn Alerts { self.alerts.as_ref() } + pub fn alerts(&self) -> &dyn Alerts { + self.alerts.as_ref() + } } /// The app config. It can be accessed from anywhere inside the service. @@ -195,8 +207,8 @@ pub async fn build_storage_client() -> Box { } pub async fn build_alert_client() -> Box { - match get_env_var_or_panic("ALERTS").as_str() { + match get_env_var_or_panic("ALERTS").as_str() { "sns" => Box::new(AWSSNS::new().await), - _ => panic!("Unsupported Alert Client") + _ => panic!("Unsupported Alert Client"), } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e4ae1ed6..7ecb256d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -135,6 +135,14 @@ pub async fn create_job( .await .map_err(|e| JobError::Other(OtherError(e)))?; if existing_job.is_some() { + config + .alerts() + .send_alert_message(format!( + "[create_job:src/jobs/mod.rs] Job already exists. Internal ID : {:?}, Metadata : {:?}", + internal_id, metadata + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; return Err(JobError::JobAlreadyExists { internal_id, job_type }); } @@ -159,6 +167,14 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { log::info!("Processing job with id {:?}", id); } _ => { + config + .alerts() + .send_alert_message(format!( + "[process_job:src/jobs/mod.rs] Job invalid status. Job ID : {:?}, Job Status : {:?}", + id, job.status + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; return Err(JobError::InvalidStatus { id, job_status: job.status }); } } @@ -226,6 +242,14 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config.database().update_job(&new_job).await.map_err(|e| JobError::Other(OtherError(e)))?; + config + .alerts() + .send_alert_message(format!( + "[verify_job:src/jobs/mod.rs] Verification failed for job. Job ID : {:?}", + id + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; log::error!("Verification failed for job with id {:?}. Cannot verify.", id); // retry job processing if we haven't exceeded the max limit @@ -237,10 +261,26 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { job.id, process_attempts + 1 ); + config + .alerts() + .send_alert_message(format!( + "[verify_job:src/jobs/mod.rs] Verification failed for job {}. Retrying processing attempt {}.", + job.id, + process_attempts + 1 + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } else { - // TODO: send alert + config + .alerts() + .send_alert_message(format!( + "[verify_job:src/jobs/mod.rs] Verification failed for job {}. Total attempts made for verifying : {}.", + job.id, process_attempts + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; } } JobVerificationStatus::Pending => { @@ -248,7 +288,14 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { - // TODO: send alert + config + .alerts() + .send_alert_message(format!( + "[verify_job:src/jobs/mod.rs] Verification attempts exceeded for job. Job ID {}", + job.id + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config .database() diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 12a464de..4212f381 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +/// Contains the trait implementations for alerts +pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. pub mod config; pub mod constants; @@ -20,5 +22,3 @@ pub mod routes; pub mod tests; /// Contains workers which act like cron jobs pub mod workers; -/// Contains the trait implementations for alerts -pub mod alerts; \ No newline at end of file From 645a6eacf048859680a833afbc9391b7d50e29fd Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 22 Aug 2024 14:04:53 +0530 Subject: [PATCH 03/14] feat : updated messages --- crates/orchestrator/src/jobs/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 7ecb256d..b63f5d0b 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -138,7 +138,7 @@ pub async fn create_job( config .alerts() .send_alert_message(format!( - "[create_job:src/jobs/mod.rs] Job already exists. Internal ID : {:?}, Metadata : {:?}", + "Error in creating job. Job already exists. Internal ID : {:?}, Metadata : {:?}", internal_id, metadata )) .await @@ -170,7 +170,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { config .alerts() .send_alert_message(format!( - "[process_job:src/jobs/mod.rs] Job invalid status. Job ID : {:?}, Job Status : {:?}", + "Error in processing job. Job invalid status. Job ID : {:?}, Job Status : {:?}", id, job.status )) .await @@ -245,7 +245,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config .alerts() .send_alert_message(format!( - "[verify_job:src/jobs/mod.rs] Verification failed for job. Job ID : {:?}", + "Verification failed for job. Job ID : {:?}", id )) .await @@ -264,7 +264,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config .alerts() .send_alert_message(format!( - "[verify_job:src/jobs/mod.rs] Verification failed for job {}. Retrying processing attempt {}.", + "Verification failed for job {}. Retrying processing attempt {}.", job.id, process_attempts + 1 )) @@ -276,7 +276,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config .alerts() .send_alert_message(format!( - "[verify_job:src/jobs/mod.rs] Verification failed for job {}. Total attempts made for verifying : {}.", + "Verification failed for job {}. Total attempts made for verifying : {}.", job.id, process_attempts )) .await @@ -291,7 +291,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config .alerts() .send_alert_message(format!( - "[verify_job:src/jobs/mod.rs] Verification attempts exceeded for job. Job ID {}", + "Verification attempts exceeded for job. Job ID {}", job.id )) .await From af9b7b65e0918d920df848473d79970584f9fc6b Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 22 Aug 2024 16:05:06 +0530 Subject: [PATCH 04/14] feat : updated tests with alerts module mock --- .env.test | 1 + crates/orchestrator/src/alerts/mod.rs | 2 ++ crates/orchestrator/src/jobs/mod.rs | 10 ++---- crates/orchestrator/src/tests/config.rs | 18 +++++++++- crates/orchestrator/src/tests/jobs/mod.rs | 41 +++++++++++++++++++++-- 5 files changed, 60 insertions(+), 12 deletions(-) diff --git a/.env.test b/.env.test index 2b47d060..6d200271 100644 --- a/.env.test +++ b/.env.test @@ -26,4 +26,5 @@ DA_LAYER="ethereum" PROVER_SERVICE="sharp" SETTLEMENT_LAYER="ethereum" DATA_STORAGE="s3" +ALERTS="sns" MONGODB_CONNECTION_STRING="mongodb://localhost:27017" diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs index 9b21e124..1e36129d 100644 --- a/crates/orchestrator/src/alerts/mod.rs +++ b/crates/orchestrator/src/alerts/mod.rs @@ -1,7 +1,9 @@ use async_trait::async_trait; +use mockall::automock; pub mod aws_sns; +#[automock] #[async_trait] pub trait Alerts: Send + Sync { /// To send an alert message to our alert service diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index b63f5d0b..85d0ac2f 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -244,10 +244,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config .alerts() - .send_alert_message(format!( - "Verification failed for job. Job ID : {:?}", - id - )) + .send_alert_message(format!("Verification failed for job. Job ID : {:?}", id)) .await .map_err(|e| JobError::Other(OtherError(e)))?; log::error!("Verification failed for job with id {:?}. Cannot verify.", id); @@ -290,10 +287,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { if verify_attempts >= job_handler.max_verification_attempts() { config .alerts() - .send_alert_message(format!( - "Verification attempts exceeded for job. Job ID {}", - job.id - )) + .send_alert_message(format!("Verification attempts exceeded for job. Job ID {}", job.id)) .await .map_err(|e| JobError::Other(OtherError(e)))?; log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 21510375..0e32a48f 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use crate::config::{build_da_client, build_prover_service, build_settlement_client, config_force_init, Config}; +use crate::config::{ + build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, Config, +}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; use httpmock::MockServer; +use crate::alerts::Alerts; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; @@ -39,6 +42,8 @@ pub struct TestConfigBuilder { queue: Option>, /// Storage client storage: Option>, + /// Alerts client + alerts: Option>, } impl Default for TestConfigBuilder { @@ -58,6 +63,7 @@ impl TestConfigBuilder { database: None, queue: None, storage: None, + alerts: None, } } @@ -96,6 +102,11 @@ impl TestConfigBuilder { self } + pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { + self.alerts = Some(alerts); + self + } + pub async fn build(mut self) -> MockServer { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); @@ -132,6 +143,10 @@ impl TestConfigBuilder { } } + if self.alerts.is_none() { + self.alerts = Some(build_alert_client().await); + } + // Deleting and Creating the queues in sqs. create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database @@ -150,6 +165,7 @@ impl TestConfigBuilder { self.database.unwrap(), self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), self.storage.unwrap(), + self.alerts.unwrap(), ); drop_database().await.unwrap(); diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index d1748feb..ee806a8f 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use crate::alerts::MockAlerts; use mockall::predicate::eq; use mongodb::bson::doc; use omniqueue::QueueError; @@ -322,8 +323,21 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { let job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + let mut alert_mock_client = MockAlerts::new(); + // adding expectations for alert calls + alert_mock_client + .expect_send_alert_message() + .with(eq(format!("Verification failed for job. Job ID : {:?}", job_item.id))) + .times(1) + .return_once(|_| Ok(())); + alert_mock_client + .expect_send_alert_message() + .with(eq(format!("Verification failed for job {}. Retrying processing attempt {}.", job_item.id, 1))) + .times(1) + .return_once(|_| Ok(())); + // building config - TestConfigBuilder::new().build().await; + TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; let config = config().await; let database_client = config.database(); @@ -363,12 +377,25 @@ async fn verify_job_with_rejected_status_works() { let mut job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + let mut alert_mock_client = MockAlerts::new(); + // adding expectations for alert calls + alert_mock_client + .expect_send_alert_message() + .with(eq(format!("Verification failed for job. Job ID : {:?}", job_item.id))) + .times(1) + .return_once(|_| Ok(())); + alert_mock_client + .expect_send_alert_message() + .with(eq(format!("Verification failed for job {}. Total attempts made for verifying : {}.", job_item.id, 1))) + .times(1) + .return_once(|_| Ok(())); + // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. let metadata = increment_key_in_metadata(&job_item.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(); job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; let config = config().await; let database_client = config.database(); @@ -454,12 +481,20 @@ async fn verify_job_with_pending_status_works() { let mut job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); + let mut alert_mock_client = MockAlerts::new(); + // adding expectations for alert calls + alert_mock_client + .expect_send_alert_message() + .with(eq(format!("Verification attempts exceeded for job. Job ID {}", job_item.id))) + .times(1) + .return_once(|_| Ok(())); + // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. let metadata = increment_key_in_metadata(&job_item.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap(); job_item.metadata = metadata; // building config - TestConfigBuilder::new().build().await; + TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; let config = config().await; let database_client = config.database(); From 8fff056ecff89403077b25d8e9ac689f0594cbd5 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 22 Aug 2024 16:08:09 +0530 Subject: [PATCH 05/14] changelog : update --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa51be02..5031fab9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- alerts module. - support for fetching PIE file from storage client in proving job. - added coveralls support - moved mongodb serde behind feature flag From 01c553062e11caa9e10192cc017ddec7fb87c985 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Thu, 22 Aug 2024 16:22:41 +0530 Subject: [PATCH 06/14] feat : added additional alerts --- crates/orchestrator/src/jobs/mod.rs | 50 +++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 85d0ac2f..cd2f33ed 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -147,7 +147,24 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await; + + // Error handling for sending alerts + let job_item = match job_item { + Ok(job_item) => job_item, + Err(e) => { + config + .alerts() + .send_alert_message(format!( + "Error in creating job. Job already exists. Internal ID : {:?}, Metadata : {:?}. Error : {:?}", + internal_id, metadata, e + )) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + return Err(e); + } + }; + config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; @@ -188,7 +205,21 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { .map_err(|e| JobError::Other(OtherError(e)))?; let job_handler = factory::get_job_handler(&job.job_type).await; - let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; + let external_id = job_handler.process_job(config.as_ref(), &mut job).await; + + // Error handling for sending alerts + let external_id = match external_id { + Ok(external_id) => external_id, + Err(e) => { + config + .alerts() + .send_alert_message(format!("Error in processing job. Error : {:?}", e)) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + return Err(e); + } + }; + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version @@ -225,7 +256,20 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { } let job_handler = factory::get_job_handler(&job.job_type).await; - let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await?; + let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await; + + // Error handling for sending alerts + let verification_status = match verification_status { + Ok(verification_status) => verification_status, + Err(e) => { + config + .alerts() + .send_alert_message(format!("Error in verifying job. Error : {:?}", e)) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + return Err(e); + } + }; match verification_status { JobVerificationStatus::Verified => { From f965c098843458b1de6703b62111e54738bfb34d Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 13:52:49 +0530 Subject: [PATCH 07/14] feat : added test for alerts and added logs at queue level --- .env.example | 3 +- .env.test | 3 +- crates/orchestrator/src/jobs/mod.rs | 78 +-------------------- crates/orchestrator/src/queue/job_queue.rs | 7 ++ crates/orchestrator/src/tests/alerts/mod.rs | 61 ++++++++++++++++ crates/orchestrator/src/tests/common/mod.rs | 22 +++++- crates/orchestrator/src/tests/config.rs | 4 +- crates/orchestrator/src/tests/mod.rs | 1 + 8 files changed, 100 insertions(+), 79 deletions(-) create mode 100644 crates/orchestrator/src/tests/alerts/mod.rs diff --git a/.env.example b/.env.example index 832658bb..74d71fa1 100644 --- a/.env.example +++ b/.env.example @@ -36,4 +36,5 @@ AWS_S3_BUCKET_REGION= # SNS AWS_SNS_REGION= -AWS_SNS_ARN= \ No newline at end of file +AWS_SNS_ARN= +AWS_SNS_ARN_NAME= \ No newline at end of file diff --git a/.env.test b/.env.test index 6d200271..761ecc21 100644 --- a/.env.test +++ b/.env.test @@ -9,7 +9,8 @@ SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45 SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" AWS_DEFAULT_REGION="localhost" AWS_SNS_REGION="us-east-1" -AWS_SNS_ARN= +AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" +AWS_SNS_ARN_NAME="madara-orchestrator-arn" ##### On chain config ##### diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index cd2f33ed..cb5832aa 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -135,35 +135,11 @@ pub async fn create_job( .await .map_err(|e| JobError::Other(OtherError(e)))?; if existing_job.is_some() { - config - .alerts() - .send_alert_message(format!( - "Error in creating job. Job already exists. Internal ID : {:?}, Metadata : {:?}", - internal_id, metadata - )) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; return Err(JobError::JobAlreadyExists { internal_id, job_type }); } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await; - - // Error handling for sending alerts - let job_item = match job_item { - Ok(job_item) => job_item, - Err(e) => { - config - .alerts() - .send_alert_message(format!( - "Error in creating job. Job already exists. Internal ID : {:?}, Metadata : {:?}. Error : {:?}", - internal_id, metadata, e - )) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - return Err(e); - } - }; + let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; @@ -184,14 +160,6 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { log::info!("Processing job with id {:?}", id); } _ => { - config - .alerts() - .send_alert_message(format!( - "Error in processing job. Job invalid status. Job ID : {:?}, Job Status : {:?}", - id, job.status - )) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; return Err(JobError::InvalidStatus { id, job_status: job.status }); } } @@ -205,20 +173,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { .map_err(|e| JobError::Other(OtherError(e)))?; let job_handler = factory::get_job_handler(&job.job_type).await; - let external_id = job_handler.process_job(config.as_ref(), &mut job).await; - - // Error handling for sending alerts - let external_id = match external_id { - Ok(external_id) => external_id, - Err(e) => { - config - .alerts() - .send_alert_message(format!("Error in processing job. Error : {:?}", e)) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - return Err(e); - } - }; + let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; @@ -256,20 +211,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { } let job_handler = factory::get_job_handler(&job.job_type).await; - let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await; - - // Error handling for sending alerts - let verification_status = match verification_status { - Ok(verification_status) => verification_status, - Err(e) => { - config - .alerts() - .send_alert_message(format!("Error in verifying job. Error : {:?}", e)) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - return Err(e); - } - }; + let verification_status = job_handler.verify_job(config.as_ref(), &mut job).await?; match verification_status { JobVerificationStatus::Verified => { @@ -286,11 +228,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { config.database().update_job(&new_job).await.map_err(|e| JobError::Other(OtherError(e)))?; - config - .alerts() - .send_alert_message(format!("Verification failed for job. Job ID : {:?}", id)) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; log::error!("Verification failed for job with id {:?}. Cannot verify.", id); // retry job processing if we haven't exceeded the max limit @@ -302,15 +239,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { job.id, process_attempts + 1 ); - config - .alerts() - .send_alert_message(format!( - "Verification failed for job {}. Retrying processing attempt {}.", - job.id, - process_attempts + 1 - )) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } else { diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index e881d2ef..a13a84ac 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -80,6 +80,13 @@ where Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + // Sending alert in case of job error + config + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + // if the queue as a retry logic at the source, it will be attempted // after the nack match delivery.nack().await { diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs new file mode 100644 index 00000000..ec68f225 --- /dev/null +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -0,0 +1,61 @@ +use crate::config::config; +use crate::tests::common::{get_sns_client, get_sqs_client, SNS_ALERT_TEST_QUEUE_URL}; +use crate::tests::config::TestConfigBuilder; +use aws_sdk_sqs::types::QueueAttributeName::QueueArn; +use rstest::rstest; +use std::time::Duration; +use tokio::time::sleep; +use utils::env_utils::get_env_var_or_panic; + +#[rstest] +#[tokio::test] +async fn sns_alert_subscribe_to_topic_receive_alert_works() { + TestConfigBuilder::new().build().await; + + let sqs_client = get_sqs_client().await; + let sns_client = get_sns_client().await; + let config = config().await; + + let queue_attributes = sqs_client + .get_queue_attributes() + .queue_url(SNS_ALERT_TEST_QUEUE_URL) + .attribute_names(QueueArn) + .send() + .await + .unwrap(); + + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap(); + + // subscribing the queue with the alerts + sns_client + .subscribe() + .topic_arn(get_env_var_or_panic("AWS_SNS_ARN").as_str()) + .protocol("sqs") + .endpoint(queue_arn) + .send() + .await + .unwrap(); + + let message_to_send = "Hello World :)"; + + // Getting sns client from the module + let alerts_client = config.alerts(); + // Sending the alert message + alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); + + sleep(Duration::from_secs(5)).await; + + // Checking the queue for message + let receive_message_result = sqs_client + .receive_message() + .queue_url(SNS_ALERT_TEST_QUEUE_URL) + .max_number_of_messages(1) + .send() + .await + .unwrap() + .messages + .unwrap(); + + assert_eq!(receive_message_result.len(), 1, "Alert message length assertion failed"); + assert!(receive_message_result[0].body.clone().unwrap().contains(message_to_send)); +} diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 9d56b7e3..944cafcf 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -4,9 +4,12 @@ use std::collections::HashMap; use ::uuid::Uuid; use aws_config::Region; +use aws_sdk_sns::error::SdkError; +use aws_sdk_sns::operation::create_topic::CreateTopicError; use mongodb::Client; use rstest::*; use serde::Deserialize; +use utils::env_utils::get_env_var_or_panic; use crate::data_storage::aws_s3::config::{AWSS3ConfigType, S3LocalStackConfig}; use crate::data_storage::aws_s3::AWSS3; @@ -19,6 +22,10 @@ use crate::jobs::types::JobType::DataSubmission; use crate::jobs::types::{ExternalId, JobItem}; use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; +pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; +pub const SNS_ALERT_TEST_QUEUE_URL: &str = + "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/orchestrator_sns_alert_testing_queue"; + #[fixture] pub fn default_job_item() -> JobItem { JobItem { @@ -40,6 +47,18 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } +pub async fn create_sns_arn() -> Result<(), SdkError> { + let sns_client = get_sns_client().await; + sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; + Ok(()) +} + +pub async fn get_sns_client() -> aws_sdk_sns::client::Client { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + aws_sdk_sns::Client::new(&config) +} + pub async fn drop_database() -> color_eyre::Result<()> { let db_client: Client = MongoDb::new(MongoDbConfig::new_from_env()).await.client(); // dropping all the collection. @@ -68,10 +87,11 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { // Creating SQS queues sqs_client.create_queue().queue_name(JOB_PROCESSING_QUEUE).send().await?; sqs_client.create_queue().queue_name(JOB_VERIFICATION_QUEUE).send().await?; + sqs_client.create_queue().queue_name("orchestrator_sns_alert_testing_queue").send().await?; Ok(()) } -async fn get_sqs_client() -> aws_sdk_sqs::Client { +pub async fn get_sqs_client() -> aws_sdk_sqs::Client { // This function is for localstack. So we can hardcode the region for this as of now. let region_provider = Region::new("us-east-1"); let config = aws_config::from_env().region(region_provider).load().await; diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 0e32a48f..8424d6d7 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -20,7 +20,7 @@ use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; -use crate::tests::common::{create_sqs_queues, drop_database, get_storage_client}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. @@ -151,6 +151,8 @@ impl TestConfigBuilder { create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database drop_database().await.expect("Unable to drop the database."); + // Creating the SNS ARN + create_sns_arn().await.expect("Unable to create the sns arn"); let config = Config::new( self.starknet_client.unwrap_or_else(|| { diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1dbc21a2..4f264304 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,6 +7,7 @@ pub mod server; pub mod queue; +pub mod alerts; pub mod common; mod data_storage; pub mod workers; From 6030c3e5e38d529bec12334c2e61eb938a1b3b1b Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 13:55:15 +0530 Subject: [PATCH 08/14] refactor : removed redundant alerts --- crates/orchestrator/src/jobs/mod.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index cb5832aa..11da33d8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -241,15 +241,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { ); add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); - } else { - config - .alerts() - .send_alert_message(format!( - "Verification failed for job {}. Total attempts made for verifying : {}.", - job.id, process_attempts - )) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; } } JobVerificationStatus::Pending => { @@ -257,11 +248,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { - config - .alerts() - .send_alert_message(format!("Verification attempts exceeded for job. Job ID {}", job.id)) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config .database() From 55ed0f243a81f256d6a461b1f39da6f7c96fa820 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 14:28:58 +0530 Subject: [PATCH 09/14] feat : updated workflow --- .github/workflows/coverage.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 3ef3e7a0..d8d0a4d9 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -12,7 +12,7 @@ jobs: localstack: image: localstack/localstack env: - SERVICES: s3, sqs + SERVICES: s3, sqs, sns DEFAULT_REGION: us-east-1 AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY" From 9a893ee8044498fc340fcb97438429bea278bbd6 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 16:17:53 +0530 Subject: [PATCH 10/14] feat : resolved comments --- crates/orchestrator/src/tests/alerts/mod.rs | 2 + crates/orchestrator/src/tests/common/mod.rs | 1 - crates/orchestrator/src/tests/jobs/mod.rs | 41 ++------------------- 3 files changed, 5 insertions(+), 39 deletions(-) diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs index ec68f225..f3dceedd 100644 --- a/crates/orchestrator/src/tests/alerts/mod.rs +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -13,6 +13,8 @@ async fn sns_alert_subscribe_to_topic_receive_alert_works() { TestConfigBuilder::new().build().await; let sqs_client = get_sqs_client().await; + sqs_client.create_queue().queue_name("orchestrator_sns_alert_testing_queue").send().await.unwrap(); + let sns_client = get_sns_client().await; let config = config().await; diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 944cafcf..66d30560 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -87,7 +87,6 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { // Creating SQS queues sqs_client.create_queue().queue_name(JOB_PROCESSING_QUEUE).send().await?; sqs_client.create_queue().queue_name(JOB_VERIFICATION_QUEUE).send().await?; - sqs_client.create_queue().queue_name("orchestrator_sns_alert_testing_queue").send().await?; Ok(()) } diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index ee806a8f..d1748feb 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use crate::alerts::MockAlerts; use mockall::predicate::eq; use mongodb::bson::doc; use omniqueue::QueueError; @@ -323,21 +322,8 @@ async fn verify_job_with_rejected_status_adds_to_queue_works() { let job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); - let mut alert_mock_client = MockAlerts::new(); - // adding expectations for alert calls - alert_mock_client - .expect_send_alert_message() - .with(eq(format!("Verification failed for job. Job ID : {:?}", job_item.id))) - .times(1) - .return_once(|_| Ok(())); - alert_mock_client - .expect_send_alert_message() - .with(eq(format!("Verification failed for job {}. Retrying processing attempt {}.", job_item.id, 1))) - .times(1) - .return_once(|_| Ok(())); - // building config - TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; + TestConfigBuilder::new().build().await; let config = config().await; let database_client = config.database(); @@ -377,25 +363,12 @@ async fn verify_job_with_rejected_status_works() { let mut job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); - let mut alert_mock_client = MockAlerts::new(); - // adding expectations for alert calls - alert_mock_client - .expect_send_alert_message() - .with(eq(format!("Verification failed for job. Job ID : {:?}", job_item.id))) - .times(1) - .return_once(|_| Ok(())); - alert_mock_client - .expect_send_alert_message() - .with(eq(format!("Verification failed for job {}. Total attempts made for verifying : {}.", job_item.id, 1))) - .times(1) - .return_once(|_| Ok(())); - // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. let metadata = increment_key_in_metadata(&job_item.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(); job_item.metadata = metadata; // building config - TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; + TestConfigBuilder::new().build().await; let config = config().await; let database_client = config.database(); @@ -481,20 +454,12 @@ async fn verify_job_with_pending_status_works() { let mut job_item = build_job_item_by_type_and_status(JobType::DataSubmission, JobStatus::PendingVerification, "1".to_string()); - let mut alert_mock_client = MockAlerts::new(); - // adding expectations for alert calls - alert_mock_client - .expect_send_alert_message() - .with(eq(format!("Verification attempts exceeded for job. Job ID {}", job_item.id))) - .times(1) - .return_once(|_| Ok(())); - // increasing JOB_VERIFICATION_ATTEMPT_METADATA_KEY to simulate max. attempts reached. let metadata = increment_key_in_metadata(&job_item.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY).unwrap(); job_item.metadata = metadata; // building config - TestConfigBuilder::new().mock_alerts(Box::new(alert_mock_client)).build().await; + TestConfigBuilder::new().build().await; let config = config().await; let database_client = config.database(); From 948a4232febfe704165a5153c6ff065226b2a741 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 23 Aug 2024 18:55:23 +0530 Subject: [PATCH 11/14] feat : added all alerts for queues --- crates/orchestrator/src/queue/job_queue.rs | 80 ++++++++++++---------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 2ed2ede1..44d492da 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -130,45 +130,46 @@ fn parse_job_message(message: &Delivery) -> Result, Cons message .payload_serde_json() .wrap_err("Payload Serde Error") - .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; - - match job_message { - Some(job_message) => { - log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue); - match handler(job_message.id).await { - Ok(_) => delivery - .ack() - .await - .map_err(|(e, _)| e) - .wrap_err("Queue Error") - .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?, - Err(e) => { - log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - - // Sending alert in case of job error - config - .alerts() - .send_alert_message(e.to_string()) - .await - .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; - - // if the queue as a retry logic at the source, it will be attempted - // after the nack - match delivery.nack().await { - Ok(_) => Err(ConsumptionError::FailedToHandleJob { - job_id: job_message.id, - error_msg: "Job handling failed, message nack-ed".to_string(), - })?, - Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { - job_id: job_message.id, - error_msg: delivery_nack_error.0.to_string(), - })?, - } - } - }; + .map_err(|e| ConsumptionError::Other(OtherError::from(e))) +} + +fn parse_worker_message(message: &Delivery) -> Result, ConsumptionError> { + message + .payload_serde_json() + .wrap_err("Payload Serde Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e))) +} + +async fn handle_job_message( + job_message: JobQueueMessage, + message: Delivery, + handler: F, +) -> Result<(), ConsumptionError> +where + F: FnOnce(Uuid) -> Fut, + Fut: Future>, +{ + log::info!("Handling job with id {:?}", job_message.id); + + match handler(job_message.id).await { + Ok(_) => { + message + .ack() + .await + .map_err(|(e, _)| e) + .wrap_err("Queue Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + Ok(()) } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + match message.nack().await { Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, @@ -207,6 +208,13 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; Err(ConsumptionError::FailedToSpawnWorker { worker_trigger_type: job_message.worker, From feb746bff7fd38eb38a6cbc84958ccc6cf12fada Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Sat, 24 Aug 2024 00:14:43 +0530 Subject: [PATCH 12/14] feat : updated code --- crates/orchestrator/src/tests/alerts/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs index f3dceedd..20e8141b 100644 --- a/crates/orchestrator/src/tests/alerts/mod.rs +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -1,5 +1,5 @@ use crate::config::config; -use crate::tests::common::{get_sns_client, get_sqs_client, SNS_ALERT_TEST_QUEUE_URL}; +use crate::tests::common::{get_sns_client, get_sqs_client, SNS_ALERT_TEST_QUEUE, SNS_ALERT_TEST_QUEUE_URL}; use crate::tests::config::TestConfigBuilder; use aws_sdk_sqs::types::QueueAttributeName::QueueArn; use rstest::rstest; @@ -13,7 +13,7 @@ async fn sns_alert_subscribe_to_topic_receive_alert_works() { TestConfigBuilder::new().build().await; let sqs_client = get_sqs_client().await; - sqs_client.create_queue().queue_name("orchestrator_sns_alert_testing_queue").send().await.unwrap(); + sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); let sns_client = get_sns_client().await; let config = config().await; From 3438448b2d259b639c38ad221f43388f8a733e21 Mon Sep 17 00:00:00 2001 From: apoorvsadana <95699312+apoorvsadana@users.noreply.github.com> Date: Sat, 24 Aug 2024 00:20:24 +0530 Subject: [PATCH 13/14] remove queue url const --- crates/orchestrator/src/tests/alerts/mod.rs | 18 ++++++++---------- crates/orchestrator/src/tests/common/mod.rs | 4 ---- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs index 20e8141b..2026061b 100644 --- a/crates/orchestrator/src/tests/alerts/mod.rs +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -1,5 +1,5 @@ use crate::config::config; -use crate::tests::common::{get_sns_client, get_sqs_client, SNS_ALERT_TEST_QUEUE, SNS_ALERT_TEST_QUEUE_URL}; +use crate::tests::common::{get_sns_client, get_sqs_client}; use crate::tests::config::TestConfigBuilder; use aws_sdk_sqs::types::QueueAttributeName::QueueArn; use rstest::rstest; @@ -7,24 +7,22 @@ use std::time::Duration; use tokio::time::sleep; use utils::env_utils::get_env_var_or_panic; +pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; + #[rstest] #[tokio::test] async fn sns_alert_subscribe_to_topic_receive_alert_works() { TestConfigBuilder::new().build().await; let sqs_client = get_sqs_client().await; - sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); + let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); + let queue_url = queue.queue_url().unwrap(); let sns_client = get_sns_client().await; let config = config().await; - let queue_attributes = sqs_client - .get_queue_attributes() - .queue_url(SNS_ALERT_TEST_QUEUE_URL) - .attribute_names(QueueArn) - .send() - .await - .unwrap(); + let queue_attributes = + sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap(); @@ -50,7 +48,7 @@ async fn sns_alert_subscribe_to_topic_receive_alert_works() { // Checking the queue for message let receive_message_result = sqs_client .receive_message() - .queue_url(SNS_ALERT_TEST_QUEUE_URL) + .queue_url(queue_url) .max_number_of_messages(1) .send() .await diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 66d30560..77585989 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -22,10 +22,6 @@ use crate::jobs::types::JobType::DataSubmission; use crate::jobs::types::{ExternalId, JobItem}; use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE}; -pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; -pub const SNS_ALERT_TEST_QUEUE_URL: &str = - "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/orchestrator_sns_alert_testing_queue"; - #[fixture] pub fn default_job_item() -> JobItem { JobItem { From 8d41f39289faf768ba2308e0fd68d033581f0924 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 30 Aug 2024 03:48:12 +0530 Subject: [PATCH 14/14] feat : tests and lint fixes --- .env.test | 6 +++++- crates/orchestrator/src/config.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.env.test b/.env.test index 79098ea6..4d09e45b 100644 --- a/.env.test +++ b/.env.test @@ -26,6 +26,9 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45 ##### SNS ##### ALERTS="sns" +AWS_SNS_REGION="us-east-1" +AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" +AWS_SNS_ARN_NAME="madara-orchestrator-arn" ##### DATABASE ##### @@ -46,7 +49,7 @@ SHARP_PROOF_LAYOUT="small" ##### ON CHAIN CONFIG ##### DA_LAYER="ethereum" -SETTLEMENT_LAYER="ethereum_test" +SETTLEMENT_LAYER="ethereum" SETTLEMENT_RPC_URL="http://localhost:3001" MADARA_RPC_URL="http://localhost:3000" L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" @@ -57,6 +60,7 @@ STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81" STARKNET_OPERATOR_ADDRESS="0x2C169DFe5fBbA12957Bdd0Ba47d9CEDbFE260CA7" +ETHEREUM_BLAST_RPC_URL="https://eth-mainnet.public.blastapi.io" ##### E2E test vars ##### diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 1058a6f1..4f29cac5 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::alerts::aws_sns::AWSSNS; use crate::alerts::Alerts; -use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; use arc_swap::{ArcSwap, Guard};