Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve copy and delete #52

Merged
merged 10 commits into from
Dec 31, 2024
13 changes: 7 additions & 6 deletions cdsobs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sqlalchemy.orm import Session

from cdsobs.cdm.api import (
apply_unit_changes,
check_cdm_compliance,
define_units,
)
from cdsobs.config import CDSObsConfig, DatasetConfig
from cdsobs.ingestion.api import (
Expand Down Expand Up @@ -294,11 +294,12 @@ def _read_homogenise_and_partition(
# Check CDM compliance
check_cdm_compliance(homogenised_data, dataset_metadata.cdm_tables)
# Apply unit changes
homogenised_data = apply_unit_changes(
homogenised_data,
service_definition.sources[source],
dataset_metadata.cdm_code_tables["observed_variable"],
)
if "units" not in homogenised_data.columns:
homogenised_data = define_units(
homogenised_data,
service_definition.sources[source],
dataset_metadata.cdm_code_tables["observed_variable"],
)
year = time_space_batch.time_batch.year
lon_tile_size = dataset_config.get_tile_size("lon", source, year)
lat_tile_size = dataset_config.get_tile_size("lat", source, year)
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def check_cdm_compliance(
return table_field_mappings


def apply_unit_changes(
def define_units(
homogenised_data: pandas.DataFrame,
source_definition: SourceDefinition,
cdm_variable_table: CDMCodeTable,
Expand Down
4 changes: 3 additions & 1 deletion cdsobs/cli/_copy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def copy_dataset(


def _copy_dataset_impl(cdsobs_config_yml, dataset, dest_config_yml, dest_dataset):
if dest_dataset is None:
dest_dataset = dataset
check_params(dest_config_yml, dataset, dest_dataset)

try:
Expand Down Expand Up @@ -191,7 +193,7 @@ def catalogue_copy(
for entry in entries:
# This is needed to load the constraints as it is a deferred attribute.
# However if we load them the other attributes will dissappear from __dict__
# There is no way apparently if doing this better in sqlalchemy
# There is no way apparently of doing this better in sqlalchemy
entry_dict = {
col.name: getattr(entry, col.name) for col in entry.__table__.columns
}
Expand Down
42 changes: 30 additions & 12 deletions cdsobs/cli/_delete_dataset.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from pathlib import Path

import dask
import sqlalchemy.orm
import typer
from click import prompt
from fastapi.encoders import jsonable_encoder
from rich.console import Console
from sqlalchemy import delete, func, select

from cdsobs.cli._utils import (
config_yml_typer,
list_parser,
)
from cdsobs.config import CDSObsConfig
from cdsobs.observation_catalogue.database import get_session
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository
from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository
from cdsobs.observation_catalogue.schemas.catalogue import (
CatalogueSchema,
Expand All @@ -26,7 +30,9 @@
def delete_dataset(
cdsobs_config_yml: Path = config_yml_typer,
dataset: str = typer.Option(..., help="dataset to delete", prompt=True),
dataset_source: str = typer.Option("", help="dataset source to delete"),
dataset_source: str = typer.Option(
None, help="dataset source to delete. By default it will delete all."
),
time: str = typer.Option(
"",
help="Filter by an exact date or by an interval of two dates. For example: "
Expand All @@ -35,7 +41,7 @@ def delete_dataset(
):
"""Permanently delete the given dataset from the catalogue and the storage."""
confirm = prompt(
"This will delete the dataset permanently."
"This will delete the data permanently."
" This action cannot be undone. "
"Please type again the name of the dataset to confirm"
)
Expand All @@ -55,8 +61,15 @@ def delete_dataset(
except (Exception, KeyboardInterrupt):
catalogue_rollback(catalogue_session, deleted_entries)
raise
if len(deleted_entries):
console.print(f"[bold green] Dataset {dataset} deleted. [/bold green]")
nd = len(deleted_entries)
console.print(f"[bold green] {nd} entries deleted from {dataset}. [/bold green]")
nremaining = catalogue_session.scalar(select(func.count()).select_from(Catalogue))
if nremaining == 0:
CadsDatasetRepository(catalogue_session).delete_dataset(dataset)
console.print(
f"[bold green] Deleted {dataset} from datasets table as it was left empty. "
f"[/bold green]"
)


def delete_from_catalogue(
Expand All @@ -78,22 +91,27 @@ def delete_from_catalogue(
entries = catalogue_repo.get_by_filters(filters)
if not len(entries):
console.print(f"[red] No entries for dataset {dataset} found")
deleted_entries = []
try:
for entry in entries:
catalogue_repo.remove(record_id=entry.id)
deleted_entries.append(entry)
catalogue_session.execute(delete(Catalogue).where(*filters))
catalogue_session.commit()
except (Exception, KeyboardInterrupt):
catalogue_rollback(catalogue_session, deleted_entries)
return deleted_entries
catalogue_rollback(catalogue_session, entries)
return entries


def delete_from_s3(deleted_entries, s3_client):
assets = [e.asset for e in deleted_entries]
for asset in assets:
bucket, name = asset.split("/")

def delete_asset(asset_to_delete):
bucket, name = asset_to_delete.split("/")
s3_client.delete_file(bucket, name)

delayed_deletes = []
for asset in assets:
delayed_deletes.append(dask.delayed(delete_asset)(asset))

dask.compute(*delayed_deletes)


def catalogue_rollback(catalogue_session, deleted_entries):
schemas = [CatalogueSchema(**jsonable_encoder(e)) for e in deleted_entries]
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

DATE_FORMAT = "%Y-%m-%d, %H:%M:%S"

CONFIG_YML = Path(cdsobs_path, "data/cdsobs_config_template.yml")
CONFIG_YML = Path(Path(cdsobs_path).parent, "tests", "data", "cdsobs_config_test.yml")

DS_TEST_NAME = "insitu-observations-woudc-ozone-total-column-and-profiles"

Expand Down
28 changes: 14 additions & 14 deletions cdsobs/data/cdsobs_config_template.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
ingestion_databases:
main:
db_user: user
pwd: testpass
host: localhost
port: 25432
db_name: baron
db_user: someuser
pwd: somepassword
host: somehost
port: 5431
db_name: ingestion
catalogue_db:
db_user: docker
db_user: someuser
pwd: docker
host: localhost
port: 5433
db_name: cataloguedbtest
port: 5432
db_name: catalogue-dev
s3config:
access_key: minioadmin
secret_key: minioadmin
host: 127.0.0.1
port: 9000
secure: false
namespace: cds2-obs-dev
access_key: somekey
secret_key: some_secret_key
host: object-store.os-api.cci2.ecmwf.int
port: 443
secure: true
namespace: somenamespace
datasets:
- name: insitu-observations-woudc-ozone-total-column-and-profiles
lon_tile_size: 180
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/ingestion/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def validate_and_homogenise(
data_renamed = data
# Add z coordinate if needed
if (
"z_coordinate" not in data_renamed
"z_coordinate" not in data_renamed.columns
and source_definition.space_columns is not None
and source_definition.space_columns.z is not None
):
Expand Down
10 changes: 6 additions & 4 deletions cdsobs/ingestion/readers/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from pathlib import Path
from typing import Tuple

import pandas
import xarray

from cdsobs.config import CDSObsConfig
from cdsobs.ingestion.api import EmptyBatchException
from cdsobs.ingestion.core import TimeSpaceBatch
from cdsobs.retrieve.filter_datasets import get_var_code_dict
from cdsobs.service_definition.service_definition_models import ServiceDefinition
from cdsobs.utils.logutils import get_logger

Expand All @@ -20,7 +20,7 @@ def read_flat_netcdfs(
source: str,
time_space_batch: TimeSpaceBatch,
input_dir: str,
) -> Tuple[pandas.DataFrame, pandas.Series]:
) -> pandas.DataFrame:
if time_space_batch.space_batch != "global":
logger.warning("This reader does not support subsetting in space.")
time_batch = time_space_batch.time_batch
Expand All @@ -30,7 +30,9 @@ def read_flat_netcdfs(
)
if netcdf_path.exists():
data = xarray.open_dataset(netcdf_path).to_pandas()
data_types = data.dtypes
else:
raise EmptyBatchException
return data, data_types # type: ignore
# Decode variable names
code_dict = get_var_code_dict(config.cdm_tables_location)
data["observed_variable"] = data["observed_variable"].map(code_dict)
return data
51 changes: 33 additions & 18 deletions cdsobs/ingestion/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas

from cdsobs import constants
from cdsobs.cdm.api import CdmDataset, to_cdm_dataset
from cdsobs.cdm.api import CdmDataset, define_units, to_cdm_dataset
from cdsobs.config import CDSObsConfig
from cdsobs.ingestion.api import read_batch_data
from cdsobs.ingestion.core import (
Expand Down Expand Up @@ -131,24 +131,10 @@ def to_netcdf(
# Encode variable names as integer
if encode_variables:
logger.info("Encoding observed variables using the CDM variable codes.")
code_table = cdm_dataset.dataset_params.cdm_code_tables[
"observed_variable"
].table
# strip to remove extra spaces
var2code = get_var2code(code_table)
encoded_data = (
cdm_dataset.dataset["observed_variable"]
.str.encode("UTF-8")
.map(var2code)
.astype("uint8")
)
cdm_code_tables = cdm_dataset.dataset_params.cdm_code_tables
data = cdm_dataset.dataset
encoded_data, var2code_subset = encode_observed_variables(cdm_code_tables, data)
cdm_dataset.dataset["observed_variable"] = encoded_data
codes_in_data = encoded_data.unique()
var2code_subset = {
var.decode("ascii"): code
for var, code in var2code.items()
if code in codes_in_data
}
encoding["observed_variable"]["dtype"] = encoded_data.dtype
attrs["observed_variable"] = dict(
labels=list(var2code_subset), codes=list(var2code_subset.values())
Expand All @@ -166,6 +152,22 @@ def to_netcdf(
return output_path


def encode_observed_variables(cdm_code_tables, data):
code_table = cdm_code_tables["observed_variable"].table
# strip to remove extra spaces
var2code = get_var2code(code_table)
encoded_data = (
data["observed_variable"].str.encode("UTF-8").map(var2code).astype("uint8")
)
codes_in_data = encoded_data.unique()
var2code_subset = {
var.decode("ascii"): code
for var, code in var2code.items()
if code in codes_in_data
}
return encoded_data, var2code_subset


def get_var2code(code_table):
code_dict = pandas.Series(
index=code_table["name"].str.strip().str.replace(" ", "_").str.encode("ascii"),
Expand Down Expand Up @@ -280,15 +282,28 @@ def batch_to_netcdf(
for field in homogenised_data:
if homogenised_data[field].dtype == "string":
homogenised_data[field] = homogenised_data[field].str.encode("UTF-8")
homogenised_data = define_units(
homogenised_data,
service_definition.sources[source],
dataset_params.cdm_code_tables["observed_variable"],
)
encoded_data, var2code_subset = encode_observed_variables(
dataset_params.cdm_code_tables, homogenised_data
)
homogenised_data["observed_variable"] = encoded_data
homogenised_data_xr = homogenised_data.to_xarray()
if service_definition.global_attributes is not None:
homogenised_data.attrs = {
**homogenised_data.attrs,
**service_definition.global_attributes,
}
homogenised_data_xr["observed_variable"].attrs = dict(
labels=list(var2code_subset), codes=list(var2code_subset.values())
)
encoding = get_encoding_with_compression_xarray(
homogenised_data_xr, string_transform="str_to_char"
)
encoding["observed_variable"]["dtype"] = str(encoded_data.dtype)
logger.info(f"Writing de-normalized and CDM mapped data to {output_path}")
homogenised_data_xr.to_netcdf(
output_path, encoding=encoding, engine="netcdf4", format="NETCDF4"
Expand Down
4 changes: 4 additions & 0 deletions cdsobs/observation_catalogue/repositories/cads_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ def get_dataset(self, dataset_name: str) -> CadsDataset | None:
return self.session.scalar(
sa.select(CadsDataset).filter(CadsDataset.name == dataset_name)
)

def delete_dataset(self, dataset_name: str):
dataset = self.get_dataset(dataset_name)
self.session.delete(dataset)
Loading
Loading