diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f6f89bf..1eec7e27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- database timestamps - alerts module. - Tests for Settlement client. - Worker queues to listen for trigger events. diff --git a/Cargo.lock b/Cargo.lock index bce30baf..ff73bb06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2525,6 +2525,7 @@ dependencies = [ "ahash 0.8.11", "base64 0.13.1", "bitvec", + "chrono", "hex", "indexmap 2.2.6", "js-sys", @@ -3236,9 +3237,11 @@ checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits 0.2.19", "serde", - "windows-targets 0.52.6", + "wasm-bindgen", + "windows-targets 0.52.5", ] [[package]] @@ -6374,6 +6377,7 @@ dependencies = [ "bytes", "c-kzg", "cairo-vm 1.0.0-rc3", + "chrono", "color-eyre", "da-client-interface", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 2664d313..b092c7ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,10 +29,11 @@ axum = { version = "0.7.4" } axum-macros = "0.4.1" bincode = "1.3.3" color-eyre = "0.6.2" +chrono = "0.4.0" c-kzg = "1.0.0" dotenvy = "0.15.7" futures = "0.3.30" -mongodb = { version = "2.8.1" } +mongodb = { version = "2.8.1", features = ["bson-chrono-0_4"] } omniqueue = { version = "0.2.0" } reqwest = { version = "0.12.7", features = [ "rustls-tls", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 1ded351a..14e979d6 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -27,6 +27,7 @@ bincode = { workspace = true } bytes = "1.6.0" c-kzg = { workspace = true } cairo-vm = { workspace = true } +chrono = { workspace = true } color-eyre = { workspace = true } da-client-interface = { workspace = true } dotenvy = { workspace = true } diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 4d53f243..8e4102eb 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -3,6 +3,7 @@ use futures::TryStreamExt; use std::collections::HashMap; use async_trait::async_trait; +use chrono::{SubsecRound, Utc}; use color_eyre::eyre::eyre; use color_eyre::Result; use mongodb::bson::{Bson, Document}; @@ -64,17 +65,22 @@ impl MongoDb { if result.modified_count == 0 { return Err(eyre!("Failed to update job. Job version is likely outdated")); } - self.update_job_version(current_job).await?; + self.post_job_update(current_job).await?; Ok(()) } + // TODO : remove this function + // Do this process in single db transaction. /// To update the document version - async fn update_job_version(&self, current_job: &JobItem) -> Result<()> { + async fn post_job_update(&self, current_job: &JobItem) -> Result<()> { let filter = doc! { "id": current_job.id, }; let combined_update = doc! { - "$inc": { "version": 1 } + "$inc": { "version": 1 }, + "$set" : { + "updated_at": Utc::now().round_subsecs(0) + } }; let options = UpdateOptions::builder().upsert(false).build(); let result = self.get_job_collection().update_one(filter, combined_update, options).await?; diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 8148b113..e23af285 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -3,6 +3,7 @@ use std::ops::{Add, Mul, Rem}; use std::str::FromStr; use async_trait::async_trait; +use chrono::{SubsecRound, Utc}; use color_eyre::eyre::WrapErr; use lazy_static::lazy_static; use num_bigint::{BigUint, ToBigUint}; @@ -71,6 +72,8 @@ impl Job for DaJob { external_id: String::new().into(), metadata, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index ab6cf1c1..3c81b2c9 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use async_trait::async_trait; use cairo_vm::vm::runners::cairo_pie::CairoPie; +use chrono::{SubsecRound, Utc}; use color_eyre::eyre::WrapErr; use prover_client_interface::{Task, TaskStatus}; use thiserror::Error; @@ -46,6 +47,8 @@ impl Job for ProvingJob { external_id: String::new().into(), metadata, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } diff --git a/crates/orchestrator/src/jobs/register_proof_job/mod.rs b/crates/orchestrator/src/jobs/register_proof_job/mod.rs index e9a076dc..f7d958ad 100644 --- a/crates/orchestrator/src/jobs/register_proof_job/mod.rs +++ b/crates/orchestrator/src/jobs/register_proof_job/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use chrono::{SubsecRound, Utc}; use color_eyre::Result; use uuid::Uuid; @@ -30,6 +31,8 @@ impl Job for RegisterProofJob { // this will allow state update jobs to be created for each block metadata, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index 8c532bb2..02898767 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use chrono::{SubsecRound, Utc}; use color_eyre::Result; use uuid::Uuid; @@ -28,6 +29,8 @@ impl Job for SnosJob { external_id: String::new().into(), metadata, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 48e4aeb9..6a8800fb 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use ::utils::collections::{has_dup, is_sorted}; use async_trait::async_trait; use cairo_vm::Felt252; +use chrono::{SubsecRound, Utc}; use color_eyre::eyre::eyre; use snos::io::output::StarknetOsOutput; use thiserror::Error; @@ -86,6 +87,8 @@ impl Job for StateUpdateJob { // we don't do one job per state update as that makes nonce management complicated metadata, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index bb2130b5..1e1a4e34 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -4,8 +4,9 @@ use color_eyre::eyre::eyre; use color_eyre::Result; use da_client_interface::DaVerificationStatus; // TODO: job types shouldn't depend on mongodb +use chrono::{DateTime, Utc}; #[cfg(feature = "with_mongodb")] -use mongodb::bson::serde_helpers::uuid_1_as_binary; +use mongodb::bson::serde_helpers::{chrono_datetime_as_bson_datetime, uuid_1_as_binary}; use serde::{Deserialize, Serialize}; use settlement_client_interface::SettlementVerificationStatus; use uuid::Uuid; @@ -133,6 +134,12 @@ pub struct JobItem { pub metadata: HashMap, /// helps to keep track of the version of the item for optimistic locking pub version: i32, + /// timestamp when the job was created + #[cfg_attr(feature = "with_mongodb", serde(with = "chrono_datetime_as_bson_datetime"))] + pub created_at: DateTime, + /// timestamp when the job was last updated + #[cfg_attr(feature = "with_mongodb", serde(with = "chrono_datetime_as_bson_datetime"))] + pub updated_at: DateTime, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 83cf7e68..8e36bf22 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -6,6 +6,7 @@ use ::uuid::Uuid; use aws_config::Region; use aws_sdk_sns::error::SdkError; use aws_sdk_sns::operation::create_topic::CreateTopicError; +use chrono::{SubsecRound, Utc}; use mongodb::Client; use rstest::*; use serde::Deserialize; @@ -32,6 +33,8 @@ pub fn default_job_item() -> JobItem { external_id: ExternalId::String("0".to_string().into_boxed_str()), metadata: HashMap::new(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), } } diff --git a/crates/orchestrator/src/tests/database/mod.rs b/crates/orchestrator/src/tests/database/mod.rs index 58866582..29c3a9e7 100644 --- a/crates/orchestrator/src/tests/database/mod.rs +++ b/crates/orchestrator/src/tests/database/mod.rs @@ -2,6 +2,7 @@ use crate::config::{config, Config}; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use crate::tests::config::TestConfigBuilder; use arc_swap::Guard; +use chrono::{SubsecRound, Utc}; use rstest::*; use std::sync::Arc; use uuid::Uuid; @@ -224,5 +225,7 @@ pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64 external_id: ExternalId::Number(0), metadata: Default::default(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), } } diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 1390e46e..d98fe6de 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -6,6 +6,7 @@ use crate::tests::common::drop_database; use crate::tests::config::TestConfigBuilder; use crate::{config::config, jobs::Job}; use assert_matches::assert_matches; +use chrono::{SubsecRound, Utc}; use color_eyre::eyre::eyre; use da_client_interface::MockDaClient; use mockall::predicate::always; @@ -68,6 +69,8 @@ async fn test_da_job_process_job_failure_on_small_blob_size( external_id: ExternalId::String(internal_id.to_string().into_boxed_str()), metadata: HashMap::default(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }, ) .await; @@ -126,6 +129,8 @@ async fn test_da_job_process_job_failure_on_pending_block() { external_id: ExternalId::String("1".to_string().into_boxed_str()), metadata: HashMap::default(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }, ) .await; @@ -202,6 +207,8 @@ async fn test_da_job_process_job_success( external_id: ExternalId::String(internal_id.to_string().into_boxed_str()), metadata: HashMap::default(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }, ) .await; diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index d1748feb..f39851d7 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -17,6 +17,7 @@ pub mod proving_job; pub mod state_update_job; use assert_matches::assert_matches; +use chrono::{SubsecRound, Utc}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -505,6 +506,8 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i external_id: ExternalId::Number(0), metadata: hashmap, version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), } } diff --git a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs index d35b2670..25204d04 100644 --- a/crates/orchestrator/src/tests/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/proving_job/mod.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use chrono::{SubsecRound, Utc}; use std::collections::HashMap; use std::fs::File; use std::io::Read; @@ -92,6 +93,8 @@ async fn test_process_job() { external_id: String::new().into(), metadata: HashMap::new(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0) } ) .await diff --git a/crates/orchestrator/src/tests/workers/utils/mod.rs b/crates/orchestrator/src/tests/workers/utils/mod.rs index 9ae812f4..7b19e9ae 100644 --- a/crates/orchestrator/src/tests/workers/utils/mod.rs +++ b/crates/orchestrator/src/tests/workers/utils/mod.rs @@ -1,6 +1,7 @@ use crate::database::MockDatabase; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use crate::jobs::MockJob; +use chrono::{SubsecRound, Utc}; use mockall::predicate::eq; use std::collections::HashMap; use uuid::Uuid; @@ -14,6 +15,8 @@ pub fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem { external_id: ExternalId::Number(0), metadata: HashMap::new(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), } } @@ -46,6 +49,8 @@ pub fn get_job_by_mock_id_vector( external_id: ExternalId::Number(0), metadata: HashMap::new(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }) } @@ -63,6 +68,8 @@ pub fn db_checks_proving_worker(id: i32, db: &mut MockDatabase, mock_job: &mut M external_id: ExternalId::Number(0), metadata: HashMap::new(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 6f71f1ac..3247cec6 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -63,6 +63,7 @@ impl UpdateStateWorker { mod test_update_state_worker_utils { use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use crate::workers::update_state::UpdateStateWorker; + use chrono::{SubsecRound, Utc}; use rstest::rstest; use uuid::Uuid; @@ -78,6 +79,8 @@ mod test_update_state_worker_utils { external_id: ExternalId::Number(0), metadata: Default::default(), version: 0, + created_at: Utc::now().round_subsecs(0), + updated_at: Utc::now().round_subsecs(0), }); }