Skip to content

Commit

Permalink
Automated artifact logging with CMF (#35)
Browse files Browse the repository at this point in the history
## Introduction
This PR introduces an opinionated implementation of automated machine learning (ML) artifact logging, and an example 4-stage ML pipeline that uses this feature to automatically log input and output artifacts of each stage.

The implementation demonstrates one possible approach to do automated logging with CMF. Automated logging implies that developers do not need (in most cases) directly operate with the CMF class instance. Instead, the framework automatically logs execution parameters, and input and output artifacts. Currently, the implementation is in the `cmflib.contrib.auto_logging_v01` module.


## Implementation details
Users organize their ML pipelines as graphs where each node is a single stage performing particular data transformation or model training. Each stage is a python function with specific interface. Users than wrap their stages with a python decorator that enables automated logging. 

This is an example stage:
```python
import typing as t
from cmflib.contrib.auto_logging_v01 import Context, Parameters, Dataset, MLModel, ExecutionMetrics, step

@step()
def train(ctx: Context, params: Parameters, train_dataset: Dataset) -> t.Dict[str, t.Union[MLModel, ExecutionMetrics]]:
    ...
```
All function parameters are optional. 
- The `Context` argument can be used to provide additional information such as artifact directory. These are runtime parameters not affecting how output artifacts are produced based upon input artifacts.
- The `Parameters` argument contains stage parametrization (hyper-parameters). These parameters  are logged with CMF automatically.
- All other parameters must be of types derived from the `Artifact` class. They are considered to be input artifacts, and will be automatically logged with CMF.
- The function returns a data structure containing instances of `Artifact` class. They are considered to be output artifacts, and will automatically be logged. 

In this auto-logging framework, the CMF is configured in a hierarchical manner. The following is the respective hierarchy (if certain fields are not specified (e.g., they are nulls), they are considered to be not set, and default values defined by CMF are used):
- Default configuration: `filename="mlmd", graph=False`.
- Configuration from environment. The following environment variables can be used: `CMF_URI` (string), `CMF_PIPELINE` (string), `CMF_STAGE` (string), `CMF_GRAPH` (boolean, true when value is "1", "on" or "true") and `CMF_IS_SERVER` (boolean).
- Configuration from user (the `cmf_config` variable defined in this module can be used to configure the CMF).
- Configuration provided via `step` decorator.
   ```python
   import typing as t

   def step(pipeline_name: t.Optional[str] = None, pipeline_stage: t.Optional[str] = None) -> t.Callable
       ...
   ```

In addition, the auto-logging module provides the `cli_run` function that enables CLI interface to stage functions (including support to provide context, parameters and input artifacts).

## Example ML pipeline
This example implements a four-stage ML pipeline that trains and tests a decision tree classifier from the scikit-learn library on tiny IRIS dataset. Steps to reproduce this example is pretty much the same as those for another CMF example ([Getting Started](https://hewlettpackard.github.io/cmf/examples/getting_started/)). Quick summary:

- Clone the project and copy content of this directory to some other directory outside the Cmf root directory.
- Initialize python environment, install CMF (`pip install -e .`) in editable (development mode).
- Install this example dependencies (the only dependency is `scikit-learn`).
- Initialize the CMF for this example. One quick approach would be to just run `python -m cmflib.contrib.init` (works for quick demo examples).
- Run stages on this pipeline: 
   ``` 
   python pipeline/fetch.py 
   python pipeline/preprocess.py dataset=workspace/iris.pkl 
   python pipeline/train.py dataset=workspace/train.pkl 
   python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl 
   ```
  • Loading branch information
sergey-serebryakov authored Oct 24, 2023
1 parent 246140f commit 4876fa9
Show file tree
Hide file tree
Showing 8 changed files with 1,084 additions and 0 deletions.
657 changes: 657 additions & 0 deletions cmflib/contrib/auto_logging_v01.py

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions cmflib/contrib/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
###
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

import os
import uuid

import click


@click.command()
@click.argument("project_path", required=False, default=os.getcwd(), type=str)
@click.option("--user_name", required=False, envvar="GIT_USER_NAME", default="First Second", type=str)
@click.option("--user_email", required=False, envvar="GIT_USER_EMAIL", default="first.second@corp.org", type=str)
@click.option(
"--git_remote",
required=False,
envvar="GIT_REMOTE_URL",
type=str,
default="git@github.com:first-second/experiment-repo.git",
)
@click.option(
"--dvc_remote",
required=False,
envvar="DVC_REMOTE_URL",
type=str,
default=f'/tmp/cmf/dvc_remotes/{str(uuid.uuid4()).replace("-", "")}',
)
def init_cmf_project(project_path: str, user_name: str, user_email: str, git_remote: str, dvc_remote: str):
"""Helper python script to init a new CMF project.
Pre-requisites: `git` and `dvc` must be installed in a system.
Args:
project_path: Path to the new project. It must exist and probably must be empty.
user_name: Username to init git repository.
user_email: User email to init git repository.
git_remote: Git remote to set on a new git repository.
dvc_remote: DVC remote to set on a new git repository (dvc will be initialized too).
"""
os.chdir(project_path)

print("[1/4] [GIT/DVC INIT ] executing git init and dvc init.")
os.system("git init -q")
os.system("dvc init -q")
os.system(f'git config --global user.name "{user_name}"')
os.system(f'git config --global user.email "{user_email}"')

print("[2/4] [INITIAL COMMIT] performing initial blank commit into main.")
os.system("git checkout -b master")
os.system('git commit --allow-empty -n -m "Initial code commit"')

print(f"[3/4] [GIT REMOTE ] setting git remote to {git_remote}")
os.system(f'git remote add origin "${git_remote}"')

print(f"[4/4] [DVC REMOTE ] setting dvc remote to ${dvc_remote}")
os.system(f'dvc remote add myremote -f "${dvc_remote}"')
os.system("dvc remote default myremote")


if __name__ == "__main__":
init_cmf_project()
28 changes: 28 additions & 0 deletions examples/auto_logging_v01/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Automated artifact logging with CMF

This example implements a four-stage ML pipeline that trains and tests a decision tree classifier from the scikit-learn
library on tiny IRIS dataset.

This example demonstrated one possible approach to do automated logging with Cmf. Automated logging implies developers
do not need (in most cases) directly operate with the Cmf class instance. Instead, the framework automatically logs
execution parameters and input and output artifacts.

Currently, the implementation is in the `cmflib.contrib.auto_logging_v01` module.

Steps to reproduce this example is pretty much the same as those for another CMF example
([Getting Started](https://hewlettpackard.github.io/cmf/examples/getting_started/)). Quick summary:

- Clone the project and copy content of this directory to some other directory outside the Cmf root directory.
- Initialize python environment, install Cmf (`pip install -e .`) in editable (development mode).
- Install this example dependencies (the only dependency is `scikit-learn`).
- Initialize the Cmf for this example. One quick approach would be to just run `python -m cmflib.contrib.init` (works
for quick demo examples).
- Run stages on this pipeline:
```
python pipeline/fetch.py
python pipeline/preprocess.py dataset=workspace/iris.pkl
python pipeline/train.py dataset=workspace/train.pkl
python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl
```
> The code of this example is documented. The documentation of the implementation is in progress.
67 changes: 67 additions & 0 deletions examples/auto_logging_v01/pipeline/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
###
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

import pickle
import typing as t

from sklearn.datasets import load_iris
from sklearn.utils import Bunch

from cmflib.contrib.auto_logging_v01 import Context, Dataset, cli_run, prepare_workspace, step


@step()
def fetch(ctx: Context) -> t.Dict[str, Dataset]:
"""Fetch IRIS dataset and serialize its data and target arrays in a pickle file.
This example demonstrates automated logging of output datasets. In your python code:
```python
from cmflib.contrib.auto_logging_v01 import cli_run
if __name__ == '__main__':
# python pipeline/fetch.py
cli_run(fetch)
```
Args:
ctx: Context for this step.
May contain:
- `workspace`: directory path to store artifacts.
Will contain:
- `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage).
No need to log output artifacts as well - just return them.
Returns:
Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts
to Cmf, only artifacts themselves.
"""
workspace = prepare_workspace(ctx)
dataset_uri = workspace / "iris.pkl"
with open(dataset_uri, "wb") as stream:
iris: Bunch = load_iris()
pickle.dump({"data": iris["data"], "target": iris["target"]}, stream)

return {"dataset": Dataset(dataset_uri)}


if __name__ == "__main__":
"""
A file will be saved to workspace/iris.pkl
```shell
python pipeline/fetch.py
```
"""
cli_run(fetch)
82 changes: 82 additions & 0 deletions examples/auto_logging_v01/pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
###
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

import pickle
import typing as t
from pathlib import Path

from sklearn.model_selection import train_test_split

from cmflib.contrib.auto_logging_v01 import Context, Dataset, Parameters, cli_run, prepare_workspace, step


@step()
def preprocess(ctx: Context, params: Parameters, dataset: Dataset) -> t.Dict[str, Dataset]:
"""Preprocess the IRIS dataset by splitting it into train and test datasets.
This example demonstrates automated logging of input and output datasets. In your python code:
```python
from cmflib.contrib.auto_logging_v01 import cli_run
if __name__ == '__main__':
# python pipeline/preprocess.py dataset=workspace/iris.pkl
cli_run(preprocess)
```
Args:
ctx: Context for this step.
May contain:
- `workspace`: directory path to store artifacts.
Will contain:
- `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage).
No need to log output artifacts as well - just return them.
params: Execution parameters. Can contain: `train_size` (train size split ratio, default is 0.7) and `shuffle`
(whether raw data should be shuffled before splitting, default is true).
dataset: Input dataset (e.g., produced by the `fetch` step).
Returns:
Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts
to Cmf, only artifacts themselves. The return dictionary will contain two items - `train_dataset` and
`test_dataset`.
"""
with open(dataset.uri, "rb") as stream:
dataset: t.Dict = pickle.load(stream)

x_train, x_test, y_train, y_test = train_test_split(
dataset["data"],
dataset["target"],
train_size=float(params.get("train_size", 0.7)),
shuffle=params.get("shuffle", "true").lower() == "true",
)

workspace: Path = prepare_workspace(ctx)

with open(workspace / "train.pkl", "wb") as stream:
pickle.dump({"x": x_train, "y": y_train}, stream)
with open(workspace / "test.pkl", "wb") as stream:
pickle.dump({"x": x_test, "y": y_test}, stream)

return {"train_dataset": Dataset(workspace / "train.pkl"), "test_dataset": Dataset(workspace / "test.pkl")}


if __name__ == "__main__":
"""
Files will be saved to workspace/train_dataset.pkl and workspace/test_dataset.pkl
```shell
python pipeline/preprocess.py dataset=workspace/iris.pkl
```
"""
cli_run(preprocess)
78 changes: 78 additions & 0 deletions examples/auto_logging_v01/pipeline/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
###
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

import pickle
import typing as t

from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier

from cmflib.cmf import Cmf
from cmflib.contrib.auto_logging_v01 import Context, Dataset, ExecutionMetrics, MLModel, cli_run, step


@step()
def test(ctx: Context, test_dataset: Dataset, model: MLModel) -> t.Dict[str, ExecutionMetrics]:
"""Test a decision tree classifier on a test dataset.
This example demonstrates the automated logging of output execution metrics. In your python code:
```python
from cmflib.contrib.auto_logging_v01 import cli_run
if __name__ == '__main__':
# python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl
cli_run(test)
```
Args:
ctx: Context for this step.
May contain:
- `workspace`: directory path to store artifacts.
Will contain:
- `cmf`: an initialized instance of the Cmf class (no need to call create_context and create_stage).
No need to log output artifacts as well - just return them.
test_dataset: Input test dataset (e.g., produced by the `preprocess` step).
model: Input ML model (e.g., produced by the `train` step).
Returns:
Dictionary that maps an artifact name to artifact description. Names are not used to automatically log artifacts
to Cmf, only artifacts themselves. The return dictionary will contain one item - `exec_metrics`.
"""

with open(test_dataset.uri, "rb") as stream:
dataset: t.Dict = pickle.load(stream)

with open(model.uri, "rb") as stream:
clf: DecisionTreeClassifier = pickle.load(stream)

test_accuracy = accuracy_score(y_true=dataset["y"], y_pred=clf.predict(dataset["x"]))

# TODO: Fix URI for execution metrics. What should it be?
cmf: Cmf = ctx["cmf"]
return {
"exec_metrics": ExecutionMetrics(
uri=str(cmf.execution.id) + "/metrics/test", name="test", params={"accuracy": test_accuracy}
)
}


if __name__ == "__main__":
"""
```shell
python pipeline/test.py test_dataset=workspace/test.pkl model=workspace/model.pkl
```
"""
cli_run(test)
Loading

0 comments on commit 4876fa9

Please sign in to comment.