Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : alerts module #95

Merged
merged 21 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ SQS_JOB_VERIFICATION_QUEUE_URL=
AWS_S3_BUCKET_NAME=
AWS_S3_BUCKET_REGION=

# SNS
AWS_SNS_REGION=
AWS_SNS_ARN=
AWS_SNS_ARN_NAME=

# Sharp Services
SHARP_CUSTOMER_ID=
SHARP_USER_CRT=
SHARP_USER_KEY=
SHARP_SERVER_CRT=
SHARP_PROOF_LAYOUT=
SHARP_PROOF_LAYOUT=
4 changes: 4 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566"
SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue"
SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue"
AWS_DEFAULT_REGION="localhost"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

##### On chain config #####

Expand All @@ -24,6 +27,7 @@ DA_LAYER="ethereum"
PROVER_SERVICE="sharp"
SETTLEMENT_LAYER="ethereum"
DATA_STORAGE="s3"
ALERTS="sns"
MONGODB_CONNECTION_STRING="mongodb://localhost:27017"

# Sharp Services
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- alerts module.
- Tests for prover client.
- Added Rust Cache for Coverage Test CI.
- support for fetching PIE file from storage client in proving job.
Expand Down
41 changes: 33 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-std = "1.12.0"
async-trait = { workspace = true }
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1.36.0"
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::alerts::Alerts;
use async_trait::async_trait;
use aws_sdk_sns::config::Region;
use aws_sdk_sns::Client;
use utils::env_utils::get_env_var_or_panic;

pub struct AWSSNS {
client: Client,
}

impl AWSSNS {
/// To create a new SNS client
pub async fn new() -> Self {
let sns_region = get_env_var_or_panic("AWS_SNS_REGION");
let config = aws_config::from_env().region(Region::new(sns_region)).load().await;
AWSSNS { client: Client::new(&config) }
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was comparing this to the S3 config and I noticed a few things

  1. We can use the same config built here in S3 as well (we will need to change Client::from_conf to Client::new inside s3)
  2. We should probably move AWS config creation into config.rs and pass it to s3 and sns
  3. s3 new doesn't need to async (and even SNS once can be made non sync after that)
  4. S3 config only needs to have bucket name after this
  5. SNS should also have a config that has topic arn which then gets loaded into the SNS struct (similar to S3)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make a new issue for this idts this is related to this PR and will cause unnecessary confusion amongst the reviewers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved in #98

}
}

#[async_trait]
impl Alerts for AWSSNS {
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let topic_arn = get_env_var_or_panic("AWS_SNS_ARN");
self.client.publish().topic_arn(topic_arn).message(message_body).send().await?;
Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use async_trait::async_trait;
use mockall::automock;

pub mod aws_sns;

#[automock]
#[async_trait]
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
}
33 changes: 31 additions & 2 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType};
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
Expand Down Expand Up @@ -42,6 +44,8 @@ pub struct Config {
queue: Box<dyn QueueProvider>,
/// Storage client
storage: Box<dyn DataStorage>,
/// Alerts client
alerts: Box<dyn Alerts>,
}

/// Initializes the app config
Expand All @@ -67,11 +71,23 @@ pub async fn init_config() -> Config {

let storage_client = build_storage_client().await;

Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client)
let alerts_client = build_alert_client().await;

Config::new(
Arc::new(provider),
da_client,
prover_client,
settlement_client,
database,
queue,
storage_client,
alerts_client,
)
}

impl Config {
/// Create a new config
#[allow(clippy::too_many_arguments)]
pub fn new(
starknet_client: Arc<JsonRpcClient<HttpTransport>>,
da_client: Box<dyn DaClient>,
Expand All @@ -80,8 +96,9 @@ impl Config {
database: Box<dyn Database>,
queue: Box<dyn QueueProvider>,
storage: Box<dyn DataStorage>,
alerts: Box<dyn Alerts>,
) -> Self {
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage }
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts }
}

/// Returns the starknet client
Expand Down Expand Up @@ -118,6 +135,11 @@ impl Config {
pub fn storage(&self) -> &dyn DataStorage {
self.storage.as_ref()
}

/// Returns the alerts client
pub fn alerts(&self) -> &dyn Alerts {
self.alerts.as_ref()
}
}

/// The app config. It can be accessed from anywhere inside the service.
Expand Down Expand Up @@ -183,3 +205,10 @@ pub async fn build_storage_client() -> Box<dyn DataStorage + Send + Sync> {
_ => panic!("Unsupported Storage Client"),
}
}

pub async fn build_alert_client() -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new().await),
_ => panic!("Unsupported Alert Client"),
}
}
7 changes: 3 additions & 4 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ pub async fn create_job(
}

let job_handler = factory::get_job_handler(&job_type).await;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?;

config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
Expand Down Expand Up @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> {

let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.as_ref(), &mut job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

// Fetching the job again because update status above will update the job version
Expand Down Expand Up @@ -239,16 +241,13 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> {
);
add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
} else {
// TODO: send alert
}
}
JobVerificationStatus::Pending => {
log::info!("Inclusion is still pending for job {}. Pushing back to queue.", job.id);
let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
if verify_attempts >= job_handler.max_verification_attempts() {
// TODO: send alert
log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id);
config
.database()
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ where
Err(e) => {
log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);

// Sending alert in case of job error
config
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
.alerts()
.send_alert_message(e.to_string())
.await
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;

// if the queue as a retry logic at the source, it will be attempted
// after the nack
match delivery.nack().await {
Expand Down
61 changes: 61 additions & 0 deletions crates/orchestrator/src/tests/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::config::config;
use crate::tests::common::{get_sns_client, get_sqs_client, SNS_ALERT_TEST_QUEUE_URL};
use crate::tests::config::TestConfigBuilder;
use aws_sdk_sqs::types::QueueAttributeName::QueueArn;
use rstest::rstest;
use std::time::Duration;
use tokio::time::sleep;
use utils::env_utils::get_env_var_or_panic;

#[rstest]
#[tokio::test]
async fn sns_alert_subscribe_to_topic_receive_alert_works() {
TestConfigBuilder::new().build().await;

let sqs_client = get_sqs_client().await;
let sns_client = get_sns_client().await;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let config = config().await;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(SNS_ALERT_TEST_QUEUE_URL)
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
.attribute_names(QueueArn)
.send()
.await
.unwrap();

let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap();

// subscribing the queue with the alerts
sns_client
.subscribe()
.topic_arn(get_env_var_or_panic("AWS_SNS_ARN").as_str())
.protocol("sqs")
.endpoint(queue_arn)
.send()
.await
.unwrap();

let message_to_send = "Hello World :)";

// Getting sns client from the module
let alerts_client = config.alerts();
// Sending the alert message
alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap();

sleep(Duration::from_secs(5)).await;

// Checking the queue for message
let receive_message_result = sqs_client
.receive_message()
.queue_url(SNS_ALERT_TEST_QUEUE_URL)
.max_number_of_messages(1)
.send()
.await
.unwrap()
.messages
.unwrap();

assert_eq!(receive_message_result.len(), 1, "Alert message length assertion failed");
assert!(receive_message_result[0].body.clone().unwrap().contains(message_to_send));
}
Loading
Loading