diff --git a/ecml_tools/create/functions/actions/grib.py b/ecml_tools/create/functions/actions/grib.py index cd077b5..a011041 100644 --- a/ecml_tools/create/functions/actions/grib.py +++ b/ecml_tools/create/functions/actions/grib.py @@ -12,17 +12,35 @@ from climetlab.utils.patterns import Pattern +def check(ds, paths, **kwargs): + count = 1 + for k, v in kwargs.items(): + if isinstance(v, (tuple, list)): + count *= len(v) + + if len(ds) != count: + raise ValueError( + f"Expected {count} fields, got {len(ds)} (kwargs={kwargs}, paths={paths})" + ) + + def execute(context, dates, path, *args, **kwargs): paths = Pattern(path, ignore_missing_keys=True).substitute( *args, date=dates, **kwargs ) + for name in ("grid", "area", "rotation", "frame", "resol", "bitmap"): + if name in kwargs: + raise ValueError(f"MARS interpolation parameter '{name}' not supported") + ds = load_source("empty") + dates = [d.isoformat() for d in dates] for path in paths: - print("PATH", path) + context.trace("📁", "PATH", path) s = load_source("file", path) - s = s.sel(valid_datetime=[d.isoformat() for d in dates], **kwargs) - + s = s.sel(valid_datetime=dates, **kwargs) ds = ds + s + + check(ds, paths, valid_datetime=dates, **kwargs) return ds diff --git a/ecml_tools/create/functions/actions/netcdf.py b/ecml_tools/create/functions/actions/netcdf.py index 84c0676..a433bc4 100644 --- a/ecml_tools/create/functions/actions/netcdf.py +++ b/ecml_tools/create/functions/actions/netcdf.py @@ -7,6 +7,44 @@ # nor does it submit to any jurisdiction. # -from .opendap import execute as opendap_execute +from climetlab import load_source +from climetlab.utils.patterns import Pattern -execute = opendap_execute # netcdf is an alias for opendap + +def check(what, ds, paths, **kwargs): + count = 1 + for k, v in kwargs.items(): + if isinstance(v, (tuple, list)): + count *= len(v) + + if len(ds) != count: + raise ValueError( + f"Expected {count} fields, got {len(ds)} (kwargs={kwargs}, {what}s={paths})" + ) + + +def load_netcdfs(emoji, what, context, dates, path, *args, **kwargs): + paths = Pattern(path, ignore_missing_keys=True).substitute( + *args, date=dates, **kwargs + ) + + ds = load_source("empty") + levels = kwargs.get("level", kwargs.get("levelist")) + + for path in paths: + context.trace(emoji, what.upper(), path) + s = load_source("opendap", path) + s = s.sel( + valid_datetime=[d.isoformat() for d in dates], + param=kwargs["param"], + step=kwargs.get("step", 0), + ) + if levels: + s = s.sel(levelist=levels) + ds = ds + s + check(what, ds, paths, valid_datetime=dates, **kwargs) + return ds + + +def execute(context, dates, path, *args, **kwargs): + return load_netcdfs("📁", "path", context, dates, path, *args, **kwargs) diff --git a/ecml_tools/create/functions/actions/opendap.py b/ecml_tools/create/functions/actions/opendap.py index e79d245..ffbfc3e 100644 --- a/ecml_tools/create/functions/actions/opendap.py +++ b/ecml_tools/create/functions/actions/opendap.py @@ -7,28 +7,8 @@ # nor does it submit to any jurisdiction. # +from .netcdf import load_netcdfs -from climetlab import load_source -from climetlab.utils.patterns import Pattern - -def execute(context, dates, url_pattern, *args, **kwargs): - urls = Pattern(url_pattern, ignore_missing_keys=True).substitute( - *args, date=dates, **kwargs - ) - - ds = load_source("empty") - levels = kwargs.get("level", kwargs.get("levelist")) - - for url in urls: - context.trace("🌐", url) - s = load_source("opendap", url) - s = s.sel( - valid_datetime=[d.isoformat() for d in dates], - param=kwargs["param"], - step=kwargs.get("step", 0), - ) - if levels: - s = s.sel(levelist=levels) - ds = ds + s - return ds +def execute(context, dates, url, *args, **kwargs): + return load_netcdfs("🌐", "url", context, dates, url, *args, **kwargs) diff --git a/ecml_tools/create/input.py b/ecml_tools/create/input.py index 9af1b9c..2d5b063 100644 --- a/ecml_tools/create/input.py +++ b/ecml_tools/create/input.py @@ -9,7 +9,6 @@ import datetime import importlib import logging -import os import textwrap import time from collections import defaultdict @@ -26,22 +25,19 @@ LOG = logging.getLogger(__name__) -def find_function_path(name, kind): - name = name.replace("-", "_") - here = os.path.dirname(__file__) - return os.path.join(here, "functions", kind, f"{name}.py") - - def import_function(name, kind): - path = find_function_path(name, kind) - spec = importlib.util.spec_from_file_location(name, path) - module = spec.loader.load_module() - return module.execute + return importlib.import_module( + f"..functions.{kind}.{name}", + package=__name__, + ).execute def is_function(name, kind): - path = find_function_path(name, kind) - return os.path.exists(path) + try: + import_function(name, kind) + return True + except ImportError: + return False def assert_is_fieldset(obj): @@ -230,7 +226,7 @@ def coords(self): class Action: - def __init__(self, context, path, /, *args, **kwargs): + def __init__(self, context, action_path, /, *args, **kwargs): if "args" in kwargs and "kwargs" in kwargs: """We have: args = [] @@ -246,7 +242,7 @@ def __init__(self, context, path, /, *args, **kwargs): self.context = context self.kwargs = kwargs self.args = args - self.path = path + self.action_path = action_path @classmethod def _short_str(cls, x): @@ -287,7 +283,7 @@ def check_references(method): @wraps(method) def wrapper(self, *args, **kwargs): result = method(self, *args, **kwargs) - self.context.notify_result(self.path, result) + self.context.notify_result(self.action_path, result) return result return wrapper @@ -296,8 +292,8 @@ def wrapper(self, *args, **kwargs): TRACE_INDENT = 0 -def step(path): - return f"[{'.'.join(path)}]" +def step(action_path): + return f"[{'.'.join(action_path)}]" def trace(emoji, *args): @@ -311,7 +307,7 @@ def wrapper(self, *args, **kwargs): trace( "🌍", "=>", - step(self.path), + step(self.action_path), self._trace_datasource(*args, **kwargs), ) TRACE_INDENT += 1 @@ -320,7 +316,7 @@ def wrapper(self, *args, **kwargs): trace( "🍎", "<=", - step(self.path), + step(self.action_path), textwrap.shorten(repr(result), 256), ) return result @@ -335,7 +331,7 @@ def wrapper(self, *args, **kwargs): trace( "👓", "=>", - ".".join(self.path), + ".".join(self.action_path), self._trace_select(*args, **kwargs), ) TRACE_INDENT += 1 @@ -344,7 +340,7 @@ def wrapper(self, *args, **kwargs): trace( "🍍", "<=", - ".".join(self.path), + ".".join(self.action_path), textwrap.shorten(repr(result), 256), ) return result @@ -355,17 +351,17 @@ def wrapper(self, *args, **kwargs): class Result(HasCoordsMixin): empty = False - def __init__(self, context, path, dates): + def __init__(self, context, action_path, dates): assert isinstance(context, Context), type(context) - assert path is None or isinstance(path, list), path + assert action_path is None or isinstance(action_path, list), action_path self.context = context self._coords = Coords(self) self._dates = dates - self.path = path or [] - if path is not None: - context.register_reference(path, self) + self.action_path = action_path or [] + if action_path is not None: + context.register_reference(action_path, self) @property @trace_datasource @@ -441,8 +437,8 @@ def variables(self): class FunctionResult(Result): - def __init__(self, context, path, dates, action): - super().__init__(context, path, dates) + def __init__(self, context, action_path, dates, action): + super().__init__(context, action_path, dates) assert isinstance(action, Action), type(action) self.action = action @@ -476,8 +472,8 @@ def function(self): class JoinResult(Result): - def __init__(self, context, path, dates, results, **kwargs): - super().__init__(context, path, dates) + def __init__(self, context, action_path, dates, results, **kwargs): + super().__init__(context, action_path, dates) self.results = [r for r in results if not r.empty] @cached_property @@ -496,13 +492,13 @@ def __repr__(self): class FunctionAction(Action): - def __init__(self, context, path, _name, **kwargs): - super().__init__(context, path, **kwargs) + def __init__(self, context, action_path, _name, **kwargs): + super().__init__(context, action_path, **kwargs) self.name = _name @trace_select def select(self, dates): - return FunctionResult(self.context, self.path, dates, action=self) + return FunctionResult(self.context, self.action_path, dates, action=self) @property def function(self): @@ -522,7 +518,7 @@ def _trace_select(self, dates): class ConcatResult(Result): - def __init__(self, context, path, results): + def __init__(self, context, action_path, results): super().__init__(context, dates=None) self.results = [r for r in results if not r.empty] @@ -573,10 +569,11 @@ def __repr__(self): class ActionWithList(Action): result_class = None - def __init__(self, context, path, *configs): - super().__init__(context, path, *configs) + def __init__(self, context, action_path, *configs): + super().__init__(context, action_path, *configs) self.actions = [ - action_factory(c, context, path + [str(i)]) for i, c in enumerate(configs) + action_factory(c, context, action_path + [str(i)]) + for i, c in enumerate(configs) ] def __repr__(self): @@ -585,13 +582,13 @@ def __repr__(self): class PipeAction(Action): - def __init__(self, context, path, *configs): - super().__init__(context, path, *configs) + def __init__(self, context, action_path, *configs): + super().__init__(context, action_path, *configs) assert len(configs) > 1, configs - current = action_factory(configs[0], context, path + ["0"]) + current = action_factory(configs[0], context, action_path + ["0"]) for i, c in enumerate(configs[1:]): current = step_factory( - c, context, path + [str(i + 1)], previous_step=current + c, context, action_path + [str(i + 1)], previous_step=current ) self.last_step = current @@ -604,8 +601,8 @@ def __repr__(self): class StepResult(Result): - def __init__(self, context, path, dates, action, upstream_result): - super().__init__(context, path, dates) + def __init__(self, context, action_path, dates, action, upstream_result): + super().__init__(context, action_path, dates) assert isinstance(upstream_result, Result), type(upstream_result) self.upstream_result = upstream_result self.action = action @@ -621,15 +618,15 @@ def datasource(self): class StepAction(Action): result_class = None - def __init__(self, context, path, previous_step, *args, **kwargs): - super().__init__(context, path, *args, **kwargs) + def __init__(self, context, action_path, previous_step, *args, **kwargs): + super().__init__(context, action_path, *args, **kwargs) self.previous_step = previous_step @trace_select def select(self, dates): return self.result_class( self.context, - self.path, + self.action_path, dates, self, self.previous_step.select(dates), @@ -676,8 +673,8 @@ class FilterStepAction(StepAction): class FunctionStepAction(StepAction): - def __init__(self, context, path, previous_step, *args, **kwargs): - super().__init__(context, path, previous_step, *args, **kwargs) + def __init__(self, context, action_path, previous_step, *args, **kwargs): + super().__init__(context, action_path, previous_step, *args, **kwargs) self.name = args[0] self.function = import_function(self.name, "steps") @@ -694,12 +691,15 @@ class JoinAction(ActionWithList): @trace_select def select(self, dates): return JoinResult( - self.context, self.path, dates, [a.select(dates) for a in self.actions] + self.context, + self.action_path, + dates, + [a.select(dates) for a in self.actions], ) class DateAction(Action): - def __init__(self, context, path, **kwargs): + def __init__(self, context, action_path, **kwargs): super().__init__(context, **kwargs) datesconfig = {} @@ -738,7 +738,7 @@ def merge_dicts(a, b): return deepcopy(b) -def action_factory(config, context, path): +def action_factory(config, context, action_path): assert isinstance(context, Context), (type, context) if not isinstance(config, dict): raise ValueError(f"Invalid input config {config}") @@ -786,10 +786,10 @@ def action_factory(config, context, path): cls = FunctionAction args = [key] + args - return cls(context, path + [key], *args, **kwargs) + return cls(context, action_path + [key], *args, **kwargs) -def step_factory(config, context, path, previous_step): +def step_factory(config, context, action_path, previous_step): assert isinstance(context, Context), (type, context) if not isinstance(config, dict): raise ValueError(f"Invalid input config {config}") @@ -816,7 +816,7 @@ def step_factory(config, context, path, previous_step): cls = FunctionStepAction args = [key] + args - return cls(context, path, previous_step, *args, **kwargs) + return cls(context, action_path, previous_step, *args, **kwargs) class FunctionContext: @@ -837,65 +837,65 @@ def __init__(self, /, order_by, flatten_grid, remapping): self.used_references = set() self.results = {} - def register_reference(self, path, obj): - assert isinstance(path, (list, tuple)), path - path = tuple(path) - trace("📚", step(path), "register", type(obj)) - if path in self.references: - raise ValueError(f"Duplicate reference {path}") - self.references[path] = obj - - def find_reference(self, path): - assert isinstance(path, (list, tuple)), path - path = tuple(path) - if path in self.references: - return self.references[path] - # It can happend that the required path is not yet registered, + def register_reference(self, action_path, obj): + assert isinstance(action_path, (list, tuple)), action_path + action_path = tuple(action_path) + trace("📚", step(action_path), "register", type(obj)) + if action_path in self.references: + raise ValueError(f"Duplicate reference {action_path}") + self.references[action_path] = obj + + def find_reference(self, action_path): + assert isinstance(action_path, (list, tuple)), action_path + action_path = tuple(action_path) + if action_path in self.references: + return self.references[action_path] + # It can happend that the required action_path is not yet registered, # even if it is defined in the config. # Handling this case implies implementing a lazy inheritance resolution # and would complexify the code. This is not implemented. - raise ValueError(f"Cannot find reference {path}") + raise ValueError(f"Cannot find reference {action_path}") - def will_need_reference(self, path): - assert isinstance(path, (list, tuple)), path - path = tuple(path) - self.used_references.add(path) + def will_need_reference(self, action_path): + assert isinstance(action_path, (list, tuple)), action_path + action_path = tuple(action_path) + self.used_references.add(action_path) - def notify_result(self, path, result): - trace("🎯", step(path), "notify result", result) - assert isinstance(path, (list, tuple)), path - path = tuple(path) - if path in self.used_references: - if path in self.results: - raise ValueError(f"Duplicate result {path}") - self.results[path] = result + def notify_result(self, action_path, result): + trace("🎯", step(action_path), "notify result", result) + assert isinstance(action_path, (list, tuple)), action_path + action_path = tuple(action_path) + if action_path in self.used_references: + if action_path in self.results: + raise ValueError(f"Duplicate result {action_path}") + self.results[action_path] = result - def get_result(self, path): - assert isinstance(path, (list, tuple)), path - path = tuple(path) - if path in self.results: - return self.results[path] - raise ValueError(f"Cannot find result {path}") + def get_result(self, action_path): + assert isinstance(action_path, (list, tuple)), action_path + action_path = tuple(action_path) + if action_path in self.results: + return self.results[action_path] + raise ValueError(f"Cannot find result {action_path}") class InputBuilder: def __init__(self, config, **kwargs): self.kwargs = kwargs self.config = config - self.path = ["input"] + self.action_path = ["input"] @trace_select def select(self, dates): """This changes the context.""" dates = build_groups(dates) context = Context(**self.kwargs) - action = action_factory(self.config, context, self.path) + action = action_factory(self.config, context, self.action_path) return action.select(dates) def __repr__(self): context = Context(**self.kwargs) - a = action_factory(self.config, context, self.path) + a = action_factory(self.config, context, self.action_path) return repr(a) def _trace_select(self, dates):