From f3aa8ad6082e565d4c0da870c076a11fc45b2942 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Wed, 11 Dec 2024 16:31:51 +0100 Subject: [PATCH 01/18] ingestion: plumbing for QC and stub qc_data implementation --- Cargo.lock | 2 ++ ingestion/Cargo.toml | 2 ++ ingestion/src/kldata.rs | 1 + ingestion/src/lib.rs | 24 ++++++++++++++++++++++-- ingestion/src/main.rs | 33 ++++++++++++++++++++++++++------- 5 files changed, 53 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcf5422b..0066fe4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1491,6 +1491,8 @@ dependencies = [ "kafka", "quick-xml", "regex", + "rove", + "rove_connector", "serde", "thiserror", "tokio", diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 238bd47b..278c0130 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -20,6 +20,8 @@ futures.workspace = true kafka.workspace = true quick-xml.workspace = true regex.workspace = true +rove.workspace = true +rove_connector = { path = "../rove_connector" } serde.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 19480f86..a4338019 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -366,6 +366,7 @@ pub async fn filter_and_label_kldata<'a>( data.push(Datum { timeseries_id, + param_id: param.id, timestamp: in_datum.timestamp, value: in_datum.value, }); diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 78622cc6..a969e7ab 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -73,6 +73,7 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, + qc_scheduler: Arc>, } impl FromRef for PgConnectionPool { @@ -93,6 +94,12 @@ impl FromRef for Arc for Arc> { + fn from_ref(state: &IngestorState) -> Arc> { + state.qc_scheduler.clone() + } +} + /// Represents the different Data types observation can have #[derive(Debug, PartialEq)] pub enum ObsType<'a> { @@ -103,6 +110,8 @@ pub enum ObsType<'a> { /// Generic container for a piece of data ready to be inserted into the DB pub struct Datum<'a> { timeseries_id: i32, + // needed for QC + param_id: i32, timestamp: DateTime, value: ObsType<'a>, } @@ -110,7 +119,7 @@ pub struct Datum<'a> { pub type Data<'a> = Vec>; // TODO: benchmark insertion of scalar and non-scalar together vs separately? -pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { +pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement // // I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new @@ -170,6 +179,10 @@ pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result< Ok(()) } +pub async fn qc_data(data: &Data<'_>, scheduler: &rove::Scheduler<'static>) -> Result<(), Error> { + todo!() +} + pub mod kldata; use kldata::{filter_and_label_kldata, parse_kldata}; @@ -193,6 +206,7 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, + State(qc_scheduler): State>>, body: String, ) -> Json { let result: Result = async { @@ -204,7 +218,9 @@ async fn handle_kldata( filter_and_label_kldata(obsinn_chunk, &mut conn, param_conversions, permit_table) .await?; - insert_data(data, &mut conn).await?; + insert_data(&data, &mut conn).await?; + + qc_data(&data, &qc_scheduler).await?; Ok(message_id) } @@ -255,10 +271,13 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, + qc_scheduler: rove::Scheduler<'static>, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; + let qc_scheduler = Arc::new(qc_scheduler); + // build our application with a single route let app = Router::new() .route("/kldata", post(handle_kldata)) @@ -266,6 +285,7 @@ pub async fn run( db_pool, param_conversions, permit_tables, + qc_scheduler, }); // run our app with hyper, listening globally on port 3001 diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index a1c1cfeb..3ad678bc 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,5 +1,13 @@ use bb8_postgres::PostgresConnectionManager; -use std::sync::{Arc, RwLock}; +use rove::{ + data_switch::{DataConnector, DataSwitch}, + load_pipelines, +}; +use rove_connector::Connector; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; use tokio_postgres::NoTls; use lard_ingestion::permissions; @@ -23,6 +31,22 @@ async fn main() -> Result<(), Box> { let permit_tables = Arc::new(RwLock::new(permissions::fetch_permits().await?)); let background_permit_tables = permit_tables.clone(); + // Set up postgres connection pool + let manager = + PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; + let db_pool = bb8::Pool::builder().build(manager).await?; + + // QC system + let scheduler = rove::Scheduler::new( + load_pipelines("").unwrap(), + DataSwitch::new(HashMap::from([( + "lard", + Box::new(Connector { + pool: db_pool.clone(), + }) as Box, + )])), + ); + println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins tokio::task::spawn(async move { @@ -42,11 +66,6 @@ async fn main() -> Result<(), Box> { } }); - // Set up postgres connection pool - let manager = - PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; - let db_pool = bb8::Pool::builder().build(manager).await?; - // Spawn kvkafka reader #[cfg(feature = "kafka_prod")] { @@ -60,5 +79,5 @@ async fn main() -> Result<(), Box> { // Set up and run our server + database println!("Ingestion server started!"); - lard_ingestion::run(db_pool, PARAMCONV, permit_tables).await + lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await } From bb391308cecb4fc654fad6f49e83189a7be0424b Mon Sep 17 00:00:00 2001 From: Ingrid Date: Wed, 11 Dec 2024 17:24:21 +0100 Subject: [PATCH 02/18] db: flag tables for confident flags --- db/flags.sql | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/db/flags.sql b/db/flags.sql index 4f2ef406..f6e90d0c 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -1,5 +1,29 @@ CREATE SCHEMA IF NOT EXISTS flags; +CREATE TABLE IF NOT EXISTS flags.confident ( + timeseries INT4 NOT NULL, + obstime TIMESTAMPTZ NOT NULL, + usable BOOLEAN NOT NULL, + CONSTRAINT unique_confident_timeseries_obstime UNIQUE (timeseries, obstime), + -- TODO: should this and confident_providence fk into public.data? + CONSTRAINT fk_confident_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries +) PARTITION BY RANGE (obstime); +CREATE INDEX IF NOT EXISTS confident_timestamp_index ON flags.confident (obstime); +CREATE INDEX IF NOT EXISTS confident_timeseries_index ON flags.confident USING HASH (timeseries); + +CREATE TABLE IF NOT EXISTS flags.confident_provenance ( + timeseries INT4 NOT NULL, + obstime TIMESTAMPTZ NOT NULL, + pipeline TEXT NOT NULL, + -- TODO: should this be an enum? + flag INT4 NOT NULL, + fail_condition TEXT NULL, + CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline), + CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries +) PARTITION BY RANGE (obstime); +CREATE INDEX IF NOT EXISTS confident_provenance_timestamp_index ON flags.confident_providence (obstime); +CREATE INDEX IF NOT EXISTS confident_provenance_timeseries_index ON flags.confident_providence USING HASH (timeseries); + CREATE TABLE IF NOT EXISTS flags.kvdata ( timeseries INT4 REFERENCES public.timeseries, obstime TIMESTAMPTZ NOT NULL, From 8684b5b0b5859447e084538728abe63ccf5be41c Mon Sep 17 00:00:00 2001 From: Ingrid Date: Thu, 12 Dec 2024 17:08:02 +0100 Subject: [PATCH 03/18] ingestion: implement qc_data --- Cargo.lock | 3 +- Cargo.toml | 3 +- db/flags.sql | 2 + ingestion/Cargo.toml | 1 + ingestion/src/lib.rs | 135 +++++++++++++++++++++++++++++++++++++++--- ingestion/src/main.rs | 2 +- 6 files changed, 135 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0066fe4a..c1364f52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,6 +1486,7 @@ dependencies = [ "bb8-postgres", "bytes", "chrono", + "chronoutil", "csv", "futures", "kafka", @@ -2346,7 +2347,7 @@ dependencies = [ [[package]] name = "rove" version = "0.1.1" -source = "git+https://github.com/metno/rove.git#95a7770f473a30ea6646a0b9254e8f974b673055" +source = "git+https://github.com/metno/rove.git?branch=lard_fixes#1d79a555847f9223c9f6ee459e7a480ff18f41bd" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 4acf2bbc..3671ffda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,8 @@ quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ] rand = "0.8.5" rand_distr = "0.4.3" regex = "1.11.1" -rove = { git = "https://github.com/metno/rove.git" } +rove = { git = "https://github.com/metno/rove.git", branch = "lard_fixes" } +# rove = { git = "https://github.com/metno/rove.git" } serde = { version = "1.0.217", features = ["derive"] } thiserror = "1.0.69" tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } diff --git a/db/flags.sql b/db/flags.sql index f6e90d0c..bd974e1d 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -11,12 +11,14 @@ CREATE TABLE IF NOT EXISTS flags.confident ( CREATE INDEX IF NOT EXISTS confident_timestamp_index ON flags.confident (obstime); CREATE INDEX IF NOT EXISTS confident_timeseries_index ON flags.confident USING HASH (timeseries); +-- TODO: should this also have a column for qc_time or some such? CREATE TABLE IF NOT EXISTS flags.confident_provenance ( timeseries INT4 NOT NULL, obstime TIMESTAMPTZ NOT NULL, pipeline TEXT NOT NULL, -- TODO: should this be an enum? flag INT4 NOT NULL, + -- TODO: better name? since this might be applied to flags that aren't fail but also aren't pass? fail_condition TEXT NULL, CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline), CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 278c0130..8b9e5fe8 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -15,6 +15,7 @@ bb8.workspace = true bb8-postgres.workspace = true bytes.workspace = true chrono.workspace = true +chronoutil.workspace = true csv.workspace = true futures.workspace = true kafka.workspace = true diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index a969e7ab..3db4d4f8 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -7,8 +7,10 @@ use axum::{ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Utc}; +use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use futures::StreamExt; +use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -30,6 +32,8 @@ pub enum Error { Pool(#[from] bb8::RunError), #[error("parse error: {0}")] Parse(String), + #[error("qc system returned an error: {0}")] + Qc(#[from] rove::scheduler::Error), #[error("RwLock was poisoned: {0}")] Lock(String), #[error("Could not read environment variable: {0}")] @@ -73,7 +77,7 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, - qc_scheduler: Arc>, + qc_scheduler: Arc, } impl FromRef for PgConnectionPool { @@ -94,8 +98,8 @@ impl FromRef for Arc for Arc> { - fn from_ref(state: &IngestorState) -> Arc> { +impl FromRef for Arc { + fn from_ref(state: &IngestorState) -> Arc { state.qc_scheduler.clone() } } @@ -118,6 +122,16 @@ pub struct Datum<'a> { pub type Data<'a> = Vec>; +pub struct QcResult { + timeseries_id: i32, + timestamp: DateTime, + // TODO: possible to avoid heap-allocating this? + pipeline: String, + // TODO: correct type? + flag: i32, + fail_condition: Option, +} + // TODO: benchmark insertion of scalar and non-scalar together vs separately? pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement @@ -179,8 +193,113 @@ pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result Ok(()) } -pub async fn qc_data(data: &Data<'_>, scheduler: &rove::Scheduler<'static>) -> Result<(), Error> { - todo!() +pub async fn qc_data( + data: &Data<'_>, + scheduler: &rove::Scheduler, + conn: &mut PooledPgConn<'_>, +) -> Result<(), Error> { + // TODO: see conflict resolution issues on queries in `insert_data` + // On periodic or consistency QC pipelines, we should be checking the provenance table to + // decide how to update usable on a conflict, but here it should be fine not to since this is + // fresh data. + // The `AND` in the `DO UPDATE SET` subexpression better handles the case of resent data where + // periodic checks might already have been run by defaulting to false. If the existing data was + // only fresh checked, and the replacement is different, this could result in a false positive. + // I think this is OK though since it should be a rare occurence and will be quickly cleared up + // by a periodic run regardless. + let query = conn + .prepare( + "INSERT INTO flags.confident (timeseries, obstime, usable) \ + VALUES ($1, $2, $3) \ + ON CONFLICT ON CONSTRAINT unique_confident_timeseries_obstime \ + DO UPDATE SET usable = usable AND EXCLUDED.usable", + ) + .await?; + let query_provenance = conn + .prepare( + "INSERT INTO flags.confident_provenance (timeseries, obstime, pipeline, flag, fail_condition) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT ON CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline \ + DO UPDATE SET flag = EXCLUDED.flag, fail_condition = EXCLUDED.fail_condition", + ) + .await?; + + let mut qc_results: Vec = Vec::with_capacity(data.len()); + for datum in data { + let time_spec = TimeSpec::new( + Timestamp(datum.timestamp.timestamp()), + Timestamp(datum.timestamp.timestamp()), + // TODO: real time resolution here. For now derive from type_id? + RelativeDuration::hours(1), + ); + let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); + // TODO: load and fetch real pipeline + let pipeline = "sample_pipeline"; + let rove_output = scheduler + .validate_direct( + "lard", + &[] as &[&str], + &time_spec, + &space_spec, + pipeline, + None, + ) + .await?; + + let first_fail = rove_output.iter().find(|check_result| { + if let Some(result) = check_result.results.first() { + if let Some(flag) = result.values.first() { + return *flag == rove::Flag::Fail; + } + } + false + }); + + let (flag, fail_condition) = match first_fail { + Some(check_result) => (1, Some(check_result.check.clone())), + None => (0, None), + }; + + qc_results.push(QcResult { + timeseries_id: datum.timeseries_id, + timestamp: datum.timestamp, + pipeline: pipeline.to_string(), + flag, + fail_condition, + }); + } + + let mut futures = qc_results + .iter() + .map(|qc_result| async { + conn.execute( + &query, + &[ + &qc_result.timeseries_id, + &qc_result.timestamp, + &(qc_result.flag == 0), + ], + ) + .await?; + conn.execute( + &query_provenance, + &[ + &qc_result.timeseries_id, + &qc_result.timestamp, + &qc_result.pipeline, + &qc_result.flag, + &qc_result.fail_condition, + ], + ) + .await + }) + .collect::>(); + + while let Some(res) = futures.next().await { + res?; + } + + Ok(()) } pub mod kldata; @@ -206,7 +325,7 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, - State(qc_scheduler): State>>, + State(qc_scheduler): State>, body: String, ) -> Json { let result: Result = async { @@ -220,7 +339,7 @@ async fn handle_kldata( insert_data(&data, &mut conn).await?; - qc_data(&data, &qc_scheduler).await?; + qc_data(&data, &qc_scheduler, &mut conn).await?; Ok(message_id) } @@ -271,7 +390,7 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, - qc_scheduler: rove::Scheduler<'static>, + qc_scheduler: rove::Scheduler, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 3ad678bc..2a9e6b62 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { let scheduler = rove::Scheduler::new( load_pipelines("").unwrap(), DataSwitch::new(HashMap::from([( - "lard", + String::from("lard"), Box::new(Connector { pool: db_pool.clone(), }) as Box, From 66ea3d7654b00aad5268307db07c1da88f6440fc Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 13 Dec 2024 17:38:25 +0100 Subject: [PATCH 04/18] ingestion: refactor Data to DataChunk, pulling up shared fields in Datum Upon further consideration, I think matching obsinn's format like this makes sense. Since we are ingesting live data from weather stations, it's reasonable to expect a station will send observations fromall its params at once with the same timestamp, so we should optimise for this use case --- ingestion/src/kldata.rs | 507 ++++++++++++++++++++++------------------ ingestion/src/lib.rs | 149 ++++++------ 2 files changed, 359 insertions(+), 297 deletions(-) diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index a4338019..509466dc 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -1,8 +1,9 @@ use crate::{ permissions::{timeseries_is_open, ParamPermitTable, StationPermitTable}, - Datum, Error, ObsType, PooledPgConn, ReferenceParam, + DataChunk, Datum, Error, ObsType, PooledPgConn, ReferenceParam, }; use chrono::{DateTime, NaiveDateTime, Utc}; +use chronoutil::RelativeDuration; use regex::Regex; use std::{ collections::HashMap, @@ -18,17 +19,12 @@ pub struct ObsinnChunk<'a> { observations: Vec>, station_id: i32, // TODO: change name here to nationalnummer? type_id: i32, + timestamp: DateTime, } /// Represents a single observation from an obsinn message #[derive(Debug, PartialEq)] pub struct ObsinnObs<'a> { - // TODO: this timestamp is shared by all obs in the row, maybe we should have - // a Vec here and remove the Vec from ObsinnChunk - // NOTE: this struct maps to Datum, the type used during insertion that should take into - // account all possible sources. While the proposed TODO would perfectly fit Obsinn, we would - // also need to change Datum accordingly, which might not work well for other sources. - timestamp: DateTime, id: ObsinnId, value: ObsType<'a>, } @@ -153,11 +149,13 @@ fn parse_obs<'a>( csv_body: Lines<'a>, columns: &[ObsinnId], reference_params: Arc>, -) -> Result>, Error> { - let mut obs = Vec::new(); + header: ObsinnHeader<'a>, +) -> Result>, Error> { + let mut chunks = Vec::new(); let row_is_empty = || Error::Parse("empty row in kldata csv".to_string()); for row in csv_body { + let mut obs = Vec::new(); let (timestamp, vals) = { let mut vals = row.split(','); @@ -201,25 +199,29 @@ fn parse_obs<'a>( } }; - obs.push(ObsinnObs { - timestamp, - id: col, - value, - }) + obs.push(ObsinnObs { id: col, value }) } - } - if obs.is_empty() { - return Err(row_is_empty()); + // TODO: should this be more resiliant? + if obs.is_empty() { + return Err(row_is_empty()); + } + + chunks.push(ObsinnChunk { + observations: obs, + station_id: header.station_id, + type_id: header.type_id, + timestamp, + }) } - Ok(obs) + Ok(chunks) } pub fn parse_kldata( msg: &str, reference_params: Arc>, -) -> Result<(usize, ObsinnChunk), Error> { +) -> Result<(usize, Vec), Error> { let (header, columns, csv_body) = { let mut csv_body = msg.lines(); let lines_err = || Error::Parse("kldata message contained too few lines".to_string()); @@ -234,22 +236,23 @@ pub fn parse_kldata( Ok(( header.message_id, - ObsinnChunk { - observations: parse_obs(csv_body, &columns, reference_params)?, - station_id: header.station_id, - type_id: header.type_id, - }, + parse_obs(csv_body, &columns, reference_params, header)?, // ObsinnChunk { + // observations: parse_obs(csv_body, &columns, reference_params)?, + // station_id: header.station_id, + // type_id: header.type_id, + // timestamp:, + // }, )) } // TODO: rewrite such that queries can be pipelined? // not pipelining here hurts latency, but shouldn't matter for throughput pub async fn filter_and_label_kldata<'a>( - chunk: ObsinnChunk<'a>, + chunks: Vec>, conn: &mut PooledPgConn<'_>, param_conversions: Arc>, permit_table: Arc>, -) -> Result>, Error> { +) -> Result>, Error> { let query_get_obsinn = conn .prepare( "SELECT timeseries \ @@ -262,117 +265,125 @@ pub async fn filter_and_label_kldata<'a>( ) .await?; - let mut data = Vec::with_capacity(chunk.observations.len()); - - for in_datum in chunk.observations { - // get the conversion first, so we avoid wasting a tsid if it doesn't exist - let param = param_conversions - .get(&in_datum.id.param_code) - .ok_or_else(|| { - Error::Parse(format!( - "unrecognised param_code '{}'", - in_datum.id.param_code - )) - })?; - - // TODO: we only need to check inside this loop if station_id is in the - // param_permit_table - if !timeseries_is_open( - permit_table.clone(), - chunk.station_id, - chunk.type_id, - param.id, - )? { - // TODO: log that the timeseries is closed? Mostly useful for tests - #[cfg(feature = "integration_tests")] - eprintln!("station {}: timeseries is closed", chunk.station_id); - continue; - } - - let transaction = conn.transaction().await?; - - let (sensor, lvl) = in_datum - .id - .sensor_and_level - .map(|both| (Some(both.0), Some(both.1))) - .unwrap_or((None, None)); - - let obsinn_label_result = transaction - .query_opt( - &query_get_obsinn, - &[ - &chunk.station_id, - &chunk.type_id, - &in_datum.id.param_code, - &lvl, - &sensor, - ], - ) - .await?; - - let timeseries_id: i32 = match obsinn_label_result { - Some(row) => row.get(0), - None => { - // create new timeseries - // TODO: currently we create a timeseries with null location - // In the future the location column should be moved to the timeseries metadata table - let timeseries_id = transaction - .query_one( - "INSERT INTO public.timeseries (fromtime) VALUES ($1) RETURNING id", - &[&in_datum.timestamp], - ) - .await? - .get(0); + let mut out_chunks = Vec::with_capacity(chunks.len()); + for chunk in chunks { + let mut data = Vec::with_capacity(chunk.observations.len()); + + for in_datum in chunk.observations { + // get the conversion first, so we avoid wasting a tsid if it doesn't exist + let param = param_conversions + .get(&in_datum.id.param_code) + .ok_or_else(|| { + Error::Parse(format!( + "unrecognised param_code '{}'", + in_datum.id.param_code + )) + })?; + + // TODO: we only need to check inside this loop if station_id is in the + // param_permit_table + if !timeseries_is_open( + permit_table.clone(), + chunk.station_id, + chunk.type_id, + param.id, + )? { + // TODO: log that the timeseries is closed? Mostly useful for tests + #[cfg(feature = "integration_tests")] + eprintln!("station {}: timeseries is closed", chunk.station_id); + continue; + } - // create obsinn label - transaction - .execute( - "INSERT INTO labels.obsinn \ + let transaction = conn.transaction().await?; + + let (sensor, lvl) = in_datum + .id + .sensor_and_level + .map(|both| (Some(both.0), Some(both.1))) + .unwrap_or((None, None)); + + let obsinn_label_result = transaction + .query_opt( + &query_get_obsinn, + &[ + &chunk.station_id, + &chunk.type_id, + &in_datum.id.param_code, + &lvl, + &sensor, + ], + ) + .await?; + + let timeseries_id: i32 = match obsinn_label_result { + Some(row) => row.get(0), + None => { + // create new timeseries + // TODO: currently we create a timeseries with null location + // In the future the location column should be moved to the timeseries metadata table + let timeseries_id = transaction + .query_one( + "INSERT INTO public.timeseries (fromtime) VALUES ($1) RETURNING id", + &[&chunk.timestamp], + ) + .await? + .get(0); + + // create obsinn label + transaction + .execute( + "INSERT INTO labels.obsinn \ (timeseries, nationalnummer, type_id, param_code, lvl, sensor) \ VALUES ($1, $2, $3, $4, $5, $6)", - &[ - ×eries_id, - &chunk.station_id, - &chunk.type_id, - &in_datum.id.param_code, - &lvl, - &sensor, - ], - ) - .await?; - - // create met label - transaction - .execute( - "INSERT INTO labels.met \ + &[ + ×eries_id, + &chunk.station_id, + &chunk.type_id, + &in_datum.id.param_code, + &lvl, + &sensor, + ], + ) + .await?; + + // create met label + transaction + .execute( + "INSERT INTO labels.met \ (timeseries, station_id, param_id, type_id, lvl, sensor) \ VALUES ($1, $2, $3, $4, $5, $6)", - &[ - ×eries_id, - &chunk.station_id, - ¶m.id, - &chunk.type_id, - &lvl, - &sensor, - ], - ) - .await?; - - timeseries_id - } - }; + &[ + ×eries_id, + &chunk.station_id, + ¶m.id, + &chunk.type_id, + &lvl, + &sensor, + ], + ) + .await?; + + timeseries_id + } + }; - transaction.commit().await?; + transaction.commit().await?; - data.push(Datum { - timeseries_id, - param_id: param.id, - timestamp: in_datum.timestamp, - value: in_datum.value, + data.push(Datum { + timeseries_id, + param_id: param.id, + value: in_datum.value, + }); + } + out_chunks.push(DataChunk { + timestamp: chunk.timestamp, + // TODO: real time_resolution (derive from type_id for now) + time_resolution: RelativeDuration::hours(1), + data, }); } - Ok(data) + Ok(out_chunks) } #[cfg(test)] @@ -527,32 +538,40 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), }, - value: Scalar(-1.1), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), }, - value: Scalar(0.0), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), }, - value: Scalar(2.8), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "single line", ), ( @@ -571,54 +590,68 @@ mod tests { sensor_and_level: None, }, ], + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, - }, - value: Scalar(-1.1), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, - }, - value: Scalar(0.0), - }, - ObsinnObs { + ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), + }, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), + }, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), + }, + ], timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, - }, - value: Scalar(2.8), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, - }, - value: Scalar(-1.5), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, - }, - value: Scalar(1.0), + station_id: 18700, + type_id: 511, }, - ObsinnObs { + ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.5), + }, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(1.0), + }, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.9), + }, + ], timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, - }, - value: Scalar(2.9), + station_id: 18700, + type_id: 511, }, ]), "multiple lines", @@ -635,24 +668,33 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "KLOBS".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "KLOBS".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), }, - value: NonScalar("20240910000000"), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), }, - value: Scalar(10.1), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "non scalar parameter", ), ( @@ -667,31 +709,40 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "unknown".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "unknown".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), }, - value: NonScalar("20240910000000"), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), }, - value: Scalar(10.1), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "unrecognised param code", ), ]; let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); - for (data, cols, expected, case_description) in cases { - let output = parse_obs(data.lines(), &cols, param_conversions.clone()); + for (data, cols, header, expected, case_description) in cases { + let output = parse_obs(data.lines(), &cols, param_conversions.clone(), header); assert_eq!(output, expected, "{}", case_description); } } diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 3db4d4f8..122f644c 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -7,7 +7,6 @@ use axum::{ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Utc}; -use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use futures::StreamExt; use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp}; @@ -111,16 +110,19 @@ pub enum ObsType<'a> { NonScalar(&'a str), } -/// Generic container for a piece of data ready to be inserted into the DB pub struct Datum<'a> { timeseries_id: i32, // needed for QC param_id: i32, - timestamp: DateTime, value: ObsType<'a>, } -pub type Data<'a> = Vec>; +/// Generic container for a piece of data ready to be inserted into the DB +pub struct DataChunk<'a> { + timestamp: DateTime, + time_resolution: chronoutil::RelativeDuration, + data: Vec>, +} pub struct QcResult { timeseries_id: i32, @@ -133,7 +135,10 @@ pub struct QcResult { } // TODO: benchmark insertion of scalar and non-scalar together vs separately? -pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { +pub async fn insert_data( + chunks: &Vec>, + conn: &mut PooledPgConn<'_>, +) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement // // I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new @@ -164,37 +169,41 @@ pub async fn insert_data(data: &Data<'_>, conn: &mut PooledPgConn<'_>) -> Result ) .await?; - let mut futures = data - .iter() - .map(|datum| async { - match &datum.value { - ObsType::Scalar(val) => { - conn.execute( - &query_scalar, - &[&datum.timeseries_id, &datum.timestamp, &val], - ) - .await + // TODO: should we flat map into one FuturesUnordered instead of for looping? + for chunk in chunks { + let mut futures = chunk + .data + .iter() + .map(|datum| async { + match &datum.value { + ObsType::Scalar(val) => { + conn.execute( + &query_scalar, + &[&datum.timeseries_id, &chunk.timestamp, &val], + ) + .await + } + ObsType::NonScalar(val) => { + conn.execute( + &query_nonscalar, + &[&datum.timeseries_id, &chunk.timestamp, &val], + ) + .await + } } - ObsType::NonScalar(val) => { - conn.execute( - &query_nonscalar, - &[&datum.timeseries_id, &datum.timestamp, &val], - ) - .await - } - } - }) - .collect::>(); + }) + .collect::>(); - while let Some(res) = futures.next().await { - res?; + while let Some(res) = futures.next().await { + res?; + } } Ok(()) } pub async fn qc_data( - data: &Data<'_>, + chunks: &Vec>, scheduler: &rove::Scheduler, conn: &mut PooledPgConn<'_>, ) -> Result<(), Error> { @@ -224,49 +233,51 @@ pub async fn qc_data( ) .await?; - let mut qc_results: Vec = Vec::with_capacity(data.len()); - for datum in data { - let time_spec = TimeSpec::new( - Timestamp(datum.timestamp.timestamp()), - Timestamp(datum.timestamp.timestamp()), - // TODO: real time resolution here. For now derive from type_id? - RelativeDuration::hours(1), - ); - let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); - // TODO: load and fetch real pipeline - let pipeline = "sample_pipeline"; - let rove_output = scheduler - .validate_direct( - "lard", - &[] as &[&str], - &time_spec, - &space_spec, - pipeline, - None, - ) - .await?; + let mut qc_results: Vec = Vec::new(); + for chunk in chunks { + let timestamp = chunk.timestamp.timestamp(); + for datum in chunk.data.iter() { + let time_spec = TimeSpec::new( + Timestamp(timestamp), + Timestamp(timestamp), + chunk.time_resolution, + ); + let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); + // TODO: load and fetch real pipeline + let pipeline = "sample_pipeline"; + let rove_output = scheduler + .validate_direct( + "lard", + &[] as &[&str], + &time_spec, + &space_spec, + pipeline, + None, + ) + .await?; - let first_fail = rove_output.iter().find(|check_result| { - if let Some(result) = check_result.results.first() { - if let Some(flag) = result.values.first() { - return *flag == rove::Flag::Fail; + let first_fail = rove_output.iter().find(|check_result| { + if let Some(result) = check_result.results.first() { + if let Some(flag) = result.values.first() { + return *flag == rove::Flag::Fail; + } } - } - false - }); - - let (flag, fail_condition) = match first_fail { - Some(check_result) => (1, Some(check_result.check.clone())), - None => (0, None), - }; - - qc_results.push(QcResult { - timeseries_id: datum.timeseries_id, - timestamp: datum.timestamp, - pipeline: pipeline.to_string(), - flag, - fail_condition, - }); + false + }); + + let (flag, fail_condition) = match first_fail { + Some(check_result) => (1, Some(check_result.check.clone())), + None => (0, None), + }; + + qc_results.push(QcResult { + timeseries_id: datum.timeseries_id, + timestamp: chunk.timestamp, + pipeline: pipeline.to_string(), + flag, + fail_condition, + }); + } } let mut futures = qc_results From 6556b356ea37ab24822a23c7b092843d943835a8 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 13 Dec 2024 18:03:33 +0100 Subject: [PATCH 05/18] ingestion: get time_resolution for qc from type_id --- ingestion/src/kldata.rs | 25 ++++++++++++++++++------- ingestion/src/lib.rs | 15 +++++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 509466dc..3094195c 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -236,15 +236,26 @@ pub fn parse_kldata( Ok(( header.message_id, - parse_obs(csv_body, &columns, reference_params, header)?, // ObsinnChunk { - // observations: parse_obs(csv_body, &columns, reference_params)?, - // station_id: header.station_id, - // type_id: header.type_id, - // timestamp:, - // }, + parse_obs(csv_body, &columns, reference_params, header)?, )) } +// TODO: this is a messy hack, but it's the only way people at met currently have to determine +// time_resolution. Ultimately we intend to store time_resolution info in the database under +// public.timeseries or labels.met. This will be populated by a combination of a script that looks +// at a timeseries's history, and manual editing by content managers. +pub fn type_id_to_time_resolution(type_id: i32) -> Option { + // Source for these matches: PDF presented by PiM + match type_id { + 514 => Some(RelativeDuration::minutes(1)), + 506 | 509 | 510 => Some(RelativeDuration::minutes(10)), + 7 | 311 | 330 | 342 | 501 | 502 | 503 | 505 | 507 | 511 => Some(RelativeDuration::hours(1)), + 522 => Some(RelativeDuration::days(1)), + 399 => Some(RelativeDuration::years(1)), + _ => None, + } +} + // TODO: rewrite such that queries can be pipelined? // not pipelining here hurts latency, but shouldn't matter for throughput pub async fn filter_and_label_kldata<'a>( @@ -378,7 +389,7 @@ pub async fn filter_and_label_kldata<'a>( out_chunks.push(DataChunk { timestamp: chunk.timestamp, // TODO: real time_resolution (derive from type_id for now) - time_resolution: RelativeDuration::hours(1), + time_resolution: type_id_to_time_resolution(chunk.type_id), data, }); } diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 122f644c..dcec644e 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -120,7 +120,7 @@ pub struct Datum<'a> { /// Generic container for a piece of data ready to be inserted into the DB pub struct DataChunk<'a> { timestamp: DateTime, - time_resolution: chronoutil::RelativeDuration, + time_resolution: Option, data: Vec>, } @@ -235,13 +235,16 @@ pub async fn qc_data( let mut qc_results: Vec = Vec::new(); for chunk in chunks { + let time_resolution = match chunk.time_resolution { + Some(time_resolution) => time_resolution, + // if there's no time_resolution, we can't QC + None => continue, + }; let timestamp = chunk.timestamp.timestamp(); + for datum in chunk.data.iter() { - let time_spec = TimeSpec::new( - Timestamp(timestamp), - Timestamp(timestamp), - chunk.time_resolution, - ); + let time_spec = + TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution); let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); // TODO: load and fetch real pipeline let pipeline = "sample_pipeline"; From bd9be7e6352f2d0031edcdb265892838405bed82 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Thu, 9 Jan 2025 10:51:25 +0100 Subject: [PATCH 06/18] use real QC pipelines --- Cargo.lock | 3 +- Cargo.toml | 1 + ingestion/Cargo.toml | 1 + ingestion/src/lib.rs | 61 +++++++++++++++++++++------------ ingestion/src/main.rs | 43 +++++++++++++---------- ingestion/src/qc_pipelines.rs | 48 ++++++++++++++++++++++++++ qc_pipelines/fresh/TA_PT1H.toml | 46 +++++++++++++++++++++++++ rove_connector/src/lib.rs | 2 +- 8 files changed, 163 insertions(+), 42 deletions(-) create mode 100644 ingestion/src/qc_pipelines.rs create mode 100644 qc_pipelines/fresh/TA_PT1H.toml diff --git a/Cargo.lock b/Cargo.lock index c1364f52..48a5558f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1498,6 +1498,7 @@ dependencies = [ "thiserror", "tokio", "tokio-postgres", + "toml", ] [[package]] @@ -2347,7 +2348,7 @@ dependencies = [ [[package]] name = "rove" version = "0.1.1" -source = "git+https://github.com/metno/rove.git?branch=lard_fixes#1d79a555847f9223c9f6ee459e7a480ff18f41bd" +source = "git+https://github.com/metno/rove.git?branch=lard_fixes#5f70d99d59b0d9d1c8fc301893684c919cab286f" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3671ffda..f073cf2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ serde = { version = "1.0.217", features = ["derive"] } thiserror = "1.0.69" tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } +toml = "0.8.19" diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 8b9e5fe8..1450c9c5 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -27,3 +27,4 @@ serde.workspace = true thiserror.workspace = true tokio.workspace = true tokio-postgres.workspace = true +toml.workspace = true diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index dcec644e..ecf75574 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -7,9 +7,10 @@ use axum::{ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Utc}; +use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use futures::StreamExt; -use rove::data_switch::{SpaceSpec, TimeSpec, Timestamp}; +use rove::data_switch::{TimeSpec, Timestamp}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -21,6 +22,7 @@ use tokio_postgres::NoTls; #[cfg(feature = "kafka")] pub mod kvkafka; pub mod permissions; +pub mod qc_pipelines; use permissions::{ParamPermitTable, StationPermitTable}; #[derive(Error, Debug)] @@ -33,6 +35,8 @@ pub enum Error { Parse(String), #[error("qc system returned an error: {0}")] Qc(#[from] rove::scheduler::Error), + #[error("rove connector returned an error: {0}")] + Connector(#[from] rove::data_switch::Error), #[error("RwLock was poisoned: {0}")] Lock(String), #[error("Could not read environment variable: {0}")] @@ -76,7 +80,8 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, - qc_scheduler: Arc, + rove_connector: Arc, + qc_pipelines: Arc>, } impl FromRef for PgConnectionPool { @@ -97,9 +102,15 @@ impl FromRef for Arc for Arc { - fn from_ref(state: &IngestorState) -> Arc { - state.qc_scheduler.clone() +impl FromRef for Arc { + fn from_ref(state: &IngestorState) -> Arc { + state.rove_connector.clone() + } +} + +impl FromRef for Arc> { + fn from_ref(state: &IngestorState) -> Arc> { + state.qc_pipelines.clone() } } @@ -204,8 +215,9 @@ pub async fn insert_data( pub async fn qc_data( chunks: &Vec>, - scheduler: &rove::Scheduler, conn: &mut PooledPgConn<'_>, + rove_connector: &rove_connector::Connector, + pipelines: &HashMap<(i32, RelativeDuration), rove::Pipeline>, ) -> Result<(), Error> { // TODO: see conflict resolution issues on queries in `insert_data` // On periodic or consistency QC pipelines, we should be checking the provenance table to @@ -245,19 +257,19 @@ pub async fn qc_data( for datum in chunk.data.iter() { let time_spec = TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution); - let space_spec = SpaceSpec::One(datum.timeseries_id.to_string()); - // TODO: load and fetch real pipeline - let pipeline = "sample_pipeline"; - let rove_output = scheduler - .validate_direct( - "lard", - &[] as &[&str], + let pipeline = match pipelines.get(&(datum.param_id, time_resolution)) { + Some(pipeline) => pipeline, + None => continue, + }; + let data = rove_connector + .fetch_one( + datum.timeseries_id, &time_spec, - &space_spec, - pipeline, - None, + pipeline.num_leading_required, + pipeline.num_trailing_required, ) .await?; + let rove_output = rove::Scheduler::schedule_tests(pipeline, data)?; let first_fail = rove_output.iter().find(|check_result| { if let Some(result) = check_result.results.first() { @@ -276,7 +288,8 @@ pub async fn qc_data( qc_results.push(QcResult { timeseries_id: datum.timeseries_id, timestamp: chunk.timestamp, - pipeline: pipeline.to_string(), + // TODO: should this encode more info? In theory the param/type can be deduced from the DB anyway + pipeline: "fresh".to_string(), flag, fail_condition, }); @@ -339,7 +352,8 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, - State(qc_scheduler): State>, + State(rove_connector): State>, + State(qc_pipelines): State>>, body: String, ) -> Json { let result: Result = async { @@ -353,7 +367,7 @@ async fn handle_kldata( insert_data(&data, &mut conn).await?; - qc_data(&data, &qc_scheduler, &mut conn).await?; + qc_data(&data, &mut conn, &rove_connector, &qc_pipelines).await?; Ok(message_id) } @@ -404,12 +418,14 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, - qc_scheduler: rove::Scheduler, + rove_connector: rove_connector::Connector, + qc_pipelines: HashMap<(i32, RelativeDuration), rove::Pipeline>, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; - let qc_scheduler = Arc::new(qc_scheduler); + let rove_connector = Arc::new(rove_connector); + let qc_pipelines = Arc::new(qc_pipelines); // build our application with a single route let app = Router::new() @@ -418,7 +434,8 @@ pub async fn run( db_pool, param_conversions, permit_tables, - qc_scheduler, + rove_connector, + qc_pipelines, }); // run our app with hyper, listening globally on port 3001 diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 2a9e6b62..6bee0586 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,13 +1,7 @@ use bb8_postgres::PostgresConnectionManager; -use rove::{ - data_switch::{DataConnector, DataSwitch}, - load_pipelines, -}; +use lard_ingestion::qc_pipelines::load_pipelines; use rove_connector::Connector; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::sync::{Arc, RwLock}; use tokio_postgres::NoTls; use lard_ingestion::permissions; @@ -37,15 +31,21 @@ async fn main() -> Result<(), Box> { let db_pool = bb8::Pool::builder().build(manager).await?; // QC system - let scheduler = rove::Scheduler::new( - load_pipelines("").unwrap(), - DataSwitch::new(HashMap::from([( - String::from("lard"), - Box::new(Connector { - pool: db_pool.clone(), - }) as Box, - )])), - ); + // NOTE: Keeping this vesion around in case we want it for the periodic checks + // let scheduler = rove::Scheduler::new( + // load_pipelines("").unwrap(), + // DataSwitch::new(HashMap::from([( + // String::from("lard"), + // Box::new(Connector { + // pool: db_pool.clone(), + // }) as Box, + // )])), + // ); + let rove_connector = Connector { + pool: db_pool.clone(), + }; + + let qc_pipelines = load_pipelines("qc_pipelines")?; println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins @@ -79,5 +79,12 @@ async fn main() -> Result<(), Box> { // Set up and run our server + database println!("Ingestion server started!"); - lard_ingestion::run(db_pool, PARAMCONV, permit_tables, scheduler).await + lard_ingestion::run( + db_pool, + PARAMCONV, + permit_tables, + rove_connector, + qc_pipelines, + ) + .await } diff --git a/ingestion/src/qc_pipelines.rs b/ingestion/src/qc_pipelines.rs new file mode 100644 index 00000000..feafa29b --- /dev/null +++ b/ingestion/src/qc_pipelines.rs @@ -0,0 +1,48 @@ +use chronoutil::RelativeDuration; +use rove::pipeline::{self, derive_num_leading_trailing, Pipeline}; +use serde::Deserialize; +use std::{collections::HashMap, path::Path}; + +#[derive(Deserialize)] +struct Header { + param_id: i32, + time_resolution: String, + #[allow(dead_code)] + sensor: Vec, +} + +#[derive(Deserialize)] +struct PipelineDef { + header: Header, + pipeline: Pipeline, +} + +pub fn load_pipelines( + path: impl AsRef, +) -> Result, pipeline::Error> { + std::fs::read_dir(path)? + .map(|entry| { + let entry = entry?; + + if !entry.file_type()?.is_file() { + return Err(pipeline::Error::DirectoryStructure); + } + + let mut pipeline_def: PipelineDef = + toml::from_str(&std::fs::read_to_string(entry.path())?)?; + ( + pipeline_def.pipeline.num_leading_required, + pipeline_def.pipeline.num_trailing_required, + ) = derive_num_leading_trailing(&pipeline_def.pipeline); + + let key = ( + pipeline_def.header.param_id, + // TODO: remove unwrap + RelativeDuration::parse_from_iso8601(&pipeline_def.header.time_resolution).unwrap(), + ); + + Ok(Some((key, pipeline_def.pipeline))) + }) + .filter_map(Result::transpose) + .collect() +} diff --git a/qc_pipelines/fresh/TA_PT1H.toml b/qc_pipelines/fresh/TA_PT1H.toml new file mode 100644 index 00000000..e16c67e9 --- /dev/null +++ b/qc_pipelines/fresh/TA_PT1H.toml @@ -0,0 +1,46 @@ +[header] +# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex? +# Would we be better served with a blocklist/allowlist? +# stations = "*" +param_id = 211 +time_resolution = "PT1H" +# level = "*" +sensor = [0] + +[[step]] +name = "special_value_check" +[step.special_value_check] +special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] + +[[step]] +name = "range_check" +[step.range_check] +min = -55 +max = 50 + +[[step]] +name = "climate_range_check" +[step.range_check_dynamic] +source = "netcdf" # TODO: define a neat spec for this? + +[[step]] +name = "step_check" +[step.step_check] +max = 18.6 + +[[step]] +name = "flatline_check" +[step.flatline_check] +max = 10 + +[[step]] +name = "spike_check" +[step.spike_check] +max = 18.6 + +[[step]] +name = "model_consistency_check" +[step.model_consistency_check] +model_source = "lustre" +model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +threshold = 3.0 # FIXME: made up value by Ingrid diff --git a/rove_connector/src/lib.rs b/rove_connector/src/lib.rs index fe3b6f9a..2456a7ec 100644 --- a/rove_connector/src/lib.rs +++ b/rove_connector/src/lib.rs @@ -91,7 +91,7 @@ fn regularize( } impl Connector { - async fn fetch_one( + pub async fn fetch_one( &self, ts_id: i32, time_spec: &TimeSpec, From 72b0b97ca87ebbaa36d32c5b5b38293558677aa5 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Thu, 9 Jan 2025 13:14:46 +0100 Subject: [PATCH 07/18] remove obsolete test case --- ingestion/src/kldata.rs | 6 ------ ingestion/src/lib.rs | 3 +++ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 3094195c..3158ca69 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -776,12 +776,6 @@ mod tests { )), "header only", ), - ( - "kldata/nationalnr=93140/type=501/messageid=23 - DD(0,0),FF(0,0),DG_1(0,0),FG_1(0,0),KLFG_1(0,0),FX_1(0,0)", - Err(Error::Parse("empty row in kldata csv".to_string())), - "missing data", - ), ]; let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index ecf75574..8326241a 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -367,6 +367,7 @@ async fn handle_kldata( insert_data(&data, &mut conn).await?; + // TODO: should we tolerate failure here? Perhaps there should be metric for this? qc_data(&data, &mut conn, &rove_connector, &qc_pipelines).await?; Ok(message_id) @@ -424,6 +425,8 @@ pub async fn run( // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; + // TODO: This should be fine without Arc, we can just clone it as the internal db_pool is + // already reference counted let rove_connector = Arc::new(rove_connector); let qc_pipelines = Arc::new(qc_pipelines); From 280309ec83606dacb05f9eef2c72e298ae8b64cc Mon Sep 17 00:00:00 2001 From: Ingrid Date: Mon, 13 Jan 2025 15:37:26 +0100 Subject: [PATCH 08/18] fix typo in index on flags.confident_provenance table --- db/flags.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/flags.sql b/db/flags.sql index bd974e1d..29a36ff7 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -23,8 +23,8 @@ CREATE TABLE IF NOT EXISTS flags.confident_provenance ( CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline), CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries ) PARTITION BY RANGE (obstime); -CREATE INDEX IF NOT EXISTS confident_provenance_timestamp_index ON flags.confident_providence (obstime); -CREATE INDEX IF NOT EXISTS confident_provenance_timeseries_index ON flags.confident_providence USING HASH (timeseries); +CREATE INDEX IF NOT EXISTS confident_provenance_timestamp_index ON flags.confident_provenance (obstime); +CREATE INDEX IF NOT EXISTS confident_provenance_timeseries_index ON flags.confident_provenance USING HASH (timeseries); CREATE TABLE IF NOT EXISTS flags.kvdata ( timeseries INT4 REFERENCES public.timeseries, From 9b133bbc366600fe883827e895f6c26fb8508529 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Mon, 13 Jan 2025 15:38:59 +0100 Subject: [PATCH 09/18] fix ambiguous column identifier in qc_data query --- ingestion/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 8326241a..bec98958 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -233,7 +233,7 @@ pub async fn qc_data( "INSERT INTO flags.confident (timeseries, obstime, usable) \ VALUES ($1, $2, $3) \ ON CONFLICT ON CONSTRAINT unique_confident_timeseries_obstime \ - DO UPDATE SET usable = usable AND EXCLUDED.usable", + DO UPDATE SET usable = flags.confident.usable AND EXCLUDED.usable", ) .await?; let query_provenance = conn From d72e95ba56bdceae20362091eb72af87cadbe086 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Mon, 13 Jan 2025 15:40:07 +0100 Subject: [PATCH 10/18] show the message from ingestor when the success assertion fails --- integration_tests/tests/end_to_end.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 7abc92e4..786bda12 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -514,7 +514,11 @@ async fn test_timeslice_endpoint() { let client = reqwest::Client::new(); for ts in &test_data { let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; - assert_eq!(ingestor_resp.res, 0); + assert_eq!( + ingestor_resp.res, 0, + "ingestor_resp.message: {}", + ingestor_resp.message + ); } for param in ¶ms { From 776408a3b9b9746e6fba8a46a341d8d3e753ef50 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Tue, 14 Jan 2025 21:00:52 +0100 Subject: [PATCH 11/18] add partitions for flag tables --- Cargo.lock | 7 + Cargo.toml | 2 +- db/partitions_generated.sql | 154 +++++++++++++++------ util/Cargo.toml | 10 ++ util/src/bin/generate_partition_queries.rs | 60 ++++++++ util/src/lib.rs | 0 6 files changed, 193 insertions(+), 40 deletions(-) create mode 100644 util/Cargo.toml create mode 100644 util/src/bin/generate_partition_queries.rs create mode 100644 util/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 48a5558f..bd229183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3202,6 +3202,13 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "util" +version = "0.1.0" +dependencies = [ + "chrono", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index f073cf2d..bd4254ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "api", "ingestion", "integration_tests", - "rove_connector", + "rove_connector", "util", ] resolver = "2" diff --git a/db/partitions_generated.sql b/db/partitions_generated.sql index 73aebbb4..81c1023f 100644 --- a/db/partitions_generated.sql +++ b/db/partitions_generated.sql @@ -1,77 +1,153 @@ --- Generated by simple script for testing -CREATE TABLE IF NOT EXISTS data_y1850_to_y1950 PARTITION OF public.data +-- Generated by util/src/bin/generate_partition_queries.rs +CREATE TABLE IF NOT EXISTS public.data_y1850_to_y1950 PARTITION OF public.data FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y1950_to_y2000 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y1950_to_y2000 PARTITION OF public.data FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2000_to_y2010 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2000_to_y2010 PARTITION OF public.data FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2010_to_y2015 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2010_to_y2015 PARTITION OF public.data FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2015_to_y2016 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2015_to_y2016 PARTITION OF public.data FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2016_to_y2017 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2016_to_y2017 PARTITION OF public.data FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2017_to_y2018 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2017_to_y2018 PARTITION OF public.data FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2018_to_y2019 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2018_to_y2019 PARTITION OF public.data FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2019_to_y2020 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2019_to_y2020 PARTITION OF public.data FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2020_to_y2021 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2020_to_y2021 PARTITION OF public.data FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2021_to_y2022 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2021_to_y2022 PARTITION OF public.data FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2022_to_y2023 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2022_to_y2023 PARTITION OF public.data FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2023_to_y2024 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2023_to_y2024 PARTITION OF public.data FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2024_to_y2025 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2024_to_y2025 PARTITION OF public.data FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2025_to_y2026 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2025_to_y2026 PARTITION OF public.data FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2026_to_y2027 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2026_to_y2027 PARTITION OF public.data FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2027_to_y2028 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2027_to_y2028 PARTITION OF public.data FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2028_to_y2029 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2028_to_y2029 PARTITION OF public.data FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2029_to_y2030 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2029_to_y2030 PARTITION OF public.data FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data +FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y1850_to_y1950 PARTITION OF flags.confident +FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y1950_to_y2000 PARTITION OF flags.confident +FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2000_to_y2010 PARTITION OF flags.confident +FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2010_to_y2015 PARTITION OF flags.confident +FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2015_to_y2016 PARTITION OF flags.confident +FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2016_to_y2017 PARTITION OF flags.confident +FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2017_to_y2018 PARTITION OF flags.confident +FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2018_to_y2019 PARTITION OF flags.confident +FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2019_to_y2020 PARTITION OF flags.confident +FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2020_to_y2021 PARTITION OF flags.confident +FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2021_to_y2022 PARTITION OF flags.confident +FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2022_to_y2023 PARTITION OF flags.confident +FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2023_to_y2024 PARTITION OF flags.confident +FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2024_to_y2025 PARTITION OF flags.confident +FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2025_to_y2026 PARTITION OF flags.confident +FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2026_to_y2027 PARTITION OF flags.confident +FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2027_to_y2028 PARTITION OF flags.confident +FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2028_to_y2029 PARTITION OF flags.confident +FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_y2029_to_y2030 PARTITION OF flags.confident +FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1850_to_y1950 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1950_to_y2000 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2000_to_y2010 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2010_to_y2015 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2015_to_y2016 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2016_to_y2017 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2017_to_y2018 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2018_to_y2019 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2019_to_y2020 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2020_to_y2021 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2021_to_y2022 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2022_to_y2023 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2023_to_y2024 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2024_to_y2025 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2025_to_y2026 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2026_to_y2027 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2027_to_y2028 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2028_to_y2029 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2029_to_y2030 PARTITION OF flags.confident_provenance FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); diff --git a/util/Cargo.toml b/util/Cargo.toml new file mode 100644 index 00000000..0d62e251 --- /dev/null +++ b/util/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "util" +version = "0.1.0" +edition.workspace = true + +[[bin]] +name = "generate_partition_queries" + +[dependencies] +chrono.workspace = true diff --git a/util/src/bin/generate_partition_queries.rs b/util/src/bin/generate_partition_queries.rs new file mode 100644 index 00000000..4779147a --- /dev/null +++ b/util/src/bin/generate_partition_queries.rs @@ -0,0 +1,60 @@ +use chrono::{DateTime, TimeZone, Utc}; +use std::{ + fs::File, + io::{BufWriter, Write}, +}; + +fn create_table_partitions( + table: &str, + boundaries: &[DateTime], + writer: &mut BufWriter, +) -> Result<(), std::io::Error> { + // .windows(2) gives a 2-wide sliding view of the vector, so we can see + // both bounds relevant to a partition + for window in boundaries.windows(2) { + let start_time = window[0]; + let end_time = window[1]; + + let line = format!( + "CREATE TABLE IF NOT EXISTS {}_y{}_to_y{} PARTITION OF {}\nFOR VALUES FROM ('{}') TO ('{}');\n", + table, + start_time.format("%Y"), + end_time.format("%Y"), + table, + start_time.format("%Y-%m-%d %H:%M:%S+00"), + end_time.format("%Y-%m-%d %H:%M:%S+00") + ); + writer.write_all(line.as_bytes())?; + } + + Ok(()) +} + +fn main() -> Result<(), std::io::Error> { + let outfile = File::create("../db/partitions_generated.sql")?; + let mut writer = BufWriter::new(outfile); + + writer.write("-- Generated by util/src/bin/generate_partition_queries.rs\n".as_bytes())?; + + // create a vector of the boundaries between partitions + let paritition_boundary_years: Vec> = [1850, 1950, 2000, 2010] + .into_iter() + .chain(2015..=2030) + .map(|y| Utc.with_ymd_and_hms(y, 1, 1, 0, 0, 0).unwrap()) + .collect(); + + create_table_partitions("public.data", &paritition_boundary_years, &mut writer)?; + create_table_partitions( + "public.nonscalar_data", + &paritition_boundary_years, + &mut writer, + )?; + create_table_partitions("flags.confident", &paritition_boundary_years, &mut writer)?; + create_table_partitions( + "flags.confident_provenance", + &paritition_boundary_years, + &mut writer, + )?; + + Ok(()) +} diff --git a/util/src/lib.rs b/util/src/lib.rs new file mode 100644 index 00000000..e69de29b From 5ef23c065def808423cf1be35bb2d229d0bd2303 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Tue, 14 Jan 2025 21:03:32 +0100 Subject: [PATCH 12/18] fix integration tests --- .../mock_qc_pipelines/fresh/TA_PT1H.toml | 47 +++++++++++++++++++ integration_tests/tests/end_to_end.rs | 27 +++++++---- 2 files changed, 65 insertions(+), 9 deletions(-) create mode 100644 integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml diff --git a/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml b/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml new file mode 100644 index 00000000..f8cd065f --- /dev/null +++ b/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml @@ -0,0 +1,47 @@ +[header] +# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex? +# Would we be better served with a blocklist/allowlist? +# stations = "*" +param_id = 211 +time_resolution = "PT1H" +# level = "*" +sensor = [0] + +[pipeline] +[[pipeline.step]] +name = "special_value_check" +[pipeline.step.special_value_check] +special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] + +[[pipeline.step]] +name = "range_check" +[pipeline.step.range_check] +min = -55 +max = 50 + +# [[pipeline.step]] +# name = "climate_range_check" +# [pipeline.step.range_check_dynamic] +# source = "netcdf" # TODO: define a neat spec for this? + +[[pipeline.step]] +name = "step_check" +[pipeline.step.step_check] +max = 18.6 + +[[pipeline.step]] +name = "flatline_check" +[pipeline.step.flatline_check] +max = 10 + +[[pipeline.step]] +name = "spike_check" +[pipeline.step.spike_check] +max = 18.6 + +# [[pipeline.step]] +# name = "model_consistency_check" +# [pipeline.step.model_consistency_check] +# model_source = "lustre" +# model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +# threshold = 3.0 # FIXME: made up value by Ingrid diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 786bda12..b83732ae 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -13,13 +13,14 @@ use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp}; use tokio::sync::mpsc; use tokio_postgres::NoTls; -use lard_api::timeseries::Timeseries; -use lard_api::{LatestResp, TimeseriesResp, TimesliceResp}; -use lard_ingestion::kvkafka; -use lard_ingestion::permissions::{ - timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable, +use lard_api::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp}; +use lard_ingestion::{ + kvkafka, + permissions::{timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable}, + qc_pipelines::load_pipelines, + KldataResp, }; -use lard_ingestion::KldataResp; +use rove_connector::Connector; const CONNECT_STRING: &str = "host=localhost user=postgres dbname=postgres password=postgres"; const PARAMCONV_CSV: &str = "../ingestion/resources/paramconversions.csv"; @@ -245,6 +246,11 @@ async fn e2e_test_wrapper>(test: T) { let api_pool = db_pool.clone(); let ingestion_pool = db_pool.clone(); + let rove_connector = Connector { + pool: db_pool.clone(), + }; + let qc_pipelines = load_pipelines("mock_qc_pipelines/fresh").expect("failed to load pipelines"); + let api_server = tokio::spawn(async move { tokio::select! { output = lard_api::run(api_pool) => output, @@ -256,9 +262,12 @@ async fn e2e_test_wrapper>(test: T) { }); let ingestor = tokio::spawn(async move { tokio::select! { - output = lard_ingestion::run(ingestion_pool, - PARAMCONV_CSV, - mock_permit_tables(), + output = lard_ingestion::run( + ingestion_pool, + PARAMCONV_CSV, + mock_permit_tables(), + rove_connector, + qc_pipelines, ) => output, _ = init_shutdown_rx2.recv() => { ingestor_shutdown_tx.send(()).unwrap(); From 299fc7e7db5f38d753a8bf79a60991112edc72b1 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Tue, 14 Jan 2025 21:09:12 +0100 Subject: [PATCH 13/18] fix rustfmt complaint about empty file --- util/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/util/src/lib.rs b/util/src/lib.rs index e69de29b..8b137891 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -0,0 +1 @@ + From 1fe477390d979b3ecfd0dd2ef91f4ec2fb9695ac Mon Sep 17 00:00:00 2001 From: Ingrid Date: Tue, 14 Jan 2025 21:35:38 +0100 Subject: [PATCH 14/18] fix clippy lint --- util/src/bin/generate_partition_queries.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/src/bin/generate_partition_queries.rs b/util/src/bin/generate_partition_queries.rs index 4779147a..fa7970b7 100644 --- a/util/src/bin/generate_partition_queries.rs +++ b/util/src/bin/generate_partition_queries.rs @@ -34,7 +34,7 @@ fn main() -> Result<(), std::io::Error> { let outfile = File::create("../db/partitions_generated.sql")?; let mut writer = BufWriter::new(outfile); - writer.write("-- Generated by util/src/bin/generate_partition_queries.rs\n".as_bytes())?; + writer.write_all("-- Generated by util/src/bin/generate_partition_queries.rs\n".as_bytes())?; // create a vector of the boundaries between partitions let paritition_boundary_years: Vec> = [1850, 1950, 2000, 2010] From 8caae2b36e4036b45c0b31eafa03a911eed7f18a Mon Sep 17 00:00:00 2001 From: Ingrid Date: Wed, 15 Jan 2025 11:05:16 +0100 Subject: [PATCH 15/18] fix schema ordering in e2e test setup --- integration_tests/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 36eece8e..5b1e7c7e 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -24,9 +24,9 @@ async fn main() { // NOTE: order matters let schemas = [ "db/public.sql", - "db/partitions_generated.sql", "db/labels.sql", "db/flags.sql", + "db/partitions_generated.sql", ]; for schema in schemas { insert_schema(&client, schema).await.unwrap(); From 1762bdf2498b77b1c6d122761afcc6e10df3561f Mon Sep 17 00:00:00 2001 From: Ingrid Date: Thu, 16 Jan 2025 16:09:52 +0100 Subject: [PATCH 16/18] merge confident.flags into public.data --- db/flags.sql | 11 -- db/partitions_generated.sql | 38 ----- db/public.sql | 4 + ingestion/src/kldata.rs | 1 + ingestion/src/lib.rs | 153 +++++++++++---------- util/src/bin/generate_partition_queries.rs | 1 - 6 files changed, 82 insertions(+), 126 deletions(-) diff --git a/db/flags.sql b/db/flags.sql index 29a36ff7..76905e37 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -1,16 +1,5 @@ CREATE SCHEMA IF NOT EXISTS flags; -CREATE TABLE IF NOT EXISTS flags.confident ( - timeseries INT4 NOT NULL, - obstime TIMESTAMPTZ NOT NULL, - usable BOOLEAN NOT NULL, - CONSTRAINT unique_confident_timeseries_obstime UNIQUE (timeseries, obstime), - -- TODO: should this and confident_providence fk into public.data? - CONSTRAINT fk_confident_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries -) PARTITION BY RANGE (obstime); -CREATE INDEX IF NOT EXISTS confident_timestamp_index ON flags.confident (obstime); -CREATE INDEX IF NOT EXISTS confident_timeseries_index ON flags.confident USING HASH (timeseries); - -- TODO: should this also have a column for qc_time or some such? CREATE TABLE IF NOT EXISTS flags.confident_provenance ( timeseries INT4 NOT NULL, diff --git a/db/partitions_generated.sql b/db/partitions_generated.sql index 81c1023f..e6a8ca05 100644 --- a/db/partitions_generated.sql +++ b/db/partitions_generated.sql @@ -75,44 +75,6 @@ CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2028_to_y2029 PARTITION OF pub FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y1850_to_y1950 PARTITION OF flags.confident -FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y1950_to_y2000 PARTITION OF flags.confident -FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2000_to_y2010 PARTITION OF flags.confident -FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2010_to_y2015 PARTITION OF flags.confident -FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2015_to_y2016 PARTITION OF flags.confident -FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2016_to_y2017 PARTITION OF flags.confident -FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2017_to_y2018 PARTITION OF flags.confident -FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2018_to_y2019 PARTITION OF flags.confident -FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2019_to_y2020 PARTITION OF flags.confident -FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2020_to_y2021 PARTITION OF flags.confident -FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2021_to_y2022 PARTITION OF flags.confident -FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2022_to_y2023 PARTITION OF flags.confident -FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2023_to_y2024 PARTITION OF flags.confident -FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2024_to_y2025 PARTITION OF flags.confident -FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2025_to_y2026 PARTITION OF flags.confident -FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2026_to_y2027 PARTITION OF flags.confident -FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2027_to_y2028 PARTITION OF flags.confident -FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2028_to_y2029 PARTITION OF flags.confident -FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS flags.confident_y2029_to_y2030 PARTITION OF flags.confident -FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1850_to_y1950 PARTITION OF flags.confident_provenance FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1950_to_y2000 PARTITION OF flags.confident_provenance diff --git a/db/public.sql b/db/public.sql index 90db0864..566b6857 100644 --- a/db/public.sql +++ b/db/public.sql @@ -32,6 +32,9 @@ CREATE TABLE IF NOT EXISTS public.data ( timeseries INT4 NOT NULL, obstime TIMESTAMPTZ NOT NULL, obsvalue REAL, + -- TODO: should qc_usable be NOT NULL? and default to true? + -- It would make greatly reduce the update load when QCing old data + qc_usable BOOLEAN, CONSTRAINT unique_data_timeseries_obstime UNIQUE (timeseries, obstime), CONSTRAINT fk_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries ) PARTITION BY RANGE (obstime); @@ -43,6 +46,7 @@ CREATE TABLE IF NOT EXISTS public.nonscalar_data ( timeseries INT4 NOT NULL, obstime TIMESTAMPTZ NOT NULL, obsvalue TEXT, + qc_usable BOOLEAN, CONSTRAINT unique_nonscalar_data_timeseries_obstime UNIQUE (timeseries, obstime), CONSTRAINT fk_nonscalar_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries ) PARTITION BY RANGE (obstime); diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 3158ca69..d7e5378d 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -384,6 +384,7 @@ pub async fn filter_and_label_kldata<'a>( timeseries_id, param_id: param.id, value: in_datum.value, + qc_usable: None, }); } out_chunks.push(DataChunk { diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index bec98958..fc923e21 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -126,6 +126,7 @@ pub struct Datum<'a> { // needed for QC param_id: i32, value: ObsType<'a>, + qc_usable: Option, } /// Generic container for a piece of data ready to be inserted into the DB @@ -135,7 +136,7 @@ pub struct DataChunk<'a> { data: Vec>, } -pub struct QcResult { +pub struct QcProvenance { timeseries_id: i32, timestamp: DateTime, // TODO: possible to avoid heap-allocating this? @@ -148,10 +149,24 @@ pub struct QcResult { // TODO: benchmark insertion of scalar and non-scalar together vs separately? pub async fn insert_data( chunks: &Vec>, + provenance: &[QcProvenance], conn: &mut PooledPgConn<'_>, ) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement // + // --- + // + // On periodic or consistency QC pipelines, we should be checking the provenance table to + // decide how to update usable on a conflict, but here it should be fine not to since this is + // fresh data. + // The `AND` in the `DO UPDATE SET` subexpression better handles the case of resent data where + // periodic checks might already have been run by defaulting to false. If the existing data was + // only fresh checked, and the replacement is different, this could result in a false positive. + // I think this is OK though since it should be a rare occurence and will be quickly cleared up + // by a periodic run regardless. + // + // --- + // // I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new // messages that overwrite previous messages. The catch is that the new message does not need // to contain all the params of the old message (or indeed any of them), and any that are left @@ -164,19 +179,29 @@ pub async fn insert_data( // implement it here. let query_scalar = conn .prepare( - "INSERT INTO public.data (timeseries, obstime, obsvalue) \ - VALUES ($1, $2, $3) \ + "INSERT INTO public.data (timeseries, obstime, obsvalue, qc_usable) \ + VALUES ($1, $2, $3, $4) \ ON CONFLICT ON CONSTRAINT unique_data_timeseries_obstime \ - DO UPDATE SET obsvalue = EXCLUDED.obsvalue", + DO UPDATE SET obsvalue = EXCLUDED.obsvalue, \ + qc_usable = public.data.qc_usable AND EXCLUDED.qc_usable", ) .await?; let query_nonscalar = conn .prepare( - "INSERT INTO public.nonscalar_data (timeseries, obstime, obsvalue) \ - VALUES ($1, $2, $3) \ + "INSERT INTO public.nonscalar_data (timeseries, obstime, obsvalue, qc_usable) \ + VALUES ($1, $2, $3, $4) \ ON CONFLICT ON CONSTRAINT unique_nonscalar_data_timeseries_obstime \ - DO UPDATE SET obsvalue = EXCLUDED.obsvalue", + DO UPDATE SET obsvalue = EXCLUDED.obsvalue, \ + qc_usable = public.nonscalar_data.qc_usable AND EXCLUDED.qc_usable", + ) + .await?; + let query_provenance = conn + .prepare( + "INSERT INTO flags.confident_provenance (timeseries, obstime, pipeline, flag, fail_condition) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT ON CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline \ + DO UPDATE SET flag = EXCLUDED.flag, fail_condition = EXCLUDED.fail_condition", ) .await?; @@ -190,14 +215,24 @@ pub async fn insert_data( ObsType::Scalar(val) => { conn.execute( &query_scalar, - &[&datum.timeseries_id, &chunk.timestamp, &val], + &[ + &datum.timeseries_id, + &chunk.timestamp, + &val, + &datum.qc_usable, + ], ) .await } ObsType::NonScalar(val) => { conn.execute( &query_nonscalar, - &[&datum.timeseries_id, &chunk.timestamp, &val], + &[ + &datum.timeseries_id, + &chunk.timestamp, + &val, + &datum.qc_usable, + ], ) .await } @@ -210,42 +245,36 @@ pub async fn insert_data( } } + let mut futures = provenance + .iter() + .map(|qc_result| async { + conn.execute( + &query_provenance, + &[ + &qc_result.timeseries_id, + &qc_result.timestamp, + &qc_result.pipeline, + &qc_result.flag, + &qc_result.fail_condition, + ], + ) + .await + }) + .collect::>(); + + while let Some(res) = futures.next().await { + res?; + } + Ok(()) } pub async fn qc_data( - chunks: &Vec>, - conn: &mut PooledPgConn<'_>, + chunks: &mut Vec>, rove_connector: &rove_connector::Connector, pipelines: &HashMap<(i32, RelativeDuration), rove::Pipeline>, -) -> Result<(), Error> { - // TODO: see conflict resolution issues on queries in `insert_data` - // On periodic or consistency QC pipelines, we should be checking the provenance table to - // decide how to update usable on a conflict, but here it should be fine not to since this is - // fresh data. - // The `AND` in the `DO UPDATE SET` subexpression better handles the case of resent data where - // periodic checks might already have been run by defaulting to false. If the existing data was - // only fresh checked, and the replacement is different, this could result in a false positive. - // I think this is OK though since it should be a rare occurence and will be quickly cleared up - // by a periodic run regardless. - let query = conn - .prepare( - "INSERT INTO flags.confident (timeseries, obstime, usable) \ - VALUES ($1, $2, $3) \ - ON CONFLICT ON CONSTRAINT unique_confident_timeseries_obstime \ - DO UPDATE SET usable = flags.confident.usable AND EXCLUDED.usable", - ) - .await?; - let query_provenance = conn - .prepare( - "INSERT INTO flags.confident_provenance (timeseries, obstime, pipeline, flag, fail_condition) \ - VALUES ($1, $2, $3, $4, $5) \ - ON CONFLICT ON CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline \ - DO UPDATE SET flag = EXCLUDED.flag, fail_condition = EXCLUDED.fail_condition", - ) - .await?; - - let mut qc_results: Vec = Vec::new(); +) -> Result, Error> { + let mut qc_results: Vec = Vec::new(); for chunk in chunks { let time_resolution = match chunk.time_resolution { Some(time_resolution) => time_resolution, @@ -254,7 +283,7 @@ pub async fn qc_data( }; let timestamp = chunk.timestamp.timestamp(); - for datum in chunk.data.iter() { + for datum in chunk.data.iter_mut() { let time_spec = TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution); let pipeline = match pipelines.get(&(datum.param_id, time_resolution)) { @@ -285,7 +314,9 @@ pub async fn qc_data( None => (0, None), }; - qc_results.push(QcResult { + datum.qc_usable = Some(flag == 0); + + qc_results.push(QcProvenance { timeseries_id: datum.timeseries_id, timestamp: chunk.timestamp, // TODO: should this encode more info? In theory the param/type can be deduced from the DB anyway @@ -296,37 +327,7 @@ pub async fn qc_data( } } - let mut futures = qc_results - .iter() - .map(|qc_result| async { - conn.execute( - &query, - &[ - &qc_result.timeseries_id, - &qc_result.timestamp, - &(qc_result.flag == 0), - ], - ) - .await?; - conn.execute( - &query_provenance, - &[ - &qc_result.timeseries_id, - &qc_result.timestamp, - &qc_result.pipeline, - &qc_result.flag, - &qc_result.fail_condition, - ], - ) - .await - }) - .collect::>(); - - while let Some(res) = futures.next().await { - res?; - } - - Ok(()) + Ok(qc_results) } pub mod kldata; @@ -361,14 +362,14 @@ async fn handle_kldata( let (message_id, obsinn_chunk) = parse_kldata(&body, param_conversions.clone())?; - let data = + let mut data = filter_and_label_kldata(obsinn_chunk, &mut conn, param_conversions, permit_table) .await?; - insert_data(&data, &mut conn).await?; - // TODO: should we tolerate failure here? Perhaps there should be metric for this? - qc_data(&data, &mut conn, &rove_connector, &qc_pipelines).await?; + let provenance = qc_data(&mut data, &rove_connector, &qc_pipelines).await?; + + insert_data(&data, &provenance, &mut conn).await?; Ok(message_id) } diff --git a/util/src/bin/generate_partition_queries.rs b/util/src/bin/generate_partition_queries.rs index fa7970b7..4fe2008a 100644 --- a/util/src/bin/generate_partition_queries.rs +++ b/util/src/bin/generate_partition_queries.rs @@ -49,7 +49,6 @@ fn main() -> Result<(), std::io::Error> { &paritition_boundary_years, &mut writer, )?; - create_table_partitions("flags.confident", &paritition_boundary_years, &mut writer)?; create_table_partitions( "flags.confident_provenance", &paritition_boundary_years, From 12ce22c680661b3243e147645d31563cb62f3116 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 17 Jan 2025 15:32:42 +0100 Subject: [PATCH 17/18] modify the one current qc pipeline to match the new format --- qc_pipelines/fresh/TA_PT1H.toml | 41 +++++++++++++++++---------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/qc_pipelines/fresh/TA_PT1H.toml b/qc_pipelines/fresh/TA_PT1H.toml index e16c67e9..f8cd065f 100644 --- a/qc_pipelines/fresh/TA_PT1H.toml +++ b/qc_pipelines/fresh/TA_PT1H.toml @@ -7,40 +7,41 @@ time_resolution = "PT1H" # level = "*" sensor = [0] -[[step]] +[pipeline] +[[pipeline.step]] name = "special_value_check" -[step.special_value_check] +[pipeline.step.special_value_check] special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] -[[step]] +[[pipeline.step]] name = "range_check" -[step.range_check] +[pipeline.step.range_check] min = -55 max = 50 -[[step]] -name = "climate_range_check" -[step.range_check_dynamic] -source = "netcdf" # TODO: define a neat spec for this? +# [[pipeline.step]] +# name = "climate_range_check" +# [pipeline.step.range_check_dynamic] +# source = "netcdf" # TODO: define a neat spec for this? -[[step]] +[[pipeline.step]] name = "step_check" -[step.step_check] +[pipeline.step.step_check] max = 18.6 -[[step]] +[[pipeline.step]] name = "flatline_check" -[step.flatline_check] +[pipeline.step.flatline_check] max = 10 -[[step]] +[[pipeline.step]] name = "spike_check" -[step.spike_check] +[pipeline.step.spike_check] max = 18.6 -[[step]] -name = "model_consistency_check" -[step.model_consistency_check] -model_source = "lustre" -model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model -threshold = 3.0 # FIXME: made up value by Ingrid +# [[pipeline.step]] +# name = "model_consistency_check" +# [pipeline.step.model_consistency_check] +# model_source = "lustre" +# model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +# threshold = 3.0 # FIXME: made up value by Ingrid From 150c6531871b327b455dc69b0b38368355385631 Mon Sep 17 00:00:00 2001 From: Ingrid Date: Fri, 17 Jan 2025 16:00:49 +0100 Subject: [PATCH 18/18] fix path to load qc pipelines from in the future we probably want to have directories like qc_pipelines/periodic and qc_pipelines/consistency, but for now we don't handle that so we just load qc_pipelines/fresh directly --- ingestion/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 6bee0586..6efebf08 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { pool: db_pool.clone(), }; - let qc_pipelines = load_pipelines("qc_pipelines")?; + let qc_pipelines = load_pipelines("qc_pipelines/fresh")?; println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins