From f019528e62cbf79f95c48f92b879952fbe3141f5 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Sun, 18 Feb 2024 10:25:26 +0000 Subject: [PATCH] trace --- .../create/functions/actions/constants.py | 2 +- .../create/functions/actions/opendap.py | 2 +- .../create/functions/steps/rotate_winds.py | 2 +- ecml_tools/create/input.py | 194 ++++++++++++------ 4 files changed, 138 insertions(+), 62 deletions(-) diff --git a/ecml_tools/create/functions/actions/constants.py b/ecml_tools/create/functions/actions/constants.py index 207d5fa..585290d 100644 --- a/ecml_tools/create/functions/actions/constants.py +++ b/ecml_tools/create/functions/actions/constants.py @@ -56,7 +56,7 @@ def normalise_time_to_hours(r): def constants(context, dates, template, param): - print(f"✅ load_source(constants, {template}, {param}") + context.trace("✅", f"load_source(constants, {template}, {param}") return load_source("constants", source_or_dataset=template, date=dates, param=param) diff --git a/ecml_tools/create/functions/actions/opendap.py b/ecml_tools/create/functions/actions/opendap.py index d0561f6..e79d245 100644 --- a/ecml_tools/create/functions/actions/opendap.py +++ b/ecml_tools/create/functions/actions/opendap.py @@ -21,7 +21,7 @@ def execute(context, dates, url_pattern, *args, **kwargs): levels = kwargs.get("level", kwargs.get("levelist")) for url in urls: - # print("URL", url) + context.trace("🌐", url) s = load_source("opendap", url) s = s.sel( valid_datetime=[d.isoformat() for d in dates], diff --git a/ecml_tools/create/functions/steps/rotate_winds.py b/ecml_tools/create/functions/steps/rotate_winds.py index a0474ac..50e27ca 100644 --- a/ecml_tools/create/functions/steps/rotate_winds.py +++ b/ecml_tools/create/functions/steps/rotate_winds.py @@ -40,7 +40,7 @@ def rotate_winds(lats, lons, x_wind, y_wind, source_projection, target_projectio orig_speed = np.sqrt(x_wind**2 + y_wind**2) x0, y0 = source_projection(lons, lats) - print(x0, y0) + if source_projection.name != "longlat": x1 = x0 + x_wind y1 = y0 + y_wind diff --git a/ecml_tools/create/input.py b/ecml_tools/create/input.py index d9c2a5c..16bc5e5 100644 --- a/ecml_tools/create/input.py +++ b/ecml_tools/create/input.py @@ -10,10 +10,11 @@ import importlib import logging import os +import textwrap import time from collections import defaultdict from copy import deepcopy -from functools import cached_property +from functools import cached_property, wraps import numpy as np from climetlab.core.order import build_remapping @@ -96,10 +97,6 @@ def sort(old_dic): ) -class Cache: - pass - - class Coords: def __init__(self, owner): self.owner = owner @@ -274,8 +271,20 @@ def select(self, dates, **kwargs): def _raise_not_implemented(self): raise NotImplementedError(f"Not implemented in {self.__class__.__name__}") + def _trace_select(self, dates): + return f"{self.__class__.__name__}({shorten(dates)})" + + +def shorten(dates): + if isinstance(dates, (list, tuple)): + dates = [d.isoformat() for d in dates] + if len(dates) > 5: + return f"{dates[0]}...{dates[-1]}" + return dates + def check_references(method): + @wraps(method) def wrapper(self, *args, **kwargs): result = method(self, *args, **kwargs) self.context.notify_result(self.path, result) @@ -284,6 +293,65 @@ def wrapper(self, *args, **kwargs): return wrapper +TRACE_INDENT = 0 + + +def step(path): + return f"[{'.'.join(path)}]" + + +def trace(emoji, *args): + print(emoji, " " * TRACE_INDENT, *args) + + +def trace_datasource(method): + @wraps(method) + def wrapper(self, *args, **kwargs): + global TRACE_INDENT + trace( + "🌍", + "=>", + step(self.path), + self._trace_datasource(*args, **kwargs), + ) + TRACE_INDENT += 1 + result = method(self, *args, **kwargs) + TRACE_INDENT -= 1 + trace( + "🍎", + "<=", + step(self.path), + textwrap.shorten(repr(result), 256), + ) + return result + + return wrapper + + +def trace_select(method): + @wraps(method) + def wrapper(self, *args, **kwargs): + global TRACE_INDENT + trace( + "👓", + "=>", + ".".join(self.path), + self._trace_select(*args, **kwargs), + ) + TRACE_INDENT += 1 + result = method(self, *args, **kwargs) + TRACE_INDENT -= 1 + trace( + "🍍", + "<=", + ".".join(self.path), + textwrap.shorten(repr(result), 256), + ) + return result + + return wrapper + + class Result(HasCoordsMixin): empty = False @@ -295,11 +363,12 @@ def __init__(self, context, path, dates): self.context = context self._coords = Coords(self) self._dates = dates - self.path = path + self.path = path or [] if path is not None: context.register_reference(path, self) @property + @trace_datasource def datasource(self): self._raise_not_implemented() @@ -309,7 +378,7 @@ def data_request(self): return _datasource_request(self.datasource) def get_cube(self): - print(f"getting cube from {self.__class__.__name__}") + trace("🧊", f"getting cube from {self.__class__.__name__}") ds = self.datasource assert_is_fieldset(ds) @@ -352,11 +421,15 @@ def __repr__(self, *args, _indent_="\n", **kwargs): def _raise_not_implemented(self): raise NotImplementedError(f"Not implemented in {self.__class__.__name__}") + def _trace_datasource(self, *args, **kwargs): + return f"{self.__class__.__name__}({shorten(self.dates)})" + class EmptyResult(Result): empty = True @cached_property + @trace_datasource def datasource(self): from climetlab import load_source @@ -377,36 +450,25 @@ def __init__(self, context, path, dates, action): context, (self.action.args, self.action.kwargs) ) - self._result = None + def _trace_datasource(self, *args, **kwargs): + return f"{self.action.name}({shorten(self.dates)})" - @property + @cached_property @check_references + @trace_datasource def datasource(self): - print( - "🌎", - self.path, - f"{self.action.function.__name__}.datasource({self.dates}, {self.args} {self.kwargs})", - ) - - # We don't use the cached_property here because if hides - # errors in the function. - - if self._result is not None: - return self._result - args, kwargs = resolve(self.context, (self.args, self.kwargs)) - self._result = self.action.function( - FunctionContext(self), self.dates, *args, **kwargs - ) - - return self._result + try: + return self.action.function( + FunctionContext(self), self.dates, *args, **kwargs + ) + except Exception: + LOG.error(f"Error in {self.action.function.__name__}", exc_info=True) + raise def __repr__(self): - content = " ".join([f"{v}" for v in self.args]) - content += " ".join([f"{k}={v}" for k, v in self.kwargs.items()]) - - return super().__repr__(content) + return f"{self.action.name}({shorten(self.dates)})" @property def function(self): @@ -420,6 +482,7 @@ def __init__(self, context, path, dates, results, **kwargs): @cached_property @check_references + @trace_datasource def datasource(self): ds = EmptyResult(self.context, None, self._dates).datasource for i in self.results: @@ -437,8 +500,8 @@ def __init__(self, context, path, _name, **kwargs): super().__init__(context, path, **kwargs) self.name = _name + @trace_select def select(self, dates): - print("🚀", self.path, f"{self.name}.select({dates})") return FunctionResult(self.context, self.path, dates, action=self) @property @@ -454,6 +517,9 @@ def __repr__(self): content = self._short_str(content) return super().__repr__(_inline_=content, _indent_=" ") + def _trace_select(self, dates): + return f"{self.name}({shorten(dates)})" + class ConcatResult(Result): def __init__(self, context, path, results): @@ -462,6 +528,7 @@ def __init__(self, context, path, results): @cached_property @check_references + @trace_datasource def datasource(self): ds = EmptyResult(self.context, self.dates).datasource for i in self.results: @@ -526,21 +593,18 @@ def __init__(self, context, path, *configs): current = step_factory( c, context, path + [str(i + 1)], previous_step=current ) - self.content = current + self.last_step = current + @trace_select def select(self, dates): - print("🚀", self.path, f"PipeAction.select({dates}, {self.content})") - result = self.content.select(dates) - print("🍎", self.path, "PipeAction.result", result) - return result + return self.last_step.select(dates) def __repr__(self): - return super().__repr__(self.content) + return super().__repr__(self.last_step) class StepResult(Result): def __init__(self, context, path, dates, action, upstream_result): - print("🐫", "step result", path, upstream_result, type(upstream_result)) super().__init__(context, path, dates) assert isinstance(upstream_result, Result), type(upstream_result) self.upstream_result = upstream_result @@ -548,6 +612,7 @@ def __init__(self, context, path, dates, action, upstream_result): @property @check_references + @trace_datasource def datasource(self): raise NotImplementedError(f"Not implemented in {self.__class__.__name__}") # return self.upstream_result.datasource @@ -560,6 +625,7 @@ def __init__(self, context, path, previous_step, *args, **kwargs): super().__init__(context, path, *args, **kwargs) self.previous_step = previous_step + @trace_select def select(self, dates): return self.result_class( self.context, @@ -574,31 +640,29 @@ def __repr__(self): class StepFunctionResult(StepResult): - _result = None - - @property + @cached_property @check_references + @trace_datasource def datasource(self): - # We don't use the cached_property here because if hides - # errors in the function. - - print("🥧", "StepFunctionResult.datasource", self.action.function) - - if self._result is not None: - return self._result + try: + return self.action.function( + FunctionContext(self), + self.upstream_result.datasource, + **self.action.kwargs, + ) - self._result = self.action.function( - FunctionContext(self), - self.upstream_result.datasource, - **self.action.kwargs, - ) + except Exception: + LOG.error(f"Error in {self.action.name}", exc_info=True) + raise - return self._result + def _trace_datasource(self, *args, **kwargs): + return f"{self.action.name}({shorten(self.dates)})" class FilterStepResult(StepResult): @property @check_references + @trace_datasource def datasource(self): ds = self.content.datasource assert_is_fieldset(ds) @@ -614,17 +678,20 @@ class FilterStepAction(StepAction): class FunctionStepAction(StepAction): def __init__(self, context, path, previous_step, *args, **kwargs): super().__init__(context, path, previous_step, *args, **kwargs) - self.function = import_function(args[0], "steps") + self.name = args[0] + self.function = import_function(self.name, "steps") result_class = StepFunctionResult class ConcatAction(ActionWithList): + @trace_select def select(self, dates): return ConcatResult(self.context, [a.select(dates) for a in self.actions]) class JoinAction(ActionWithList): + @trace_select def select(self, dates): return JoinResult( self.context, self.path, dates, [a.select(dates) for a in self.actions] @@ -646,6 +713,7 @@ def __init__(self, context, path, **kwargs): self._dates = build_groups(datesconfig) self.content = action_factory(subconfig, context) + @trace_select def select(self, dates): newdates = self._dates.intersect(dates) if newdates.empty(): @@ -755,6 +823,9 @@ class FunctionContext: def __init__(self, owner): self.owner = owner + def trace(self, emoji, *args): + trace(emoji, *args) + class Context: def __init__(self, /, order_by, flatten_grid, remapping): @@ -769,7 +840,7 @@ def __init__(self, /, order_by, flatten_grid, remapping): def register_reference(self, path, obj): assert isinstance(path, (list, tuple)), path path = tuple(path) - print("=======> register", path, type(obj)) + trace("📚", step(path), "register", type(obj)) if path in self.references: raise ValueError(f"Duplicate reference {path}") self.references[path] = obj @@ -792,7 +863,7 @@ def will_need_reference(self, path): self.used_references.add(path) def notify_result(self, path, result): - print("notify_result", path, result) + print("🎯", " " * TRACE_INDENT, step(path), "notify result", result) assert isinstance(path, (list, tuple)), path path = tuple(path) if path in self.used_references: @@ -812,18 +883,23 @@ class InputBuilder: def __init__(self, config, **kwargs): self.kwargs = kwargs self.config = config + self.path = ["input"] + @trace_select def select(self, dates): """This changes the context.""" dates = build_groups(dates) context = Context(**self.kwargs) - action = action_factory(self.config, context, ["input"]) + action = action_factory(self.config, context, self.path) return action.select(dates) def __repr__(self): context = Context(**self.kwargs) - a = action_factory(self.config, context, ["input"]) + a = action_factory(self.config, context, self.path) return repr(a) + def _trace_select(self, dates): + return f"InputBuilder({shorten(dates)})" + build_input = InputBuilder