Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expect removed from various places #197

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
- refactor: http_mock version updated
- refactor: prover-services renamed to prover-clients
Expand Down
34 changes: 29 additions & 5 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration};
use aws_sdk_s3::Client;
use bytes::Bytes;
use color_eyre::eyre::Context;
use color_eyre::Result;

use crate::data_storage::DataStorage;
Expand Down Expand Up @@ -50,8 +51,20 @@ impl AWSS3 {
impl DataStorage for AWSS3 {
/// Function to get the data from S3 bucket by Key.
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
let response = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.context(format!("Failed to get object from bucket: {}, key: {}", self.bucket, key))?;

let data_stream = response.body.collect().await.context(format!(
"Failed to collect body into AggregatedBytes for bucket: {}, key: {}",
self.bucket, key
))?;

tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::debug!(
Expand All @@ -74,7 +87,8 @@ impl DataStorage for AWSS3 {
.body(ByteStream::from(data))
.content_type("application/json")
.send()
.await?;
.await
.context(format!("Failed to put object in bucket: {}, key: {}", self.bucket, key))?;

tracing::debug!(
log_type = "DataStorage",
Expand All @@ -88,7 +102,12 @@ impl DataStorage for AWSS3 {

async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
if self.bucket_location_constraint.as_str() == "us-east-1" {
self.client.create_bucket().bucket(bucket_name).send().await?;
self.client
.create_bucket()
.bucket(bucket_name)
.send()
.await
.context(format!("Failed to create bucket: {} in us-east-1", bucket_name))?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}

Expand All @@ -101,7 +120,12 @@ impl DataStorage for AWSS3 {
.bucket(bucket_name)
.set_create_bucket_configuration(bucket_configuration)
.send()
.await?;
.await
.context(format!(
"Failed to create bucket: {} in region: {}",
bucket_name,
self.bucket_location_constraint.as_str()
))?;

Ok(())
}
Expand Down
46 changes: 25 additions & 21 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl Job for DaJob {
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

let transformed_data = fft_transformation(blob_data_biguint);
let transformed_data =
fft_transformation(blob_data_biguint).wrap_err("Failed to apply FFT transformation").map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

Expand Down Expand Up @@ -204,17 +208,18 @@ impl Job for DaJob {
}

#[tracing::instrument(skip(elements))]
pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
pub fn fft_transformation(elements: Vec<BigUint>) -> Result<Vec<BigUint>, JobError> {
let xs: Vec<BigUint> = (0..*BLOB_LEN)
.map(|i| {
let bin = format!("{:012b}", i);
let bin_rev = bin.chars().rev().collect::<String>();
GENERATOR.modpow(
&BigUint::from_str_radix(&bin_rev, 2).expect("Not able to convert the parameters into exponent."),
&BLS_MODULUS,
)
let exponent = BigUint::from_str_radix(&bin_rev, 2)
.wrap_err("Failed to convert binary string to exponent")
.map_err(|e| JobError::Other(OtherError(e)))?;
Ok(GENERATOR.modpow(&exponent, &BLS_MODULUS))
})
.collect();
.collect::<Result<Vec<BigUint>, JobError>>()?;

let n = elements.len();
let mut transform: Vec<BigUint> = vec![BigUint::zero(); n];

Expand All @@ -223,7 +228,7 @@ pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
transform[i] = (transform[i].clone().mul(&xs[i]).add(&elements[j])).rem(&*BLS_MODULUS);
}
}
transform
Ok(transform)
}

pub fn convert_to_biguint(elements: Vec<Felt>) -> Vec<BigUint> {
Expand Down Expand Up @@ -310,7 +315,7 @@ pub async fn state_update_to_blob_data(

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64);
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64)?;
blob_data.push(address);
blob_data.push(da_word);

Expand Down Expand Up @@ -355,7 +360,7 @@ async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc
/// DA word encoding:
/// |---padding---|---class flag---|---new nonce---|---num changes---|
/// 127 bits 1 bit 64 bits 64 bits
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Felt {
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Result<Felt, JobError> {
// padding of 127 bits
let mut binary_string = "0".repeat(127);

Expand All @@ -367,13 +372,8 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
}

// checking for nonce here
if let Some(_new_nonce) = nonce_change {
let bytes: [u8; 32] = nonce_change
.expect(
"Not able to convert the nonce_change var into [u8; 32] type. Possible Error : Improper parameter \
length.",
)
.to_bytes_be();
if let Some(new_nonce) = nonce_change {
let bytes: [u8; 32] = new_nonce.to_bytes_be();
let biguint = BigUint::from_bytes_be(&bytes);
let binary_string_local = format!("{:b}", biguint);
let padded_binary_string = format!("{:0>64}", binary_string_local);
Expand All @@ -387,12 +387,16 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
let padded_binary_string = format!("{:0>64}", binary_representation);
binary_string += &padded_binary_string;

let biguint = BigUint::from_str_radix(binary_string.as_str(), 2).expect("Invalid binary string");
let biguint = BigUint::from_str_radix(binary_string.as_str(), 2)
.wrap_err("Failed to convert binary string to BigUint")
.map_err(|e| JobError::Other(OtherError(e)))?;

// Now convert the BigUint to a decimal string
let decimal_string = biguint.to_str_radix(10);

Felt::from_dec_str(&decimal_string).expect("issue while converting to fieldElement")
Felt::from_dec_str(&decimal_string)
.wrap_err("Failed to convert decimal string to FieldElement")
.map_err(|e| JobError::Other(OtherError(e)))
}

fn refactor_state_update(state_update: &mut StateDiff) {
Expand Down Expand Up @@ -453,7 +457,7 @@ pub mod test {
#[case] expected: String,
) {
let new_nonce = if new_nonce > 0 { Some(Felt::from(new_nonce)) } else { None };
let da_word = da_word(class_flag, new_nonce, num_changes);
let da_word = da_word(class_flag, new_nonce, num_changes).expect("Failed to create DA word");
let expected = Felt::from_dec_str(expected.as_str()).unwrap();
assert_eq!(da_word, expected);
}
Expand Down Expand Up @@ -562,7 +566,7 @@ pub mod test {
// converting the data to its original format
let ifft_blob_data = blob::recover(original_blob_data.clone());
// applying the fft function again on the original format
let fft_blob_data = fft_transformation(ifft_blob_data);
let fft_blob_data = fft_transformation(ifft_blob_data).expect("FFT transformation failed during test");

// ideally the data after fft transformation and the data before ifft should be same.
assert_eq!(fft_blob_data, original_blob_data);
Expand Down
31 changes: 20 additions & 11 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Job for StateUpdateJob {
for block_no in block_numbers.iter() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");

let snos = self.fetch_snos_for_block(*block_no, config.clone()).await;
let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
let txn_hash = self
.update_state_for_block(config.clone(), *block_no, snos, nonce)
.await
Expand Down Expand Up @@ -320,7 +320,7 @@ impl StateUpdateJob {
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await;
let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;

// TODO :
// Fetching nonce before the transaction is run
Expand All @@ -336,21 +336,30 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> StarknetOsOutput {
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
let storage_client = config.storage();
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;
let snos_output_bytes = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
serde_json::from_slice(snos_output_bytes.iter().as_slice())
.expect("Unable to convert the data into snos output")

let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
})
}

async fn fetch_program_output_for_block(&self, block_number: u64, config: Arc<Config>) -> Vec<[u8; 32]> {
async fn fetch_program_output_for_block(
&self,
block_number: u64,
config: Arc<Config>,
) -> Result<Vec<[u8; 32]>, JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
let program_output = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
let decode_data: Vec<[u8; 32]> =
bincode::deserialize(&program_output).expect("Unable to decode the fetched data from storage provider.");
decode_data

let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

bincode::deserialize(&program_output).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
})
}

/// Insert the tx hashes into the the metadata for the attempt number - will be used later by
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn bytes_to_vec_u8(bytes: &[u8]) -> color_eyre::Result<Vec<[u8; 32]>> {
let trimmed = line.trim();
assert!(!trimmed.is_empty());

let result = U256::from_str(trimmed).expect("Unable to convert line");
let result = U256::from_str(trimmed)?;
let res_vec = result.to_be_bytes_vec();
let hex = to_padded_hex(res_vec.as_slice());
let vec_hex = hex_string_to_u8_vec(&hex)
Expand Down
14 changes: 11 additions & 3 deletions crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ pub struct WorkerTriggerMessage {
pub worker: WorkerTriggerType,
}

#[derive(Error, Debug)]
pub enum WorkerTriggerTypeError {
#[error("Unknown WorkerTriggerType: {0}")]
UnknownType(String),
}

impl FromStr for WorkerTriggerType {
type Err = String;
type Err = WorkerTriggerTypeError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
Expand All @@ -68,7 +74,7 @@ impl FromStr for WorkerTriggerType {
"ProofRegistration" => Ok(WorkerTriggerType::ProofRegistration),
"DataSubmission" => Ok(WorkerTriggerType::DataSubmission),
"UpdateState" => Ok(WorkerTriggerType::UpdateState),
_ => Err(format!("Unknown WorkerTriggerType: {}", s)),
_ => Err(WorkerTriggerTypeError::UnknownType(s.to_string())),
}
}
}
Expand Down Expand Up @@ -231,7 +237,9 @@ fn parse_worker_message(message: &Delivery) -> Result<Option<WorkerTriggerMessag
.borrow_payload()
.ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?;
let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string();
let trigger_type = WorkerTriggerType::from_str(message_string.as_str()).expect("trigger type unwrapping failed");
let trigger_type = WorkerTriggerType::from_str(message_string.as_str())
.wrap_err("Failed to parse worker trigger type from message")
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved
Ok(Some(WorkerTriggerMessage { worker: trigger_type }))
}

Expand Down
12 changes: 8 additions & 4 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
use opentelemetry::KeyValue;
use starknet::providers::Provider;

Expand Down Expand Up @@ -35,10 +36,13 @@ impl Worker for SnosWorker {

let latest_job_in_db = config.database().get_latest_job_by_type(JobType::SnosRun).await?;

let latest_job_id = match latest_job_in_db {
Some(job) => job.internal_id.parse::<u64>().expect("Failed to parse job internal ID to u64"),
None => "0".to_string().parse::<u64>().expect("Failed to parse '0' to u64"),
};
let latest_job_id = latest_job_in_db
.map(|job| {
job.internal_id
.parse::<u64>()
.wrap_err_with(|| format!("Failed to parse job internal ID: {}", job.internal_id))
})
.unwrap_or(Ok(0))?;

// To be used when testing in specific block range
let block_start = if let Some(min_block_to_process) = config.service_config().min_block_to_process {
Expand Down
24 changes: 10 additions & 14 deletions crates/prover-clients/atlantic-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl AtlanticClient {
let prover_type = atlantic_params.atlantic_prover_type.clone();

let client = HttpClient::builder(url.as_str())
.expect("Failed to create HTTP client builder")
.default_form_data("mockFactHash", &mock_fact_hash)
.default_form_data("proverType", &prover_type)
.build()
Expand All @@ -56,7 +57,7 @@ impl AtlanticClient {
let proving_layer: Box<dyn ProvingLayer> = match atlantic_params.atlantic_settlement_layer.as_str() {
"ethereum" => Box::new(EthereumLayer),
"starknet" => Box::new(StarknetLayer),
_ => panic!("proving layer not correct"),
_ => panic!("Invalid settlement layer: {}", atlantic_params.atlantic_settlement_layer),
};

Self { client, proving_layer }
Expand All @@ -66,7 +67,7 @@ impl AtlanticClient {
&self,
pie_file: &Path,
proof_layout: LayoutName,
atlantic_api_key: String,
atlantic_api_key: impl AsRef<str>,
) -> Result<AtlanticAddJobResponse, AtlanticError> {
let proof_layout = match proof_layout {
LayoutName::dynamic => "dynamic",
Expand All @@ -76,22 +77,17 @@ impl AtlanticClient {
let response = self
.proving_layer
.customize_request(
self.client
.request()
.method(Method::POST)
.query_param("apiKey", &atlantic_api_key)
.form_file("pieFile", pie_file, "pie.zip")
.form_text("layout", proof_layout),
self.client.request().method(Method::POST).query_param("apiKey", atlantic_api_key.as_ref()),
)
.form_file("pieFile", pie_file, "pie.zip")?
.form_text("layout", proof_layout)
.send()
.await
.map_err(AtlanticError::AddJobFailure)
.expect("Failed to add job");
.map_err(AtlanticError::AddJobFailure)?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved

if response.status().is_success() {
response.json().await.map_err(AtlanticError::AddJobFailure)
} else {
Err(AtlanticError::SharpService(response.status()))
match response.status().is_success() {
true => response.json().await.map_err(AtlanticError::AddJobFailure),
false => Err(AtlanticError::SharpService(response.status())),
}
}

Expand Down
Loading
Loading