Skip to content

Commit

Permalink
update: added termination queue
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Jul 24, 2024
1 parent 6803b91 commit 7a4a4aa
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Function to calculate the kzg proof of x_0.
- Tests for updating the state.
- Function to update the state and publish blob on ethereum in state update job.
- Implement DL queue for handling failed jobs.

## Changed

Expand Down
12 changes: 12 additions & 0 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
pub async fn terminate_job(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;
let mut metadata = job.metadata.clone();
metadata.insert("last_job_status".to_string(), job.status.to_string());
config.database().update_metadata(&job, metadata).await?;
config.database().update_job_status(&job, JobStatus::Failed).await?;

Ok(())
}

fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
Expand Down
18 changes: 17 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use color_eyre::eyre::eyre;
use color_eyre::Result;
Expand Down Expand Up @@ -99,6 +99,22 @@ pub enum JobStatus {
VerificationTimeout,
/// The job failed processing
VerificationFailed(String),
/// The job failed completing
Failed,
}

impl fmt::Display for JobStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
JobStatus::Created => write!(f, "Created"),
JobStatus::LockedForProcessing => write!(f, "Locked for Processing"),
JobStatus::PendingVerification => write!(f, "Pending Verification"),
JobStatus::Completed => write!(f, "Completed"),
JobStatus::VerificationTimeout => write!(f, "Verification Timeout"),
JobStatus::VerificationFailed(reason) => write!(f, "Verification Failed: {}", reason),
JobStatus::Failed => write!(f, "Failed"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand Down
12 changes: 11 additions & 1 deletion crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use tracing::log;
use uuid::Uuid;

use crate::config::config;
use crate::jobs::{process_job, verify_job};
use crate::jobs::{process_job, terminate_job, verify_job};

const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue";
const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue";
const JOB_TERMINATION_QUEUE: &str = "madara_orchestrator_job_termination_queue";

#[derive(Debug, Serialize, Deserialize)]
pub struct JobQueueMessage {
Expand Down Expand Up @@ -88,6 +89,15 @@ pub async fn init_consumers() -> Result<()> {
sleep(Duration::from_secs(1)).await;
}
});
tokio::spawn(async move {
loop {
match consume_job_from_queue(JOB_TERMINATION_QUEUE.to_string(), terminate_job).await {
Ok(_) => {}
Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_TERMINATION_QUEUE, e),
}
sleep(Duration::from_secs(1)).await;
}
});
Ok(())
}

Expand Down

0 comments on commit 7a4a4aa

Please sign in to comment.