Skip to content

Commit

Permalink
Revert "update: better alert impl. (#111)" (#113)
Browse files Browse the repository at this point in the history
This reverts commit 21ec2cb.
  • Loading branch information
heemankv authored Sep 5, 2024
1 parent e052eac commit bc9ba16
Show file tree
Hide file tree
Showing 36 changed files with 614 additions and 766 deletions.
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:45

##### SNS #####
ALERTS="sns"
AWS_SNS_REGION="us-east-1"
AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn"
AWS_SNS_ARN_NAME="madara-orchestrator-arn"

Expand Down
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- AWS config built from TestConfigBuilder.
- Better TestConfigBuilder, with sync config clients.
- Drilled Config, removing dirty global reads.
- refactor AWS config usage and clean .env files
- GitHub's coverage CI yml file for localstack and db testing.
- Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder.
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.7.0", features = ["v4", "serde"] }
httpmock = { version = "0.7.0", features = ["remote"] }
num-bigint = { version = "0.4.4" }
arc-swap = { version = "1.7.1" }
num-traits = "0.2"
lazy_static = "1.4.0"
stark_evm_adapter = "0.1.1"
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ path = "src/main.rs"

[dependencies]
alloy = { workspace = true }
arc-swap = { workspace = true }
assert_matches = "1.5.0"
async-std = "1.12.0"
async-trait = { workspace = true }
Expand Down
8 changes: 5 additions & 3 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::alerts::Alerts;
use async_trait::async_trait;
use aws_config::SdkConfig;
use aws_sdk_sns::config::Region;
use aws_sdk_sns::Client;
use utils::env_utils::get_env_var_or_panic;

Expand All @@ -10,8 +10,10 @@ pub struct AWSSNS {

impl AWSSNS {
/// To create a new SNS client
pub async fn new(config: &SdkConfig) -> Self {
AWSSNS { client: Client::new(config) }
pub async fn new() -> Self {
let sns_region = get_env_var_or_panic("AWS_SNS_REGION");
let config = aws_config::from_env().region(Region::new(sns_region)).load().await;
AWSSNS { client: Client::new(&config) }
}
}

Expand Down
63 changes: 46 additions & 17 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use std::sync::Arc;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
use arc_swap::{ArcSwap, Guard};
use aws_config::SdkConfig;
use dotenvy::dotenv;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};

use da_client_interface::{DaClient, DaConfig};
use dotenvy::dotenv;
use ethereum_da_client::config::EthereumDaConfig;
use ethereum_settlement_client::EthereumSettlementClient;
use prover_client_interface::ProverClient;
use settlement_client_interface::SettlementClient;
use sharp_service::SharpProverService;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};
use starknet_settlement_client::StarknetSettlementClient;
use tokio::sync::OnceCell;
use utils::env_utils::get_env_var_or_panic;
use utils::settings::default::DefaultSettingsProvider;
use utils::settings::SettingsProvider;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::data_storage::aws_s3::config::AWSS3Config;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::{DataStorage, DataStorageConfig};
use crate::database::mongodb::config::MongoDbConfig;
use crate::database::mongodb::MongoDb;
use crate::database::{Database, DatabaseConfig};
Expand Down Expand Up @@ -49,7 +50,7 @@ pub struct Config {
}

/// Initializes the app config
pub async fn init_config() -> Arc<Config> {
pub async fn init_config() -> Config {
dotenv().ok();

// init starknet client
Expand All @@ -67,7 +68,7 @@ pub async fn init_config() -> Arc<Config> {
// TODO: we use omniqueue for now which doesn't support loading AWS config
// from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require
// us stop using the generic omniqueue abstractions for message ack/nack
let queue = build_queue_client();
let queue = build_queue_client(&aws_config);

let da_client = build_da_client().await;

Expand All @@ -76,9 +77,10 @@ pub async fn init_config() -> Arc<Config> {
let prover_client = build_prover_service(&settings_provider);

let storage_client = build_storage_client(&aws_config).await;
let alerts_client = build_alert_client(&aws_config).await;

Arc::new(Config::new(
let alerts_client = build_alert_client().await;

Config::new(
Arc::new(provider),
da_client,
prover_client,
Expand All @@ -87,7 +89,7 @@ pub async fn init_config() -> Arc<Config> {
queue,
storage_client,
alerts_client,
))
)
}

impl Config {
Expand Down Expand Up @@ -147,6 +149,33 @@ impl Config {
}
}

/// The app config. It can be accessed from anywhere inside the service.
/// It's initialized only once.
/// We are using `ArcSwap` as it allow us to replace the new `Config` with
/// a new one which is required when running test cases. This approach was
/// inspired from here - https://github.com/matklad/once_cell/issues/127
pub static CONFIG: OnceCell<ArcSwap<Config>> = OnceCell::const_new();

/// Returns the app config. Initializes if not already done.
pub async fn config() -> Guard<Arc<Config>> {
let cfg = CONFIG.get_or_init(|| async { ArcSwap::from_pointee(init_config().await) }).await;
cfg.load()
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
#[cfg(test)]
pub async fn config_force_init(config: Config) {
match CONFIG.get() {
Some(arc) => arc.store(Arc::new(config)),
None => {
CONFIG.get_or_init(|| async { ArcSwap::from_pointee(config) }).await;
}
}
}

/// Builds the DA client based on the environment variable DA_LAYER
pub async fn build_da_client() -> Box<dyn DaClient + Send + Sync> {
match get_env_var_or_panic("DA_LAYER").as_str() {
Expand Down Expand Up @@ -184,13 +213,13 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box<dyn DataStorage
}
}

pub async fn build_alert_client(aws_config: &SdkConfig) -> Box<dyn Alerts + Send + Sync> {
pub async fn build_alert_client() -> Box<dyn Alerts + Send + Sync> {
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => Box::new(AWSSNS::new(aws_config).await),
"sns" => Box::new(AWSSNS::new().await),
_ => panic!("Unsupported Alert Client"),
}
}
pub fn build_queue_client() -> Box<dyn QueueProvider + Send + Sync> {
pub fn build_queue_client(_aws_config: &SdkConfig) -> Box<dyn QueueProvider + Send + Sync> {
match get_env_var_or_panic("QUEUE_PROVIDER").as_str() {
"sqs" => Box::new(SqsQueue {}),
_ => panic!("Unsupported Queue Client"),
Expand Down
46 changes: 22 additions & 24 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::ops::{Add, Mul, Rem};
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
Expand All @@ -14,11 +13,10 @@ use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

lazy_static! {
/// EIP-4844 BLS12-381 modulus.
Expand Down Expand Up @@ -61,7 +59,7 @@ pub struct DaJob;
impl Job for DaJob {
async fn create_job(
&self,
_config: Arc<Config>,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Expand All @@ -76,7 +74,7 @@ impl Job for DaJob {
})
}

async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
Expand All @@ -97,7 +95,7 @@ impl Job for DaJob {
MaybePendingStateUpdate::Update(state_update) => state_update,
};
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone())
let blob_data = state_update_to_blob_data(block_no, state_update, config)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
// transforming the data so that we can apply FFT on this.
Expand Down Expand Up @@ -137,7 +135,7 @@ impl Job for DaJob {
Ok(external_id)
}

async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
Expand Down Expand Up @@ -232,7 +230,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
pub async fn state_update_to_blob_data(
block_no: u64,
state_update: StateUpdate,
config: Arc<Config>,
config: &Config,
) -> color_eyre::Result<Vec<FieldElement>> {
let state_diff = state_update.state_diff;
let mut blob_data: Vec<FieldElement> = vec![
Expand Down Expand Up @@ -310,7 +308,7 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());
Expand Down Expand Up @@ -372,12 +370,18 @@ fn da_word(class_flag: bool, nonce_change: Option<FieldElement>, num_changes: u6
#[cfg(test)]

pub mod test {
use crate::jobs::da_job::da_word;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::sync::Arc;

use crate::config::config;
use crate::data_storage::MockDataStorage;
use crate::tests::config::TestConfigBuilder;
use ::serde::{Deserialize, Serialize};
use color_eyre::Result;
use da_client_interface::MockDaClient;
use httpmock::prelude::*;
use majin_blob_core::blob;
use majin_blob_types::serde;
Expand All @@ -389,11 +393,6 @@ pub mod test {
use starknet_core::types::{FieldElement, StateUpdate};
use url::Url;

use da_client_interface::MockDaClient;

use crate::data_storage::MockDataStorage;
use crate::jobs::da_job::da_word;

/// Tests `da_word` function with various inputs for class flag, new nonce, and number of changes.
/// Verifies that `da_word` produces the correct FieldElement based on the provided parameters.
/// Uses test cases with different combinations of inputs and expected output strings.
Expand Down Expand Up @@ -445,10 +444,7 @@ pub mod test {
#[case] file_path: &str,
#[case] nonce_file_path: &str,
) {
use crate::{
jobs::da_job::{convert_to_biguint, state_update_to_blob_data},
tests::config::TestConfigBuilder,
};
use crate::jobs::da_job::{convert_to_biguint, state_update_to_blob_data};

let server = MockServer::start();
let mut da_client = MockDaClient::new();
Expand All @@ -466,17 +462,19 @@ pub mod test {
));

// mock block number (madara) : 5
let services = TestConfigBuilder::new()
.configure_starknet_client(provider.into())
.configure_da_client(da_client.into())
.configure_storage_client(storage_client.into())
TestConfigBuilder::new()
.mock_starknet_client(Arc::new(provider))
.mock_da_client(Box::new(da_client))
.mock_storage_client(Box::new(storage_client))
.build()
.await;

let config = config().await;

get_nonce_attached(&server, nonce_file_path);

let state_update = read_state_update_from_file(state_update_file_path).expect("issue while reading");
let blob_data = state_update_to_blob_data(block_no, state_update, services.config)
let blob_data = state_update_to_blob_data(block_no, state_update, &config)
.await
.expect("issue while converting state update to blob data");

Expand Down
Loading

0 comments on commit bc9ba16

Please sign in to comment.