Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rove for QC in the ingestor #42

Open
wants to merge 18 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [
"api",
"ingestion",
"integration_tests",
"rove_connector",
"rove_connector", "util",
]
resolver = "2"

Expand All @@ -27,8 +27,10 @@ quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ]
rand = "0.8.5"
rand_distr = "0.4.3"
regex = "1.11.1"
rove = { git = "https://github.com/metno/rove.git" }
rove = { git = "https://github.com/metno/rove.git", branch = "lard_fixes" }
# rove = { git = "https://github.com/metno/rove.git" }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be removed before merging

serde = { version = "1.0.217", features = ["derive"] }
thiserror = "1.0.69"
tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] }
tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] }
toml = "0.8.19"
15 changes: 15 additions & 0 deletions db/flags.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
CREATE SCHEMA IF NOT EXISTS flags;

-- TODO: should this also have a column for qc_time or some such?
CREATE TABLE IF NOT EXISTS flags.confident_provenance (
timeseries INT4 NOT NULL,
obstime TIMESTAMPTZ NOT NULL,
pipeline TEXT NOT NULL,
-- TODO: should this be an enum?
flag INT4 NOT NULL,
-- TODO: better name? since this might be applied to flags that aren't fail but also aren't pass?
fail_condition TEXT NULL,
CONSTRAINT unique_confident_providence_timeseries_obstime_pipeline UNIQUE (timeseries, obstime, pipeline),
CONSTRAINT fk_confident_providence_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries
) PARTITION BY RANGE (obstime);
CREATE INDEX IF NOT EXISTS confident_provenance_timestamp_index ON flags.confident_provenance (obstime);
CREATE INDEX IF NOT EXISTS confident_provenance_timeseries_index ON flags.confident_provenance USING HASH (timeseries);

CREATE TABLE IF NOT EXISTS flags.kvdata (
timeseries INT4 REFERENCES public.timeseries,
obstime TIMESTAMPTZ NOT NULL,
Expand Down
116 changes: 77 additions & 39 deletions db/partitions_generated.sql
Original file line number Diff line number Diff line change
@@ -1,77 +1,115 @@
-- Generated by simple script for testing
CREATE TABLE IF NOT EXISTS data_y1850_to_y1950 PARTITION OF public.data
-- Generated by util/src/bin/generate_partition_queries.rs
CREATE TABLE IF NOT EXISTS public.data_y1850_to_y1950 PARTITION OF public.data
FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y1950_to_y2000 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y1950_to_y2000 PARTITION OF public.data
FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2000_to_y2010 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2000_to_y2010 PARTITION OF public.data
FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2010_to_y2015 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2010_to_y2015 PARTITION OF public.data
FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2015_to_y2016 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2015_to_y2016 PARTITION OF public.data
FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2016_to_y2017 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2016_to_y2017 PARTITION OF public.data
FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2017_to_y2018 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2017_to_y2018 PARTITION OF public.data
FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2018_to_y2019 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2018_to_y2019 PARTITION OF public.data
FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2019_to_y2020 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2019_to_y2020 PARTITION OF public.data
FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2020_to_y2021 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2020_to_y2021 PARTITION OF public.data
FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2021_to_y2022 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2021_to_y2022 PARTITION OF public.data
FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2022_to_y2023 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2022_to_y2023 PARTITION OF public.data
FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2023_to_y2024 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2023_to_y2024 PARTITION OF public.data
FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2024_to_y2025 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2024_to_y2025 PARTITION OF public.data
FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2025_to_y2026 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2025_to_y2026 PARTITION OF public.data
FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2026_to_y2027 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2026_to_y2027 PARTITION OF public.data
FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2027_to_y2028 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2027_to_y2028 PARTITION OF public.data
FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2028_to_y2029 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2028_to_y2029 PARTITION OF public.data
FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS data_y2029_to_y2030 PARTITION OF public.data
CREATE TABLE IF NOT EXISTS public.data_y2029_to_y2030 PARTITION OF public.data
FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1850_to_y1950 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y1950_to_y2000 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2000_to_y2010 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2010_to_y2015 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2015_to_y2016 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2016_to_y2017 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2017_to_y2018 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2018_to_y2019 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2019_to_y2020 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2020_to_y2021 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2021_to_y2022 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2022_to_y2023 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2023_to_y2024 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2024_to_y2025 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2025_to_y2026 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2026_to_y2027 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2027_to_y2028 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2028_to_y2029 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data
CREATE TABLE IF NOT EXISTS public.nonscalar_data_y2029_to_y2030 PARTITION OF public.nonscalar_data
FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1850_to_y1950 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('1850-01-01 00:00:00+00') TO ('1950-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y1950_to_y2000 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('1950-01-01 00:00:00+00') TO ('2000-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2000_to_y2010 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2000-01-01 00:00:00+00') TO ('2010-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2010_to_y2015 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2010-01-01 00:00:00+00') TO ('2015-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2015_to_y2016 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2015-01-01 00:00:00+00') TO ('2016-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2016_to_y2017 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2016-01-01 00:00:00+00') TO ('2017-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2017_to_y2018 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2017-01-01 00:00:00+00') TO ('2018-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2018_to_y2019 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2018-01-01 00:00:00+00') TO ('2019-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2019_to_y2020 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2019-01-01 00:00:00+00') TO ('2020-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2020_to_y2021 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2020-01-01 00:00:00+00') TO ('2021-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2021_to_y2022 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2021-01-01 00:00:00+00') TO ('2022-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2022_to_y2023 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2022-01-01 00:00:00+00') TO ('2023-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2023_to_y2024 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2023-01-01 00:00:00+00') TO ('2024-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2024_to_y2025 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2024-01-01 00:00:00+00') TO ('2025-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2025_to_y2026 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2025-01-01 00:00:00+00') TO ('2026-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2026_to_y2027 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2026-01-01 00:00:00+00') TO ('2027-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2027_to_y2028 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2027-01-01 00:00:00+00') TO ('2028-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2028_to_y2029 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2028-01-01 00:00:00+00') TO ('2029-01-01 00:00:00+00');
CREATE TABLE IF NOT EXISTS flags.confident_provenance_y2029_to_y2030 PARTITION OF flags.confident_provenance
FOR VALUES FROM ('2029-01-01 00:00:00+00') TO ('2030-01-01 00:00:00+00');
4 changes: 4 additions & 0 deletions db/public.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ CREATE TABLE IF NOT EXISTS public.data (
timeseries INT4 NOT NULL,
obstime TIMESTAMPTZ NOT NULL,
obsvalue REAL,
-- TODO: should qc_usable be NOT NULL? and default to true?
-- It would make greatly reduce the update load when QCing old data
qc_usable BOOLEAN,
CONSTRAINT unique_data_timeseries_obstime UNIQUE (timeseries, obstime),
CONSTRAINT fk_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries
) PARTITION BY RANGE (obstime);
Expand All @@ -43,6 +46,7 @@ CREATE TABLE IF NOT EXISTS public.nonscalar_data (
timeseries INT4 NOT NULL,
obstime TIMESTAMPTZ NOT NULL,
obsvalue TEXT,
qc_usable BOOLEAN,
CONSTRAINT unique_nonscalar_data_timeseries_obstime UNIQUE (timeseries, obstime),
CONSTRAINT fk_nonscalar_data_timeseries FOREIGN KEY (timeseries) REFERENCES public.timeseries
) PARTITION BY RANGE (obstime);
Expand Down
4 changes: 4 additions & 0 deletions ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ bb8.workspace = true
bb8-postgres.workspace = true
bytes.workspace = true
chrono.workspace = true
chronoutil.workspace = true
csv.workspace = true
futures.workspace = true
kafka.workspace = true
quick-xml.workspace = true
regex.workspace = true
rove.workspace = true
rove_connector = { path = "../rove_connector" }
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
toml.workspace = true
Loading
Loading