From 7a4a4aa4df3893860d607e28ebd2ba69346ec4f9 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Wed, 24 Jul 2024 15:03:41 +0530 Subject: [PATCH] update: added termination queue --- CHANGELOG.md | 1 + crates/orchestrator/src/jobs/mod.rs | 12 ++++++++++++ crates/orchestrator/src/jobs/types.rs | 18 +++++++++++++++++- crates/orchestrator/src/queue/job_queue.rs | 12 +++++++++++- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40fd1c8d..0f49cb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Function to calculate the kzg proof of x_0. - Tests for updating the state. - Function to update the state and publish blob on ethereum in state update job. +- Implement DL queue for handling failed jobs. ## Changed diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index b501cade..5acad76f 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -173,6 +173,18 @@ pub async fn verify_job(id: Uuid) -> Result<()> { Ok(()) } +/// Terminates the job and updates the status of the job in the DB. +pub async fn terminate_job(id: Uuid) -> Result<()> { + let config = config().await; + let job = get_job(id).await?; + let mut metadata = job.metadata.clone(); + metadata.insert("last_job_status".to_string(), job.status.to_string()); + config.database().update_metadata(&job, metadata).await?; + config.database().update_job_status(&job, JobStatus::Failed).await?; + + Ok(()) +} + fn get_job_handler(job_type: &JobType) -> Box { match job_type { JobType::DataSubmission => Box::new(da_job::DaJob), diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index b8e492b4..a52d8f8b 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; use color_eyre::eyre::eyre; use color_eyre::Result; @@ -99,6 +99,22 @@ pub enum JobStatus { VerificationTimeout, /// The job failed processing VerificationFailed(String), + /// The job failed completing + Failed, +} + +impl fmt::Display for JobStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + JobStatus::Created => write!(f, "Created"), + JobStatus::LockedForProcessing => write!(f, "Locked for Processing"), + JobStatus::PendingVerification => write!(f, "Pending Verification"), + JobStatus::Completed => write!(f, "Completed"), + JobStatus::VerificationTimeout => write!(f, "Verification Timeout"), + JobStatus::VerificationFailed(reason) => write!(f, "Verification Failed: {}", reason), + JobStatus::Failed => write!(f, "Failed"), + } + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 9432276f..2ef47dfb 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -10,10 +10,11 @@ use tracing::log; use uuid::Uuid; use crate::config::config; -use crate::jobs::{process_job, verify_job}; +use crate::jobs::{process_job, terminate_job, verify_job}; const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; +const JOB_TERMINATION_QUEUE: &str = "madara_orchestrator_job_termination_queue"; #[derive(Debug, Serialize, Deserialize)] pub struct JobQueueMessage { @@ -88,6 +89,15 @@ pub async fn init_consumers() -> Result<()> { sleep(Duration::from_secs(1)).await; } }); + tokio::spawn(async move { + loop { + match consume_job_from_queue(JOB_TERMINATION_QUEUE.to_string(), terminate_job).await { + Ok(_) => {} + Err(e) => log::error!("Failed to consume from queue {:?}. Error: {:?}", JOB_TERMINATION_QUEUE, e), + } + sleep(Duration::from_secs(1)).await; + } + }); Ok(()) }