diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index 387457d5..cc29f1e3 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -43,6 +43,10 @@ pub trait Database: Send + Sync { job_status: JobStatus, internal_id: String, ) -> Result>; + + // TODO: can be extendible to support multiple status. + async fn get_jobs_by_status(&self, status : JobStatus ) -> Result>; + } pub trait DatabaseConfig { diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 5be0cf54..77a2b4db 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -290,4 +290,30 @@ impl Database for MongoDb { Ok(results) } + + async fn get_jobs_by_status(&self, job_status : JobStatus) -> Result> { + + let filter = doc! { + "job_status": bson::to_bson(&job_status)? + }; + + let mut jobs = self + .get_job_collection() + .find(filter, None) + .await + .expect("Failed to fetch jobs by given job type and status"); + + let mut results = Vec::new(); + + while let Some(result) = jobs.next().await { + match result { + Ok(job_item) => { + results.push(job_item); + } + Err(e) => return Err(e.into()), + } + } + + Ok(results) + } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index b501cade..e8dcab4c 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { match job.status { // we only want to process jobs that are in the created or verification failed state. // verification failed state means that the previous processing failed and we want to retry - JobStatus::Created | JobStatus::VerificationFailed(_) => { + JobStatus::Created | JobStatus::VerificationFailed => { log::info!("Processing job with id {:?}", id); } _ => { @@ -134,8 +134,9 @@ pub async fn verify_job(id: Uuid) -> Result<()> { JobVerificationStatus::Verified => { config.database().update_job_status(&job, JobStatus::Completed).await?; } - JobVerificationStatus::Rejected(e) => { - config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?; + JobVerificationStatus::Rejected(_) => { + // TODO: change '_' to 'e' and add error 'e' to metadata of job status. + config.database().update_job_status(&job, JobStatus::VerificationFailed).await?; // retry job processing if we haven't exceeded the max limit let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index b8e492b4..fce76280 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -98,7 +98,7 @@ pub enum JobStatus { /// The job was processed but the was unable to be verified under the given time VerificationTimeout, /// The job failed processing - VerificationFailed(String), + VerificationFailed, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/crates/orchestrator/src/workers/data_submission.rs b/crates/orchestrator/src/workers/data_submission.rs index e1622c7b..2f118a2c 100644 --- a/crates/orchestrator/src/workers/data_submission.rs +++ b/crates/orchestrator/src/workers/data_submission.rs @@ -15,6 +15,12 @@ impl Worker for DataSubmissionWorker { // 2. Fetch the latest DA job creation. // 3. Create jobs from after the lastest DA job already created till latest completed proving job. async fn run_worker(&self) -> Result<(), Box> { + + // Return without doing anything if the worker is not enabled. + if !self.is_worker_enabled().await? { + return Ok(()); + } + let config = config().await; // provides latest completed proof creation job id @@ -38,13 +44,6 @@ impl Worker for DataSubmissionWorker { let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?; let latest_proven_id: u64 = latest_proven_job_id.parse()?; - let block_diff = latest_proven_id - latest_data_submission_id; - - // if all blocks are processed - if block_diff == 0 { - return Ok(()); - } - // creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist. for x in latest_data_submission_id + 1..latest_proven_id + 1 { create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?; diff --git a/crates/orchestrator/src/workers/mod.rs b/crates/orchestrator/src/workers/mod.rs index 93014cdc..1c063c37 100644 --- a/crates/orchestrator/src/workers/mod.rs +++ b/crates/orchestrator/src/workers/mod.rs @@ -1,5 +1,5 @@ use std::error::Error; - +use crate::{config::config, jobs::types::JobStatus}; use async_trait::async_trait; pub mod data_submission; @@ -11,4 +11,27 @@ pub mod update_state; #[async_trait] pub trait Worker: Send + Sync { async fn run_worker(&self) -> Result<(), Box>; + + // TODO: Assumption : False Negative + // we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist + // for a single block, the code might fail to work as expected if this happens. + + // Checks if any of the jobs have failed + // Haults any new job creation till all the count of failed jobs is not Zero. + async fn is_worker_enabled(&self) -> Result> { + let config = config().await; + + let failed_da_jobs = config + .database() + .get_jobs_by_status( + JobStatus::VerificationFailed, + ) + .await?; + + if failed_da_jobs.len() > 0 { + return Ok(false); + } + + Ok(true) + } }