diff --git a/Cargo.lock b/Cargo.lock index dcf5422b..bd229183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,15 +1486,19 @@ dependencies = [ "bb8-postgres", "bytes", "chrono", + "chronoutil", "csv", "futures", "kafka", "quick-xml", "regex", + "rove", + "rove_connector", "serde", "thiserror", "tokio", "tokio-postgres", + "toml", ] [[package]] @@ -2344,7 +2348,7 @@ dependencies = [ [[package]] name = "rove" version = "0.1.1" -source = "git+https://github.com/metno/rove.git#95a7770f473a30ea6646a0b9254e8f974b673055" +source = "git+https://github.com/metno/rove.git?branch=lard_fixes#5f70d99d59b0d9d1c8fc301893684c919cab286f" dependencies = [ "async-trait", "chrono", @@ -3198,6 +3202,13 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "util" +version = "0.1.0" +dependencies = [ + "chrono", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 4acf2bbc..bd4254ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "api", "ingestion", "integration_tests", - "rove_connector", + "rove_connector", "util", ] resolver = "2" @@ -27,8 +27,10 @@ quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ] rand = "0.8.5" rand_distr = "0.4.3" regex = "1.11.1" -rove = { git = "https://github.com/metno/rove.git" } +rove = { git = "https://github.com/metno/rove.git", branch = "lard_fixes" } +# rove = { git = "https://github.com/metno/rove.git" } serde = { version = "1.0.217", features = ["derive"] } thiserror = "1.0.69" tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } +toml = "0.8.19" diff --git a/db/flags.sql b/db/flags.sql index 4f2ef406..76905e37 100644 --- a/db/flags.sql +++ b/db/flags.sql @@ -1,5 +1,20 @@ CREATE SCHEMA IF NOT EXISTS flags; +-- TODO: should this also have a column for qc_time or some such? +CREATE TABLE IF NOT EXISTS flags.confident_provenance ( + timeseries INT4 NOT NULL, + obstime TIMESTAMPTZ NOT NULL, + pipeline TEXT NOT NULL, + -- TODO: should this be an enum? + flag INT4 NOT NULL, + -- TODO: better name? since this might be applied to flags that aren't fail but also aren't pass? + fail_condition TEXT NULL, + CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline), + CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries +) PARTITION BY RANGE (obstime); +CREATE INDEX IF NOT EXISTS confident_provenance_timestamp_index ON flags.confident_provenance (obstime); +CREATE INDEX IF NOT EXISTS confident_provenance_timeseries_index ON flags.confident_provenance USING HASH (timeseries); + CREATE TABLE IF NOT EXISTS flags.kvdata ( timeseries INT4 REFERENCES public.timeseries, obstime TIMESTAMPTZ NOT NULL, diff --git a/db/partitions_generated.sql b/db/partitions_generated.sql index 73aebbb4..e6a8ca05 100644 --- a/db/partitions_generated.sql +++ b/db/partitions_generated.sql @@ -1,77 +1,115 @@ --- Generated by simple script for testing -CREATE TABLE IF NOT EXISTS data_y1850_to_y1950 PARTITION OF public.data +-- Generated by util/src/bin/generate_partition_queries.rs +CREATE TABLE IF NOT EXISTS public.data_y1850_to_y1950 PARTITION OF public.data FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y1950_to_y2000 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y1950_to_y2000 PARTITION OF public.data FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2000_to_y2010 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2000_to_y2010 PARTITION OF public.data FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2010_to_y2015 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2010_to_y2015 PARTITION OF public.data FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2015_to_y2016 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2015_to_y2016 PARTITION OF public.data FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2016_to_y2017 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2016_to_y2017 PARTITION OF public.data FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2017_to_y2018 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2017_to_y2018 PARTITION OF public.data FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2018_to_y2019 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2018_to_y2019 PARTITION OF public.data FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2019_to_y2020 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2019_to_y2020 PARTITION OF public.data FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2020_to_y2021 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2020_to_y2021 PARTITION OF public.data FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2021_to_y2022 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2021_to_y2022 PARTITION OF public.data FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2022_to_y2023 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2022_to_y2023 PARTITION OF public.data FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2023_to_y2024 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2023_to_y2024 PARTITION OF public.data FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2024_to_y2025 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2024_to_y2025 PARTITION OF public.data FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2025_to_y2026 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2025_to_y2026 PARTITION OF public.data FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2026_to_y2027 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2026_to_y2027 PARTITION OF public.data FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2027_to_y2028 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2027_to_y2028 PARTITION OF public.data FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2028_to_y2029 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2028_to_y2029 PARTITION OF public.data FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS data_y2029_to_y2030 PARTITION OF public.data +CREATE TABLE IF NOT EXISTS public.data_y2029_to_y2030 PARTITION OF public.data FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); -CREATE TABLE IF NOT EXISTS nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data +CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data +FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1850_to_y1950 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1950_to_y2000 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2000_to_y2010 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2010_to_y2015 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2015_to_y2016 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2016_to_y2017 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2017_to_y2018 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2018_to_y2019 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2019_to_y2020 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2020_to_y2021 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2021_to_y2022 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2022_to_y2023 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2023_to_y2024 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2024_to_y2025 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2025_to_y2026 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2026_to_y2027 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2027_to_y2028 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2028_to_y2029 PARTITION OF flags.confident_provenance +FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00'); +CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2029_to_y2030 PARTITION OF flags.confident_provenance FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00'); diff --git a/db/public.sql b/db/public.sql index 90db0864..566b6857 100644 --- a/db/public.sql +++ b/db/public.sql @@ -32,6 +32,9 @@ CREATE TABLE IF NOT EXISTS public.data ( timeseries INT4 NOT NULL, obstime TIMESTAMPTZ NOT NULL, obsvalue REAL, + -- TODO: should qc_usable be NOT NULL? and default to true? + -- It would make greatly reduce the update load when QCing old data + qc_usable BOOLEAN, CONSTRAINT unique_data_timeseries_obstime UNIQUE (timeseries, obstime), CONSTRAINT fk_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries ) PARTITION BY RANGE (obstime); @@ -43,6 +46,7 @@ CREATE TABLE IF NOT EXISTS public.nonscalar_data ( timeseries INT4 NOT NULL, obstime TIMESTAMPTZ NOT NULL, obsvalue TEXT, + qc_usable BOOLEAN, CONSTRAINT unique_nonscalar_data_timeseries_obstime UNIQUE (timeseries, obstime), CONSTRAINT fk_nonscalar_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries ) PARTITION BY RANGE (obstime); diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 238bd47b..1450c9c5 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -15,12 +15,16 @@ bb8.workspace = true bb8-postgres.workspace = true bytes.workspace = true chrono.workspace = true +chronoutil.workspace = true csv.workspace = true futures.workspace = true kafka.workspace = true quick-xml.workspace = true regex.workspace = true +rove.workspace = true +rove_connector = { path = "../rove_connector" } serde.workspace = true thiserror.workspace = true tokio.workspace = true tokio-postgres.workspace = true +toml.workspace = true diff --git a/ingestion/src/kldata.rs b/ingestion/src/kldata.rs index 19480f86..d7e5378d 100644 --- a/ingestion/src/kldata.rs +++ b/ingestion/src/kldata.rs @@ -1,8 +1,9 @@ use crate::{ permissions::{timeseries_is_open, ParamPermitTable, StationPermitTable}, - Datum, Error, ObsType, PooledPgConn, ReferenceParam, + DataChunk, Datum, Error, ObsType, PooledPgConn, ReferenceParam, }; use chrono::{DateTime, NaiveDateTime, Utc}; +use chronoutil::RelativeDuration; use regex::Regex; use std::{ collections::HashMap, @@ -18,17 +19,12 @@ pub struct ObsinnChunk<'a> { observations: Vec>, station_id: i32, // TODO: change name here to nationalnummer? type_id: i32, + timestamp: DateTime, } /// Represents a single observation from an obsinn message #[derive(Debug, PartialEq)] pub struct ObsinnObs<'a> { - // TODO: this timestamp is shared by all obs in the row, maybe we should have - // a Vec here and remove the Vec from ObsinnChunk - // NOTE: this struct maps to Datum, the type used during insertion that should take into - // account all possible sources. While the proposed TODO would perfectly fit Obsinn, we would - // also need to change Datum accordingly, which might not work well for other sources. - timestamp: DateTime, id: ObsinnId, value: ObsType<'a>, } @@ -153,11 +149,13 @@ fn parse_obs<'a>( csv_body: Lines<'a>, columns: &[ObsinnId], reference_params: Arc>, -) -> Result>, Error> { - let mut obs = Vec::new(); + header: ObsinnHeader<'a>, +) -> Result>, Error> { + let mut chunks = Vec::new(); let row_is_empty = || Error::Parse("empty row in kldata csv".to_string()); for row in csv_body { + let mut obs = Vec::new(); let (timestamp, vals) = { let mut vals = row.split(','); @@ -201,25 +199,29 @@ fn parse_obs<'a>( } }; - obs.push(ObsinnObs { - timestamp, - id: col, - value, - }) + obs.push(ObsinnObs { id: col, value }) } - } - if obs.is_empty() { - return Err(row_is_empty()); + // TODO: should this be more resiliant? + if obs.is_empty() { + return Err(row_is_empty()); + } + + chunks.push(ObsinnChunk { + observations: obs, + station_id: header.station_id, + type_id: header.type_id, + timestamp, + }) } - Ok(obs) + Ok(chunks) } pub fn parse_kldata( msg: &str, reference_params: Arc>, -) -> Result<(usize, ObsinnChunk), Error> { +) -> Result<(usize, Vec), Error> { let (header, columns, csv_body) = { let mut csv_body = msg.lines(); let lines_err = || Error::Parse("kldata message contained too few lines".to_string()); @@ -234,22 +236,34 @@ pub fn parse_kldata( Ok(( header.message_id, - ObsinnChunk { - observations: parse_obs(csv_body, &columns, reference_params)?, - station_id: header.station_id, - type_id: header.type_id, - }, + parse_obs(csv_body, &columns, reference_params, header)?, )) } +// TODO: this is a messy hack, but it's the only way people at met currently have to determine +// time_resolution. Ultimately we intend to store time_resolution info in the database under +// public.timeseries or labels.met. This will be populated by a combination of a script that looks +// at a timeseries's history, and manual editing by content managers. +pub fn type_id_to_time_resolution(type_id: i32) -> Option { + // Source for these matches: PDF presented by PiM + match type_id { + 514 => Some(RelativeDuration::minutes(1)), + 506 | 509 | 510 => Some(RelativeDuration::minutes(10)), + 7 | 311 | 330 | 342 | 501 | 502 | 503 | 505 | 507 | 511 => Some(RelativeDuration::hours(1)), + 522 => Some(RelativeDuration::days(1)), + 399 => Some(RelativeDuration::years(1)), + _ => None, + } +} + // TODO: rewrite such that queries can be pipelined? // not pipelining here hurts latency, but shouldn't matter for throughput pub async fn filter_and_label_kldata<'a>( - chunk: ObsinnChunk<'a>, + chunks: Vec>, conn: &mut PooledPgConn<'_>, param_conversions: Arc>, permit_table: Arc>, -) -> Result>, Error> { +) -> Result>, Error> { let query_get_obsinn = conn .prepare( "SELECT timeseries \ @@ -262,116 +276,126 @@ pub async fn filter_and_label_kldata<'a>( ) .await?; - let mut data = Vec::with_capacity(chunk.observations.len()); - - for in_datum in chunk.observations { - // get the conversion first, so we avoid wasting a tsid if it doesn't exist - let param = param_conversions - .get(&in_datum.id.param_code) - .ok_or_else(|| { - Error::Parse(format!( - "unrecognised param_code '{}'", - in_datum.id.param_code - )) - })?; - - // TODO: we only need to check inside this loop if station_id is in the - // param_permit_table - if !timeseries_is_open( - permit_table.clone(), - chunk.station_id, - chunk.type_id, - param.id, - )? { - // TODO: log that the timeseries is closed? Mostly useful for tests - #[cfg(feature = "integration_tests")] - eprintln!("station {}: timeseries is closed", chunk.station_id); - continue; - } - - let transaction = conn.transaction().await?; - - let (sensor, lvl) = in_datum - .id - .sensor_and_level - .map(|both| (Some(both.0), Some(both.1))) - .unwrap_or((None, None)); - - let obsinn_label_result = transaction - .query_opt( - &query_get_obsinn, - &[ - &chunk.station_id, - &chunk.type_id, - &in_datum.id.param_code, - &lvl, - &sensor, - ], - ) - .await?; - - let timeseries_id: i32 = match obsinn_label_result { - Some(row) => row.get(0), - None => { - // create new timeseries - // TODO: currently we create a timeseries with null location - // In the future the location column should be moved to the timeseries metadata table - let timeseries_id = transaction - .query_one( - "INSERT INTO public.timeseries (fromtime) VALUES ($1) RETURNING id", - &[&in_datum.timestamp], - ) - .await? - .get(0); + let mut out_chunks = Vec::with_capacity(chunks.len()); + for chunk in chunks { + let mut data = Vec::with_capacity(chunk.observations.len()); + + for in_datum in chunk.observations { + // get the conversion first, so we avoid wasting a tsid if it doesn't exist + let param = param_conversions + .get(&in_datum.id.param_code) + .ok_or_else(|| { + Error::Parse(format!( + "unrecognised param_code '{}'", + in_datum.id.param_code + )) + })?; + + // TODO: we only need to check inside this loop if station_id is in the + // param_permit_table + if !timeseries_is_open( + permit_table.clone(), + chunk.station_id, + chunk.type_id, + param.id, + )? { + // TODO: log that the timeseries is closed? Mostly useful for tests + #[cfg(feature = "integration_tests")] + eprintln!("station {}: timeseries is closed", chunk.station_id); + continue; + } - // create obsinn label - transaction - .execute( - "INSERT INTO labels.obsinn \ + let transaction = conn.transaction().await?; + + let (sensor, lvl) = in_datum + .id + .sensor_and_level + .map(|both| (Some(both.0), Some(both.1))) + .unwrap_or((None, None)); + + let obsinn_label_result = transaction + .query_opt( + &query_get_obsinn, + &[ + &chunk.station_id, + &chunk.type_id, + &in_datum.id.param_code, + &lvl, + &sensor, + ], + ) + .await?; + + let timeseries_id: i32 = match obsinn_label_result { + Some(row) => row.get(0), + None => { + // create new timeseries + // TODO: currently we create a timeseries with null location + // In the future the location column should be moved to the timeseries metadata table + let timeseries_id = transaction + .query_one( + "INSERT INTO public.timeseries (fromtime) VALUES ($1) RETURNING id", + &[&chunk.timestamp], + ) + .await? + .get(0); + + // create obsinn label + transaction + .execute( + "INSERT INTO labels.obsinn \ (timeseries, nationalnummer, type_id, param_code, lvl, sensor) \ VALUES ($1, $2, $3, $4, $5, $6)", - &[ - ×eries_id, - &chunk.station_id, - &chunk.type_id, - &in_datum.id.param_code, - &lvl, - &sensor, - ], - ) - .await?; - - // create met label - transaction - .execute( - "INSERT INTO labels.met \ + &[ + ×eries_id, + &chunk.station_id, + &chunk.type_id, + &in_datum.id.param_code, + &lvl, + &sensor, + ], + ) + .await?; + + // create met label + transaction + .execute( + "INSERT INTO labels.met \ (timeseries, station_id, param_id, type_id, lvl, sensor) \ VALUES ($1, $2, $3, $4, $5, $6)", - &[ - ×eries_id, - &chunk.station_id, - ¶m.id, - &chunk.type_id, - &lvl, - &sensor, - ], - ) - .await?; - - timeseries_id - } - }; + &[ + ×eries_id, + &chunk.station_id, + ¶m.id, + &chunk.type_id, + &lvl, + &sensor, + ], + ) + .await?; + + timeseries_id + } + }; - transaction.commit().await?; + transaction.commit().await?; - data.push(Datum { - timeseries_id, - timestamp: in_datum.timestamp, - value: in_datum.value, + data.push(Datum { + timeseries_id, + param_id: param.id, + value: in_datum.value, + qc_usable: None, + }); + } + out_chunks.push(DataChunk { + timestamp: chunk.timestamp, + // TODO: real time_resolution (derive from type_id for now) + time_resolution: type_id_to_time_resolution(chunk.type_id), + data, }); } - Ok(data) + Ok(out_chunks) } #[cfg(test)] @@ -526,32 +550,40 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), }, - value: Scalar(-1.1), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), }, - value: Scalar(0.0), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), }, - value: Scalar(2.8), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "single line", ), ( @@ -570,54 +602,68 @@ mod tests { sensor_and_level: None, }, ], + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, - }, - value: Scalar(-1.1), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, - }, - value: Scalar(0.0), - }, - ObsinnObs { + ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.1), + }, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(0.0), + }, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.8), + }, + ], timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 41, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, - }, - value: Scalar(2.8), + station_id: 18700, + type_id: 511, }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, - }, - value: Scalar(-1.5), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "CI".to_string(), - sensor_and_level: None, - }, - value: Scalar(1.0), - }, - ObsinnObs { + ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(-1.5), + }, + ObsinnObs { + id: ObsinnId { + param_code: "CI".to_string(), + sensor_and_level: None, + }, + value: Scalar(1.0), + }, + ObsinnObs { + id: ObsinnId { + param_code: "IR".to_string(), + sensor_and_level: None, + }, + value: Scalar(2.9), + }, + ], timestamp: Utc.with_ymd_and_hms(2016, 2, 1, 5, 51, 0).unwrap(), - id: ObsinnId { - param_code: "IR".to_string(), - sensor_and_level: None, - }, - value: Scalar(2.9), + station_id: 18700, + type_id: 511, }, ]), "multiple lines", @@ -634,24 +680,33 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "KLOBS".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "KLOBS".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), }, - value: NonScalar("20240910000000"), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), }, - value: Scalar(10.1), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "non scalar parameter", ), ( @@ -666,31 +721,40 @@ mod tests { sensor_and_level: None, }, ], - Ok(vec![ - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "unknown".to_string(), - sensor_and_level: None, + ObsinnHeader { + station_id: 18700, + type_id: 511, + message_id: 1, + _received_time: None, + }, + Ok(vec![ObsinnChunk { + observations: vec![ + ObsinnObs { + id: ObsinnId { + param_code: "unknown".to_string(), + sensor_and_level: None, + }, + value: NonScalar("20240910000000"), }, - value: NonScalar("20240910000000"), - }, - ObsinnObs { - timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), - id: ObsinnId { - param_code: "TA".to_string(), - sensor_and_level: None, + ObsinnObs { + id: ObsinnId { + param_code: "TA".to_string(), + sensor_and_level: None, + }, + value: Scalar(10.1), }, - value: Scalar(10.1), - }, - ]), + ], + timestamp: Utc.with_ymd_and_hms(2024, 9, 10, 0, 0, 0).unwrap(), + station_id: 18700, + type_id: 511, + }]), "unrecognised param code", ), ]; let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); - for (data, cols, expected, case_description) in cases { - let output = parse_obs(data.lines(), &cols, param_conversions.clone()); + for (data, cols, header, expected, case_description) in cases { + let output = parse_obs(data.lines(), &cols, param_conversions.clone(), header); assert_eq!(output, expected, "{}", case_description); } } @@ -713,12 +777,6 @@ mod tests { )), "header only", ), - ( - "kldata/nationalnr=93140/type=501/messageid=23 - DD(0,0),FF(0,0),DG_1(0,0),FG_1(0,0),KLFG_1(0,0),FX_1(0,0)", - Err(Error::Parse("empty row in kldata csv".to_string())), - "missing data", - ), ]; let param_conversions = get_conversions("resources/paramconversions.csv").unwrap(); diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 78622cc6..fc923e21 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -7,8 +7,10 @@ use axum::{ use bb8::PooledConnection; use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Utc}; +use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use futures::StreamExt; +use rove::data_switch::{TimeSpec, Timestamp}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -20,6 +22,7 @@ use tokio_postgres::NoTls; #[cfg(feature = "kafka")] pub mod kvkafka; pub mod permissions; +pub mod qc_pipelines; use permissions::{ParamPermitTable, StationPermitTable}; #[derive(Error, Debug)] @@ -30,6 +33,10 @@ pub enum Error { Pool(#[from] bb8::RunError), #[error("parse error: {0}")] Parse(String), + #[error("qc system returned an error: {0}")] + Qc(#[from] rove::scheduler::Error), + #[error("rove connector returned an error: {0}")] + Connector(#[from] rove::data_switch::Error), #[error("RwLock was poisoned: {0}")] Lock(String), #[error("Could not read environment variable: {0}")] @@ -73,6 +80,8 @@ struct IngestorState { db_pool: PgConnectionPool, param_conversions: ParamConversions, // converts param codes to element ids permit_tables: Arc>, + rove_connector: Arc, + qc_pipelines: Arc>, } impl FromRef for PgConnectionPool { @@ -93,6 +102,18 @@ impl FromRef for Arc for Arc { + fn from_ref(state: &IngestorState) -> Arc { + state.rove_connector.clone() + } +} + +impl FromRef for Arc> { + fn from_ref(state: &IngestorState) -> Arc> { + state.qc_pipelines.clone() + } +} + /// Represents the different Data types observation can have #[derive(Debug, PartialEq)] pub enum ObsType<'a> { @@ -100,19 +121,52 @@ pub enum ObsType<'a> { NonScalar(&'a str), } -/// Generic container for a piece of data ready to be inserted into the DB pub struct Datum<'a> { timeseries_id: i32, - timestamp: DateTime, + // needed for QC + param_id: i32, value: ObsType<'a>, + qc_usable: Option, } -pub type Data<'a> = Vec>; +/// Generic container for a piece of data ready to be inserted into the DB +pub struct DataChunk<'a> { + timestamp: DateTime, + time_resolution: Option, + data: Vec>, +} + +pub struct QcProvenance { + timeseries_id: i32, + timestamp: DateTime, + // TODO: possible to avoid heap-allocating this? + pipeline: String, + // TODO: correct type? + flag: i32, + fail_condition: Option, +} // TODO: benchmark insertion of scalar and non-scalar together vs separately? -pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result<(), Error> { +pub async fn insert_data( + chunks: &Vec>, + provenance: &[QcProvenance], + conn: &mut PooledPgConn<'_>, +) -> Result<(), Error> { // TODO: the conflict resolution on this query is an imperfect solution, and needs improvement // + // --- + // + // On periodic or consistency QC pipelines, we should be checking the provenance table to + // decide how to update usable on a conflict, but here it should be fine not to since this is + // fresh data. + // The `AND` in the `DO UPDATE SET` subexpression better handles the case of resent data where + // periodic checks might already have been run by defaulting to false. If the existing data was + // only fresh checked, and the replacement is different, this could result in a false positive. + // I think this is OK though since it should be a rare occurence and will be quickly cleared up + // by a periodic run regardless. + // + // --- + // // I learned from Søren that obsinn and kvalobs organise updates and deletions by sending new // messages that overwrite previous messages. The catch is that the new message does not need // to contain all the params of the old message (or indeed any of them), and any that are left @@ -125,41 +179,86 @@ pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result< // implement it here. let query_scalar = conn .prepare( - "INSERT INTO public.data (timeseries, obstime, obsvalue) \ - VALUES ($1, $2, $3) \ + "INSERT INTO public.data (timeseries, obstime, obsvalue, qc_usable) \ + VALUES ($1, $2, $3, $4) \ ON CONFLICT ON CONSTRAINT unique_data_timeseries_obstime \ - DO UPDATE SET obsvalue = EXCLUDED.obsvalue", + DO UPDATE SET obsvalue = EXCLUDED.obsvalue, \ + qc_usable = public.data.qc_usable AND EXCLUDED.qc_usable", ) .await?; let query_nonscalar = conn .prepare( - "INSERT INTO public.nonscalar_data (timeseries, obstime, obsvalue) \ - VALUES ($1, $2, $3) \ + "INSERT INTO public.nonscalar_data (timeseries, obstime, obsvalue, qc_usable) \ + VALUES ($1, $2, $3, $4) \ ON CONFLICT ON CONSTRAINT unique_nonscalar_data_timeseries_obstime \ - DO UPDATE SET obsvalue = EXCLUDED.obsvalue", + DO UPDATE SET obsvalue = EXCLUDED.obsvalue, \ + qc_usable = public.nonscalar_data.qc_usable AND EXCLUDED.qc_usable", + ) + .await?; + let query_provenance = conn + .prepare( + "INSERT INTO flags.confident_provenance (timeseries, obstime, pipeline, flag, fail_condition) \ + VALUES ($1, $2, $3, $4, $5) \ + ON CONFLICT ON CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline \ + DO UPDATE SET flag = EXCLUDED.flag, fail_condition = EXCLUDED.fail_condition", ) .await?; - let mut futures = data - .iter() - .map(|datum| async { - match &datum.value { - ObsType::Scalar(val) => { - conn.execute( - &query_scalar, - &[&datum.timeseries_id, &datum.timestamp, &val], - ) - .await + // TODO: should we flat map into one FuturesUnordered instead of for looping? + for chunk in chunks { + let mut futures = chunk + .data + .iter() + .map(|datum| async { + match &datum.value { + ObsType::Scalar(val) => { + conn.execute( + &query_scalar, + &[ + &datum.timeseries_id, + &chunk.timestamp, + &val, + &datum.qc_usable, + ], + ) + .await + } + ObsType::NonScalar(val) => { + conn.execute( + &query_nonscalar, + &[ + &datum.timeseries_id, + &chunk.timestamp, + &val, + &datum.qc_usable, + ], + ) + .await + } } - ObsType::NonScalar(val) => { - conn.execute( - &query_nonscalar, - &[&datum.timeseries_id, &datum.timestamp, &val], - ) - .await - } - } + }) + .collect::>(); + + while let Some(res) = futures.next().await { + res?; + } + } + + let mut futures = provenance + .iter() + .map(|qc_result| async { + conn.execute( + &query_provenance, + &[ + &qc_result.timeseries_id, + &qc_result.timestamp, + &qc_result.pipeline, + &qc_result.flag, + &qc_result.fail_condition, + ], + ) + .await }) .collect::>(); @@ -170,6 +269,67 @@ pub async fn insert_data(data: Data<'_>, conn: &mut PooledPgConn<'_>) -> Result< Ok(()) } +pub async fn qc_data( + chunks: &mut Vec>, + rove_connector: &rove_connector::Connector, + pipelines: &HashMap<(i32, RelativeDuration), rove::Pipeline>, +) -> Result, Error> { + let mut qc_results: Vec = Vec::new(); + for chunk in chunks { + let time_resolution = match chunk.time_resolution { + Some(time_resolution) => time_resolution, + // if there's no time_resolution, we can't QC + None => continue, + }; + let timestamp = chunk.timestamp.timestamp(); + + for datum in chunk.data.iter_mut() { + let time_spec = + TimeSpec::new(Timestamp(timestamp), Timestamp(timestamp), time_resolution); + let pipeline = match pipelines.get(&(datum.param_id, time_resolution)) { + Some(pipeline) => pipeline, + None => continue, + }; + let data = rove_connector + .fetch_one( + datum.timeseries_id, + &time_spec, + pipeline.num_leading_required, + pipeline.num_trailing_required, + ) + .await?; + let rove_output = rove::Scheduler::schedule_tests(pipeline, data)?; + + let first_fail = rove_output.iter().find(|check_result| { + if let Some(result) = check_result.results.first() { + if let Some(flag) = result.values.first() { + return *flag == rove::Flag::Fail; + } + } + false + }); + + let (flag, fail_condition) = match first_fail { + Some(check_result) => (1, Some(check_result.check.clone())), + None => (0, None), + }; + + datum.qc_usable = Some(flag == 0); + + qc_results.push(QcProvenance { + timeseries_id: datum.timeseries_id, + timestamp: chunk.timestamp, + // TODO: should this encode more info? In theory the param/type can be deduced from the DB anyway + pipeline: "fresh".to_string(), + flag, + fail_condition, + }); + } + } + + Ok(qc_results) +} + pub mod kldata; use kldata::{filter_and_label_kldata, parse_kldata}; @@ -193,6 +353,8 @@ async fn handle_kldata( State(pool): State, State(param_conversions): State, State(permit_table): State>>, + State(rove_connector): State>, + State(qc_pipelines): State>>, body: String, ) -> Json { let result: Result = async { @@ -200,11 +362,14 @@ async fn handle_kldata( let (message_id, obsinn_chunk) = parse_kldata(&body, param_conversions.clone())?; - let data = + let mut data = filter_and_label_kldata(obsinn_chunk, &mut conn, param_conversions, permit_table) .await?; - insert_data(data, &mut conn).await?; + // TODO: should we tolerate failure here? Perhaps there should be metric for this? + let provenance = qc_data(&mut data, &rove_connector, &qc_pipelines).await?; + + insert_data(&data, &provenance, &mut conn).await?; Ok(message_id) } @@ -255,10 +420,17 @@ pub async fn run( db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, + rove_connector: rove_connector::Connector, + qc_pipelines: HashMap<(i32, RelativeDuration), rove::Pipeline>, ) -> Result<(), Box> { // set up param conversion map let param_conversions = get_conversions(param_conversion_path)?; + // TODO: This should be fine without Arc, we can just clone it as the internal db_pool is + // already reference counted + let rove_connector = Arc::new(rove_connector); + let qc_pipelines = Arc::new(qc_pipelines); + // build our application with a single route let app = Router::new() .route("/kldata", post(handle_kldata)) @@ -266,6 +438,8 @@ pub async fn run( db_pool, param_conversions, permit_tables, + rove_connector, + qc_pipelines, }); // run our app with hyper, listening globally on port 3001 diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index a1c1cfeb..6efebf08 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,4 +1,6 @@ use bb8_postgres::PostgresConnectionManager; +use lard_ingestion::qc_pipelines::load_pipelines; +use rove_connector::Connector; use std::sync::{Arc, RwLock}; use tokio_postgres::NoTls; @@ -23,6 +25,28 @@ async fn main() -> Result<(), Box> { let permit_tables = Arc::new(RwLock::new(permissions::fetch_permits().await?)); let background_permit_tables = permit_tables.clone(); + // Set up postgres connection pool + let manager = + PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; + let db_pool = bb8::Pool::builder().build(manager).await?; + + // QC system + // NOTE: Keeping this vesion around in case we want it for the periodic checks + // let scheduler = rove::Scheduler::new( + // load_pipelines("").unwrap(), + // DataSwitch::new(HashMap::from([( + // String::from("lard"), + // Box::new(Connector { + // pool: db_pool.clone(), + // }) as Box, + // )])), + // ); + let rove_connector = Connector { + pool: db_pool.clone(), + }; + + let qc_pipelines = load_pipelines("qc_pipelines/fresh")?; + println!("Spawing task to fetch permissions from StInfoSys..."); // background task to refresh permit tables every 30 mins tokio::task::spawn(async move { @@ -42,11 +66,6 @@ async fn main() -> Result<(), Box> { } }); - // Set up postgres connection pool - let manager = - PostgresConnectionManager::new_from_stringlike(std::env::var("LARD_CONN_STRING")?, NoTls)?; - let db_pool = bb8::Pool::builder().build(manager).await?; - // Spawn kvkafka reader #[cfg(feature = "kafka_prod")] { @@ -60,5 +79,12 @@ async fn main() -> Result<(), Box> { // Set up and run our server + database println!("Ingestion server started!"); - lard_ingestion::run(db_pool, PARAMCONV, permit_tables).await + lard_ingestion::run( + db_pool, + PARAMCONV, + permit_tables, + rove_connector, + qc_pipelines, + ) + .await } diff --git a/ingestion/src/qc_pipelines.rs b/ingestion/src/qc_pipelines.rs new file mode 100644 index 00000000..feafa29b --- /dev/null +++ b/ingestion/src/qc_pipelines.rs @@ -0,0 +1,48 @@ +use chronoutil::RelativeDuration; +use rove::pipeline::{self, derive_num_leading_trailing, Pipeline}; +use serde::Deserialize; +use std::{collections::HashMap, path::Path}; + +#[derive(Deserialize)] +struct Header { + param_id: i32, + time_resolution: String, + #[allow(dead_code)] + sensor: Vec, +} + +#[derive(Deserialize)] +struct PipelineDef { + header: Header, + pipeline: Pipeline, +} + +pub fn load_pipelines( + path: impl AsRef, +) -> Result, pipeline::Error> { + std::fs::read_dir(path)? + .map(|entry| { + let entry = entry?; + + if !entry.file_type()?.is_file() { + return Err(pipeline::Error::DirectoryStructure); + } + + let mut pipeline_def: PipelineDef = + toml::from_str(&std::fs::read_to_string(entry.path())?)?; + ( + pipeline_def.pipeline.num_leading_required, + pipeline_def.pipeline.num_trailing_required, + ) = derive_num_leading_trailing(&pipeline_def.pipeline); + + let key = ( + pipeline_def.header.param_id, + // TODO: remove unwrap + RelativeDuration::parse_from_iso8601(&pipeline_def.header.time_resolution).unwrap(), + ); + + Ok(Some((key, pipeline_def.pipeline))) + }) + .filter_map(Result::transpose) + .collect() +} diff --git a/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml b/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml new file mode 100644 index 00000000..f8cd065f --- /dev/null +++ b/integration_tests/mock_qc_pipelines/fresh/TA_PT1H.toml @@ -0,0 +1,47 @@ +[header] +# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex? +# Would we be better served with a blocklist/allowlist? +# stations = "*" +param_id = 211 +time_resolution = "PT1H" +# level = "*" +sensor = [0] + +[pipeline] +[[pipeline.step]] +name = "special_value_check" +[pipeline.step.special_value_check] +special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] + +[[pipeline.step]] +name = "range_check" +[pipeline.step.range_check] +min = -55 +max = 50 + +# [[pipeline.step]] +# name = "climate_range_check" +# [pipeline.step.range_check_dynamic] +# source = "netcdf" # TODO: define a neat spec for this? + +[[pipeline.step]] +name = "step_check" +[pipeline.step.step_check] +max = 18.6 + +[[pipeline.step]] +name = "flatline_check" +[pipeline.step.flatline_check] +max = 10 + +[[pipeline.step]] +name = "spike_check" +[pipeline.step.spike_check] +max = 18.6 + +# [[pipeline.step]] +# name = "model_consistency_check" +# [pipeline.step.model_consistency_check] +# model_source = "lustre" +# model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +# threshold = 3.0 # FIXME: made up value by Ingrid diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 36eece8e..5b1e7c7e 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -24,9 +24,9 @@ async fn main() { // NOTE: order matters let schemas = [ "db/public.sql", - "db/partitions_generated.sql", "db/labels.sql", "db/flags.sql", + "db/partitions_generated.sql", ]; for schema in schemas { insert_schema(&client, schema).await.unwrap(); diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 7abc92e4..b83732ae 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -13,13 +13,14 @@ use rove::data_switch::{DataConnector, SpaceSpec, TimeSpec, Timestamp}; use tokio::sync::mpsc; use tokio_postgres::NoTls; -use lard_api::timeseries::Timeseries; -use lard_api::{LatestResp, TimeseriesResp, TimesliceResp}; -use lard_ingestion::kvkafka; -use lard_ingestion::permissions::{ - timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable, +use lard_api::{timeseries::Timeseries, LatestResp, TimeseriesResp, TimesliceResp}; +use lard_ingestion::{ + kvkafka, + permissions::{timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable}, + qc_pipelines::load_pipelines, + KldataResp, }; -use lard_ingestion::KldataResp; +use rove_connector::Connector; const CONNECT_STRING: &str = "host=localhost user=postgres dbname=postgres password=postgres"; const PARAMCONV_CSV: &str = "../ingestion/resources/paramconversions.csv"; @@ -245,6 +246,11 @@ async fn e2e_test_wrapper>(test: T) { let api_pool = db_pool.clone(); let ingestion_pool = db_pool.clone(); + let rove_connector = Connector { + pool: db_pool.clone(), + }; + let qc_pipelines = load_pipelines("mock_qc_pipelines/fresh").expect("failed to load pipelines"); + let api_server = tokio::spawn(async move { tokio::select! { output = lard_api::run(api_pool) => output, @@ -256,9 +262,12 @@ async fn e2e_test_wrapper>(test: T) { }); let ingestor = tokio::spawn(async move { tokio::select! { - output = lard_ingestion::run(ingestion_pool, - PARAMCONV_CSV, - mock_permit_tables(), + output = lard_ingestion::run( + ingestion_pool, + PARAMCONV_CSV, + mock_permit_tables(), + rove_connector, + qc_pipelines, ) => output, _ = init_shutdown_rx2.recv() => { ingestor_shutdown_tx.send(()).unwrap(); @@ -514,7 +523,11 @@ async fn test_timeslice_endpoint() { let client = reqwest::Client::new(); for ts in &test_data { let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; - assert_eq!(ingestor_resp.res, 0); + assert_eq!( + ingestor_resp.res, 0, + "ingestor_resp.message: {}", + ingestor_resp.message + ); } for param in ¶ms { diff --git a/qc_pipelines/fresh/TA_PT1H.toml b/qc_pipelines/fresh/TA_PT1H.toml new file mode 100644 index 00000000..f8cd065f --- /dev/null +++ b/qc_pipelines/fresh/TA_PT1H.toml @@ -0,0 +1,47 @@ +[header] +# TODO: reconsider format of station/level/sensor filtering. Is this supposed to be a regex? +# Would we be better served with a blocklist/allowlist? +# stations = "*" +param_id = 211 +time_resolution = "PT1H" +# level = "*" +sensor = [0] + +[pipeline] +[[pipeline.step]] +name = "special_value_check" +[pipeline.step.special_value_check] +special_values = [-999999, -6999, -99.9, -99.8, 999, 6999, 9999] + +[[pipeline.step]] +name = "range_check" +[pipeline.step.range_check] +min = -55 +max = 50 + +# [[pipeline.step]] +# name = "climate_range_check" +# [pipeline.step.range_check_dynamic] +# source = "netcdf" # TODO: define a neat spec for this? + +[[pipeline.step]] +name = "step_check" +[pipeline.step.step_check] +max = 18.6 + +[[pipeline.step]] +name = "flatline_check" +[pipeline.step.flatline_check] +max = 10 + +[[pipeline.step]] +name = "spike_check" +[pipeline.step.spike_check] +max = 18.6 + +# [[pipeline.step]] +# name = "model_consistency_check" +# [pipeline.step.model_consistency_check] +# model_source = "lustre" +# model_args = "arome/air_temperature" # TODO: verify if we need more args than this for the model +# threshold = 3.0 # FIXME: made up value by Ingrid diff --git a/rove_connector/src/lib.rs b/rove_connector/src/lib.rs index fe3b6f9a..2456a7ec 100644 --- a/rove_connector/src/lib.rs +++ b/rove_connector/src/lib.rs @@ -91,7 +91,7 @@ fn regularize( } impl Connector { - async fn fetch_one( + pub async fn fetch_one( &self, ts_id: i32, time_spec: &TimeSpec, diff --git a/util/Cargo.toml b/util/Cargo.toml new file mode 100644 index 00000000..0d62e251 --- /dev/null +++ b/util/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "util" +version = "0.1.0" +edition.workspace = true + +[[bin]] +name = "generate_partition_queries" + +[dependencies] +chrono.workspace = true diff --git a/util/src/bin/generate_partition_queries.rs b/util/src/bin/generate_partition_queries.rs new file mode 100644 index 00000000..4fe2008a --- /dev/null +++ b/util/src/bin/generate_partition_queries.rs @@ -0,0 +1,59 @@ +use chrono::{DateTime, TimeZone, Utc}; +use std::{ + fs::File, + io::{BufWriter, Write}, +}; + +fn create_table_partitions( + table: &str, + boundaries: &[DateTime], + writer: &mut BufWriter, +) -> Result<(), std::io::Error> { + // .windows(2) gives a 2-wide sliding view of the vector, so we can see + // both bounds relevant to a partition + for window in boundaries.windows(2) { + let start_time = window[0]; + let end_time = window[1]; + + let line = format!( + "CREATE TABLE IF NOT EXISTS {}_y{}_to_y{} PARTITION OF {}\nFOR VALUES FROM ('{}') TO ('{}');\n", + table, + start_time.format("%Y"), + end_time.format("%Y"), + table, + start_time.format("%Y-%m-%d %H:%M:%S+00"), + end_time.format("%Y-%m-%d %H:%M:%S+00") + ); + writer.write_all(line.as_bytes())?; + } + + Ok(()) +} + +fn main() -> Result<(), std::io::Error> { + let outfile = File::create("../db/partitions_generated.sql")?; + let mut writer = BufWriter::new(outfile); + + writer.write_all("-- Generated by util/src/bin/generate_partition_queries.rs\n".as_bytes())?; + + // create a vector of the boundaries between partitions + let paritition_boundary_years: Vec> = [1850, 1950, 2000, 2010] + .into_iter() + .chain(2015..=2030) + .map(|y| Utc.with_ymd_and_hms(y, 1, 1, 0, 0, 0).unwrap()) + .collect(); + + create_table_partitions("public.data", &paritition_boundary_years, &mut writer)?; + create_table_partitions( + "public.nonscalar_data", + &paritition_boundary_years, + &mut writer, + )?; + create_table_partitions( + "flags.confident_provenance", + &paritition_boundary_years, + &mut writer, + )?; + + Ok(()) +} diff --git a/util/src/lib.rs b/util/src/lib.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/util/src/lib.rs @@ -0,0 +1 @@ +