Skip to content

Commit

Permalink
updated messages for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
RSO9192 committed Jan 9, 2025
1 parent d9a273b commit 9d34a2b
Showing 1 changed file with 43 additions and 20 deletions.
63 changes: 43 additions & 20 deletions cavapy.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_climate_data(
xlim: tuple[float, float] | None = None,
ylim: tuple[float, float] | None = None,
remote: bool = True,
variables: list[str] | None = None,
num_processes: int = len(VALID_VARIABLES),
max_threads_per_process: int = 8,
) -> dict[str, xr.DataArray]:
Expand All @@ -111,6 +112,7 @@ def get_climate_data(
xlim (tuple or None): Longitudinal bounds of the region of interest. Use only when country is None (default: None).
ylim (tuple or None): Latitudinal bounds of the region of interest. Use only when country is None (default: None).
remote (bool): Flag to work with remote data or not (default: True).
variables (list[str] or None): List of variables to process. Must be a subset of {VALID_VARIABLES}. If None, all variables are processed. (default: None).
num_processes (int): Number of processes to use, one per variable.
By default equals to the number of all possible variables. (default: {len(VALID_VARIABLES)}).
max_threads_per_process (int): Max number of threads within each process. (default: 8).
Expand Down Expand Up @@ -149,13 +151,23 @@ def get_climate_data(
# Make sure years_obs is set to default when obs=False
years_obs = DEFAULT_YEARS_OBS

# Validate variables if provided
if variables is not None:
invalid_vars = [var for var in variables if var not in VALID_VARIABLES]
if invalid_vars:
raise ValueError(
f"Invalid variables: {invalid_vars}. Must be a subset of {VALID_VARIABLES}"
)
else:
variables = VALID_VARIABLES

_validate_urls(gcm, rcm, rcp, remote, cordex_domain, obs)

bbox = _geo_localize(country, xlim, ylim, buffer, cordex_domain)

with mp.Pool(processes=num_processes) as pool:
with mp.Pool(processes=min(num_processes, len(variables))) as pool:
futures = []
for variable in VALID_VARIABLES:
for variable in variables:
futures.append(
pool.apply_async(
process_worker,
Expand All @@ -178,7 +190,7 @@ def get_climate_data(
)

results = {
variable: futures[i].get() for i, variable in enumerate(VALID_VARIABLES)
variable: futures[i].get() for i, variable in enumerate(variables)
}

pool.close() # Prevent any more tasks from being submitted to the pool
Expand All @@ -196,7 +208,7 @@ def _validate_urls(
obs: bool = False,
):
# Load the data
log = logger.getChild("URLs validation")
log = logger.getChild("URL-validation")

if obs is False:
inventory_csv_url = (
Expand Down Expand Up @@ -225,15 +237,19 @@ def _validate_urls(
if num_rows == 1:
# Log the output for one row
row1 = column_values.iloc[0]
log.info(f"Projections: {row1}")
log_proj = logger.getChild("URL-validation-projections")
log_proj.info(f"{row1}")
else:
# Log the output for two rows
row1 = column_values.iloc[0]
row2 = column_values.iloc[1]
log.info(f"Historical simulation: {row1}")
log.info(f"Projections: {row2}")
log_hist = logger.getChild("URL-validation-historical")
log_proj = logger.getChild("URL-validation-projections")
log_hist.info(f"{row1}")
log_proj.info(f"{row2}")
else: # when obs is True
log.info(f"Observations: {ERA5_DATA_REMOTE_URL}")
log_obs = logger.getChild("URL-validation-observations")
log_obs.info(f"{ERA5_DATA_REMOTE_URL}")


def _geo_localize(
Expand Down Expand Up @@ -495,11 +511,12 @@ def _climate_data_for_variable(

def _thread_download_data(url: str | None, **kwargs):
variable = kwargs["variable"]
log = logger.getChild(variable)
temporal = "observations" if kwargs["obs"] else ("historical" if "historical" in str(url) else "projections")
log = logger.getChild(f"{variable}-{temporal}")
try:
return _download_data(url=url, **kwargs)
except Exception as e:
log.exception(f"Failed to download data from {url}: {e}")
log.exception(f"Failed to process data from {url}: {e}")
raise


Expand All @@ -512,16 +529,17 @@ def _download_data(
years_up_to: int,
remote: bool,
) -> xr.DataArray:
log = logger.getChild(variable)
tempo = "projection" if "rcp" in url else "historical"
temporal = "observations" if obs else ("historical" if url and "historical" in url else "projections")
log = logger.getChild(f"{variable}-{temporal}")

if obs:
var = VARIABLES_MAP[variable]
log.info(f"Opening ERA5 dataset for {variable}({var})")
log.info(f"Establishing connection to ERA5 data for {variable}({var})")
if remote:
ds_var = xr.open_dataset(ERA5_DATA_REMOTE_URL)[var]
else:
ds_var = xr.open_dataset(ERA5_DATA_LOCAL_PATH)[var]
log.info(f"ERA5 dataset for {variable}({var}) has been opened")
log.info(f"Connection to ERA5 data for {variable}({var}) has been established")

# Coordinate normalization and renaming for 'hurs'
if var == "hurs":
Expand Down Expand Up @@ -561,9 +579,9 @@ def _download_data(
)

else:
log.info(f"Opening CORDEX dataset for {variable}-{tempo}")
log.info(f"Establishing connection to CORDEX data for {variable}")
ds_var = xr.open_dataset(url)[variable]
log.info(f"CORDEX dataset for {variable}-{tempo} has been opened")
log.info(f"Connection to CORDEX data for {variable} has been established")
ds_cropped = ds_var.sel(
longitude=slice(bbox["xlim"][0], bbox["xlim"][1]),
latitude=slice(bbox["ylim"][1], bbox["ylim"][0]),
Expand Down Expand Up @@ -594,7 +612,7 @@ def _download_data(
ds_cropped = ds_cropped.convert_calendar(
calendar="gregorian", missing=np.nan, align_on="date"
)

time_mask = (ds_cropped["time"].dt.year >= years[0]) & (
ds_cropped["time"].dt.year <= years[-1]
)
Expand All @@ -604,9 +622,14 @@ def _download_data(

assert isinstance(ds_cropped, xr.DataArray)

log.info(
f"{'Observational' if obs else f'CORDEX-{tempo}'} data for {variable} has been processed"
)
if obs:
log.info(
f"ERA5 data for {variable} has been processed: unit conversion ({ds_cropped.attrs.get('units', 'unknown units')}), time selection ({years[0]}-{years[-1]})"
)
else:
log.info(
f"CORDEX data for {variable} has been processed: unit conversion ({ds_cropped.attrs.get('units', 'unknown units')}), calendar transformation (360-day to Gregorian), time selection ({years[0]}-{years[-1]})"
)

return ds_cropped

Expand Down

0 comments on commit 9d34a2b

Please sign in to comment.