Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : alerts module #95

Merged
merged 21 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 42 additions & 27 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,50 +1,65 @@
HOST=
PORT=
DATABASE_URL=
MADARA_RPC_URL=
DA_LAYER=
SETTLEMENT_LAYER=

# Ethereum
ETHEREUM_PRIVATE_KEY=
ETHEREUM_RPC_URL=
MEMORY_PAGES_CONTRACT_ADDRESS=
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS=

##### ORCHESTRATOR #####

# Starknet
STARKNET_PUBLIC_KEY=
STARNET_PRIVATE_KEY=
STARKNET_RPC_URL=
STARKNET_CAIRO_CORE_CONTRACT_ADDRESS=
HOST=
PORT=

# MongoDB connection string
MONGODB_CONNECTION_STRING=
##### AWS CONFIG #####

# AWS
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AWS_REGION=
AWS_ENDPOINT_URL=

##### STORAGE #####

# SQS
DATA_STORAGE=
AWS_S3_BUCKET_NAME=

##### QUEUE #####

QUEUE_PROVIDER=
SQS_JOB_PROCESSING_QUEUE_URL=
SQS_JOB_VERIFICATION_QUEUE_URL=
SQS_JOB_HANDLE_FAILURE_QUEUE_URL=
SQS_WORKER_TRIGGER_QUEUE_URL=

# S3
AWS_S3_BUCKET_NAME=
AWS_S3_BUCKET_REGION=
##### DATABASE #####

DATABASE=
MONGODB_CONNECTION_STRING=

# Ethereum Settlement
DEFAULT_SETTLEMENT_CLIENT_RPC=
DEFAULT_L1_CORE_CONTRACT_ADDRESS=
##### PROVER #####

# Sharp Services
PROVER_SERVICE=
SHARP_CUSTOMER_ID=
SHARP_URL=
# [IMP!!!] These are test certificates (they don't work)
SHARP_USER_CRT=
SHARP_USER_KEY=
SHARP_SERVER_CRT=
SHARP_PROOF_LAYOUT=

##### ON CHAIN CONFIG #####

DA_LAYER=
SETTLEMENT_LAYER=
ETHEREUM_RPC_URL=
MADARA_RPC_URL=
MEMORY_PAGES_CONTRACT_ADDRESS=
PRIVATE_KEY=
ETHEREUM_PRIVATE_KEY=
STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS=

##### ALERTS #####

AWS_SNS_REGION=
AWS_SNS_ARN=
AWS_SNS_ARN_NAME=

##### SETTLEMENT CLIENT #####

DEFAULT_SETTLEMENT_CLIENT_RPC=
DEFAULT_L1_CORE_CONTRACT_ADDRESS=
4 changes: 4 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:
SQS_JOB_HANDLE_FAILURE_QUEUE_URL=
SQS_WORKER_TRIGGER_QUEUE_URL=
AWS_DEFAULT_REGION="localhost"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

##### On chain config #####

Expand All @@ -27,6 +30,7 @@ DA_LAYER="ethereum"
PROVER_SERVICE="sharp"
SETTLEMENT_LAYER="ethereum"
DATA_STORAGE="s3"
ALERTS="sns"
MONGODB_CONNECTION_STRING="mongodb://localhost:27017"
DEFAULT_SETTLEMENT_CLIENT_RPC="http://localhost:3000"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
localstack:
image: localstack/localstack
env:
SERVICES: s3, sqs
SERVICES: s3, sqs, sns
DEFAULT_REGION: us-east-1
AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY"
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- alerts module.
- Tests for Settlement client.
- Worker queues to listen for trigger events.
- Tests for prover client.
Expand Down
41 changes: 33 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async-std = "1.12.0"
async-trait = { workspace = true }
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1.36.0"
axum = { workspace = true, features = ["macros"] }
axum-macros = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::alerts::Alerts;
use async_trait::async_trait;
use aws_sdk_sns::config::Region;
use aws_sdk_sns::Client;
use utils::env_utils::get_env_var_or_panic;

pub struct AWSSNS {
client: Client,
}

impl AWSSNS {
/// To create a new SNS client
pub async fn new() -> Self {
let sns_region = get_env_var_or_panic("AWS_SNS_REGION");
let config = aws_config::from_env().region(Region::new(sns_region)).load().await;
AWSSNS { client: Client::new(&config) }
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was comparing this to the S3 config and I noticed a few things

  1. We can use the same config built here in S3 as well (we will need to change Client::from_conf to Client::new inside s3)
  2. We should probably move AWS config creation into config.rs and pass it to s3 and sns
  3. s3 new doesn't need to async (and even SNS once can be made non sync after that)
  4. S3 config only needs to have bucket name after this
  5. SNS should also have a config that has topic arn which then gets loaded into the SNS struct (similar to S3)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make a new issue for this idts this is related to this PR and will cause unnecessary confusion amongst the reviewers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved in #98

}
}

#[async_trait]
impl Alerts for AWSSNS {
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let topic_arn = get_env_var_or_panic("AWS_SNS_ARN");
self.client.publish().topic_arn(topic_arn).message(message_body).send().await?;
Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use async_trait::async_trait;
use mockall::automock;

pub mod aws_sns;

#[automock]
#[async_trait]
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
}
33 changes: 31 additions & 2 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType};
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
Expand Down Expand Up @@ -42,6 +44,8 @@ pub struct Config {
queue: Box<dyn QueueProvider>,
/// Storage client
storage: Box<dyn DataStorage>,
/// Alerts client
alerts: Box<dyn Alerts>,
}

/// Initializes the app config
Expand All @@ -67,11 +71,23 @@ pub async fn init_config() -> Config {

let storage_client = build_storage_client().await;

Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client)
let alerts_client = build_alert_client().await;

Config::new(
Arc::new(provider),
da_client,
prover_client,
settlement_client,
database,
queue,
storage_client,
alerts_client,
)
}

impl Config {
/// Create a new config
#[allow(clippy::too_many_arguments)]
pub fn new(
starknet_client: Arc<JsonRpcClient<HttpTransport>>,
da_client: Box<dyn DaClient>,
Expand All @@ -80,8 +96,9 @@ impl Config {
database: Box<dyn Database>,
queue: Box<dyn QueueProvider>,
storage: Box<dyn DataStorage>,
alerts: Box<dyn Alerts>,
) -> Self {
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage }
Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts }
}

/// Returns the starknet client
Expand Down Expand Up @@ -118,6 +135,11 @@ impl Config {
pub fn storage(&self) -> &dyn DataStorage {
self.storage.as_ref()
}

/// Returns the alerts client
pub fn alerts(&self) -> &dyn Alerts {
self.alerts.as_ref()
}
}

/// The app config. It can be accessed from anywhere inside the service.
Expand Down Expand Up @@ -183,3 +205,10 @@ pub async fn build_storage_client() -> Box<dyn DataStorage + Send + Sync> {
_ => panic!("Unsupported Storage Client"),
}
}

pub async fn build_alert_client() -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new().await),
_ => panic!("Unsupported Alert Client"),
}
}
7 changes: 3 additions & 4 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ pub async fn create_job(
}

let job_handler = factory::get_job_handler(&job_type).await;
let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?;
let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?;

config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
Expand Down Expand Up @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> {

let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.as_ref(), &mut job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

// Fetching the job again because update status above will update the job version
Expand Down Expand Up @@ -239,16 +241,13 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> {
);
add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
} else {
// TODO: send alert
}
}
JobVerificationStatus::Pending => {
log::info!("Inclusion is still pending for job {}. Pushing back to queue.", job.id);
let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
if verify_attempts >= job_handler.max_verification_attempts() {
// TODO: send alert
log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id);
config
.database()
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
Expand Down
Loading
Loading