From 72aeda7d2be14d31a04a5d2d8e089558ae20955f Mon Sep 17 00:00:00 2001 From: Marco Angheben Date: Mon, 11 Nov 2024 07:36:33 +0100 Subject: [PATCH 1/2] filter on forecast later in time with respect to last observation - issue #38 --- pollution_v2/src/common/connector/common.py | 2 +- pollution_v2/src/common/data_model/station.py | 4 ++- .../src/road_weather/manager/road_weather.py | 36 ++++++++++++++----- .../road_weather/model/road_weather_model.py | 2 +- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/pollution_v2/src/common/connector/common.py b/pollution_v2/src/common/connector/common.py index b92ada6..a78b124 100644 --- a/pollution_v2/src/common/connector/common.py +++ b/pollution_v2/src/common/connector/common.py @@ -351,7 +351,7 @@ def get_measures(self, from_date: datetime, to_date: datetime, station: Optional query_params["where"] = f'and({",".join(where_conds)})' logger.debug(f"Retrieving measures on [{type(self).__name__}] from date [{iso_from_date}] " - f"to date [{iso_to_date}] with where [{query_params['where']}]") + f"to date [{iso_to_date}] with where [{query_params['where']}]") raw_measures = self._get_result_list( path=f"/v2/flat,node/{self._station_type}/{','.join(self._measure_types)}/{iso_from_date}/{iso_to_date}", diff --git a/pollution_v2/src/common/data_model/station.py b/pollution_v2/src/common/data_model/station.py index 4a40ca2..ba8957d 100644 --- a/pollution_v2/src/common/data_model/station.py +++ b/pollution_v2/src/common/data_model/station.py @@ -127,6 +127,7 @@ def id_corsia(self) -> int: @classmethod def from_json(cls, dict_data) -> TrafficSensorStation: + wrf_code = dict_data.get("wrf_code") if dict_data.get("wrf_code") is not None else None return TrafficSensorStation( code=dict_data["code"], active=dict_data["active"], @@ -135,5 +136,6 @@ def from_json(cls, dict_data) -> TrafficSensorStation: metadata=dict_data["metadata"], name=dict_data["name"], station_type=dict_data["station_type"], - origin=dict_data["origin"] + origin=dict_data["origin"], + wrf_code=wrf_code ) diff --git a/pollution_v2/src/road_weather/manager/road_weather.py b/pollution_v2/src/road_weather/manager/road_weather.py index 668654d..86322f9 100644 --- a/pollution_v2/src/road_weather/manager/road_weather.py +++ b/pollution_v2/src/road_weather/manager/road_weather.py @@ -14,9 +14,9 @@ from common.data_model import Station, RoadWeatherObservationMeasureCollection, Provenance, DataType, MeasureCollection from common.data_model.entry import GenericEntry from common.data_model.roadcast import RoadCastMeasure, RoadCastEntry, RoadCastMeasureCollection, RoadCastClass, \ - RoadCastTypeClass + RoadCastTypeClass, ExtendedRoadCastEntry from common.manager.station import StationManager -from common.settings import TMP_DIR +from common.settings import TMP_DIR, DEFAULT_TIMEZONE from road_weather.manager._forecast import Forecast from road_weather.model.road_weather_model import RoadWeatherModel @@ -68,7 +68,7 @@ def _download_observation_data(self, measures=connector.get_measures(from_date=from_date, to_date=to_date, station=traffic_station) ) - def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]: # TODO: change with RoadWeatherForecastMeasureCollection + def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, datetime]: # TODO: change with RoadWeatherForecastMeasureCollection """ Download forecast data measures in the given interval. @@ -91,9 +91,14 @@ def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]: forecast_filename = f"{TMP_DIR}/forecast_{traffic_station.wrf_code}_{roadcast_start}.xml" forecast.to_xml(forecast_filename) logger.info(f'forecast - XML saved in {forecast_filename} ') + + roadcast_start = datetime.strptime(roadcast_start, '%Y-%m-%dT%H:%M') + if roadcast_start.tzinfo is None: + roadcast_start = DEFAULT_TIMEZONE.localize(roadcast_start) + return forecast_filename, roadcast_start - def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[datetime, datetime]: + def _compute_observation_start_end_dates(self, forecast_start: datetime) -> Tuple[datetime, datetime]: """ Compute the start and end dates for the observation computation. @@ -102,10 +107,15 @@ def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[dat """ # start_obs = forecast.start - 16 h - # end_obs = forecast.start + 8 h - forecast_start = datetime.strptime(forecast_start, '%Y-%m-%dT%H:%M') start_date = forecast_start - timedelta(hours=16) + if start_date.tzinfo is None: + start_date = DEFAULT_TIMEZONE.localize(start_date) + + # end_obs = forecast.start + 8 h end_date = forecast_start + timedelta(hours=8) + if end_date.tzinfo is None: + end_date = DEFAULT_TIMEZONE.localize(end_date) + return start_date, end_date def _download_data_and_compute(self, station: Station) -> List[GenericEntry]: @@ -115,8 +125,10 @@ def _download_data_and_compute(self, station: Station) -> List[GenericEntry]: return [] observation_data = [] + to_date = None forecast_data_xml_path = "" - forecast_start = "" + forecast_start = None + max_observation_data = None try: # TODO: change with actual implementation from ODH when available forecast_data_xml_path, forecast_start = self._download_forecast_data(station) @@ -124,14 +136,20 @@ def _download_data_and_compute(self, station: Station) -> List[GenericEntry]: start_date, to_date = self._compute_observation_start_end_dates(forecast_start) observation_data = self._download_observation_data(start_date, to_date, station) + max_observation_data = max([item.valid_time for item in observation_data.get_entries()]) except Exception as e: logger.exception( f"Unable to download observation and forecast data for station [{station.code}]", exc_info=e) - if observation_data and forecast_data_xml_path: + if max_observation_data and observation_data and forecast_data_xml_path: model = RoadWeatherModel() - return model.compute_data(observation_data, forecast_data_xml_path, forecast_start, station) + res: list[ExtendedRoadCastEntry] = model.compute_data(observation_data, forecast_data_xml_path, + forecast_start, station) + logger.info(f"Received {len(res)} records from road weahter WS") + res = [item for item in res if item.valid_time > max_observation_data] + logger.info(f"Remaining with {len(res)} records from road weahter WS once filter on date {max_observation_data} is applied") + return res return [] diff --git a/pollution_v2/src/road_weather/model/road_weather_model.py b/pollution_v2/src/road_weather/model/road_weather_model.py index f70dc11..3fd3ae8 100644 --- a/pollution_v2/src/road_weather/model/road_weather_model.py +++ b/pollution_v2/src/road_weather/model/road_weather_model.py @@ -68,7 +68,7 @@ def create_multipart_formdata(cls, files): def compute_data(self, observation: RoadWeatherObservationMeasureCollection, forecast_filename: str, # TODO: change with RoadWeatherForecastMeasureCollection - forecast_start: str, # TODO: check if needed + forecast_start: datetime, # TODO: check if needed station: TrafficSensorStation) -> List[ExtendedRoadCastEntry]: """ Compute the road condition for the given station. From 58e746e9a8b32cfbbd53a8bdccd4afa07a5906ab Mon Sep 17 00:00:00 2001 From: Marco Angheben Date: Mon, 11 Nov 2024 07:44:49 +0100 Subject: [PATCH 2/2] fixed tests - issue #38 --- pollution_v2/src/tests/test_common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pollution_v2/src/tests/test_common.py b/pollution_v2/src/tests/test_common.py index f9739d7..ef8b6cd 100644 --- a/pollution_v2/src/tests/test_common.py +++ b/pollution_v2/src/tests/test_common.py @@ -49,7 +49,8 @@ def setUp(self): "metadata": {"a22_metadata": "{\"metro\":\"123\"}", "sensor_type": "induction_loop"}, "name": "station", "station_type": "type", - "origin": "origin" + "origin": "origin", + "wrf_code": None } self.station2_dict = self.station_dict.copy()