Skip to content

Commit

Permalink
Merge branch '38-update-road-weather-forecasts-output' into 'main'
Browse files Browse the repository at this point in the history
Resolve "Update road weather forecasts output"

Closes noi-techpark#38

See merge request u-hopper/projects/industrial/open-data-hub-bz/bdp-elaborations!27
  • Loading branch information
Marco Angheben committed Nov 11, 2024
2 parents 13a77be + 58e746e commit 66fe9b4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pollution_v2/src/common/connector/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def get_measures(self, from_date: datetime, to_date: datetime, station: Optional
query_params["where"] = f'and({",".join(where_conds)})'

logger.debug(f"Retrieving measures on [{type(self).__name__}] from date [{iso_from_date}] "
f"to date [{iso_to_date}] with where [{query_params['where']}]")
f"to date [{iso_to_date}] with where [{query_params['where']}]")

raw_measures = self._get_result_list(
path=f"/v2/flat,node/{self._station_type}/{','.join(self._measure_types)}/{iso_from_date}/{iso_to_date}",
Expand Down
4 changes: 3 additions & 1 deletion pollution_v2/src/common/data_model/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def id_corsia(self) -> int:

@classmethod
def from_json(cls, dict_data) -> TrafficSensorStation:
wrf_code = dict_data.get("wrf_code") if dict_data.get("wrf_code") is not None else None
return TrafficSensorStation(
code=dict_data["code"],
active=dict_data["active"],
Expand All @@ -135,5 +136,6 @@ def from_json(cls, dict_data) -> TrafficSensorStation:
metadata=dict_data["metadata"],
name=dict_data["name"],
station_type=dict_data["station_type"],
origin=dict_data["origin"]
origin=dict_data["origin"],
wrf_code=wrf_code
)
36 changes: 27 additions & 9 deletions pollution_v2/src/road_weather/manager/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from common.data_model import Station, RoadWeatherObservationMeasureCollection, Provenance, DataType, MeasureCollection
from common.data_model.entry import GenericEntry
from common.data_model.roadcast import RoadCastMeasure, RoadCastEntry, RoadCastMeasureCollection, RoadCastClass, \
RoadCastTypeClass
RoadCastTypeClass, ExtendedRoadCastEntry
from common.manager.station import StationManager
from common.settings import TMP_DIR
from common.settings import TMP_DIR, DEFAULT_TIMEZONE
from road_weather.manager._forecast import Forecast
from road_weather.model.road_weather_model import RoadWeatherModel

Expand Down Expand Up @@ -68,7 +68,7 @@ def _download_observation_data(self,
measures=connector.get_measures(from_date=from_date, to_date=to_date, station=traffic_station)
)

def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]: # TODO: change with RoadWeatherForecastMeasureCollection
def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, datetime]: # TODO: change with RoadWeatherForecastMeasureCollection
"""
Download forecast data measures in the given interval.
Expand All @@ -91,9 +91,14 @@ def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]:
forecast_filename = f"{TMP_DIR}/forecast_{traffic_station.wrf_code}_{roadcast_start}.xml"
forecast.to_xml(forecast_filename)
logger.info(f'forecast - XML saved in {forecast_filename} ')

roadcast_start = datetime.strptime(roadcast_start, '%Y-%m-%dT%H:%M')
if roadcast_start.tzinfo is None:
roadcast_start = DEFAULT_TIMEZONE.localize(roadcast_start)

return forecast_filename, roadcast_start

def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[datetime, datetime]:
def _compute_observation_start_end_dates(self, forecast_start: datetime) -> Tuple[datetime, datetime]:
"""
Compute the start and end dates for the observation computation.
Expand All @@ -102,10 +107,15 @@ def _compute_observation_start_end_dates(self, forecast_start: str) -> Tuple[dat
"""

# start_obs = forecast.start - 16 h
# end_obs = forecast.start + 8 h
forecast_start = datetime.strptime(forecast_start, '%Y-%m-%dT%H:%M')
start_date = forecast_start - timedelta(hours=16)
if start_date.tzinfo is None:
start_date = DEFAULT_TIMEZONE.localize(start_date)

# end_obs = forecast.start + 8 h
end_date = forecast_start + timedelta(hours=8)
if end_date.tzinfo is None:
end_date = DEFAULT_TIMEZONE.localize(end_date)

return start_date, end_date

def _download_data_and_compute(self, station: Station) -> List[GenericEntry]:
Expand All @@ -115,23 +125,31 @@ def _download_data_and_compute(self, station: Station) -> List[GenericEntry]:
return []

observation_data = []
to_date = None
forecast_data_xml_path = ""
forecast_start = ""
forecast_start = None
max_observation_data = None
try:
# TODO: change with actual implementation from ODH when available
forecast_data_xml_path, forecast_start = self._download_forecast_data(station)

start_date, to_date = self._compute_observation_start_end_dates(forecast_start)

observation_data = self._download_observation_data(start_date, to_date, station)
max_observation_data = max([item.valid_time for item in observation_data.get_entries()])
except Exception as e:
logger.exception(
f"Unable to download observation and forecast data for station [{station.code}]",
exc_info=e)

if observation_data and forecast_data_xml_path:
if max_observation_data and observation_data and forecast_data_xml_path:
model = RoadWeatherModel()
return model.compute_data(observation_data, forecast_data_xml_path, forecast_start, station)
res: list[ExtendedRoadCastEntry] = model.compute_data(observation_data, forecast_data_xml_path,
forecast_start, station)
logger.info(f"Received {len(res)} records from road weahter WS")
res = [item for item in res if item.valid_time > max_observation_data]
logger.info(f"Remaining with {len(res)} records from road weahter WS once filter on date {max_observation_data} is applied")
return res

return []

Expand Down
2 changes: 1 addition & 1 deletion pollution_v2/src/road_weather/model/road_weather_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create_multipart_formdata(cls, files):

def compute_data(self, observation: RoadWeatherObservationMeasureCollection,
forecast_filename: str, # TODO: change with RoadWeatherForecastMeasureCollection
forecast_start: str, # TODO: check if needed
forecast_start: datetime, # TODO: check if needed
station: TrafficSensorStation) -> List[ExtendedRoadCastEntry]:
"""
Compute the road condition for the given station.
Expand Down
3 changes: 2 additions & 1 deletion pollution_v2/src/tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def setUp(self):
"metadata": {"a22_metadata": "{\"metro\":\"123\"}", "sensor_type": "induction_loop"},
"name": "station",
"station_type": "type",
"origin": "origin"
"origin": "origin",
"wrf_code": None
}

self.station2_dict = self.station_dict.copy()
Expand Down

0 comments on commit 66fe9b4

Please sign in to comment.