Skip to content

Commit

Permalink
Feat : Database/timestamp (#109)
Browse files Browse the repository at this point in the history
* feat : added database timestamp

* changelog

* chore : lint fixes

* refactor
  • Loading branch information
ocdbytes authored Sep 9, 2024
1 parent 2bd2ddc commit 30116b2
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- database timestamps
- alerts module.
- Tests for Settlement client.
- Worker queues to listen for trigger events.
Expand Down
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ axum = { version = "0.7.4" }
axum-macros = "0.4.1"
bincode = "1.3.3"
color-eyre = "0.6.2"
chrono = "0.4.0"
c-kzg = "1.0.0"
dotenvy = "0.15.7"
futures = "0.3.30"
mongodb = { version = "2.8.1" }
mongodb = { version = "2.8.1", features = ["bson-chrono-0_4"] }
omniqueue = { version = "0.2.0" }
reqwest = { version = "0.12.7", features = [
"rustls-tls",
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ bincode = { workspace = true }
bytes = "1.6.0"
c-kzg = { workspace = true }
cairo-vm = { workspace = true }
chrono = { workspace = true }
color-eyre = { workspace = true }
da-client-interface = { workspace = true }
dotenvy = { workspace = true }
Expand Down
12 changes: 9 additions & 3 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::TryStreamExt;
use std::collections::HashMap;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::{Bson, Document};
Expand Down Expand Up @@ -64,17 +65,22 @@ impl MongoDb {
if result.modified_count == 0 {
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}
self.update_job_version(current_job).await?;
self.post_job_update(current_job).await?;
Ok(())
}

// TODO : remove this function
// Do this process in single db transaction.
/// To update the document version
async fn update_job_version(&self, current_job: &JobItem) -> Result<()> {
async fn post_job_update(&self, current_job: &JobItem) -> Result<()> {
let filter = doc! {
"id": current_job.id,
};
let combined_update = doc! {
"$inc": { "version": 1 }
"$inc": { "version": 1 },
"$set" : {
"updated_at": Utc::now().round_subsecs(0)
}
};
let options = UpdateOptions::builder().upsert(false).build();
let result = self.get_job_collection().update_one(filter, combined_update, options).await?;
Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::ops::{Add, Mul, Rem};
use std::str::FromStr;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::WrapErr;
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
Expand Down Expand Up @@ -71,6 +72,8 @@ impl Job for DaJob {
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use async_trait::async_trait;
use cairo_vm::vm::runners::cairo_pie::CairoPie;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::WrapErr;
use prover_client_interface::{Task, TaskStatus};
use thiserror::Error;
Expand Down Expand Up @@ -46,6 +47,8 @@ impl Job for ProvingJob {
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/jobs/register_proof_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::Result;
use uuid::Uuid;

Expand Down Expand Up @@ -30,6 +31,8 @@ impl Job for RegisterProofJob {
// this will allow state update jobs to be created for each block
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::Result;
use uuid::Uuid;

Expand Down Expand Up @@ -28,6 +29,8 @@ impl Job for SnosJob {
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;
use ::utils::collections::{has_dup, is_sorted};
use async_trait::async_trait;
use cairo_vm::Felt252;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::eyre;
use snos::io::output::StarknetOsOutput;
use thiserror::Error;
Expand Down Expand Up @@ -86,6 +87,8 @@ impl Job for StateUpdateJob {
// we don't do one job per state update as that makes nonce management complicated
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand Down
9 changes: 8 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use color_eyre::eyre::eyre;
use color_eyre::Result;
use da_client_interface::DaVerificationStatus;
// TODO: job types shouldn't depend on mongodb
use chrono::{DateTime, Utc};
#[cfg(feature = "with_mongodb")]
use mongodb::bson::serde_helpers::uuid_1_as_binary;
use mongodb::bson::serde_helpers::{chrono_datetime_as_bson_datetime, uuid_1_as_binary};
use serde::{Deserialize, Serialize};
use settlement_client_interface::SettlementVerificationStatus;
use uuid::Uuid;
Expand Down Expand Up @@ -133,6 +134,12 @@ pub struct JobItem {
pub metadata: HashMap<String, String>,
/// helps to keep track of the version of the item for optimistic locking
pub version: i32,
/// timestamp when the job was created
#[cfg_attr(feature = "with_mongodb", serde(with = "chrono_datetime_as_bson_datetime"))]
pub created_at: DateTime<Utc>,
/// timestamp when the job was last updated
#[cfg_attr(feature = "with_mongodb", serde(with = "chrono_datetime_as_bson_datetime"))]
pub updated_at: DateTime<Utc>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ::uuid::Uuid;
use aws_config::Region;
use aws_sdk_sns::error::SdkError;
use aws_sdk_sns::operation::create_topic::CreateTopicError;
use chrono::{SubsecRound, Utc};
use mongodb::Client;
use rstest::*;
use serde::Deserialize;
Expand All @@ -32,6 +33,8 @@ pub fn default_job_item() -> JobItem {
external_id: ExternalId::String("0".to_string().into_boxed_str()),
metadata: HashMap::new(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::config::{config, Config};
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::tests::config::TestConfigBuilder;
use arc_swap::Guard;
use chrono::{SubsecRound, Utc};
use rstest::*;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -224,5 +225,7 @@ pub fn build_job_item(job_type: JobType, job_status: JobStatus, internal_id: u64
external_id: ExternalId::Number(0),
metadata: Default::default(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
}
}
7 changes: 7 additions & 0 deletions crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::tests::common::drop_database;
use crate::tests::config::TestConfigBuilder;
use crate::{config::config, jobs::Job};
use assert_matches::assert_matches;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::eyre;
use da_client_interface::MockDaClient;
use mockall::predicate::always;
Expand Down Expand Up @@ -68,6 +69,8 @@ async fn test_da_job_process_job_failure_on_small_blob_size(
external_id: ExternalId::String(internal_id.to_string().into_boxed_str()),
metadata: HashMap::default(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
},
)
.await;
Expand Down Expand Up @@ -126,6 +129,8 @@ async fn test_da_job_process_job_failure_on_pending_block() {
external_id: ExternalId::String("1".to_string().into_boxed_str()),
metadata: HashMap::default(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
},
)
.await;
Expand Down Expand Up @@ -202,6 +207,8 @@ async fn test_da_job_process_job_success(
external_id: ExternalId::String(internal_id.to_string().into_boxed_str()),
metadata: HashMap::default(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
},
)
.await;
Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod proving_job;
pub mod state_update_job;

use assert_matches::assert_matches;
use chrono::{SubsecRound, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -505,6 +506,8 @@ fn build_job_item_by_type_and_status(job_type: JobType, job_status: JobStatus, i
external_id: ExternalId::Number(0),
metadata: hashmap,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use chrono::{SubsecRound, Utc};
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
Expand Down Expand Up @@ -92,6 +93,8 @@ async fn test_process_job() {
external_id: String::new().into(),
metadata: HashMap::new(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0)
}
)
.await
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/tests/workers/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::database::MockDatabase;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::jobs::MockJob;
use chrono::{SubsecRound, Utc};
use mockall::predicate::eq;
use std::collections::HashMap;
use uuid::Uuid;
Expand All @@ -14,6 +15,8 @@ pub fn get_job_item_mock_by_id(id: String, uuid: Uuid) -> JobItem {
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
}
}

Expand Down Expand Up @@ -46,6 +49,8 @@ pub fn get_job_by_mock_id_vector(
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
}

Expand All @@ -63,6 +68,8 @@ pub fn db_checks_proving_worker(id: i32, db: &mut MockDatabase, mock_job: &mut M
external_id: ExternalId::Number(0),
metadata: HashMap::new(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/workers/update_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl UpdateStateWorker {
mod test_update_state_worker_utils {
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType};
use crate::workers::update_state::UpdateStateWorker;
use chrono::{SubsecRound, Utc};
use rstest::rstest;
use uuid::Uuid;

Expand All @@ -78,6 +79,8 @@ mod test_update_state_worker_utils {
external_id: ExternalId::Number(0),
metadata: Default::default(),
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
});
}

Expand Down

0 comments on commit 30116b2

Please sign in to comment.