Skip to content

Commit

Permalink
update: is_worker_enabled impl & usage in da_submission, removal of S…
Browse files Browse the repository at this point in the history
…tring from VerificationFailed
  • Loading branch information
heemankv committed Jul 23, 2024
1 parent df4fa86 commit d21abff
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 12 deletions.
4 changes: 4 additions & 0 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub trait Database: Send + Sync {
job_status: JobStatus,
internal_id: String,
) -> Result<Vec<JobItem>>;

// TODO: can be extendible to support multiple status.
async fn get_jobs_by_status(&self, status : JobStatus ) -> Result<Vec<JobItem>>;

}

pub trait DatabaseConfig {
Expand Down
26 changes: 26 additions & 0 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,30 @@ impl Database for MongoDb {

Ok(results)
}

async fn get_jobs_by_status(&self, job_status : JobStatus) -> Result<Vec<JobItem>> {

let filter = doc! {
"job_status": bson::to_bson(&job_status)?
};

let mut jobs = self
.get_job_collection()
.find(filter, None)
.await
.expect("Failed to fetch jobs by given job type and status");

let mut results = Vec::new();

while let Some(result) = jobs.next().await {
match result {
Ok(job_item) => {
results.push(job_item);
}
Err(e) => return Err(e.into()),
}
}

Ok(results)
}
}
7 changes: 4 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
match job.status {
// we only want to process jobs that are in the created or verification failed state.
// verification failed state means that the previous processing failed and we want to retry
JobStatus::Created | JobStatus::VerificationFailed(_) => {
JobStatus::Created | JobStatus::VerificationFailed => {
log::info!("Processing job with id {:?}", id);
}
_ => {
Expand Down Expand Up @@ -134,8 +134,9 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
JobVerificationStatus::Verified => {
config.database().update_job_status(&job, JobStatus::Completed).await?;
}
JobVerificationStatus::Rejected(e) => {
config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?;
JobVerificationStatus::Rejected(_) => {
// TODO: change '_' to 'e' and add error 'e' to metadata of job status.
config.database().update_job_status(&job, JobStatus::VerificationFailed).await?;

// retry job processing if we haven't exceeded the max limit
let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub enum JobStatus {
/// The job was processed but the was unable to be verified under the given time
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
VerificationFailed,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
13 changes: 6 additions & 7 deletions crates/orchestrator/src/workers/data_submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ impl Worker for DataSubmissionWorker {
// 2. Fetch the latest DA job creation.
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {

// Return without doing anything if the worker is not enabled.
if !self.is_worker_enabled().await? {
return Ok(());
}

let config = config().await;

// provides latest completed proof creation job id
Expand All @@ -38,13 +44,6 @@ impl Worker for DataSubmissionWorker {
let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
let latest_proven_id: u64 = latest_proven_job_id.parse()?;

let block_diff = latest_proven_id - latest_data_submission_id;

// if all blocks are processed
if block_diff == 0 {
return Ok(());
}

// creating data submission jobs for latest blocks without pre-running data submission jobs jobs don't yet exist.
for x in latest_data_submission_id + 1..latest_proven_id + 1 {
create_job(JobType::DataSubmission, x.to_string(), HashMap::new()).await?;
Expand Down
25 changes: 24 additions & 1 deletion crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::error::Error;

use crate::{config::config, jobs::types::JobStatus};
use async_trait::async_trait;

pub mod data_submission;
Expand All @@ -11,4 +11,27 @@ pub mod update_state;
#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self) -> Result<(), Box<dyn Error>>;

// TODO: Assumption : False Negative
// we are assuming that the worker will spawn only 1 job for a block and no two jobs will ever exist
// for a single block, the code might fail to work as expected if this happens.

// Checks if any of the jobs have failed
// Haults any new job creation till all the count of failed jobs is not Zero.
async fn is_worker_enabled(&self) -> Result<bool, Box<dyn Error>> {
let config = config().await;

let failed_da_jobs = config
.database()
.get_jobs_by_status(
JobStatus::VerificationFailed,
)
.await?;

if failed_da_jobs.len() > 0 {
return Ok(false);
}

Ok(true)
}
}

0 comments on commit d21abff

Please sign in to comment.