Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: remove stac record creation #20

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions hazard_workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ $graph:
store:
type: string
default: "./indicator"
inventory_format:
type: string
default: "osc"
write_xarray_compatible_zarr:
type: boolean
default: false
Expand Down Expand Up @@ -80,7 +77,6 @@ $graph:
window_years: window_years
indicator: indicator
store: store
inventory_format: inventory_format
write_xarray_compatible_zarr: write_xarray_compatible_zarr
dask_cluster_kwargs: dask_cluster_kwargs
out:
Expand All @@ -92,7 +88,7 @@ $graph:

hints:
DockerRequirement:
dockerPull: public.ecr.aws/c9k5s3u3/os-hazard-indicator:ed6f5d1
dockerPull: public.ecr.aws/c9k5s3u3/os-hazard-indicator:14edea7

requirements:
ResourceRequirement:
Expand Down Expand Up @@ -135,8 +131,6 @@ $graph:
type: string
store:
type: string
inventory_format:
type: string
write_xarray_compatible_zarr:
type: boolean
dask_cluster_kwargs:
Expand Down Expand Up @@ -172,8 +166,6 @@ $graph:
valueFrom: $(inputs.central_year_historical)
- prefix: --window_years
valueFrom: $(inputs.window_years)
- prefix: --inventory_format
valueFrom: $(inputs.inventory_format)
- prefix: --write_xarray_compatible_zarr
valueFrom: $(inputs.write_xarray_compatible_zarr)
- prefix: --dask_cluster_kwargs
Expand Down
1 change: 0 additions & 1 deletion hazard_workflow_input_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ store: "./indicator" # default value of type 'string'.
source_dataset_kwargs: "{}" # default value of type 'string'.
source_dataset: a_string # type 'string'
scenario_list: a_string # type 'string'
inventory_format: "osc" # default value of type 'string'.
indicator: "days_tas_above_indicator" # default value of type 'string'.
gcm_list: a_string # type 'string'
dask_cluster_kwargs: "{'n_workers': 1, 'threads_per_worker': 1}" # default value of type 'string'.
Expand Down
6 changes: 0 additions & 6 deletions src/hazard/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ def days_tas_above_indicator(
bucket: Optional[str] = None,
prefix: Optional[str] = None,
store: Optional[str] = None,
inventory_format: Optional[str] = "osc",
write_xarray_compatible_zarr: Optional[bool] = False,
dask_cluster_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
):
hazard_services.days_tas_above_indicator(
source_dataset,
Expand All @@ -36,7 +34,6 @@ def days_tas_above_indicator(
prefix,
store,
write_xarray_compatible_zarr,
inventory_format,
dask_cluster_kwargs,
)

Expand All @@ -54,9 +51,7 @@ def degree_days_indicator(
prefix: Optional[str] = None,
store: Optional[str] = None,
write_xarray_compatible_zarr: Optional[bool] = False,
inventory_format: Optional[str] = "osc",
dask_cluster_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
):
hazard_services.degree_days_indicator(
source_dataset,
Expand All @@ -71,7 +66,6 @@ def degree_days_indicator(
prefix,
store,
write_xarray_compatible_zarr,
inventory_format,
dask_cluster_kwargs,
)

Expand Down
79 changes: 1 addition & 78 deletions src/hazard/docs_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import os
from typing import Any, Dict, Iterable, List, Optional, cast
from typing import Dict, Iterable, List, Optional

import s3fs # type: ignore
from fsspec import AbstractFileSystem # type: ignore
Expand Down Expand Up @@ -92,80 +92,12 @@ def write_new_empty_inventory(self):
with self._fs.open(path, "w") as f:
f.write(json_str)

def write_inventory_stac(self, resources: Iterable[HazardResource]):
"""Write a hazard models inventory as STAC."""

if self._fs == s3fs.S3FileSystem:
path_root = self._root
else:
path_root = "."

items = HazardResources(resources=list(resources)).to_stac_items(
path_root=path_root, items_as_dicts=True
)
items_cast: List[Dict[str, Any]] = cast(List[Dict[str, Any]], items)
for it in items_cast:
with self._fs.open(self._full_path_stac_item(id=it["id"]), "w") as f:
f.write(json.dumps(it, indent=4))
catalog_path = self._full_path_stac_catalog()
catalog = self.stac_catalog(items=items_cast)
with self._fs.open(catalog_path, "w") as f:
json_str = json.dumps(catalog, indent=4)
f.write(json_str)
collection_path = self._full_path_stac_collection()
collection = self.stac_collection(items=items_cast)
with self._fs.open(collection_path, "w") as f:
json_str = json.dumps(collection, indent=4)
f.write(json_str)

def stac_catalog(self, items: List[Dict[str, Any]]) -> Dict[str, Any]:
links = [
{"rel": "self", "href": "./catalog.json"},
{"rel": "root", "href": "./catalog.json"},
{"rel": "child", "href": "./collection.json"},
]
links.extend([{"rel": "item", "href": f"./{x['id']}.json"} for x in items])
return {
"stac_version": "1.0.0",
"id": "osc-hazard-indicators-catalog",
"type": "Catalog",
"description": "OS-C hazard indicators catalog",
"links": links,
}

def stac_collection(self, items: List[Dict[str, Any]]) -> Dict[str, Any]:
links = [
{"rel": "self", "type": "application/json", "href": "./collection.json"},
{"rel": "root", "type": "application/json", "href": "./catalog.json"},
]
links.extend([{"rel": "item", "href": f"./{x['id']}.json"} for x in items])
return {
"stac_version": "1.0.0",
"type": "Collection",
"stac_extensions": [],
"id": "osc-hazard-indicators-collection",
"title": "OS-C hazard indicators collection",
"description": "OS-C hazard indicators collection",
"license": "CC-BY-4.0",
"extent": {
"spatial": {"bbox": [[-180, -90, 180, 90]]},
"temporal": {
"interval": [["1950-01-01T00:00:00Z", "2100-12-31T23:59:59Z"]]
},
},
"providers": [
{"name": "UKRI", "roles": ["producer"], "url": "https://www.ukri.org/"}
],
"links": links,
}

def update_inventory(
self, resources: Iterable[HazardResource], remove_existing: bool = False
):
"""Add the hazard models provided to the inventory. If a model with the same key
(hazard type and id) exists, replace."""

# if format == stac, we do a round trip, stac -> osc -> stac.
path = self._full_path_inventory()
combined = (
{} if remove_existing else dict((i.key(), i) for i in self.read_inventory())
Expand Down Expand Up @@ -194,12 +126,3 @@ def _full_path_doc(self, path: str):

def _full_path_inventory(self):
return str(os.path.join(self._root, "inventory.json"))

def _full_path_stac_item(self, id: str):
return str(os.path.join(self._root, f"{id}.json"))

def _full_path_stac_catalog(self):
return str(os.path.join(self._root, "catalog.json"))

def _full_path_stac_collection(self):
return str(os.path.join(self._root, "collection.json"))
132 changes: 1 addition & 131 deletions src/hazard/inventory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import datetime
import itertools
import json
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

import pystac
import shapely
from pydantic import BaseModel, Field


Expand Down Expand Up @@ -125,135 +120,10 @@ def key(self):
selects based on its own logic (e.g. selects a particular General Circulation Model)."""
return self.path

def to_stac_items(
self, path_root: str, items_as_dicts: bool = False
) -> List[Union[pystac.Item, Dict]]:
"""
converts a hazard resource to a list of STAC items. One unique set of parameter values and scenarios results
in a single STAC item in the list.
"""
keys, values = zip(*self.params.items())
params_permutations = list(itertools.product(*values))
params_permutations_dicts = [dict(zip(keys, p)) for p in params_permutations]

scenarios_permutations = []
for s in self.scenarios:
for y in s.years:
scenarios_permutations.append({"scenario": s.id, "year": y})

permutations = [
dict(**param, **scenario)
for param, scenario in itertools.product(
params_permutations_dicts, scenarios_permutations
)
]

items = []

for p in permutations:
items.append(
self.to_stac_item(
path_root=path_root,
combined_parameters=p,
item_as_dict=items_as_dicts,
)
)

return items

def to_stac_item(
self,
path_root: str,
combined_parameters: Dict[Any, str],
item_as_dict: bool = False,
) -> Union[pystac.Item, Dict]:
"""
converts a hazard resource along with combined parameters (params and scenarios) to a single STAC item.
"""
data_asset_path = self.path.format(**combined_parameters)
item_id = data_asset_path.replace("/", "_")
osc_properties = self.model_dump()
osc_properties = {
f"osc-hazard:{k}": osc_properties[k] for k in osc_properties.keys()
}

asset = pystac.Asset(
href=f"{path_root}/{data_asset_path}",
title="zarr directory",
description="directory containing indicators data as zarr arrays",
media_type=pystac.MediaType.ZARR,
roles=["data"],
)

link = pystac.Link(
rel="collection", media_type="application/json", target="./collection.json"
)

coordinates = deepcopy(self.map.bounds) if self.map else None

# GeoJSON requires the coordinates of a LineRing to have the same beginning and end
if coordinates:
coordinates.append(coordinates[0])

bbox = self.map.bbox if self.map else None
stac_item = pystac.Item(
id=item_id,
geometry={"type": "Polygon", "coordinates": [coordinates]},
bbox=bbox,
datetime=None,
start_datetime=datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc),
end_datetime=datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc),
properties=osc_properties,
collection="osc-hazard-indicators",
assets={"data": asset},
)

stac_item.add_link(link)

stac_item.validate()

is_valid_response = shapely.is_valid_reason(
shapely.from_geojson(json.dumps(stac_item.to_dict()))
)
if is_valid_response != "Valid Geometry":
raise Exception(f"STAC Item is not valid: {is_valid_response}")

templated_out_item = self._expand_template_values_for_stac_record(
combined_parameters, stac_item
)

if item_as_dict:
return templated_out_item.to_dict()
else:
return templated_out_item

def _expand_template_values_for_stac_record(
self, combined_parameters: Dict[Any, str], stac_item: pystac.Item
):
item_string = json.dumps(stac_item.to_dict())
for k, v in combined_parameters.items():
item_string = item_string.replace(f"{{{k}}}", str(v))
templated_out_item = stac_item.from_dict(json.loads(item_string))
templated_out_item.validate()
return templated_out_item


class HazardResources(BaseModel):
resources: List[HazardResource]

def to_stac_items(
self, path_root: str, items_as_dicts: bool = False
) -> List[Union[pystac.Item, Dict]]:
"""
converts hazard resources to a list of STAC items.
"""
stac_items_lists = [
resource.to_stac_items(path_root=path_root, items_as_dicts=items_as_dicts)
for resource in self.resources
]
stac_items_flat = list(itertools.chain(*stac_items_lists))
return stac_items_flat


def expand(item: str, key: str, param: str):
return item and item.replace("{" + key + "}", param)
Expand Down
19 changes: 2 additions & 17 deletions src/hazard/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from fsspec.implementations.local import LocalFileSystem

from hazard.docs_store import DocStore # type: ignore # noqa: E402
from hazard.indicator_model import IndicatorModel
from hazard.models.days_tas_above import DaysTasAboveIndicator # noqa: E402
from hazard.models.degree_days import DegreeDays # noqa: E402
from hazard.sources import SourceDataset, get_source_dataset_instance
Expand All @@ -30,7 +29,6 @@ def days_tas_above_indicator(
prefix: Optional[str] = None,
store: Optional[str] = None,
write_xarray_compatible_zarr: Optional[bool] = False,
inventory_format: Optional[str] = "osc",
dask_cluster_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Expand Down Expand Up @@ -60,7 +58,7 @@ def days_tas_above_indicator(
source_dataset=source_dataset,
)

_write_inventory_files(docs_store, inventory_format, model)
docs_store.update_inventory(model.inventory())

model.run_all(source, target, client=client)

Expand All @@ -78,7 +76,6 @@ def degree_days_indicator(
prefix: Optional[str] = None,
store: Optional[str] = None,
write_xarray_compatible_zarr: Optional[bool] = False,
inventory_format: Optional[str] = "osc",
dask_cluster_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Expand Down Expand Up @@ -108,23 +105,11 @@ def degree_days_indicator(
source_dataset=source_dataset,
)

_write_inventory_files(docs_store, inventory_format, model)
docs_store.update_inventory(model.inventory())

model.run_all(source, target, client=client)


def _write_inventory_files(
docs_store: DocStore, inventory_format: Optional[str], model: IndicatorModel
):
if inventory_format == "stac":
docs_store.write_inventory_stac(model.inventory())
elif inventory_format == "osc":
docs_store.update_inventory(model.inventory())
elif inventory_format == "all":
docs_store.write_inventory_stac(model.inventory())
docs_store.update_inventory(model.inventory())


def setup(
bucket: Optional[str] = None,
prefix: Optional[str] = None,
Expand Down
Loading
Loading