From 4876fa90d5da4f36b13b605d21e7240b9926dd69 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Tue, 24 Oct 2023 13:18:41 -0700 Subject: [PATCH] Automated artifact logging with CMF (#35) ## Introduction This PR introduces an opinionated implementation of automated machine learning (ML) artifact logging, and an example 4-stage ML pipeline that uses this feature to automatically log input and output artifacts of each stage. The implementation demonstrates one possible approach to do automated logging with CMF. Automated logging implies that developers do not need (in most cases) directly operate with the CMF class instance. Instead, the framework automatically logs execution parameters, and input and output artifacts. Currently, the implementation is in the `cmflib.contrib.auto_logging_v01` module. ## Implementation details Users organize their ML pipelines as graphs where each node is a single stage performing particular data transformation or model training. Each stage is a python function with specific interface. Users than wrap their stages with a python decorator that enables automated logging. This is an example stage: ```python import typing as t from cmflib.contrib.auto_logging_v01 import Context, Parameters, Dataset, MLModel, ExecutionMetrics, step @step() def train(ctx: Context, params: Parameters, train_dataset: Dataset) -> t.Dict[str, t.Union[MLModel, ExecutionMetrics]]: ... ``` All function parameters are optional. - The `Context` argument can be used to provide additional information such as artifact directory. These are runtime parameters not affecting how output artifacts are produced based upon input artifacts. - The `Parameters` argument contains stage parametrization (hyper-parameters). These parameters are logged with CMF automatically. - All other parameters must be of types derived from the `Artifact` class. They are considered to be input artifacts, and will be automatically logged with CMF. - The function returns a data structure containing instances of `Artifact` class. They are considered to be output artifacts, and will automatically be logged. In this auto-logging framework, the CMF is configured in a hierarchical manner. The following is the respective hierarchy (if certain fields are not specified (e.g., they are nulls), they are considered to be not set, and default values defined by CMF are used): - Default configuration: `filename="mlmd", graph=False`. - Configuration from environment. The following environment variables can be used: `CMF_URI` (string), `CMF_PIPELINE` (string), `CMF_STAGE` (string), `CMF_GRAPH` (boolean, true when value is "1", "on" or "true") and `CMF_IS_SERVER` (boolean). - Configuration from user (the `cmf_config` variable defined in this module can be used to configure the CMF). - Configuration provided via `step` decorator. ```python import typing as t def step(pipeline_name: t.Optional[str] = None, pipeline_stage: t.Optional[str] = None) -> t.Callable ... ``` In addition, the auto-logging module provides the `cli_run` function that enables CLI interface to stage functions (including support to provide context, parameters and input artifacts). ## Example ML pipeline This example implements a four-stage ML pipeline that trains and tests a decision tree classifier from the scikit-learn library on tiny IRIS dataset. Steps to reproduce this example is pretty much the same as those for another CMF example ([Getting Started](https://hewlettpackard.github.io/cmf/examples/getting_started/)). Quick summary: - Clone the project and copy content of this directory to some other directory outside the Cmf root directory. - Initialize python environment, install CMF (`pip install -e .`) in editable (development mode). - Install this example dependencies (the only dependency is `scikit-learn`). - Initialize the CMF for this example. One quick approach would be to just run `python -m cmflib.contrib.init` (works for quick demo examples). - Run stages on this pipeline: ``` python pipeline/fetch.py python pipeline/preprocess.py dataset=workspace/iris.pkl python pipeline/train.py dataset=workspace/train.pkl python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl ``` --- cmflib/contrib/auto_logging_v01.py | 657 ++++++++++++++++++ cmflib/contrib/init.py | 74 ++ examples/auto_logging_v01/README.md | 28 + examples/auto_logging_v01/pipeline/fetch.py | 67 ++ .../auto_logging_v01/pipeline/preprocess.py | 82 +++ examples/auto_logging_v01/pipeline/test.py | 78 +++ examples/auto_logging_v01/pipeline/train.py | 93 +++ examples/auto_logging_v01/pyproject.toml | 5 + 8 files changed, 1084 insertions(+) create mode 100644 cmflib/contrib/auto_logging_v01.py create mode 100644 cmflib/contrib/init.py create mode 100644 examples/auto_logging_v01/README.md create mode 100644 examples/auto_logging_v01/pipeline/fetch.py create mode 100644 examples/auto_logging_v01/pipeline/preprocess.py create mode 100644 examples/auto_logging_v01/pipeline/test.py create mode 100644 examples/auto_logging_v01/pipeline/train.py create mode 100644 examples/auto_logging_v01/pyproject.toml diff --git a/cmflib/contrib/auto_logging_v01.py b/cmflib/contrib/auto_logging_v01.py new file mode 100644 index 00000000..dee1a03e --- /dev/null +++ b/cmflib/contrib/auto_logging_v01.py @@ -0,0 +1,657 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import argparse +import functools +import inspect +import json +import logging +import os +import sys +import time +import typing as t +from copy import deepcopy +from dataclasses import dataclass +from pathlib import Path + +import yaml + +from cmflib.cmf import Cmf + +__all__ = [ + "CMFError", + "Artifact", + "Dataset", + "MLModel", + "MLModelCard", + "ExecutionMetrics", + "Context", + "Parameters", + "cmf_config", + "step", + "prepare_workspace", + "cli_run", +] + +logger = logging.getLogger(__name__) + + +class LogMessage: + """Helper class to do structured logging. + + Args: + type_: Message type (aka event type). + kwargs: Additional keyword arguments to be associated with the event being logged. + """ + + def __init__(self, type_: str, **kwargs) -> None: + self.type = type_ + self.kwargs = kwargs # maybe copy? + + def __str__(self) -> str: + return json.dumps({"type": self.type, **self.kwargs}, cls=JSONEncoder) + + +M = LogMessage + + +class CMFError(Exception): + """Base exception for CMF.""" + + ... + + +class Artifact: + """Base class for all artifacts. + Args: + uri: Uniform resource identifier (URI) of the given artifact. Does not have to be a file path. Rather, + it's something that can be used to retrieve an artifact from one of backends. + params: Dictionary of parameters associated with this artifact. Should be serializable with ML metadata. + """ + + def __init__(self, uri: t.Union[str, Path], params: t.Optional[t.Dict] = None) -> None: + self.uri = uri if isinstance(uri, str) else uri.as_posix() + self.params = deepcopy(params or {}) + + def __str__(self) -> str: + return f"{self.__class__.__name__}(uri={self.uri}, params={self.params})" + + +class Dataset(Artifact): + """Artifact to represent datasets. + + Note: + Various dataset splits, when represented as different files or directories, should have different artifacts. + """ + + ... + + +class MLModel(Artifact): + """Artifact to represent machine learning models.""" + + ... + + +class MLModelCard(Artifact): + """Artifact to represent machine learning model cards containing evaluation reports""" + + ... + + +class ExecutionMetrics(Artifact): + """Artifact to represent execution metrics. + + Args: + uri: Uniform resource identifier (URI) of the given artifact. Does not have to be a file path. Rather, + it's something that can be used to retrieve an artifact from one of backends. + name: Name of a metric group, e.g., `train_metrics` or `test_metrics`. + params: Dictionary of metrics. Keys do not have to specify the dataset split these metrics were computed with + (it's specified with the `name` parameter). + """ + + def __init__(self, uri: t.Union[str, Path], name: str, params: t.Optional[t.Dict] = None) -> None: + super().__init__(uri, params) + self.name = name + + def __str__(self) -> str: + return f"{self.__class__.__name__}(uri={self.uri}, params={self.params}, name={self.name})" + + +class JSONEncoder(json.JSONEncoder): + """JSON encoder that can handle instances of the `Artifact` class.""" + + def default(self, o: t.Any) -> t.Any: + if isinstance(o, Artifact): + return str(o) + return super().default(o) + + +class Context(dict): + """Step context. + + This is not the context as it is defined in MLMD. Rather, it contains runtime parameters for steps, such as, for + instance, workspace directory or instance of the initialized Cmf. + + If a step function defines this parameter, then it will have the following fields: + - cmf: Instance of the `cmflib.Cmf`. + """ + + ... + + +class Parameters(dict): + """Step parameters. + + These parameters include, for instance, hyperparameters for training ML models (learning rate, batch size etc.) + """ + + @classmethod + def from_file(cls, stage_name: str, file_name: t.Union[str, Path] = "pipeline.yaml") -> "Parameters": + """Helper method to load parameters from a `yaml` file. + + Args: + stage_name: Name of a stage to load parameters for. It is assumed the yaml file is a dictionary where + top-level keys correspond to stage names. Each top-level key must point to a dictionary containing + parameters for the given stage. + file_name: Path to a `yaml` file containing pipeline parameters. + Returns: + Instance of this class containing parameters for the requested stage. + """ + with open(file_name, "rt") as f: + pipeline_stages_params = yaml.load(f, Loader=yaml.SafeLoader) + return Parameters(**pipeline_stages_params[stage_name]) + + +def _str_to_bool(val: t.Optional[str]) -> t.Optional[bool]: + """Convert string value into boolean. + + Args: + val: String representation of the boolean value. + + Returns: + True or False. + """ + if val is None: + return None + val = val.lower() + return val in ("1", "on", "true") + + +@dataclass +class CmfConfig: + """CMF configuration. + + CMF for autologging is configured in a hierarchical manner. The following is the respective hierarchy (if certain + fields are not specified (e.g., they are Nones), they are considered to be not set, and default values defined by + CMF are used): + - Default configuration: filename="mlmd", graph=False. + - Configuration from environment. + - Configuration from user (see `cmf_config` variable below). + - Configuration provided via `step` decorator. + + The CMf autologging framework uses the same values for stage and execution names (`pipeline_stage`). + """ + + filename: t.Optional[str] = None + pipeline_name: t.Optional[str] = None + pipeline_stage: t.Optional[str] = None + graph: t.Optional[bool] = None + is_server: t.Optional[bool] = None + + def update(self, other: "CmfConfig") -> "CmfConfig": + """Override this configuration with other configuration.""" + if other.filename is not None: + self.filename = other.filename + if other.pipeline_name is not None: + self.pipeline_name = other.pipeline_name + if other.pipeline_stage is not None: + self.pipeline_stage = other.pipeline_stage + if other.graph is not None: + self.graph = other.graph + return self + + @classmethod + def from_env(cls) -> "CmfConfig": + """Retrieve CMF configuration from environment.""" + return cls( + filename=os.environ.get("CMF_URI", None), + pipeline_name=os.environ.get("CMF_PIPELINE", None), + pipeline_stage=os.environ.get("CMF_STAGE", None), + graph=_str_to_bool(os.environ.get("CMF_GRAPH", None)), + is_server=_str_to_bool(os.environ.get("CMF_IS_SERVER", None)), + ) + + @classmethod + def from_params(cls, **kwargs) -> "CmfConfig": + """Build CMF configuration using provided parameters.""" + return cls(**kwargs) + + +cmf_config = CmfConfig() +"""Users can use this object to configure CMF programmatically""" + + +def step(pipeline_name: t.Optional[str] = None, pipeline_stage: t.Optional[str] = None) -> t.Callable: + """Function decorator that automatically logs input and output artifacts for Cmf steps. + + This function decorator adds automated Cmf-based logging of input and output artifacts to user functions. Users must + define their functions using specific rules. The most general API is the following: + + ```python + import typing as t + from cmflib.contrib.auto_logging_v01 import (step, Context, MLModel, Dataset, cli_run) + + @step() + def test(ctx: Context, params: Parameters, model: MLModel, test_dataset: Dataset) -> t.Optional[t.Dict]: + ... + + if __name__ == '__main__': + cli_run(test) + ``` + + The following rules must be followed: + - All function input parameters must be annotated. + - A function can accept one or zero parameters of type `Context`, one or zero parameters of type `Parameters`, + zero or more parameters of type `Artifact` or its derived types, e.g., Dataset, MLModel etc. + - A function can optionally return a dictionary-like object with string keys and values of type `Artifact` or + its derived types (Dataset, MLModel) etc. + - No other parameter types are allowed. + + Functions that satisfy the above rules can be annotated with the `step` decorator that adds automated logging of + input and output artifacts. + + Note: + Best practices (mandatory requirement?) require that all input artifacts must already be present in the metadata + store. The current implementation does not enforce this for now. It means that in order to use some raw dataset + on a pipeline, a special `ingest` node should be used that, for instance, take a dataset name as a parameter + and outputs an artifact. This output dataset artifact will be added to metadata store, and is thus becomes + eligible to be used by other pipeline steps as input artifact. + + This function performs the following steps: + - It creates an instance of Cmf. + - It creates a context for this step and then execution. If parameters are present, these parameters will be + associated with this execution. + - All input artifacts are logged (input artifacts are all input parameters of type `Artifact`) with CMF as input + artifacts. No parameters are associated with input artifacts. + - A step function is called. + - The return object is observed. If it's a dictionary containing values of type `Artifact`, these values are + logged as output artifacts. + + If function accepts a parameter of type `Context`, the decorator will add a Cmf instance under the `cmf` key. + + Args: + pipeline_name: Name of a pipeline. This name override name provided with environment variable `CMF_PIPELINE`. + pipeline_stage: Name of a pipeline stage (==context name in MLMD terms). If not specified, a function name is + used. + + TODO: (sergey) rename to stage (or just do `stage = step`)? + """ + + def step_decorator(func: t.Callable) -> t.Callable: + nonlocal pipeline_name, pipeline_stage + + @functools.wraps(func) + def _wrapper(*args, **kwargs) -> t.Any: + from cmflib.cmf import Cmf + + config = CmfConfig.from_params(filename="mlmd", graph=False) + config = ( + config.update(CmfConfig.from_env()) + .update(cmf_config) + .update(CmfConfig.from_params(pipeline_name=pipeline_name, pipeline_stage=pipeline_stage)) + ) + + if config.pipeline_name is None: + _module = getattr(func, "__module__", None) + if _module is not None and "__file__" in _module: + config.pipeline_name = Path(_module["__file__"]).stem + if config.pipeline_stage is None: + config.pipeline_stage = func.__name__ + if not config.pipeline_name: + raise CMFError( + "The pipeline name is not specified. You have two options to specify the name. Option 1: export " + "environmental variable `export CMF_PIPELINE=iris` in linux or `set CMF_PIPELINE=iris` in Windows. " + "Option 2: use step annotator's `pipeline_name` parameter, e.g., `@step(pipeline_name=iris)`" + ) + + # Get context, parameters and input artifacts + ctx, params, inputs = _validate_task_arguments(args, kwargs) + + # Create a pipeline, create a context and an execution + cmf = Cmf( + filename=config.filename, + pipeline_name=config.pipeline_name, + graph=config.graph, + ) + _ = cmf.create_context(pipeline_stage=config.pipeline_stage) + _ = cmf.create_execution(execution_type=config.pipeline_stage, custom_properties=params) + _log_artifacts(cmf, "input", inputs) + + # Run the step + if ctx is not None: + ctx["cmf"] = cmf + + logger.debug( + M( + "execution", + pipeline=config.pipeline_name, + stage=config.pipeline_stage, + execution_id=cmf.execution.id, + ) + ) + logger.debug(M("execution.impl", execution_id=cmf.execution.id, impl=func.__name__)) + logger.debug( + M( + "execution.inputs", + execution_id=cmf.execution.id, + ctx_keys=list((ctx or {}).keys()), + params=(params or {}), + inputs=inputs, + ) + ) + start_time = time.time() + outputs: t.Optional[t.Dict[str, Artifact]] = func(*args, **kwargs) + end_time = time.time() + + logger.debug( + M( + "execution.runtime", + execution_id=cmf.execution.id, + time_seconds=end_time - start_time, + ) + ) + logger.debug( + M( + "execution.outputs", + execution_id=cmf.execution.id, + outputs=outputs, + metrics=list(cmf.metrics.keys()), + ) + ) + + if ctx is not None: + del ctx["cmf"] + + # Log the output artifacts + if outputs is not None: + _log_artifacts(cmf, "output", outputs) + + # Commit all metrics + for metrics_name in cmf.metrics.keys(): + cmf.commit_metrics(metrics_name) + + # All done + cmf.finalize() + + return outputs + + return _wrapper + + return step_decorator + + +def cli_run(step_fn: t.Callable) -> None: + """Parse command line and run the CMF stage. + + The following syntax for command line arguments is supported. + - Context parameters: `--ctx name1=value1,name2=value2`. + - Execution parameters: `--params lr=0.07,batch_size=128`. + - Other key-value parameters are input artifacts (do not use `--`): `dataset=workspace/iris.pkl`. Names must + match function parameters. + + Environment variables: + Users can specify the following environment variables + - CMF_PIPELINE: Name of a pipeline. + - CMF_URI: MLMD file name, default is `mlmd` + - CMG_GRAPH: If set, Neo4J will be used, default is not to use. To enable, set it to `true`. + + Args: + step_fn: Python function implementing a pipeline stage. + + """ + # Parse command line arguments + import argparse + + parser = argparse.ArgumentParser(description="CMF step arguments") + parser.add_argument( + "--params", + required=False, + default="", + help="Execution parameters for a pipeline step, e.g., --params=train_size=0.7,add_noise=true. Users must define" + "their functions accepting exactly one positional or keyed argument of type `Parameters` if they want to " + "be able to accept execution parameters.", + ) + parser.add_argument( + "--ctx", + required=False, + default="", + help="Runtime parameters for the step. These parameters define execution environment, e.g., workspace location." + "Also, users can access the instance of `Cmf` via this context under the `cmf` key. Users must define" + "their functions accepting exactly one positional or keyed argument of type `Context` if they want to " + "be able to accept these runtime parameters.", + ) + parsed, artifacts = parser.parse_known_args(sys.argv[1:]) + + # Convert command line arguments into dictionaries + def _parse_key_value_list(_kv_list: t.Union[str, t.List[str]], _dict_cls) -> t.Union[t.Dict, Context, Parameters]: + """Convert a string like 'a=3,b=5' into a dictionary.""" + _dict = _dict_cls() + if not _kv_list: + return _dict + if isinstance(_kv_list, str): + _kv_list = _kv_list.split(",") + for _item in _kv_list: + _k, _v = _item.split("=") + _dict[_k.strip()] = _v.strip() + return _dict + + params = _parse_key_value_list(parsed.params, Parameters) + ctx = _parse_key_value_list(parsed.ctx, Context) + inputs = _parse_key_value_list(artifacts, dict) + + # Call the step function. + _call_step_with_parameter_check(step_fn, ctx, params, inputs) + + +def prepare_workspace(ctx: Context, namespace: t.Optional[str] = None) -> Path: + """Ensure the workspace directory exists. + + Workspace is a place where we store various files. + + Args: + ctx: Context for this step. If it does not contain `workspace` parameter, current working directory is used. + namespace: Relative path within workspace (relative directory path) that a step is requesting to create. + + Returns: + Path to the workspace directory. + """ + workspace = Path(ctx.get("workspace", Path.cwd() / "workspace")) + if namespace: + workspace = workspace / namespace + workspace.mkdir(parents=True, exist_ok=True) + return workspace + + +def _call_step_with_parameter_check(fn: t.Callable, ctx: Context, params: Parameters, inputs: t.Dict) -> None: + """The goal is to make sure the fn's API accept provided `context`, `params` and `inputs`. + + Args: + fn: User function implementing the pipeline step. + ctx: Context to be passed to this function. + params: Parameters to be passed to this function. + inputs: Input artifacts to be passed to this function. + """ + # We will be modifying inputs, so need to make a copy. + _unrecognized_artifacts = set(inputs.keys()) + + # Get function parameters (named and their annotations) + signature: inspect.Signature = inspect.signature(fn) + fn_specs = argparse.Namespace(kwargs={}, needs_ctx=False, needs_params=False) + for param in signature.parameters.values(): + if issubclass(param.annotation, Context): + fn_specs.kwargs[param.name] = ctx or Context() + fn_specs.needs_ctx = True + elif issubclass(param.annotation, Parameters): + fn_specs.kwargs[param.name] = params or Parameters() + fn_specs.needs_params = True + elif issubclass(param.annotation, Artifact): + if param.name not in inputs: + raise CMFError( + f"Missing input artifact (name={param.name}) for function {fn}. You have provided the following " + f"inputs: {inputs.keys()}." + ) + if isinstance(inputs[param.name], Artifact): + fn_specs.kwargs[param.name] = inputs[param.name] + elif isinstance(inputs[param.name], str): + fn_specs.kwargs[param.name] = _uri_to_artifact(inputs[param.name], param.annotation) + else: + raise CMFError( + f"Unrecognized artifact value: name={param.name}, value={inputs[param.name]}. Supported values are " + "instances of `Artifact` class or strings. In the latter case, string values are assumed to " + "be URIs." + ) + + _unrecognized_artifacts.remove(param.name) + else: + raise CMFError( + f"Unrecognized function parameter: fn={fn}, param={param.name}, annotation={param.annotation}. A valid " + "CMF step function must annotate all its parameters, and only `Context`, `Parameter` and parents of " + "`Artifact` are allowed." + ) + + # Check we have used all provided inputs + if _unrecognized_artifacts: + raise CMFError( + f"The following input artifacts have not been accepted by the step " + f"function ({fn}): ({_unrecognized_artifacts})." + ) + # Check that this function does accept context and parameters if they are present + if ctx and not fn_specs.needs_ctx: + raise CMFError(f"Context is provided (keys={ctx.keys()}) but function ({fn}) does not accept context.") + if params and not fn_specs.needs_params: + raise CMFError(f"Params are provided (keys={params.keys()}) but function ({fn}) does not accept params.") + + # All done - call the function + fn(**fn_specs.kwargs) + + +def _uri_to_artifact(uri: str, annotation: t.Any) -> Artifact: + """Naive implementation to return instance of an artifact based upon function's parameter annotation. + + Args: + uri: Artifact URI, e.g., file path. + annotation: Function annotation for this artifact. All valid Cmf steps within this automated logging framework + must have their parameters annotated. + + Returns: + Artifact instance associated with this parameter. + """ + if issubclass(annotation, Dataset): + return Dataset(uri) + elif issubclass(annotation, MLModel): + return MLModel(uri) + raise CMFError(f"Cannot convert URI to an Artifact instance: uri={uri}, annotation={annotation}.") + + +def _validate_task_arguments( + args: t.Tuple, kwargs: t.Dict +) -> t.Tuple[t.Optional[Context], t.Optional[Parameters], t.Dict]: + """Check parameters to be passed to a Cmf's step function. + Args: + args: Positional parameters to be passed. Can only be of the following two types: `Context`, `Parameter`. + kwargs: Keyed parameters to be passed. Can be of the following type: `Context`, `Parameter` and `Artifact`. + + Returns: + A tuple with three elements containing `Context` parameter, `Parameters` parameter and dictionary with + artifacts. Some values can be None if not present in args and kwargs. + """ + context: t.Optional[Context] = None # Task execution context. + params: t.Optional[Parameters] = None # Task parameters. + inputs: t.Dict = {} # Task input artifacts. + + def _set_exec_context(_value: t.Any) -> None: + nonlocal context + if context is not None: + raise CMFError("Multiple execution contexts are not allowed.") + context = _value + + def _set_parameters(_value: t.Any) -> None: + nonlocal params + if params is not None: + raise CMFError("Multiple parameter dictionaries are not allowed.") + params = _value + + # Parse task positional parameters. + for value in args: + if isinstance(value, Context): + _set_exec_context(value) + elif isinstance(value, Parameters): + _set_parameters(value) + else: + raise CMFError( + f"Invalid positional parameter (value={value}, type={type(value)}). Only Context and " + "Parameters could be positional parameters. All other parameters must be keyword parameters." + ) + + # Parse task keyword parameters. + for key, value in kwargs.items(): + if isinstance(value, Context): + _set_exec_context(value) + elif isinstance(value, Parameters): + _set_parameters(value) + elif isinstance(value, Artifact): + inputs[key] = value + else: + raise CMFError( + f"Invalid keyword parameter type (name={key}, value={value}, type={type(value)}). " + "Expecting one of: `Context`, `Parameters` or `Artifact`." + ) + + return context, params, inputs + + +def _log_artifacts( + cmf: Cmf, + event: str, + artifacts: t.Union[ + Artifact, + t.List[Artifact], + t.Tuple[Artifact], + t.Set[Artifact], + t.Dict[str, t.Union[Artifact, t.List[Artifact]]], + ], +) -> None: + """Log artifacts with Cmf. + Args: + cmf: Instance of initialized Cmf. + event: One of `input` or `output` (whether these artifacts input or output artifacts). + artifacts: Dictionary that maps artifacts names to artifacts. Names are not used, only artifacts. + """ + if isinstance(artifacts, dict): + for artifact in artifacts.values(): + _log_artifacts(cmf, event, artifact) + elif isinstance(artifacts, (list, tuple, set)): + for artifact in artifacts: + _log_artifacts(cmf, event, artifact) + elif isinstance(artifacts, Dataset): + cmf.log_dataset(url=artifacts.uri, event=event, custom_properties=artifacts.params) + elif isinstance(artifacts, MLModel): + cmf.log_model(path=artifacts.uri, event=event, **artifacts.params) + elif isinstance(artifacts, ExecutionMetrics): + cmf.log_execution_metrics(artifacts.name, artifacts.params) + else: + raise CMFError(f"Can't log unrecognized artifact: type={type(artifacts)}, artifacts={str(artifacts)}") diff --git a/cmflib/contrib/init.py b/cmflib/contrib/init.py new file mode 100644 index 00000000..e679d821 --- /dev/null +++ b/cmflib/contrib/init.py @@ -0,0 +1,74 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import os +import uuid + +import click + + +@click.command() +@click.argument("project_path", required=False, default=os.getcwd(), type=str) +@click.option("--user_name", required=False, envvar="GIT_USER_NAME", default="First Second", type=str) +@click.option("--user_email", required=False, envvar="GIT_USER_EMAIL", default="first.second@corp.org", type=str) +@click.option( + "--git_remote", + required=False, + envvar="GIT_REMOTE_URL", + type=str, + default="git@github.com:first-second/experiment-repo.git", +) +@click.option( + "--dvc_remote", + required=False, + envvar="DVC_REMOTE_URL", + type=str, + default=f'/tmp/cmf/dvc_remotes/{str(uuid.uuid4()).replace("-", "")}', +) +def init_cmf_project(project_path: str, user_name: str, user_email: str, git_remote: str, dvc_remote: str): + """Helper python script to init a new CMF project. + + Pre-requisites: `git` and `dvc` must be installed in a system. + + Args: + project_path: Path to the new project. It must exist and probably must be empty. + user_name: Username to init git repository. + user_email: User email to init git repository. + git_remote: Git remote to set on a new git repository. + dvc_remote: DVC remote to set on a new git repository (dvc will be initialized too). + """ + os.chdir(project_path) + + print("[1/4] [GIT/DVC INIT ] executing git init and dvc init.") + os.system("git init -q") + os.system("dvc init -q") + os.system(f'git config --global user.name "{user_name}"') + os.system(f'git config --global user.email "{user_email}"') + + print("[2/4] [INITIAL COMMIT] performing initial blank commit into main.") + os.system("git checkout -b master") + os.system('git commit --allow-empty -n -m "Initial code commit"') + + print(f"[3/4] [GIT REMOTE ] setting git remote to {git_remote}") + os.system(f'git remote add origin "${git_remote}"') + + print(f"[4/4] [DVC REMOTE ] setting dvc remote to ${dvc_remote}") + os.system(f'dvc remote add myremote -f "${dvc_remote}"') + os.system("dvc remote default myremote") + + +if __name__ == "__main__": + init_cmf_project() diff --git a/examples/auto_logging_v01/README.md b/examples/auto_logging_v01/README.md new file mode 100644 index 00000000..d4394fc2 --- /dev/null +++ b/examples/auto_logging_v01/README.md @@ -0,0 +1,28 @@ +# Automated artifact logging with CMF + +This example implements a four-stage ML pipeline that trains and tests a decision tree classifier from the scikit-learn +library on tiny IRIS dataset. + +This example demonstrated one possible approach to do automated logging with Cmf. Automated logging implies developers +do not need (in most cases) directly operate with the Cmf class instance. Instead, the framework automatically logs +execution parameters and input and output artifacts. + +Currently, the implementation is in the `cmflib.contrib.auto_logging_v01` module. + +Steps to reproduce this example is pretty much the same as those for another CMF example +([Getting Started](https://hewlettpackard.github.io/cmf/examples/getting_started/)). Quick summary: + +- Clone the project and copy content of this directory to some other directory outside the Cmf root directory. +- Initialize python environment, install Cmf (`pip install -e .`) in editable (development mode). +- Install this example dependencies (the only dependency is `scikit-learn`). +- Initialize the Cmf for this example. One quick approach would be to just run `python -m cmflib.contrib.init` (works + for quick demo examples). +- Run stages on this pipeline: + ``` + python pipeline/fetch.py + python pipeline/preprocess.py dataset=workspace/iris.pkl + python pipeline/train.py dataset=workspace/train.pkl + python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl + ``` + +> The code of this example is documented. The documentation of the implementation is in progress. diff --git a/examples/auto_logging_v01/pipeline/fetch.py b/examples/auto_logging_v01/pipeline/fetch.py new file mode 100644 index 00000000..4addf4d5 --- /dev/null +++ b/examples/auto_logging_v01/pipeline/fetch.py @@ -0,0 +1,67 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import pickle +import typing as t + +from sklearn.datasets import load_iris +from sklearn.utils import Bunch + +from cmflib.contrib.auto_logging_v01 import Context, Dataset, cli_run, prepare_workspace, step + + +@step() +def fetch(ctx: Context) -> t.Dict[str, Dataset]: + """Fetch IRIS dataset and serialize its data and target arrays in a pickle file. + + This example demonstrates automated logging of output datasets. In your python code: + ```python + from cmflib.contrib.auto_logging_v01 import cli_run + + if __name__ == '__main__': + # python pipeline/fetch.py + cli_run(fetch) + ``` + + Args: + ctx: Context for this step. + May contain: + - `workspace`: directory path to store artifacts. + Will contain: + - `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage). + No need to log output artifacts as well - just return them. + + Returns: + Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts + to Cmf, only artifacts themselves. + """ + workspace = prepare_workspace(ctx) + dataset_uri = workspace / "iris.pkl" + with open(dataset_uri, "wb") as stream: + iris: Bunch = load_iris() + pickle.dump({"data": iris["data"], "target": iris["target"]}, stream) + + return {"dataset": Dataset(dataset_uri)} + + +if __name__ == "__main__": + """ + A file will be saved to workspace/iris.pkl + ```shell + python pipeline/fetch.py + ``` + """ + cli_run(fetch) diff --git a/examples/auto_logging_v01/pipeline/preprocess.py b/examples/auto_logging_v01/pipeline/preprocess.py new file mode 100644 index 00000000..bbb85646 --- /dev/null +++ b/examples/auto_logging_v01/pipeline/preprocess.py @@ -0,0 +1,82 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import pickle +import typing as t +from pathlib import Path + +from sklearn.model_selection import train_test_split + +from cmflib.contrib.auto_logging_v01 import Context, Dataset, Parameters, cli_run, prepare_workspace, step + + +@step() +def preprocess(ctx: Context, params: Parameters, dataset: Dataset) -> t.Dict[str, Dataset]: + """Preprocess the IRIS dataset by splitting it into train and test datasets. + + This example demonstrates automated logging of input and output datasets. In your python code: + ```python + from cmflib.contrib.auto_logging_v01 import cli_run + + if __name__ == '__main__': + # python pipeline/preprocess.py dataset=workspace/iris.pkl + cli_run(preprocess) + ``` + + Args: + ctx: Context for this step. + May contain: + - `workspace`: directory path to store artifacts. + Will contain: + - `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage). + No need to log output artifacts as well - just return them. + params: Execution parameters. Can contain: `train_size` (train size split ratio, default is 0.7) and `shuffle` + (whether raw data should be shuffled before splitting, default is true). + dataset: Input dataset (e.g., produced by the `fetch` step). + + Returns: + Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts + to Cmf, only artifacts themselves. The return dictionary will contain two items - `train_dataset` and + `test_dataset`. + """ + with open(dataset.uri, "rb") as stream: + dataset: t.Dict = pickle.load(stream) + + x_train, x_test, y_train, y_test = train_test_split( + dataset["data"], + dataset["target"], + train_size=float(params.get("train_size", 0.7)), + shuffle=params.get("shuffle", "true").lower() == "true", + ) + + workspace: Path = prepare_workspace(ctx) + + with open(workspace / "train.pkl", "wb") as stream: + pickle.dump({"x": x_train, "y": y_train}, stream) + with open(workspace / "test.pkl", "wb") as stream: + pickle.dump({"x": x_test, "y": y_test}, stream) + + return {"train_dataset": Dataset(workspace / "train.pkl"), "test_dataset": Dataset(workspace / "test.pkl")} + + +if __name__ == "__main__": + """ + Files will be saved to workspace/train_dataset.pkl and workspace/test_dataset.pkl + ```shell + python pipeline/preprocess.py dataset=workspace/iris.pkl + ``` + """ + cli_run(preprocess) diff --git a/examples/auto_logging_v01/pipeline/test.py b/examples/auto_logging_v01/pipeline/test.py new file mode 100644 index 00000000..bd462a5e --- /dev/null +++ b/examples/auto_logging_v01/pipeline/test.py @@ -0,0 +1,78 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import pickle +import typing as t + +from sklearn.metrics import accuracy_score +from sklearn.tree import DecisionTreeClassifier + +from cmflib.cmf import Cmf +from cmflib.contrib.auto_logging_v01 import Context, Dataset, ExecutionMetrics, MLModel, cli_run, step + + +@step() +def test(ctx: Context, test_dataset: Dataset, model: MLModel) -> t.Dict[str, ExecutionMetrics]: + """Test a decision tree classifier on a test dataset. + + This example demonstrates the automated logging of output execution metrics. In your python code: + ```python + from cmflib.contrib.auto_logging_v01 import cli_run + + if __name__ == '__main__': + # python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl + cli_run(test) + ``` + + Args: + ctx: Context for this step. + May contain: + - `workspace`: directory path to store artifacts. + Will contain: + - `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage). + No need to log output artifacts as well - just return them. + test_dataset: Input test dataset (e.g., produced by the `preprocess` step). + model: Input ML model (e.g., produced by the `train` step). + + Returns: + Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts + to Cmf, only artifacts themselves. The return dictionary will contain one item - `exec_metrics`. + """ + + with open(test_dataset.uri, "rb") as stream: + dataset: t.Dict = pickle.load(stream) + + with open(model.uri, "rb") as stream: + clf: DecisionTreeClassifier = pickle.load(stream) + + test_accuracy = accuracy_score(y_true=dataset["y"], y_pred=clf.predict(dataset["x"])) + + # TODO: Fix URI for execution metrics. What should it be? + cmf: Cmf = ctx["cmf"] + return { + "exec_metrics": ExecutionMetrics( + uri=str(cmf.execution.id) + "/metrics/test", name="test", params={"accuracy": test_accuracy} + ) + } + + +if __name__ == "__main__": + """ + ```shell + python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl + ``` + """ + cli_run(test) diff --git a/examples/auto_logging_v01/pipeline/train.py b/examples/auto_logging_v01/pipeline/train.py new file mode 100644 index 00000000..5dbb5a17 --- /dev/null +++ b/examples/auto_logging_v01/pipeline/train.py @@ -0,0 +1,93 @@ +### +# Copyright (2022) Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +import pickle +import typing as t +from pathlib import Path + +from sklearn.metrics import accuracy_score +from sklearn.tree import DecisionTreeClassifier + +from cmflib.cmf import Cmf +from cmflib.contrib.auto_logging_v01 import ( + Context, + Dataset, + ExecutionMetrics, + MLModel, + Parameters, + cli_run, + prepare_workspace, + step, +) + + +@step() +def train(ctx: Context, params: Parameters, train_dataset: Dataset) -> t.Dict[str, t.Union[MLModel, ExecutionMetrics]]: + """Train a decision tree classifier. + + This example demonstrates automated logging of output models and execution metrics. In your python code: + ```python + from cmflib.contrib.auto_logging_v01 import cli_run + + if __name__ == '__main__': + # python pipeline/train.py dataset=workspace/train.pkl + cli_run(train) + ``` + + Args: + ctx: Context for this step. + May contain: + - `workspace`: directory path to store artifacts. + Will contain: + - `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage). + No need to log output artifacts as well - just return them. + params: Execution parameters. Not used now in this implementation. + train_dataset: Input train dataset (e.g., produced by the `preprocess` step). + + Returns: + Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts + to Cmf, only artifacts themselves. The return dictionary will contain two items - `model` and + `exec_metrics`. + """ + with open(train_dataset.uri, "rb") as stream: + dataset: t.Dict = pickle.load(stream) + + clf = DecisionTreeClassifier() + clf = clf.fit(dataset["x"], dataset["y"]) + train_accuracy = accuracy_score(y_true=dataset["y"], y_pred=clf.predict(dataset["x"])) + + workspace: Path = prepare_workspace(ctx) + with open(workspace / "model.pkl", "wb") as stream: + pickle.dump(clf, stream) + + # TODO: Fix URI for execution metrics. What should it be? + cmf: Cmf = ctx["cmf"] + return { + "model": MLModel(workspace / "model.pkl"), + "exec_metrics": ExecutionMetrics( + uri=str(cmf.execution.id) + "/metrics/train", name="train", params={"accuracy": train_accuracy} + ), + } + + +if __name__ == "__main__": + """ + A model will be saved to workspace/model.pkl + ```shell + python pipeline/train.py dataset=workspace/train.pkl + ``` + """ + cli_run(train) diff --git a/examples/auto_logging_v01/pyproject.toml b/examples/auto_logging_v01/pyproject.toml new file mode 100644 index 00000000..8a3e94ff --- /dev/null +++ b/examples/auto_logging_v01/pyproject.toml @@ -0,0 +1,5 @@ +[project] +dependencies = [ + 'cmflib', + 'scikit-learn' +]