From bc71277b74aea2bd5867e10676b5fe0a172ab8b2 Mon Sep 17 00:00:00 2001 From: Tarek Abdunabi Date: Mon, 24 Jan 2022 11:16:10 -0500 Subject: [PATCH] Update to version v1.5.0 --- CHANGELOG.md | 15 +- CONTRIBUTING.md | 24 +- NOTICE.txt | 4 +- README.md | 20 +- deployment/build-s3-dist.sh | 40 +- source/app.py | 37 +- .../pipeline_orchestration/lambda_helpers.py | 311 ++++++++----- .../tests/fixtures/orchestrator_fixtures.py | 131 +++++- .../tests/test_pipeline_orchestration.py | 72 ++- .../create_baseline_job/baselines_helper.py | 243 +++++++++- .../byom/lambdas/create_baseline_job/main.py | 82 +++- .../tests/fixtures/baseline_fixtures.py | 158 ++++++- .../tests/test_create_data_baseline.py | 114 ++++- source/lib/blueprints/byom/model_monitor.py | 422 +++++++++++------- .../pipeline_definitions/deploy_actions.py | 30 +- .../byom/pipeline_definitions/iam_policies.py | 142 ++++-- .../sagemaker_model_monitor_construct.py | 189 +++++++- .../sagemaker_monitor_role.py | 10 +- .../templates_parameters.py | 95 +++- ...s_stack.py => mlops_orchestrator_stack.py} | 0 20 files changed, 1697 insertions(+), 442 deletions(-) rename source/lib/{aws_mlops_stack.py => mlops_orchestrator_stack.py} (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index eddc302..a65074a 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.5.0] - 2022-01-24 + +### Added + +- A new pipeline to deploy [Amazon SageMaker Clarify Model Bias Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-model-monitor-bias-drift.html). The new pipeline monitors predictions for bias on a regular basis, and generates + alerts if bias beyond a certain threshold is detected. +- A new pipeline to deploy [Amazon SageMaker Clarify Explainability (Feature Attribution Drift) Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-model-monitor-feature-attribution-drift.html). The new pipeline helps data scientists and ML engineers + monitor predictions for feature attribution drift on a regular basis. + +### Updated + +- The solution's name was changed from "AWS MLOps Framework" to "MLOps Workload Orchestrator". + ## [1.4.1] - 2021-12-20 ### Added @@ -23,7 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- A new pipeline to deploy [AWS SageMaker Model Quality Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality.html). The new pipeline monitors the performance of a deployed model by comparing the +- A new pipeline to deploy [Amazon SageMaker Model Quality Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality.html). The new pipeline monitors the performance of a deployed model by comparing the predictions that the model makes with the actual ground truth labels that the model attempts to predict. ### Updated diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d966d17..c86e28b 100755 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -6,24 +6,23 @@ documentation, we greatly value feedback and contributions from our community. Please read through this document before submitting any issues or pull requests to ensure we have all the necessary information to effectively respond to your bug report or contribution. - ## Reporting Bugs/Feature Requests We welcome you to use the GitHub issue tracker to report bugs or suggest features. -When filing an issue, please check [existing open](https://github.com/awslabs/aws-mlops-solution/issues), or [recently closed](https://github.com/awslabs/aws-mlops-solution/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already +When filing an issue, please check [existing open](https://github.com/aws-solutions/mlops-workload-orchestrator/issues), or [recently closed](https://github.com/aws-solutions/mlops-workload-orchestrator/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already reported the issue. Please try to include as much information as you can. Details like these are incredibly useful: -* A reproducible test case or series of steps -* The version of our code being used -* Any modifications you've made relevant to the bug -* Anything unusual about your environment or deployment - +- A reproducible test case or series of steps +- The version of our code being used +- Any modifications you've made relevant to the bug +- Anything unusual about your environment or deployment ## Contributing via Pull Requests + Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: -1. You are working against the latest source on the *master* branch. +1. You are working against the latest source on the _main_ branch. 2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already. 3. You open an issue to discuss any significant work - we would hate for your time to be wasted. @@ -40,23 +39,22 @@ To send us a pull request, please: GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and [creating a pull request](https://help.github.com/articles/creating-a-pull-request/). - ## Finding contributions to work on -Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels ((enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/awslabs/aws-mlops-solution/labels/help%20wanted) issues is a great place to start. +Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels ((enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/aws-solutions/mlops-workload-orchestrator/labels/help%20wanted) issues is a great place to start. ## Code of Conduct + This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact opensource-codeofconduct@amazon.com with any additional questions or comments. - ## Security issue notifications -If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. +If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. ## Licensing -See the [LICENSE](https://github.com/awslabs/aws-mlops-solution/blob/master/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. +See the [LICENSE](https://github.com/aws-solutions/mlops-workload-orchestrator/blob/main/LICENSE) file for our project's licensing. We will ask you to confirm the licensing of your contribution. We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikipedia.org/wiki/Contributor_License_Agreement) for larger changes. diff --git a/NOTICE.txt b/NOTICE.txt index dd6ef5e..ef91f44 100755 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,5 +1,5 @@ -aws-mlops-framework -Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +mlops-workload-orchestrator +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at http://www.apache.org/licenses/ or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, diff --git a/README.md b/README.md index c0b006e..a65bedf 100755 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# aws-mlops-framework +# mlops-workload-orchestrator The machine learning (ML) lifecycle is an iterative and repetitive process that involves changing models over time and learning from new data. As ML applications gain popularity, @@ -16,9 +16,9 @@ pipeline for building and registering Docker images for custom algorithms that c deployment on an [Amazon SageMaker](https://aws.amazon.com/sagemaker/) endpoint. You can use batch and real-time data inferences to configure the pipeline for your business context. -You can also provision multiple data quality and model quality Monitor pipelines to periodically monitor the quality of deployed -Amazon SageMaker ML models. This solution increases your team’s agility and efficiency by allowing them -to repeat successful processes at scale. +You can also provision multiple data quality, model quality, model bias, and model explainability Monitor +pipelines to periodically monitor the quality of deployed Amazon SageMaker ML models. This solution +increases your team’s agility and efficiency by allowing them to repeat successful processes at scale. #### Benefits @@ -122,8 +122,8 @@ chmod +x ./build-s3-dist.sh - Upload the distributable assets to your Amazon S3 bucket in your account. Note: ensure that you own the Amazon S3 bucket before uploading the assets. To upload the assets to the S3 bucket, you can use the AWS Console or the AWS CLI as shown below. ``` -aws s3 cp ./global-s3-assets/ s3://my-bucket-name-/aws-mlops-framework// --recursive --acl bucket-owner-full-control --profile aws-cred-profile-name -aws s3 cp ./regional-s3-assets/ s3://my-bucket-name-/aws-mlops-framework// --recursive --acl bucket-owner-full-control --profile aws-cred-profile-name +aws s3 cp ./global-s3-assets/ s3://my-bucket-name-/mlops-workload-orchestrator// --recursive --acl bucket-owner-full-control --profile aws-cred-profile-name +aws s3 cp ./regional-s3-assets/ s3://my-bucket-name-/mlops-workload-orchestrator// --recursive --acl bucket-owner-full-control --profile aws-cred-profile-name ``` --- @@ -132,17 +132,17 @@ aws s3 cp ./regional-s3-assets/ s3://my-bucket-name-/aws-mlops-frame ``` $DIST_OUTPUT_BUCKET - This is the global name of the distribution. For the bucket name, the AWS Region is added to the global name (example: 'my-bucket-name-us-east-1') to create a regional bucket. The lambda artifact should be uploaded to the regional buckets for the CloudFormation template to pick it up for deployment. -$SOLUTION_NAME - The name of This solution (example: aws-mlops-framework) +$SOLUTION_NAME - The name of This solution (example: mlops-workload-orchestrator) $VERSION - The version number of the change ``` ## Uninstall the solution -Please refer to the [Uninstall the solution section](https://docs.aws.amazon.com/solutions/latest/aws-mlops-framework/uninstall-the-solution.html) in the [solution's implementation guide](https://docs.aws.amazon.com/solutions/latest/aws-mlops-framework/welcome.html). +Please refer to the [Uninstall the solution section](https://docs.aws.amazon.com/solutions/latest/mlops-workload-orchestrator/uninstall-the-solution.html) in the [solution's implementation guide](https://docs.aws.amazon.com/solutions/latest/mlops-workload-orchestrator/welcome.html). ## Collection of operational metrics -This solution collects anonymous operational metrics to help AWS improve the quality and features of the solution. For more information, including how to disable this capability, please see the [implementation guide](https://docs.aws.amazon.com/solutions/latest/aws-mlops-framework/operational-metrics.html). +This solution collects anonymous operational metrics to help AWS improve the quality and features of the solution. For more information, including how to disable this capability, please see the [implementation guide](https://docs.aws.amazon.com/solutions/latest/mlops-workload-orchestrator/operational-metrics.html). ## Known Issues @@ -157,7 +157,7 @@ For more information regarding this issue and short-term and long-term fixes, re --- -Copyright 2020-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at diff --git a/deployment/build-s3-dist.sh b/deployment/build-s3-dist.sh index 1396517..64beeb3 100755 --- a/deployment/build-s3-dist.sh +++ b/deployment/build-s3-dist.sh @@ -115,6 +115,10 @@ echo "cdk synth DataQualityModelMonitorStack > lib/blueprints/byom/byom_data_qua cdk synth DataQualityModelMonitorStack > lib/blueprints/byom/byom_data_quality_monitor.yaml --path-metadata false --version-reporting false echo "cdk synth ModelQualityModelMonitorStack > lib/blueprints/byom/byom_model_quality_monitor.yaml --path-metadata false --version-reporting false" cdk synth ModelQualityModelMonitorStack > lib/blueprints/byom/byom_model_quality_monitor.yaml --path-metadata false --version-reporting false +echo "cdk synth ModelBiasModelMonitorStack > lib/blueprints/byom/byom_model_bias_monitor.yaml --path-metadata false --version-reporting false" +cdk synth ModelBiasModelMonitorStack > lib/blueprints/byom/byom_model_bias_monitor.yaml --path-metadata false --version-reporting false +echo "cdk synth ModelExplainabilityModelMonitorStack > lib/blueprints/byom/byom_model_explainability_monitor.yaml --path-metadata false --version-reporting false" +cdk synth ModelExplainabilityModelMonitorStack > lib/blueprints/byom/byom_model_explainability_monitor.yaml --path-metadata false --version-reporting false echo "cdk synth SingleAccountCodePipelineStack > lib/blueprints/byom/single_account_codepipeline.yaml --path-metadata false --version-reporting false" cdk synth SingleAccountCodePipelineStack > lib/blueprints/byom/single_account_codepipeline.yaml --path-metadata false --version-reporting false echo "cdk synth MultiAccountCodePipelineStack > lib/blueprints/byom/multi_account_codepipeline.yaml --path-metadata false --version-reporting false" @@ -132,6 +136,10 @@ echo "sed -i -e $replace lib/blueprints/byom/byom_data_quality_monitor.yaml" sed -i -e $replace lib/blueprints/byom/byom_data_quality_monitor.yaml echo "sed -i -e $replace lib/blueprints/byom/byom_model_quality_monitor.yaml" sed -i -e $replace lib/blueprints/byom/byom_model_quality_monitor.yaml +echo "sed -i -e $replace lib/blueprints/byom/byom_model_bias_monitor.yaml" +sed -i -e $replace lib/blueprints/byom/byom_model_bias_monitor.yaml +echo "sed -i -e $replace lib/blueprints/byom/byom_model_explainability_monitor.yaml" +sed -i -e $replace lib/blueprints/byom/byom_model_explainability_monitor.yaml echo "sed -i -e $replace lib/blueprints/byom/byom_realtime_inference_pipeline.yaml" sed -i -e $replace lib/blueprints/byom/byom_realtime_inference_pipeline.yaml echo "sed -i -e $replace lib/blueprints/byom/single_account_codepipeline.yaml" @@ -144,10 +152,10 @@ echo "sed -i -e $replace lib/blueprints/byom/byom_batch_pipeline.yaml" sed -i -e $replace lib/blueprints/byom/byom_batch_pipeline.yaml # Run 'cdk synth' for main templates to generate raw solution outputs -echo "cdk synth aws-mlops-single-account-framework --path-metadata false --version-reporting false --output=$staging_dist_dir" -cdk synth aws-mlops-single-account-framework --path-metadata false --version-reporting false --output=$staging_dist_dir -echo "cdk synth aws-mlops-multi-account-framework --path-metadata false --version-reporting false --output=$staging_dist_dir" -cdk synth aws-mlops-multi-account-framework --path-metadata false --version-reporting false --output=$staging_dist_dir +echo "cdk synth mlops-workload-orchestrator-single-account --path-metadata false --version-reporting false --output=$staging_dist_dir" +cdk synth mlops-workload-orchestrator-single-account --path-metadata false --version-reporting false --output=$staging_dist_dir +echo "cdk synth mlops-workload-orchestrator-multi-account --path-metadata false --version-reporting false --output=$staging_dist_dir" +cdk synth mlops-workload-orchestrator-multi-account --path-metadata false --version-reporting false --output=$staging_dist_dir # Remove unnecessary output files echo "cd $staging_dist_dir" @@ -187,20 +195,20 @@ cd $template_dist_dir echo "Updating code source bucket in template with $1" replace="s/%%BUCKET_NAME%%/$1/g" -echo "sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework.template" -sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework.template -echo "sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template" -sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account.template" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template replace="s/%%SOLUTION_NAME%%/$2/g" -echo "sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework" -sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework.template -echo "sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template" -sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template replace="s/%%VERSION%%/$3/g" -echo "sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework.template" -sed -i -e $replace $template_dist_dir/aws-mlops-single-account-framework.template -echo "sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template" -sed -i -e $replace $template_dist_dir/aws-mlops-multi-account-framework.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account.template" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-single-account.template +echo "sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template" +sed -i -e $replace $template_dist_dir/mlops-workload-orchestrator-multi-account.template echo "------------------------------------------------------------------------------" diff --git a/source/app.py b/source/app.py index a229fc2..52f8b05 100644 --- a/source/app.py +++ b/source/app.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # ##################################################################################################################### -# Copyright 2020-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. # +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # # # Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance # # with the License. A copy of the License is located at # @@ -12,7 +12,7 @@ # and limitations under the License. # # ##################################################################################################################### from aws_cdk import core -from lib.aws_mlops_stack import MLOpsStack +from lib.mlops_orchestrator_stack import MLOpsStack from lib.blueprints.byom.model_monitor import ModelMonitorStack from lib.blueprints.byom.realtime_inference_pipeline import BYOMRealtimePipelineStack from lib.blueprints.byom.byom_batch_pipeline import BYOMBatchStack @@ -28,8 +28,8 @@ mlops_stack_single = MLOpsStack( app, - "aws-mlops-single-account-framework", - description=f"({solution_id}-sa) - AWS MLOps Framework (Single Account Option). Version {version}", + "mlops-workload-orchestrator-single-account", + description=f"({solution_id}-sa) - MLOps Workload Orchestrator (Single Account Option). Version {version}", ) # add AWS_SDK_USER_AGENT env variable to Lambda functions @@ -37,9 +37,9 @@ mlops_stack_multi = MLOpsStack( app, - "aws-mlops-multi-account-framework", + "mlops-workload-orchestrator-multi-account", multi_account=True, - description=f"({solution_id}-ma) - AWS MLOps Framework (Multi Account Option). Version {version}", + description=f"({solution_id}-ma) - MLOps Workload Orchestrator (Multi Account Option). Version {version}", ) core.Aspects.of(mlops_stack_multi).add(AwsSDKConfigAspect(app, "SDKUserAgentMulti", solution_id, version)) @@ -49,7 +49,7 @@ "BYOMCustomAlgorithmImageBuilderStack", description=( f"({solution_id}byom-caib) - Bring Your Own Model pipeline to build custom algorithm docker images" - f"in AWS MLOps Framework. Version {version}" + f"in MLOps Workload Orchestrator. Version {version}" ), ) @@ -80,7 +80,28 @@ ) core.Aspects.of(model_quality_monitor_stack).add( - AwsSDKConfigAspect(app, "SDKUserAgentModelMonitor", solution_id, version) + AwsSDKConfigAspect(app, "SDKUserAgentModelQuality", solution_id, version) +) + +model_bias_monitor_stack = ModelMonitorStack( + app, + "ModelBiasModelMonitorStack", + monitoring_type="ModelBias", + description=(f"({solution_id}byom-mqmb) - ModelBias Model Monitor pipeline. Version {version}"), +) + +core.Aspects.of(model_bias_monitor_stack).add(AwsSDKConfigAspect(app, "SDKUserAgentModelBias", solution_id, version)) + + +model_explainability_monitor_stack = ModelMonitorStack( + app, + "ModelExplainabilityModelMonitorStack", + monitoring_type="ModelExplainability", + description=(f"({solution_id}byom-mqme) - ModelExplainability Model Monitor pipeline. Version {version}"), +) + +core.Aspects.of(model_explainability_monitor_stack).add( + AwsSDKConfigAspect(app, "SDKUserAgentModelExplainability", solution_id, version) ) diff --git a/source/lambdas/pipeline_orchestration/lambda_helpers.py b/source/lambdas/pipeline_orchestration/lambda_helpers.py index b57c516..430e7ed 100644 --- a/source/lambdas/pipeline_orchestration/lambda_helpers.py +++ b/source/lambdas/pipeline_orchestration/lambda_helpers.py @@ -40,6 +40,8 @@ def template_url(pipeline_type: str) -> str: byom_batch_pipeline.yaml byom_data_quality_monitor.yaml byom_model_quality_monitor.yaml + byom_model_bias_monitor.yaml + byom_model_expainability_monitor.yaml byom_custom_algorithm_image_builder.yaml single_account_codepipeline.yaml multi_account_codepipeline.yaml @@ -55,6 +57,8 @@ def template_url(pipeline_type: str) -> str: "byom_batch_custom": batch_inference_template, "byom_data_quality_monitor": "blueprints/byom/byom_data_quality_monitor.yaml", "byom_model_quality_monitor": "blueprints/byom/byom_model_quality_monitor.yaml", + "byom_model_bias_monitor": "blueprints/byom/byom_model_bias_monitor.yaml", + "byom_model_explainability_monitor": "blueprints/byom/byom_model_explainability_monitor.yaml", "byom_image_builder": f"{url}/byom_custom_algorithm_image_builder.yaml", "single_account_codepipeline": f"{url}/single_account_codepipeline.yaml", "multi_account_codepipeline": f"{url}/multi_account_codepipeline.yaml", @@ -79,31 +83,25 @@ def get_stack_name(event: Dict[str, Any]) -> str: pipeline_type = event.get("pipeline_type") pipeline_stack_name = os.environ["PIPELINE_STACK_NAME"] model_name = event.get("model_name", "").lower().strip() - if pipeline_type in [ - "byom_realtime_builtin", - "byom_realtime_custom", - "byom_batch_builtin", - "byom_batch_custom", - ]: - - postfix = { - "byom_realtime_builtin": "BYOMPipelineRealtimeBuiltIn", - "byom_realtime_custom": "BYOMPipelineRealtimeCustom", - "byom_batch_builtin": "BYOMPipelineBatchBuiltIn", - "byom_batch_custom": "BYOMPipelineBatchCustom", - } - # name of stack - provisioned_pipeline_stack_name = f"{pipeline_stack_name}-{model_name}-{postfix[pipeline_type]}" - elif pipeline_type == "byom_data_quality_monitor": - provisioned_pipeline_stack_name = f"{pipeline_stack_name}-{model_name}-BYOMDataQualityMonitor" + # create pipeline_type -> postfix map + postfix = { + "byom_realtime_builtin": "BYOMPipelineRealtimeBuiltIn", + "byom_realtime_custom": "BYOMPipelineRealtimeCustom", + "byom_batch_builtin": "BYOMPipelineBatchBuiltIn", + "byom_batch_custom": "BYOMPipelineBatchCustom", + "byom_data_quality_monitor": "BYOMDataQualityMonitor", + "byom_model_quality_monitor": "BYOMModelQualityMonitor", + "byom_model_bias_monitor": "BYOMModelBiasMonitor", + "byom_model_explainability_monitor": "BYOMModelExplainabilityMonitor", + "byom_image_builder": "BYOMPipelineImageBuilder", + } - elif pipeline_type == "byom_model_quality_monitor": - provisioned_pipeline_stack_name = f"{pipeline_stack_name}-{model_name}-BYOMModelQualityMonitor" + # stack name's infix + infix = event.get("image_tag") if pipeline_type == "byom_image_builder" else model_name - elif pipeline_type == "byom_image_builder": - image_tag = event.get("image_tag") - provisioned_pipeline_stack_name = f"{pipeline_stack_name}-{image_tag}-BYOMPipelineImageBuilder" + # name of stack + provisioned_pipeline_stack_name = f"{pipeline_stack_name}-{infix}-{postfix[pipeline_type]}" return provisioned_pipeline_stack_name.lower() @@ -118,33 +116,64 @@ def get_template_parameters(event: Dict[str, Any], is_multi_account: bool, stage ("KmsKeyArn", kms_key_arn), ("BlueprintBucket", os.environ["BLUEPRINT_BUCKET"]), ] - if pipeline_type in [ - "byom_realtime_builtin", - "byom_realtime_custom", - "byom_batch_builtin", - "byom_batch_custom", - ]: - - common_params.extend(get_common_realtime_batch_params(event, region, stage)) - - # add realtime specific parameters - if pipeline_type in ["byom_realtime_builtin", "byom_realtime_custom"]: - common_params.extend(get_realtime_specific_params(event, stage)) - # else add batch params - else: - common_params.extend(get_batch_specific_params(event, stage)) - return common_params - - elif pipeline_type == "byom_data_quality_monitor": - return [*common_params, *get_model_monitor_params(event, region, stage)] + # realtime params + realtime_params = ( + [ + *common_params, + *get_common_realtime_batch_params(event, region, stage), + *get_realtime_specific_params(event, stage), + ] + if pipeline_type in ["byom_realtime_builtin", "byom_realtime_custom"] + else None + ) + # batch params + batch_params = ( + [ + *common_params, + *get_common_realtime_batch_params(event, region, stage), + *get_batch_specific_params(event, stage), + ] + if pipeline_type in ["byom_batch_builtin", "byom_batch_custom"] + else None + ) - elif pipeline_type == "byom_model_quality_monitor": - return [*common_params, *get_model_monitor_params(event, region, stage, monitoring_type="ModelQuality")] + # create pipeline_type -> parameters map + pipelines_params = { + "byom_realtime_builtin": realtime_params, + "byom_realtime_custom": realtime_params, + "byom_batch_builtin": batch_params, + "byom_batch_custom": batch_params, + "byom_data_quality_monitor": [*common_params, *get_model_monitor_params(event, region, stage)] + if pipeline_type == "byom_data_quality_monitor" + else None, + "byom_model_quality_monitor": [ + *common_params, + *get_model_monitor_params(event, region, stage, monitoring_type="ModelQuality"), + ] + if pipeline_type == "byom_model_quality_monitor" + else None, + "byom_model_bias_monitor": [ + *common_params, + *get_model_monitor_params(event, region, stage, monitoring_type="ModelBias"), + ] + if pipeline_type == "byom_model_bias_monitor" + else None, + "byom_model_explainability_monitor": [ + *common_params, + *get_model_monitor_params(event, region, stage, monitoring_type="ModelExplainability"), + ] + if pipeline_type == "byom_model_explainability_monitor" + else None, + "byom_image_builder": [*get_image_builder_params(event)] if pipeline_type == "byom_image_builder" else None, + } - elif pipeline_type == "byom_image_builder": - return get_image_builder_params(event) + # get the pipeline's paramaters + pipeline_params = pipelines_params.get(pipeline_type) + # return the params if not NOne, otherwise throw a BadRequest exception + if pipeline_params: + return pipeline_params else: raise BadRequest("Bad request format. Please provide a supported pipeline") @@ -231,9 +260,9 @@ def get_batch_specific_params(event: Dict[str, Any], stage: str) -> List[Tuple[s ] -def get_built_in_model_monitor_image_uri(region, image_name="model-monitor"): +def get_built_in_model_monitor_image_uri(region, framework): model_monitor_image_uri = sagemaker.image_uris.retrieve( - framework=image_name, + framework=framework, region=region, ) @@ -248,8 +277,8 @@ def get_model_monitor_params( # generate jobs names # make sure baseline_job_name and monitoring_schedule_name are <= 63 characters long, especially # if endpoint_name was dynamically generated by AWS CDK. - baseline_job_name = f"{endpoint_name}-{monitoring_type.lower()}-baseline-{str(uuid.uuid4())[:4]}" - monitoring_schedule_name = f"{endpoint_name}-{monitoring_type.lower()}-monitor-{str(uuid.uuid4())[:4]}" + baseline_job_name = f"{endpoint_name}-{monitoring_type.lower()}-{str(uuid.uuid4())[:4]}" + monitoring_schedule_name = f"{endpoint_name}-{monitoring_type.lower()}-{str(uuid.uuid4())[:4]}" baseline_job_output_location = clean_param(get_stage_param(event, "baseline_job_output_location", stage)) data_capture_location = clean_param(get_stage_param(event, "data_capture_location", stage)) @@ -261,6 +290,10 @@ def get_model_monitor_params( schedule_expression = get_stage_param(event, "schedule_expression", stage) monitor_ground_truth_input = get_stage_param(event, "monitor_ground_truth_input", stage) + # set the framework based on the monitoring type + # DataQuality/ModelQuality -> framework="model-monitor" + # ModelBias/ModelExplanability -> framework="clarify" + monitor_framework = "model-monitor" if monitoring_type in ["DataQuality", "ModelQuality"] else "clarify" monitor_params = [ ("BaselineJobName", baseline_job_name), ("BaselineOutputBucket", baseline_job_output_location.split("/")[0]), @@ -268,7 +301,7 @@ def get_model_monitor_params( ("DataCaptureBucket", data_capture_location.split("/")[0]), ("DataCaptureLocation", data_capture_location), ("EndpointName", endpoint_name), - ("ImageUri", get_built_in_model_monitor_image_uri(region)), + ("ImageUri", get_built_in_model_monitor_image_uri(region, framework=monitor_framework)), ("InstanceType", instance_type), ("InstanceVolumeSize", instance_volume_size), ("BaselineMaxRuntimeSeconds", baseline_max_runtime_seconds), @@ -279,18 +312,61 @@ def get_model_monitor_params( ("BaselineData", event.get("baseline_data")), ] - # add ModelQuality parameters + # add ModelQuality specific params if monitoring_type == "ModelQuality": monitor_params.extend( [ ("BaselineInferenceAttribute", event.get("baseline_inference_attribute", "").strip()), ("BaselineProbabilityAttribute", event.get("baseline_probability_attribute", "").strip()), ("BaselineGroundTruthAttribute", event.get("baseline_ground_truth_attribute", "").strip()), + ] + ) + # add ModelQuality parameters, also used by ModelBias/Model + if monitoring_type in ["ModelQuality", "ModelBias", "ModelExplainability"]: + monitor_params.extend( + [ ("ProblemType", event.get("problem_type", "").strip()), ("MonitorInferenceAttribute", event.get("monitor_inference_attribute", "").strip()), ("MonitorProbabilityAttribute", event.get("monitor_probability_attribute", "").strip()), ("ProbabilityThresholdAttribute", event.get("probability_threshold_attribute", "").strip()), - ("MonitorGroundTruthInput", monitor_ground_truth_input), + ] + ) + + # only add MonitorGroundTruthInput if ModelQuality|ModelBias + if monitoring_type in ["ModelQuality", "ModelBias"]: + monitor_params.append(("GroundTruthBucket", monitor_ground_truth_input.split("/")[0])) + monitor_params.append(("MonitorGroundTruthInput", monitor_ground_truth_input)) + + # add ModelBias specific params + if monitoring_type == "ModelBias": + model_predicted_label_config = event.get("model_predicted_label_config") + monitor_params.extend( + [ + ( + "ModelPredictedLabelConfig", + json.dumps(model_predicted_label_config) if model_predicted_label_config else "", + ), + ("BiasConfig", json.dumps(event.get("bias_config"))), + ] + ) + + # add ModelExplainability specific params + if monitoring_type == "ModelExplainability": + shap_config = event.get("shap_config") + model_scores = event.get("model_scores") + monitor_params.extend( + [ + ("SHAPConfig", json.dumps(shap_config) if shap_config else ""), + ("ExplainabilityModelScores", json.dumps(model_scores) if model_scores else ""), + ] + ) + + # add common params for ModelBias/ModelExplainability + if monitoring_type in ["ModelBias", "ModelExplainability"]: + + monitor_params.extend( + [ + ("FeaturesAttribute", event.get("features_attribute", "").strip()), ] ) @@ -398,6 +474,27 @@ def get_image_uri(pipeline_type: str, event: Dict[str, Any], region: str) -> str def get_required_keys(pipeline_type: str, use_model_registry: str, problem_type: str = None) -> List[str]: + + common_keys = ["pipeline_type", "model_name", "inference_instance"] + model_location = ["model_artifact_location"] + builtin_model_keys = ["model_framework", "model_framework_version"] + model_location + custom_model_keys = ["custom_image_uri"] + model_location + # if model registry is used + if use_model_registry == "Yes": + builtin_model_keys = custom_model_keys = ["model_package_name"] + + realtime_specific_keys = ["data_capture_location"] + batch_specific_keys = ["batch_inference_data", "batch_job_output_location"] + + # model monitor keys + monitors = ["byom_model_quality_monitor", "byom_model_bias_monitor", "byom_model_explainability_monitor"] + if pipeline_type in monitors and problem_type not in [ + "Regression", + "MulticlassClassification", + "BinaryClassification", + ]: + raise BadRequest("Bad request format. Unsupported problem_type in byom_model_quality_monitor pipeline") + # common required keys between model monitor types common_monitor_keys = [ "pipeline_type", @@ -412,68 +509,66 @@ def get_required_keys(pipeline_type: str, use_model_registry: str, problem_type: "instance_type", "instance_volume_size", ] - # Realtime/batch pipelines - if pipeline_type in [ - "byom_realtime_builtin", - "byom_realtime_custom", - "byom_batch_builtin", - "byom_batch_custom", - ]: - common_keys = ["pipeline_type", "model_name", "inference_instance"] - model_location = ["model_artifact_location"] - builtin_model_keys = ["model_framework", "model_framework_version"] + model_location - custom_model_keys = ["custom_image_uri"] + model_location - # if model registry is used - if use_model_registry == "Yes": - builtin_model_keys = custom_model_keys = ["model_package_name"] - - realtime_specific_keys = ["data_capture_location"] - batch_specific_keys = ["batch_inference_data", "batch_job_output_location"] - - keys_map = { - "byom_realtime_builtin": common_keys + builtin_model_keys + realtime_specific_keys, - "byom_realtime_custom": common_keys + custom_model_keys + realtime_specific_keys, - "byom_batch_builtin": common_keys + builtin_model_keys + batch_specific_keys, - "byom_batch_custom": common_keys + custom_model_keys + batch_specific_keys, - } - - return keys_map[pipeline_type] - - # Data Quality Monitor pipeline - elif pipeline_type == "byom_data_quality_monitor": - return common_monitor_keys - # Model Quality Monitor pipeline - elif pipeline_type == "byom_model_quality_monitor": - common_model_keys = [ - "baseline_inference_attribute", - "baseline_ground_truth_attribute", - "problem_type", - "monitor_ground_truth_input", - ] - if problem_type in ["Regression", "MulticlassClassification"]: - common_model_keys.append("monitor_inference_attribute") - elif problem_type == "BinaryClassification": - common_model_keys.extend( - ["monitor_probability_attribute", "probability_threshold_attribute", "baseline_probability_attribute"] - ) + # ModelQuality specific keys + model_quality_keys = ["baseline_inference_attribute", "baseline_ground_truth_attribute"] + # common model related monitors + common_model_keys = ["problem_type"] + # add required keys based on problem type + if problem_type in ["Regression", "MulticlassClassification"]: + common_model_keys.append("monitor_inference_attribute") + # problem_type == "BinaryClassification". Note: depending on the model output, + # monitor_inference_attribute, monitor_probability_attribute, and probability_threshold_attribute + # can be passed all together, or in pairs + elif pipeline_type == "byom_model_quality_monitor": + model_quality_keys.append("baseline_probability_attribute") - else: - raise BadRequest("Bad request format. Unsupported problem_type in byom_model_quality_monitor pipeline") + # shared_model_quality_bias keys + shared_model_quality_bias_keys = ["monitor_ground_truth_input"] - return [ + # add model_predicted_label_config if "byom_model_bias_monitor" and + # the problem is "BinaryClassification" or "MulticlassClassification" + extra_bias_keys = [] + if pipeline_type == "byom_model_bias_monitor" and problem_type in [ + "BinaryClassification", + "MulticlassClassification", + ]: + extra_bias_keys.append("model_predicted_label_config") + + # create pipeline_type -> required_keys map + pipeline_keys_map = { + "byom_realtime_builtin": [*common_keys, *builtin_model_keys, *realtime_specific_keys], + "byom_realtime_custom": [*common_keys, *custom_model_keys, *realtime_specific_keys], + "byom_batch_builtin": [*common_keys, *builtin_model_keys, *batch_specific_keys], + "byom_batch_custom": [*common_keys, *custom_model_keys, *batch_specific_keys], + "byom_data_quality_monitor": common_monitor_keys, + "byom_model_quality_monitor": [ *common_monitor_keys, + *model_quality_keys, *common_model_keys, - ] - # Image Builder pipeline - elif pipeline_type == "byom_image_builder": - return [ - "pipeline_type", - "custom_algorithm_docker", - "ecr_repo_name", - "image_tag", - ] + *shared_model_quality_bias_keys, + ], + "byom_model_bias_monitor": [ + *common_monitor_keys, + *common_model_keys, + *shared_model_quality_bias_keys, + *extra_bias_keys, + "bias_config", + ], + "byom_model_explainability_monitor": [ + *common_monitor_keys, + *common_model_keys, + "shap_config", + ], + "byom_image_builder": ["pipeline_type", "custom_algorithm_docker", "ecr_repo_name", "image_tag"], + } + + # get the required keys based on the pipeline_type + required_keys = pipeline_keys_map.get(pipeline_type) + # return required_keys if not None. Otherwise, raise BadRequest exception + if required_keys: + return required_keys else: raise BadRequest( "Bad request format. Pipeline type not supported. Check documentation for API & config formats" diff --git a/source/lambdas/pipeline_orchestration/tests/fixtures/orchestrator_fixtures.py b/source/lambdas/pipeline_orchestration/tests/fixtures/orchestrator_fixtures.py index 87689c0..75bc96d 100644 --- a/source/lambdas/pipeline_orchestration/tests/fixtures/orchestrator_fixtures.py +++ b/source/lambdas/pipeline_orchestration/tests/fixtures/orchestrator_fixtures.py @@ -47,6 +47,15 @@ def mock_env_variables(): os.environ["IS_DELEGATED_ADMIN"] = "No" os.environ["MODEL_PACKAGE_GROUP_NAME"] = "xgboost" os.environ["MODEL_PACKAGE_NAME"] = "arn:aws:sagemaker:*:*:model-package/xgboost/1" + os.environ["MODEL_PREDICTED_LABEL_CONFIG"] = json.dumps(dict(probability=0)) + os.environ["BIAS_CONFIG"] = json.dumps( + dict( + label_values_or_threshold=[1], + facet_name="age", + facet_values_or_threshold=[40], + group_name="personal_status_sex", + ) + ) @pytest.fixture @@ -146,6 +155,33 @@ def api_model_quality_event(api_data_quality_event): return model_quality_event +@pytest.fixture +def api_model_bias_event(api_model_quality_event): + model_bias_event = api_model_quality_event.copy() + model_bias_event.update( + { + "pipeline_type": "byom_model_bias_monitor", + "model_predicted_label_config": os.environ["MODEL_PREDICTED_LABEL_CONFIG"], + "bias_config": os.environ["BIAS_CONFIG"], + } + ) + return model_bias_event + + +@pytest.fixture +def api_model_explainability_event(api_model_bias_event): + model_explainability_event = api_model_bias_event.copy() + model_explainability_event.update( + { + "pipeline_type": "byom_model_explainability_monitor", + } + ) + # remove monitor_ground_truth_input + del model_explainability_event["monitor_ground_truth_input"] + + return model_explainability_event + + @pytest.fixture def api_image_builder_event(): return { @@ -214,6 +250,7 @@ def expected_model_quality_monitor_params(expected_data_quality_monitor_params): ("MonitorInferenceAttribute", "0"), ("MonitorProbabilityAttribute", "0"), ("ProbabilityThresholdAttribute", "0.5"), + ("GroundTruthBucket", "test-bucket"), ("MonitorGroundTruthInput", "s3://test-bucket/groundtruth"), ] ) @@ -221,6 +258,36 @@ def expected_model_quality_monitor_params(expected_data_quality_monitor_params): return expected_model_quality +@pytest.fixture +def expected_model_explainability_monitor_params(expected_model_quality_monitor_params): + expected_model_expainability = expected_model_quality_monitor_params[3:-2].copy() + # add ModelExpainability params + expected_model_expainability.extend( + [ + ("SHAPConfig", json.dumps(os.getenv("SHAP_CONFIG", {}))), + ("ExplainabilityModelScores", ""), + ("FeaturesAttribute", ""), + ] + ) + + return expected_model_expainability + + +@pytest.fixture +def expected_model_bias_monitor_params(expected_model_quality_monitor_params): + expected_model_bias = expected_model_quality_monitor_params[3:].copy() + + expected_model_bias.extend( + [ + ("BiasConfig", os.environ["BIAS_CONFIG"]), + ("ModelPredictedLabelConfig", os.environ["MODEL_PREDICTED_LABEL_CONFIG"]), + ("FeaturesAttribute", ""), + ] + ) + + return expected_model_bias + + @pytest.fixture def expected_common_realtime_batch_params(): return [ @@ -431,28 +498,50 @@ def _required_api_keys_model_monitor(monitoring_type, problem_type=None): "instance_type", "instance_volume_size", ] - if monitoring_type != "ModelQuality": + if monitoring_type == "DataQuality": return common_keys - else: - common_model_keys = [ - "baseline_inference_attribute", - "baseline_ground_truth_attribute", - "problem_type", - "monitor_ground_truth_input", - ] - if problem_type in ["Regression", "MulticlassClassification"]: - common_model_keys.append("monitor_inference_attribute") - - # BinaryClassification problem - else: - common_model_keys.extend( - [ - "monitor_probability_attribute", - "probability_threshold_attribute", - "baseline_probability_attribute", - ] - ) - return [*common_keys, *common_model_keys] + + # ModelQuality specific keys + model_quality_keys = ["baseline_inference_attribute", "baseline_ground_truth_attribute"] + # common model related monitors + common_model_keys = ["problem_type"] + # add required keys based on problem type + if problem_type in ["Regression", "MulticlassClassification"]: + common_model_keys.append("monitor_inference_attribute") + elif monitoring_type == "ModelQuality": + model_quality_keys.append("baseline_probability_attribute") + + # shared_model_quality_bias keys + shared_model_quality_bias_keys = ["monitor_ground_truth_input"] + + # add model_predicted_label_config if "byom_model_bias_monitor" and + # the problem is "BinaryClassification" or "MulticlassClassification" + extra_bias_keys = [] + if monitoring_type == "ModelBias" and problem_type in [ + "BinaryClassification", + "MulticlassClassification", + ]: + extra_bias_keys.append("model_predicted_label_config") + + # create MonitoringType -> RequiredParamaters map + type_keys = { + "ModelQuality": [ + *common_keys, + *model_quality_keys, + *common_model_keys, + *shared_model_quality_bias_keys, + ], + "ModelBias": [ + *common_keys, + *common_model_keys, + *shared_model_quality_bias_keys, + *extra_bias_keys, + "bias_config", + ], + "ModelExplainability": [*common_keys, *common_model_keys, "shap_config"], + } + + return type_keys.get(monitoring_type) return _required_api_keys_model_monitor diff --git a/source/lambdas/pipeline_orchestration/tests/test_pipeline_orchestration.py b/source/lambdas/pipeline_orchestration/tests/test_pipeline_orchestration.py index edc074d..2147a99 100644 --- a/source/lambdas/pipeline_orchestration/tests/test_pipeline_orchestration.py +++ b/source/lambdas/pipeline_orchestration/tests/test_pipeline_orchestration.py @@ -77,6 +77,10 @@ required_api_byom_realtime_custom, required_api_byom_batch_custom, api_model_monitor_event, + api_model_bias_event, + api_model_explainability_event, + expected_model_bias_monitor_params, + expected_model_explainability_monitor_params, required_api_keys_model_monitor, template_parameters_common, template_parameters_realtime_builtin, @@ -465,7 +469,14 @@ def test_pipeline_status(): assert response == expected_response_no_cp -def test_get_stack_name(api_byom_event, api_data_quality_event, api_model_quality_event, api_image_builder_event): +def test_get_stack_name( + api_byom_event, + api_data_quality_event, + api_model_quality_event, + api_model_bias_event, + api_model_explainability_event, + api_image_builder_event, +): # realtime builtin pipeline realtime_builtin = api_byom_event("byom_realtime_builtin") assert ( @@ -488,6 +499,18 @@ def test_get_stack_name(api_byom_event, api_data_quality_event, api_model_qualit == f"mlops-pipeline-{api_model_quality_event['model_name']}-byommodelqualitymonitor" ) + # model bias monitor pipeline + assert ( + get_stack_name(api_model_bias_event) + == f"mlops-pipeline-{api_model_bias_event['model_name']}-byommodelbiasmonitor" + ) + + # model explainability monitor pipeline + assert ( + get_stack_name(api_model_explainability_event) + == f"mlops-pipeline-{api_model_explainability_event['model_name']}-byommodelexplainabilitymonitor" + ) + # image builder pipeline assert ( get_stack_name(api_image_builder_event) @@ -534,6 +557,14 @@ def test_get_required_keys( returned_keys = get_required_keys("byom_model_quality_monitor", "No", "BinaryClassification") expected_keys = required_api_keys_model_monitor("ModelQuality", "BinaryClassification") TestCase().assertCountEqual(expected_keys, returned_keys) + # Required keys in model bias monitor, problem type BinaryClassification + returned_keys = get_required_keys("byom_model_bias_monitor", "No", "BinaryClassification") + expected_keys = required_api_keys_model_monitor("ModelBias", "BinaryClassification") + TestCase().assertCountEqual(expected_keys, returned_keys) + # Required keys in model expainability monitor, problem type Regression + returned_keys = get_required_keys("byom_model_explainability_monitor", "No", "Regression") + expected_keys = required_api_keys_model_monitor("ModelExplainability", "Regression") + TestCase().assertCountEqual(expected_keys, returned_keys) # test exception for unsupported problem type with pytest.raises(BadRequest) as error: get_required_keys("byom_model_quality_monitor", "No", "UnsupportedProblemType") @@ -567,11 +598,15 @@ def test_get_template_parameters( api_image_builder_event, api_data_quality_event, api_model_quality_event, + api_model_bias_event, + api_model_explainability_event, expected_params_realtime_custom, expected_image_builder_params, expected_batch_params, expected_data_quality_monitor_params, expected_model_quality_monitor_params, + expected_model_bias_monitor_params, + expected_model_explainability_monitor_params, ): single_event = api_byom_event("byom_realtime_custom", False) # realtime pipeline @@ -584,11 +619,13 @@ def test_get_template_parameters( expected_batch_params, ) + # additional params used by Model Monitor asserts + common_params = [("AssetsBucket", "testassetsbucket"), ("KmsKeyArn", ""), ("BlueprintBucket", "testbucket")] # data quality pipeline assert len(get_template_parameters(api_data_quality_event, False)) == len( [ *expected_data_quality_monitor_params, - *[("AssetsBucket", "testassetsbucket"), ("KmsKeyArn", ""), ("BlueprintBucket", "testbucket")], + *common_params, ] ) @@ -596,9 +633,26 @@ def test_get_template_parameters( assert len(get_template_parameters(api_model_quality_event, False)) == len( [ *expected_model_quality_monitor_params, - *[("AssetsBucket", "testassetsbucket"), ("KmsKeyArn", ""), ("BlueprintBucket", "testbucket")], + *common_params, ] ) + + # model bias pipeline + assert len(get_template_parameters(api_model_bias_event, False)) == len( + [ + *expected_model_bias_monitor_params, + *common_params, + ] + ) + + # model explainability pipeline + assert len(get_template_parameters(api_model_explainability_event, False)) == len( + [ + *expected_model_explainability_monitor_params, + *common_params, + ] + ) + # test for exception with pytest.raises(BadRequest): get_template_parameters({"pipeline_type": "unsupported"}, False) @@ -633,11 +687,21 @@ def test_get_batch_specific_params(api_byom_event, expected_batch_specific_param def test_get_built_in_model_monitor_container_uri(): + # The 156813124566 is one of the actual account ids for a public Model Monitor Image provided + # by the SageMaker service. The reason is I need to provide a valid image URI because the SDK + # has validation for the inputs # assert the returned value by an actual Model Monitor Image URI for the region. assert ( - get_built_in_model_monitor_image_uri("us-east-1") + get_built_in_model_monitor_image_uri("us-east-1", "model-monitor") == "156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer" ) + # The 205585389593 is one of the actual account ids for a public Clarify image provided + # by the SageMaker service. + # assert the returned value by an actual clarify Image URI for the region. + assert ( + get_built_in_model_monitor_image_uri("us-east-1", "clarify") + == "205585389593.dkr.ecr.us-east-1.amazonaws.com/sagemaker-clarify-processing:1.0" + ) @patch("lambda_helpers.sagemaker.image_uris.retrieve") diff --git a/source/lib/blueprints/byom/lambdas/create_baseline_job/baselines_helper.py b/source/lib/blueprints/byom/lambdas/create_baseline_job/baselines_helper.py index 779c1aa..6bedcc2 100644 --- a/source/lib/blueprints/byom/lambdas/create_baseline_job/baselines_helper.py +++ b/source/lib/blueprints/byom/lambdas/create_baseline_job/baselines_helper.py @@ -10,12 +10,23 @@ # OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions # # and limitations under the License. # # ##################################################################################################################### -from typing import Callable, Any, Dict, List, Optional +from typing import Callable, Any, Dict, List, Optional, Union import logging +import json import sagemaker +from botocore.client import BaseClient from sagemaker.model_monitor import DefaultModelMonitor from sagemaker.model_monitor import ModelQualityMonitor +from sagemaker.model_monitor import ModelBiasMonitor +from sagemaker.model_monitor import ModelExplainabilityMonitor from sagemaker.model_monitor.dataset_format import DatasetFormat +from sagemaker.clarify import ( + DataConfig, + BiasConfig, + ModelConfig, + ModelPredictedLabelConfig, + SHAPConfig, +) logger = logging.getLogger(__name__) @@ -73,9 +84,24 @@ class SolutionSageMakerBaselines: Used only with 'BinaryClassification' problem if 'inference_attribute' is not provided (default: None). probability_threshold_attribute (float): threshold to convert probabilities to binaries (used with ModelQuality baseline). Used only with 'BinaryClassification' problem if 'inference_attribute' is not provided (default: None). - sagemaker_session: (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, one is created using the default AWS configuration chain (default: None). + data_config (sagemaker.clarify.DataConfig): Config of the input/output data used by ModelBias/ModelExplainability baselines + refer to https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.DataConfig + bias_config (sagemaker.clarify.BiasConfig): Config of sensitive groups + refer to https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.BiasConfig + model_config (sagemaker.clarify.ModelConfig): Config of the model and its endpoint to be created. + Used by ModelBias/ModelExplainability baselines. + refer to https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.ModelConfig + model_predicted_label_config (sagemaker.clarify.ModelPredictedLabelConfig): Config of how to extract the predicted label + from the model output.Used by ModelBias baseline. + refer to https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.ModelPredictedLabelConfig + explainability_config (sagemaker.clarify.SHAPConfig): Config of the specific explainability method. Currently, only SHAP is supported. + Used by ModelExplainability baseline. + refer to https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.ExplainabilityConfig + model_scores: (str or int): Index or JSONPath location in the model output for the predicted scores to be explained. + This is not required if the model output is a single score. Used by ModelExplainability baseline. tags (list[dict[str, str]]): resource tags (default: None). """ @@ -98,14 +124,20 @@ def __init__( probability_attribute: Optional[str] = None, probability_threshold_attribute: Optional[float] = None, sagemaker_session: Optional[sagemaker.session.Session] = None, + data_config: Optional[DataConfig] = None, + bias_config: Optional[BiasConfig] = None, + model_config: Optional[ModelConfig] = None, + model_predicted_label_config: Optional[ModelPredictedLabelConfig] = None, + explainability_config: Optional[SHAPConfig] = None, + model_scores: Optional[Union[str, int]] = None, tags: Optional[List[Dict[str, str]]] = None, ) -> None: # validate the provided monitoring_type - if monitoring_type not in ["DataQuality", "ModelQuality"]: + if monitoring_type not in ["DataQuality", "ModelQuality", "ModelBias", "ModelExplainability"]: raise ValueError( ( f"The provided monitoring type: {monitoring_type} is not valid. " - + "It must be 'DataQuality'|'ModelQuality'" + + "It must be 'DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability'" ) ) self.monitoring_type = monitoring_type @@ -124,6 +156,12 @@ def __init__( self.probability_attribute = probability_attribute self.probability_threshold_attribute = probability_threshold_attribute self.sagemaker_session = sagemaker_session + self.data_config = data_config + self.bias_config = bias_config + self.model_config = model_config + self.model_predicted_label_config = model_predicted_label_config + self.explainability_config = explainability_config + self.model_scores = model_scores self.tags = tags @exception_handler @@ -136,7 +174,10 @@ def create_baseline_job(self) -> sagemaker.processing.ProcessingJob: """ # create *Baseline Job MonitoringType->function_name map type_function_map = dict( - DataQuality="_create_data_quality_baseline", ModelQuality="_create_model_quality_baseline" + DataQuality="_create_data_quality_baseline", + ModelQuality="_create_model_quality_baseline", + ModelBias="_create_model_bias_baseline", + ModelExplainability="_create_model_explainability_baseline", ) # get the formated baseline job arguments @@ -150,12 +191,12 @@ def create_baseline_job(self) -> sagemaker.processing.ProcessingJob: @exception_handler def _get_baseline_job_args( self, - ) -> Dict[str, Dict[str, str]]: + ) -> Dict[str, Dict[str, Any]]: """ Gets the baseline job arguments to create the *baseline job Returns: - dict[str, dict[str, str]]: the arguments to create the *baseline job + dict[str, dict[str, Any]]: the arguments to create the *baseline job """ # validate baseline_dataset if not self._is_valid_argument_value(self.baseline_dataset): @@ -174,12 +215,19 @@ def _get_baseline_job_args( # args passed to the Monitor class's suggest_baseline function suggest_args=dict( job_name=self.baseline_job_name, - dataset_format=DatasetFormat.csv(header=True), - baseline_dataset=self.baseline_dataset, - output_s3_uri=self.output_s3_uri, ), ) + # add args valid only for DataQuality or ModelQuality + if self.monitoring_type in ["DataQuality", "ModelQuality"]: + baseline_args["suggest_args"].update( + { + "dataset_format": DatasetFormat.csv(header=True), + "baseline_dataset": self.baseline_dataset, + "output_s3_uri": self.output_s3_uri, + } + ) + # add max_runtime_in_seconds if provided if self.max_runtime_in_seconds: baseline_args["class_args"].update({"max_runtime_in_seconds": self.max_runtime_in_seconds}) @@ -201,21 +249,29 @@ def _get_baseline_job_args( if self.monitoring_type == "ModelQuality": baseline_args = self._add_model_quality_args(baseline_args) + # add ModelBias args + if self.monitoring_type == "ModelBias": + baseline_args = self._add_model_bias_args(baseline_args) + + # add ModelQuality args + if self.monitoring_type == "ModelExplainability": + baseline_args = self._add_model_explainability_args(baseline_args) + return baseline_args @exception_handler def _add_model_quality_args( self, - baseline_args: Dict[str, Dict[str, str]], - ) -> Dict[str, Dict[str, str]]: + baseline_args: Dict[str, Dict[str, Any]], + ) -> Dict[str, Dict[str, Any]]: """ Adds ModelQuality's specific arguments to the passed baseline_args Args: - baseline_args (dict[str, dict[str, str]]): arguments to create the baseline job + baseline_args (dict[str, dict[str, Any]]): arguments to create the baseline job Returns: - dict[str, dict[str, str]]: The combined arguments to create the baseline job + dict[str, dict[str, Any]]: The combined arguments to create the baseline job """ # validate the problem_type if self.problem_type not in ["Regression", "BinaryClassification", "MulticlassClassification"]: @@ -264,9 +320,49 @@ def _add_model_quality_args( return baseline_args + @exception_handler + def _add_model_bias_args( + self, + baseline_args: Dict[str, Dict[str, Any]], + ) -> Dict[str, Dict[str, Any]]: + baseline_args["suggest_args"].update( + { + "data_config": self.data_config, + "bias_config": self.bias_config, + "model_config": self.model_config, + "model_predicted_label_config": self.model_predicted_label_config, + } + ) + + # add kms_key, if provided, to encrypt the user code file + if self.kms_key_arn: + baseline_args["suggest_args"].update({"kms_key": self.kms_key_arn}) + + return baseline_args + + @exception_handler + def _add_model_explainability_args( + self, + baseline_args: Dict[str, Dict[str, Any]], + ) -> Dict[str, Dict[str, Any]]: + baseline_args["suggest_args"].update( + { + "data_config": self.data_config, + "explainability_config": self.explainability_config, + "model_config": self.model_config, + "model_scores": self.model_scores, + } + ) + + # add kms_key, if provided, to encrypt the user code file + if self.kms_key_arn: + baseline_args["suggest_args"].update({"kms_key": self.kms_key_arn}) + + return baseline_args + @exception_handler def _create_data_quality_baseline( - self, data_quality_baseline_job_args: Dict[str, Dict[str, str]] + self, data_quality_baseline_job_args: Dict[str, Dict[str, Any]] ) -> sagemaker.processing.ProcessingJob: """ Creates SageMaker DataQuality baseline job @@ -281,20 +377,20 @@ def _create_data_quality_baseline( f"Creating DataQuality baseline job {data_quality_baseline_job_args['suggest_args']['job_name']} ..." ) - # create DefaultModelMonitor + # create DefaultModel Monitor data_quality_monitor = DefaultModelMonitor(**data_quality_baseline_job_args["class_args"]) # create the DataQuality baseline job - data_baseline_job = data_quality_monitor.suggest_baseline( + data_quality_baseline_job = data_quality_monitor.suggest_baseline( **data_quality_baseline_job_args["suggest_args"], ) - return data_baseline_job + return data_quality_baseline_job @exception_handler def _create_model_quality_baseline( self, - model_quality_baseline_job_args: Dict[str, Dict[str, str]], + model_quality_baseline_job_args: Dict[str, Dict[str, Any]], ) -> sagemaker.processing.ProcessingJob: """ Creates SageMaker ModelQuality baseline job @@ -309,16 +405,117 @@ def _create_model_quality_baseline( f"Creating ModelQuality baseline job {model_quality_baseline_job_args['suggest_args']['job_name']} ..." ) - # create ModelQualityMonitor + # create ModelQuality Monitor model_quality_monitor = ModelQualityMonitor(**model_quality_baseline_job_args["class_args"]) - # create the DataQuality baseline job - model_baseline_job = model_quality_monitor.suggest_baseline( + # create the ModelQuality baseline job + model_quality_baseline_job = model_quality_monitor.suggest_baseline( **model_quality_baseline_job_args["suggest_args"], ) - return model_baseline_job + return model_quality_baseline_job + + @exception_handler + def _create_model_bias_baseline( + self, + model_bias_baseline_job_args: Dict[str, Dict[str, Any]], + ) -> sagemaker.processing.ProcessingJob: + """ + Creates SageMaker ModelBias baseline job + + Args: + model_bias_baseline_job_config (dict[str, dict[str, Any]]): The ModelBias baseline job arguments + + Returns: + sagemaker.processing.ProcessingJob object + """ + logger.info(f"Creating ModelBias baseline job {model_bias_baseline_job_args['suggest_args']['job_name']} ...") + + # create ModelBias Monitor + model_bias_monitor = ModelBiasMonitor(**model_bias_baseline_job_args["class_args"]) + + # create the ModelBias baseline job + model_bias_baseline_job = model_bias_monitor.suggest_baseline( + **model_bias_baseline_job_args["suggest_args"], + ) + + return model_bias_baseline_job + + @exception_handler + def _create_model_explainability_baseline( + self, + model_explainability_baseline_job_args: Dict[str, Dict[str, Any]], + ) -> sagemaker.processing.ProcessingJob: + """ + Creates SageMaker ModelExplainability baseline job + + Args: + model_explainability_baseline_job_config (dict[str, dict[str, Any]]): The ModelExplainability baseline job arguments + + Returns: + sagemaker.processing.ProcessingJob object + """ + logger.info( + f"Creating ModelExplainability baseline job {model_explainability_baseline_job_args['suggest_args']['job_name']} ..." + ) + + # create ModelExplainability Monitor + model_explainability_monitor = ModelExplainabilityMonitor( + **model_explainability_baseline_job_args["class_args"] + ) + + # create the ModelExplainability baseline job + model_explainability_baseline_job = model_explainability_monitor.suggest_baseline( + **model_explainability_baseline_job_args["suggest_args"], + ) + + return model_explainability_baseline_job def _is_valid_argument_value(self, value: str) -> bool: # validate the argument's value is not None or empty string return True if value else False + + @staticmethod + @exception_handler + def get_model_name(endpoint_name: str, sm_client: BaseClient) -> str: + """ + Gets Baselines from Model Registry and Model Name from the deployed endpoint + + Args: + endpoint_name (str): SageMaker Endpoint name to be monitored + sm_client (Boto3 SageMaker client): Amazon SageMaker boto3 client + + Returns: + str: SageMaker model name + """ + # get the EndpointConfigName using the Endpoint Name + endpoint_config_name = sm_client.describe_endpoint(EndpointName=endpoint_name)["EndpointConfigName"] + + # get the ModelName using EndpointConfigName + model_name = sm_client.describe_endpoint_config(EndpointConfigName=endpoint_config_name)["ProductionVariants"][ + 0 + ]["ModelName"] + + return model_name + + @staticmethod + @exception_handler + def get_baseline_dataset_header(bucket_name: str, file_key: str, s3_client: BaseClient) -> List[str]: + """ + Get the baseline dataset's header (columns names) from teh baseline dataset csv + file stored in the S3 Assets bucket + + Args: + bucket_name (str): the bucket name where the json config file is stored + file_key (str): baseline dataset csv file S3 key + s3_client (BaseClient): S3 boto3 client + + Returns: + header [column names] as List[str] + """ + # read the baseline dataset csv file from S3 + dataset = s3_client.get_object(Bucket=bucket_name, Key=file_key)["Body"].read().decode("utf-8") + # Extract features names (row 1). Note the label is expected to be the first column + header = dataset.split("\n")[0].split(",") + + return header diff --git a/source/lib/blueprints/byom/lambdas/create_baseline_job/main.py b/source/lib/blueprints/byom/lambdas/create_baseline_job/main.py index 80fb55d..1550111 100644 --- a/source/lib/blueprints/byom/lambdas/create_baseline_job/main.py +++ b/source/lib/blueprints/byom/lambdas/create_baseline_job/main.py @@ -11,12 +11,24 @@ # and limitations under the License. # # ##################################################################################################################### import os +import json import sagemaker +from sagemaker.clarify import ( + DataConfig, + BiasConfig, + ModelConfig, + ModelPredictedLabelConfig, + SHAPConfig, +) + from shared.logger import get_logger +from shared.helper import get_client from baselines_helper import SolutionSageMakerBaselines, exception_handler logger = get_logger(__name__) +s3_client = get_client("s3") +sm_client = get_client("sagemaker") sagemaker_session = sagemaker.session.Session() @@ -26,20 +38,59 @@ def handler(event, context): assets_bucket = os.environ["ASSETS_BUCKET"] monitoring_type = os.environ.get("MONITORING_TYPE") baseline_job_name = os.environ["BASELINE_JOB_NAME"] + instance_type = os.environ.get("INSTANCE_TYPE", "ml.m5.large") + instance_count = int(os.environ.get("INSTANCE_COUNT", "1")) max_runtime_seconds = os.environ.get("MAX_RUNTIME_SECONDS") + baseline_dataset_file_key = os.environ["BASELINE_DATA_LOCATION"] + baseline_data_s3_uri = f"s3://{assets_bucket}/{baseline_dataset_file_key}" + baseline_output_s3_uri = f"s3://{os.environ['BASELINE_JOB_OUTPUT_LOCATION']}" + endpoint_name = os.getenv("ENDPOINT_NAME") + # used only for ModelBias/Explanability + # model_predicted_label_config is optional for regression + raw_model_predicted_label_config = os.getenv("MODEL_PREDICTED_LABEL_CONFIG") + model_predicted_label_config = ( + json.loads(raw_model_predicted_label_config) + if raw_model_predicted_label_config + # set default for regression problem + else dict(label=None, probability=None, probability_threshold=None, label_headers=None) + ) + # bias_config required for ModelBias + bias_config = json.loads(os.getenv("BIAS_CONFIG", "{}")) + # shap_config required for ModelExplainability + shap_config = json.loads(os.getenv("SHAP_CONFIG", "{}")) + + # check if the baseline is a string (a file key is provided not a list of lists to calculate the baseline) + # the baseline file is expected to be in the Assets Bucket + baseline = shap_config.get("baseline") + # add assets bucket if a file key is provided + if isinstance(baseline, str): + shap_config["baseline"] = f"s3://{assets_bucket}/{baseline}" + + # use model scores if provided + model_scores = json.loads(os.getenv("MODEL_SCORES")) if os.getenv("MODEL_SCORES") else None logger.info(f"Creating {monitoring_type} baseline processing job {baseline_job_name} ...") + # get config file contents if the baseline to be created is ModelBias|ModelExplainability + # the config file should be uploaded to the Solution's Assets S3 bucket + # details on the contents for expected ModelBias|ModelExplainability config files are provided in the + # SolutionSageMakerBaselines.get_baseline_config_file function's docs + header = None + if monitoring_type in ["ModelBias", "ModelExplainability"]: + header = SolutionSageMakerBaselines.get_baseline_dataset_header( + bucket_name=assets_bucket, file_key=baseline_dataset_file_key, s3_client=s3_client + ) + # create a SageMakerBaselines instance sagemaker_baseline = SolutionSageMakerBaselines( monitoring_type=os.environ.get("MONITORING_TYPE"), - instance_type=os.environ.get("INSTANCE_TYPE", "ml.m5.large"), - instance_count=int(os.environ.get("INSTANCE_COUNT", "1")), + instance_type=instance_type, + instance_count=instance_count, instance_volume_size=int(os.environ.get("INSTANCE_VOLUME_SIZE", "30")), role_arn=os.environ["ROLE_ARN"], baseline_job_name=os.environ["BASELINE_JOB_NAME"], - baseline_dataset=f"s3://{assets_bucket}/{os.environ['BASELINE_DATA_LOCATION']}", - output_s3_uri=f"s3://{os.environ['BASELINE_JOB_OUTPUT_LOCATION']}", + baseline_dataset=baseline_data_s3_uri, + output_s3_uri=baseline_output_s3_uri, max_runtime_in_seconds=int(max_runtime_seconds) if max_runtime_seconds else None, kms_key_arn=os.environ.get("KMS_KEY_ARN"), problem_type=os.environ.get("PROBLEM_TYPE"), @@ -48,6 +99,29 @@ def handler(event, context): probability_attribute=os.environ.get("PROBABILITY_ATTRIBUTE"), probability_threshold_attribute=os.environ.get("PROBABILITY_THRESHOLD_ATTRIBUTE"), sagemaker_session=sagemaker_session, + data_config=DataConfig( + s3_data_input_path=baseline_data_s3_uri, + s3_output_path=baseline_output_s3_uri, + label=header[0], # the label is expected to be the first column in the baseline dataset + headers=header, + dataset_type="text/csv", + ) + if monitoring_type in ["ModelBias", "ModelExplainability"] + else None, + bias_config=BiasConfig(**bias_config) if monitoring_type == "ModelBias" else None, + model_config=ModelConfig( + model_name=SolutionSageMakerBaselines.get_model_name(endpoint_name, sm_client), + instance_type=instance_type, + instance_count=instance_count, + accept_type="text/csv", + ) + if monitoring_type in ["ModelBias", "ModelExplainability"] + else None, + model_predicted_label_config=ModelPredictedLabelConfig(**model_predicted_label_config) + if monitoring_type == "ModelBias" + else None, + explainability_config=SHAPConfig(**shap_config) if monitoring_type == "ModelExplainability" else None, + model_scores=model_scores if monitoring_type == "ModelExplainability" else None, tags=[{"Key": "stack_name", "Value": os.environ["STACK_NAME"]}], ) diff --git a/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/fixtures/baseline_fixtures.py b/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/fixtures/baseline_fixtures.py index b035a6f..af029a3 100644 --- a/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/fixtures/baseline_fixtures.py +++ b/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/fixtures/baseline_fixtures.py @@ -11,10 +11,18 @@ # and limitations under the License. # # ##################################################################################################################### import os +import json import pytest from baselines_helper import SolutionSageMakerBaselines from sagemaker.model_monitor.dataset_format import DatasetFormat import sagemaker +from sagemaker.clarify import ( + DataConfig, + BiasConfig, + ModelConfig, + ModelPredictedLabelConfig, + SHAPConfig, +) # create sagemaker session sagemaker_session = sagemaker.session.Session() @@ -29,11 +37,21 @@ def mock_basic_data_quality_env(): "SAGEMAKER_ENDPOINT_NAME": "Sagemaker-test-endpoint", "BASELINE_DATA_LOCATION": "baseline_data.csv", "BASELINE_JOB_OUTPUT_LOCATION": "s3://testbucket/baseline_output", - "INSTANCE_TYPE": "ml.m5.4xlarge", + "INSTANCE_TYPE": "ml.m5.large", "INSTANCE_VOLUME_SIZE": "20", "ROLE_ARN": "arn:aws:iam::account:role/myrole", "STACK_NAME": "test_stack", "LOG_LEVEL": "INFO", + "ENDPOINT_NAME": "My-endpoint", + "MODEL_PREDICTED_LABEL_CONFIG": json.dumps(dict(probability=0)), + "BIAS_CONFIG": json.dumps( + dict( + label_values_or_threshold=[1], + facet_name="age", + facet_values_or_threshold=[40], + group_name="personal_status_sex", + ) + ), } return data_quality_env @@ -69,12 +87,93 @@ def mock_model_quality_env_with_optional_vars(mock_data_quality_env_with_optiona return model_quality_env +@pytest.fixture +def mock_model_bias_env_with_optional_vars(mock_data_quality_env_with_optional_vars): + model_bias_env = mock_data_quality_env_with_optional_vars.copy() + model_bias_env.update( + { + "MONITORING_TYPE": "ModelBias", + "BIAS_CONFIG": json.dumps( + {"label_values_or_threshold": [1], "facet_name": "Account Length", "facet_values_or_threshold": [100]} + ), + } + ) + + return model_bias_env + + +@pytest.fixture +def mock_model_explainability_env_with_optional_vars(mock_data_quality_env_with_optional_vars): + model_explainability_env = mock_data_quality_env_with_optional_vars.copy() + model_explainability_env.update( + { + "MONITORING_TYPE": "ModelExplainability", + "SHAP_CONFIG": json.dumps( + {"baseline": "shap-baseline-data.csv", "num_samples": 500, "agg_method": "mean_abs"} + ), + } + ) + + return model_explainability_env + + +@pytest.fixture +def mocked_data_config(): + return DataConfig(s3_data_input_path="s3://test-bucket/data.csv", s3_output_path="s3://test-bucket/baseline_output") + + +@pytest.fixture +def mocked_model_config(monkeypatch, mock_model_quality_env_with_optional_vars): + monkeypatch.setattr(os, "environ", mock_model_quality_env_with_optional_vars) + return ModelConfig(model_name="sagemaker-model-1", instance_count=1, instance_type=os.environ["INSTANCE_TYPE"]) + + +@pytest.fixture +def mocked_bias_config(): + return BiasConfig(label_values_or_threshold=[1], facet_name="age") + + +@pytest.fixture +def mocked_model_label_config(): + return ModelPredictedLabelConfig(probability_threshold=0.8) + + +@pytest.fixture +def mocked_baseline_dataset_header(monkeypatch, mock_model_quality_env_with_optional_vars): + monkeypatch.setattr(os, "environ", mock_model_quality_env_with_optional_vars) + return ["label", "feature_1", "feature_2", "feature_3", "feature_4"] + + +@pytest.fixture +def mocked_shap_config(): + return SHAPConfig( + baseline=[ + [ + 0.26124998927116394, + 0.2824999988079071, + 0.06875000149011612, + 0.38749998807907104, + 20.6512508392334, + ] + ], + num_samples=100, + agg_method="mean_abs", + ) + + @pytest.fixture def mocked_sagemaker_baseline_attributes( monkeypatch, mock_basic_data_quality_env, mock_data_quality_env_with_optional_vars, mock_model_quality_env_with_optional_vars, + mock_model_bias_env_with_optional_vars, + mock_model_explainability_env_with_optional_vars, + mocked_data_config, + mocked_bias_config, + mocked_model_config, + mocked_model_label_config, + mocked_shap_config, ): def _mocked_sagemaker_baseline_attributes(monitoring_type, with_optional=False): # set the env variables based on monitoring_type, with_optional @@ -83,8 +182,12 @@ def _mocked_sagemaker_baseline_attributes(monitoring_type, with_optional=False): envs = mock_data_quality_env_with_optional_vars else: envs = mock_basic_data_quality_env - else: + elif monitoring_type == "ModelQuality": envs = mock_model_quality_env_with_optional_vars + elif monitoring_type == "ModelBias": + envs = mock_model_bias_env_with_optional_vars + else: + envs = mock_model_explainability_env_with_optional_vars monkeypatch.setattr(os, "environ", envs) max_runtime_seconds = os.environ.get("MAX_RUNTIME_SECONDS") @@ -106,6 +209,12 @@ def _mocked_sagemaker_baseline_attributes(monitoring_type, with_optional=False): "probability_attribute": os.environ.get("PROBABILITY_ATTRIBUTE"), "probability_threshold_attribute": os.environ.get("PROBABILITY_THRESHOLD_ATTRIBUTE"), "sagemaker_session": sagemaker_session, + "data_config": mocked_data_config if monitoring_type in ["ModelBias", "ModelExplainability"] else None, + "bias_config": mocked_bias_config if monitoring_type == "ModelBias" else None, + "model_config": mocked_model_config if monitoring_type in ["ModelBias", "ModelExplainability"] else None, + "model_predicted_label_config": mocked_model_label_config if monitoring_type == "ModelBias" else None, + "explainability_config": mocked_shap_config if monitoring_type == "ModelExplainability" else None, + "model_scores": None, "tags": [{"Key": "stack_name", "Value": os.environ["STACK_NAME"]}], } @@ -121,7 +230,14 @@ def _mocked_sagemaker_baselines_instance(monitoring_type, with_optional=True): @pytest.fixture -def mocked_expected_baseline_args(mocked_sagemaker_baselines_instance): +def mocked_expected_baseline_args( + mocked_sagemaker_baselines_instance, + mocked_data_config, + mocked_model_config, + mocked_bias_config, + mocked_model_label_config, + mocked_shap_config, +): def _mocked_expected_baseline_args(monitoring_type): sagemaker_baselines_instance = mocked_sagemaker_baselines_instance(monitoring_type) baseline_args = dict( @@ -140,11 +256,17 @@ def _mocked_expected_baseline_args(monitoring_type): # args passed to the Monitor class's suggest_baseline function suggest_args=dict( job_name=sagemaker_baselines_instance.baseline_job_name, - dataset_format=DatasetFormat.csv(header=True), - baseline_dataset=sagemaker_baselines_instance.baseline_dataset, - output_s3_uri=sagemaker_baselines_instance.output_s3_uri, ), ) + # add args valid only for DataQuality or ModelQuality + if monitoring_type in ["DataQuality", "ModelQuality"]: + baseline_args["suggest_args"].update( + { + "dataset_format": DatasetFormat.csv(header=True), + "baseline_dataset": sagemaker_baselines_instance.baseline_dataset, + "output_s3_uri": sagemaker_baselines_instance.output_s3_uri, + } + ) # add ModelQuality if monitoring_type == "ModelQuality": @@ -163,6 +285,30 @@ def _mocked_expected_baseline_args(monitoring_type): baseline_args["suggest_args"].update( {"ground_truth_attribute": sagemaker_baselines_instance.ground_truth_attribute} ) + + # add ModelBias args + if monitoring_type == "ModelBias": + baseline_args["suggest_args"].update( + { + "data_config": mocked_data_config, + "bias_config": mocked_bias_config, + "model_config": mocked_model_config, + "model_predicted_label_config": mocked_model_label_config, + "kms_key": sagemaker_baselines_instance.kms_key_arn, + } + ) + + # add ModelBias args + if monitoring_type == "ModelExplainability": + baseline_args["suggest_args"].update( + { + "data_config": mocked_data_config, + "explainability_config": mocked_shap_config, + "model_config": mocked_model_config, + "model_scores": None, + "kms_key": sagemaker_baselines_instance.kms_key_arn, + } + ) return baseline_args return _mocked_expected_baseline_args diff --git a/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/test_create_data_baseline.py b/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/test_create_data_baseline.py index 764115d..a367a4e 100644 --- a/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/test_create_data_baseline.py +++ b/source/lib/blueprints/byom/lambdas/create_baseline_job/tests/test_create_data_baseline.py @@ -13,6 +13,10 @@ from unittest.mock import patch from unittest import TestCase import pytest +import json +import boto3 +from botocore.response import StreamingBody +from io import BytesIO import os from main import handler from tests.fixtures.baseline_fixtures import ( @@ -22,6 +26,14 @@ mocked_sagemaker_baseline_attributes, mocked_sagemaker_baselines_instance, mocked_expected_baseline_args, + mock_model_bias_env_with_optional_vars, + mock_model_explainability_env_with_optional_vars, + mocked_data_config, + mocked_bias_config, + mocked_model_config, + mocked_model_label_config, + mocked_shap_config, + mocked_baseline_dataset_header, event, ) from baselines_helper import SolutionSageMakerBaselines @@ -59,7 +71,7 @@ def test_init(mocked_sagemaker_baseline_attributes): with pytest.raises(ValueError) as error: SolutionSageMakerBaselines(**mocked_sagemaker_baseline_attributes("NotSupported")) assert str(error.value) == ( - "The provided monitoring type: NotSupported is not valid. It must be 'DataQuality'|'ModelQuality'" + "The provided monitoring type: NotSupported is not valid. It must be 'DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability'" ) @@ -99,6 +111,16 @@ def test_get_baseline_job_args( assert baseline_args["suggest_args"].get("probability_attribute") is None assert baseline_args["suggest_args"].get("probability_threshold_attribute") is None + # assert the returned baseline args for ModelBias baseline + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelBias") + TestCase().assertDictEqual(sagemaker_baselines._get_baseline_job_args(), mocked_expected_baseline_args("ModelBias")) + + # assert the returned baseline args for ModelExplainability baseline + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelExplainability") + TestCase().assertDictEqual( + sagemaker_baselines._get_baseline_job_args(), mocked_expected_baseline_args("ModelExplainability") + ) + def test_get_baseline_job_args_exceptions(mocked_sagemaker_baseline_attributes): # test exception if baseline_dataset is not provided @@ -158,15 +180,37 @@ def test_get_baseline_job_args_exceptions(mocked_sagemaker_baseline_attributes): assert str(error.value) == "GroundTruthAttribute must be provided" +@patch("baselines_helper.SolutionSageMakerBaselines._create_model_explainability_baseline") +@patch("baselines_helper.SolutionSageMakerBaselines._create_model_bias_baseline") @patch("baselines_helper.SolutionSageMakerBaselines._create_model_quality_baseline") @patch("baselines_helper.SolutionSageMakerBaselines._create_data_quality_baseline") def test_create_baseline_job( - mocked_create_data_quality_baseline, mocked_create_model_quality_baseline, mocked_sagemaker_baselines_instance + mocked_create_data_quality_baseline, + mocked_create_model_quality_baseline, + mocked_create_model_bias_baseline, + mocked_create_model_explainability_baseline, + mocked_sagemaker_baselines_instance, ): + # DataQuality baseline sagemaker_baselines = mocked_sagemaker_baselines_instance("DataQuality") sagemaker_baselines.create_baseline_job() baseline_args = sagemaker_baselines._get_baseline_job_args() mocked_create_data_quality_baseline.assert_called_with(baseline_args) + # ModelQuality baseline + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelQuality") + sagemaker_baselines.create_baseline_job() + baseline_args = sagemaker_baselines._get_baseline_job_args() + mocked_create_model_quality_baseline.assert_called_with(baseline_args) + # ModelBias baseline + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelBias") + sagemaker_baselines.create_baseline_job() + baseline_args = sagemaker_baselines._get_baseline_job_args() + mocked_create_model_bias_baseline.assert_called_with(baseline_args) + # ModelExplainability baseline + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelExplainability") + sagemaker_baselines.create_baseline_job() + baseline_args = sagemaker_baselines._get_baseline_job_args() + mocked_create_model_explainability_baseline.assert_called_with(baseline_args) @patch("baselines_helper.DefaultModelMonitor.suggest_baseline") @@ -185,10 +229,72 @@ def test_create_model_quality_baseline(mocked_model_monitor_suggest_baseline, mo mocked_model_monitor_suggest_baseline.assert_called_with(**expected_baseline_args["suggest_args"]) +@patch("baselines_helper.ModelBiasMonitor.suggest_baseline") +def test_create_model_bias_baseline(mocked_model_bias_suggest_baseline, mocked_sagemaker_baselines_instance): + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelBias") + expected_baseline_args = sagemaker_baselines._get_baseline_job_args() + sagemaker_baselines._create_model_bias_baseline(expected_baseline_args) + mocked_model_bias_suggest_baseline.assert_called_with(**expected_baseline_args["suggest_args"]) + + +@patch("baselines_helper.ModelExplainabilityMonitor.suggest_baseline") +def test_create_model_explainability_baseline( + mocked_model_explainability_suggest_baseline, mocked_sagemaker_baselines_instance +): + sagemaker_baselines = mocked_sagemaker_baselines_instance("ModelExplainability") + expected_baseline_args = sagemaker_baselines._get_baseline_job_args() + sagemaker_baselines._create_model_explainability_baseline(expected_baseline_args) + mocked_model_explainability_suggest_baseline.assert_called_with(**expected_baseline_args["suggest_args"]) + + +def test_get_baseline_dataset_header(mocked_baseline_dataset_header): + with patch("boto3.client") as patched_client: + s3_client = boto3.client("s3", region_name="us-east-1") + # create config file content + sample_content = "label,feature_1,feature_2,feature_3,feature_4\n1,19,3,9,15" + encoded_config_file_body = sample_content.encode("utf-8") + # s3_client return StreamingBody, create one to be used for return value + body = StreamingBody(BytesIO(encoded_config_file_body), len(encoded_config_file_body)) + # set the return value for the mocked s3 client get_object + patched_client().get_object.return_value = {"Body": body} + # get the confog file + returned_config_file = SolutionSageMakerBaselines.get_baseline_dataset_header( + bucket_name="test-bucket", file_key="dataset.csv", s3_client=s3_client + ) + # assert the returned config file equals the expected content + TestCase().assertListEqual(returned_config_file, mocked_baseline_dataset_header) + + +def test_get_model_name(): + with patch("boto3.client") as patched_client: + sm_client = boto3.client("sagemaker", region_name="us-east-1") + endpoint_name = "test-endpoint" + model_name = "test-model" + # set the return value for the mocked SageMaker client describe_endpoint + patched_client().describe_endpoint.return_value = {"EndpointConfigName": "endpoint-config-name"} + # set the return value for the mocked SageMaker client client describe_endpoint_config + patched_client().describe_endpoint_config.return_value = {"ProductionVariants": [{"ModelName": model_name}]} + # assert the returned model name is as expected + TestCase().assertEqual( + SolutionSageMakerBaselines.get_model_name(endpoint_name=endpoint_name, sm_client=sm_client), model_name + ) + + +@patch("baselines_helper.SolutionSageMakerBaselines.get_model_name") +@patch("baselines_helper.SolutionSageMakerBaselines.get_baseline_dataset_header") @patch("baselines_helper.SolutionSageMakerBaselines.create_baseline_job") -def test_handler(mocked_create_baseline_job, event, mocked_sagemaker_baseline_attributes): +def test_handler( + mocked_create_baseline_job, + mocked_get_baseline_dataset_header, + mocked_get_model_name, + event, + mocked_sagemaker_baseline_attributes, + mocked_baseline_dataset_header, +): + mocked_get_baseline_dataset_header.return_value = mocked_baseline_dataset_header + mocked_get_model_name.return_value = "MyModel" # set the environment variables - mocked_sagemaker_baseline_attributes("ModelQuality") + mocked_sagemaker_baseline_attributes("ModelExplainability") # calling the handler function should create the SolutionSageMakerBaselines object # and call the create_baseline_job function handler(event, {}) diff --git a/source/lib/blueprints/byom/model_monitor.py b/source/lib/blueprints/byom/model_monitor.py index 159c3e2..0497ca6 100644 --- a/source/lib/blueprints/byom/model_monitor.py +++ b/source/lib/blueprints/byom/model_monitor.py @@ -33,11 +33,11 @@ def __init__(self, scope: core.Construct, id: str, monitoring_type: str, **kwarg super().__init__(scope, id, **kwargs) # validate the provided monitoring_type - if monitoring_type not in ["DataQuality", "ModelQuality"]: + if monitoring_type not in ["DataQuality", "ModelQuality", "ModelBias", "ModelExplainability"]: raise ValueError( ( - f"The {monitoring_type} is not valid. Currently supported Monitoring Types are: " - f"['DataQuality'|'ModelQuality']" + f"The {monitoring_type} is not valid. Supported Monitoring Types are: " + f"'DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability'" ) ) @@ -46,66 +46,121 @@ def __init__(self, scope: core.Construct, id: str, monitoring_type: str, **kwarg self.monitor_attributes = dict() # Parameteres # - blueprint_bucket_name = pf.create_blueprint_bucket_name_parameter(self) - assets_bucket_name = pf.create_assets_bucket_name_parameter(self) - endpoint_name = pf.create_endpoint_name_parameter(self) - baseline_job_output_location = pf.create_baseline_job_output_location_parameter(self) - baseline_data = pf.create_baseline_data_parameter(self) - instance_type = pf.create_instance_type_parameter(self) - instance_count = pf.create_instance_count_parameter(self) - instance_volume_size = pf.create_instance_volume_size_parameter(self) - baseline_max_runtime_seconds = pf.create_baseline_max_runtime_seconds_parameter(self) - monitor_max_runtime_seconds = pf.create_monitor_max_runtime_seconds_parameter(self, "ModelQuality") - kms_key_arn = pf.create_kms_key_arn_parameter(self) - baseline_job_name = pf.create_baseline_job_name_parameter(self) - monitoring_schedule_name = pf.create_monitoring_schedule_name_parameter(self) - data_capture_bucket = pf.create_data_capture_bucket_name_parameter(self) - baseline_output_bucket = pf.create_baseline_output_bucket_name_parameter(self) - data_capture_s3_location = pf.create_data_capture_location_parameter(self) - monitoring_output_location = pf.create_monitoring_output_location_parameter(self) - schedule_expression = pf.create_schedule_expression_parameter(self) - image_uri = pf.create_algorithm_image_uri_parameter(self) + self.monitoring_type = monitoring_type + self.blueprint_bucket_name = pf.create_blueprint_bucket_name_parameter(self) + self.assets_bucket_name = pf.create_assets_bucket_name_parameter(self) + self.endpoint_name = pf.create_endpoint_name_parameter(self) + self.baseline_job_output_location = pf.create_baseline_job_output_location_parameter(self) + self.baseline_data = pf.create_baseline_data_parameter(self) + self.instance_type = pf.create_instance_type_parameter(self) + self.instance_count = pf.create_instance_count_parameter(self) + self.instance_volume_size = pf.create_instance_volume_size_parameter(self) + self.baseline_max_runtime_seconds = pf.create_baseline_max_runtime_seconds_parameter(self) + self.monitor_max_runtime_seconds = pf.create_monitor_max_runtime_seconds_parameter(self, "ModelQuality") + self.kms_key_arn = pf.create_kms_key_arn_parameter(self) + self.baseline_job_name = pf.create_baseline_job_name_parameter(self) + self.monitoring_schedule_name = pf.create_monitoring_schedule_name_parameter(self) + self.data_capture_bucket = pf.create_data_capture_bucket_name_parameter(self) + self.baseline_output_bucket = pf.create_baseline_output_bucket_name_parameter(self) + self.data_capture_s3_location = pf.create_data_capture_location_parameter(self) + self.monitoring_output_location = pf.create_monitoring_output_location_parameter(self) + self.schedule_expression = pf.create_schedule_expression_parameter(self) + self.image_uri = pf.create_algorithm_image_uri_parameter(self) - # add ModelQuality specific parameters/conditions, and update self.baseline_attributes/self.monitor_attributes - if monitoring_type == "ModelQuality": - self._add_model_quality_resources() - - # conditions - kms_key_arn_provided = cf.create_kms_key_arn_provided_condition(self, kms_key_arn) + # common conditions + self.kms_key_arn_provided = cf.create_kms_key_arn_provided_condition(self, self.kms_key_arn) # Resources # - assets_bucket = s3.Bucket.from_bucket_name(self, "ImportedAssetsBucket", assets_bucket_name.value_as_string) + self.assets_bucket = s3.Bucket.from_bucket_name( + self, "ImportedAssetsBucket", self.assets_bucket_name.value_as_string + ) # getting blueprint bucket object from its name - will be used later in the stack - blueprint_bucket = s3.Bucket.from_bucket_name( - self, "ImportedBlueprintBucket", blueprint_bucket_name.value_as_string + self.blueprint_bucket = s3.Bucket.from_bucket_name( + self, "ImportedBlueprintBucket", self.blueprint_bucket_name.value_as_string ) - # create sagemaker layer - sm_layer = sagemaker_layer(self, blueprint_bucket) + # update common Baseline attributes + self._update_common_baseline_attributes() + + # add ModelQuality specific parameters/conditions, and update self.baseline_attributes/self.monitor_attributes + if self.monitoring_type in ["ModelQuality", "ModelBias", "ModelExplainability"]: + self._add_model_quality_resources() + + # add extra ModelBias/ModelExplainability + if self.monitoring_type in ["ModelBias", "ModelExplainability"]: + self._add_model_bias_explainability_extra_attributes() + + # create custom resource to invoke the baseline job lambda + invoke_lambda_custom_resource = self._create_invoke_lambda_custom_resource() + + # creating SageMaker monitor role + self.sagemaker_role = self._create_sagemaker_monitor_role() + + # update attributes + self._update_common_monitor_attributes() - # update Baseline attributes + # create SageMaker monitoring Schedule + sagemaker_monitor = SageMakerModelMonitor(self, f"{monitoring_type}Monitor", **self.monitor_attributes) + + # add job definition dependency on sagemaker role and invoke_lambda_custom_resource + # (so, the baseline job is created) + sagemaker_monitor.job_definition.node.add_dependency(self.sagemaker_role) + sagemaker_monitor.job_definition.node.add_dependency(invoke_lambda_custom_resource) + + # Outputs # + self._create_stack_outputs() + + def _update_common_baseline_attributes(self): self.baseline_attributes.update( dict( - monitoring_type=monitoring_type, - baseline_job_name=baseline_job_name.value_as_string, - baseline_data_location=baseline_data.value_as_string, - baseline_job_output_location=baseline_job_output_location.value_as_string, - endpoint_name=endpoint_name.value_as_string, - instance_type=instance_type.value_as_string, - instance_volume_size=instance_volume_size.value_as_string, - max_runtime_seconds=baseline_max_runtime_seconds.value_as_string, + monitoring_type=self.monitoring_type, + baseline_job_name=self.baseline_job_name.value_as_string, + baseline_data_location=self.baseline_data.value_as_string, + baseline_output_bucket=self.baseline_output_bucket.value_as_string, + baseline_job_output_location=self.baseline_job_output_location.value_as_string, + endpoint_name=self.endpoint_name.value_as_string, + instance_type=self.instance_type.value_as_string, + instance_volume_size=self.instance_volume_size.value_as_string, + max_runtime_seconds=self.baseline_max_runtime_seconds.value_as_string, kms_key_arn=core.Fn.condition_if( - kms_key_arn_provided.logical_id, kms_key_arn.value_as_string, core.Aws.NO_VALUE + self.kms_key_arn_provided.logical_id, self.kms_key_arn.value_as_string, core.Aws.NO_VALUE ).to_string(), - kms_key_arn_provided_condition=kms_key_arn_provided, + kms_key_arn_provided_condition=self.kms_key_arn_provided, stack_name=core.Aws.STACK_NAME, ) ) + + def _update_common_monitor_attributes(self): + self.monitor_attributes.update( + dict( + monitoring_schedule_name=self.monitoring_schedule_name.value_as_string, + endpoint_name=self.endpoint_name.value_as_string, + baseline_job_output_location=f"s3://{self.baseline_job_output_location.value_as_string}", + schedule_expression=self.schedule_expression.value_as_string, + monitoring_output_location=f"s3://{self.monitoring_output_location.value_as_string}", + instance_type=self.instance_type.value_as_string, + instance_count=self.instance_count.value_as_string, + instance_volume_size=self.instance_volume_size.value_as_string, + max_runtime_seconds=self.monitor_max_runtime_seconds.value_as_string, + kms_key_arn=core.Fn.condition_if( + self.kms_key_arn_provided.logical_id, self.kms_key_arn.value_as_string, core.Aws.NO_VALUE + ).to_string(), + role_arn=self.sagemaker_role.role_arn, + image_uri=self.image_uri.value_as_string, + monitoring_type=self.monitoring_type, + tags=[{"key": "stack-name", "value": core.Aws.STACK_NAME}], + ) + ) + + def _create_invoke_lambda_custom_resource(self): + # create sagemaker layer + sm_layer = sagemaker_layer(self, self.blueprint_bucket) + # create baseline job lambda action baseline_job_lambda = create_baseline_job_lambda( self, - blueprint_bucket=blueprint_bucket, - assets_bucket=assets_bucket, + blueprint_bucket=self.blueprint_bucket, + assets_bucket=self.assets_bucket, sm_layer=sm_layer, **self.baseline_attributes, ) @@ -118,13 +173,13 @@ def __init__(self, scope: core.Construct, id: str, monitoring_type: str, **kwarg id="InvokeBaselineLambda", lambda_function_arn=baseline_job_lambda.function_arn, lambda_function_name=baseline_job_lambda.function_name, - blueprint_bucket=blueprint_bucket, + blueprint_bucket=self.blueprint_bucket, # add baseline attributes to the invoke lambda custom resource, so any change to these attributes # (via template update) will re-invoke the baseline lambda and re-calculate the baseline custom_resource_properties={ "Resource": "InvokeLambda", "function_name": baseline_job_lambda.function_name, - "assets_bucket_name": assets_bucket_name.value_as_string, + "assets_bucket_name": self.assets_bucket_name.value_as_string, **self.baseline_attributes, }, ) @@ -132,158 +187,207 @@ def __init__(self, scope: core.Construct, id: str, monitoring_type: str, **kwarg # add dependency on baseline lambda invoke_lambda_custom_resource.node.add_dependency(baseline_job_lambda) - # creating monitoring schedule - sagemaker_role = create_sagemaker_monitor_role( + return invoke_lambda_custom_resource + + def _create_sagemaker_monitor_role(self): + return create_sagemaker_monitor_role( scope=self, id="MLOpsSagemakerMonitorRole", - kms_key_arn=kms_key_arn.value_as_string, - assets_bucket_name=assets_bucket_name.value_as_string, - data_capture_bucket=data_capture_bucket.value_as_string, - data_capture_s3_location=data_capture_s3_location.value_as_string, - baseline_output_bucket=baseline_output_bucket.value_as_string, - baseline_job_output_location=baseline_job_output_location.value_as_string, - output_s3_location=monitoring_output_location.value_as_string, - kms_key_arn_provided_condition=kms_key_arn_provided, - baseline_job_name=baseline_job_name.value_as_string, - monitoring_schedule_name=monitoring_schedule_name.value_as_string, - endpoint_name=endpoint_name.value_as_string, - model_monitor_ground_truth_input=None - if monitoring_type == "DataQuality" - else self.monitor_attributes["ground_truth_s3_uri"], + kms_key_arn=self.kms_key_arn.value_as_string, + assets_bucket_name=self.assets_bucket_name.value_as_string, + data_capture_bucket=self.data_capture_bucket.value_as_string, + data_capture_s3_location=self.data_capture_s3_location.value_as_string, + baseline_output_bucket=self.baseline_output_bucket.value_as_string, + baseline_job_output_location=self.baseline_job_output_location.value_as_string, + output_s3_location=self.monitoring_output_location.value_as_string, + kms_key_arn_provided_condition=self.kms_key_arn_provided, + baseline_job_name=self.baseline_job_name.value_as_string, + monitoring_schedule_name=self.monitoring_schedule_name.value_as_string, + endpoint_name=self.endpoint_name.value_as_string, + model_monitor_ground_truth_bucket=self.ground_truth_s3_bucket.value_as_string + if self.monitoring_type in ["ModelQuality", "ModelBias"] + else None, + model_monitor_ground_truth_input=self.ground_truth_s3_uri.value_as_string + if self.monitoring_type in ["ModelQuality", "ModelBias"] + else None, + monitoring_type=self.monitoring_type, ) - # resource tags - resource_tags = [{"key": "stack-name", "value": core.Aws.STACK_NAME}] + def _add_model_quality_resources(self): + """ + Adds ModelQuality specific parameters/conditions and updates + self.baseline_attributes/self.monitor_attributes. Most of these attributes are reused + by ModelBias and ModelExplainability monitors + """ + # add baseline job attributes (they are different from Monitor attributes) + if self.monitoring_type == "ModelQuality": + self.baseline_inference_attribute = pf.create_inference_attribute_parameter(self, "Baseline") + self.baseline_probability_attribute = pf.create_probability_attribute_parameter(self, "Baseline") + self.ground_truth_attribute = pf.create_ground_truth_attribute_parameter(self) + # add ModelQuality Baseline attributes + self.baseline_attributes.update( + dict( + ground_truth_attribute=self.ground_truth_attribute.value_as_string, + inference_attribute=self.baseline_inference_attribute.value_as_string, + probability_attribute=self.baseline_probability_attribute.value_as_string, + ) + ) + # add monitor attributes + self.monitor_inference_attribute = pf.create_inference_attribute_parameter(self, "Monitor") + self.monitor_probability_attribute = pf.create_probability_attribute_parameter(self, "Monitor") + # only create ground_truth_s3_url parameter for ModelQuality/Bias + if self.monitoring_type in ["ModelQuality", "ModelBias"]: + # ground_truth_s3_uri is only for ModelQuality/ModelBias + self.ground_truth_s3_bucket = pf.create_ground_truth_bucket_name_parameter(self) + self.ground_truth_s3_uri = pf.create_ground_truth_s3_uri_parameter(self) + self.monitor_attributes.update(dict(ground_truth_s3_uri=f"s3://{self.ground_truth_s3_uri.value_as_string}")) + # problem_type and probability_threshold_attribute are the same for both + self.problem_type = pf.create_problem_type_parameter(self) + self.probability_threshold_attribute = pf.create_probability_threshold_attribute_parameter(self) - # update attributes + # add conditions (used by monitor) + self.inference_attribute_provided = cf.create_attribute_provided_condition( + self, "InferenceAttributeProvided", self.monitor_inference_attribute + ) + + self.binary_classification_propability_attribute_provided = ( + cf.create_problem_type_binary_classification_attribute_provided_condition( + self, self.problem_type, self.monitor_probability_attribute, "ProbabilityAttribute" + ) + ) + self.binary_classification_propability_threshold_provided = ( + cf.create_problem_type_binary_classification_attribute_provided_condition( + self, self.problem_type, self.probability_threshold_attribute, "ProbabilityThreshold" + ) + ) + + # add shared Baseline attributes + self.baseline_attributes.update( + dict( + problem_type=self.problem_type.value_as_string, + probability_threshold_attribute=self.probability_threshold_attribute.value_as_string, + ) + ) + + # add ModelQuality Monitor attributes self.monitor_attributes.update( dict( - monitoring_schedule_name=monitoring_schedule_name.value_as_string, - endpoint_name=endpoint_name.value_as_string, - baseline_job_name=baseline_job_name.value_as_string, - baseline_job_output_location=baseline_job_output_location.value_as_string, - schedule_expression=schedule_expression.value_as_string, - monitoring_output_location=monitoring_output_location.value_as_string, - instance_type=instance_type.value_as_string, - instance_count=instance_count.value_as_string, - instance_volume_size=instance_volume_size.value_as_string, - max_runtime_seconds=monitor_max_runtime_seconds.value_as_string, - kms_key_arn=core.Fn.condition_if( - kms_key_arn_provided.logical_id, kms_key_arn.value_as_string, core.Aws.NO_VALUE + problem_type=self.problem_type.value_as_string, + # pass inference_attribute if provided + inference_attribute=core.Fn.condition_if( + self.inference_attribute_provided.logical_id, + self.monitor_inference_attribute.value_as_string, + core.Aws.NO_VALUE, + ).to_string(), + # pass probability_attribute if provided and ProblemType is BinaryClassification + probability_attribute=core.Fn.condition_if( + self.binary_classification_propability_attribute_provided.logical_id, + self.monitor_probability_attribute.value_as_string, + core.Aws.NO_VALUE, + ).to_string(), + # pass probability_threshold_attribute if provided and ProblemType is BinaryClassification + probability_threshold_attribute=core.Fn.condition_if( + self.binary_classification_propability_threshold_provided.logical_id, + self.probability_threshold_attribute.value_as_string, + core.Aws.NO_VALUE, ).to_string(), - role_arn=sagemaker_role.role_arn, - image_uri=image_uri.value_as_string, - monitoring_type=monitoring_type, - tags=resource_tags, ) ) - # create Sagemaker monitoring Schedule - sagemaker_monitor = SageMakerModelMonitor(self, f"{monitoring_type}Monitor", **self.monitor_attributes) - # add job definition dependency on sagemaker role and invoke_lambda_custom_resource (so, the baseline job is created) - sagemaker_monitor.job_definition.node.add_dependency(sagemaker_role) - sagemaker_monitor.job_definition.node.add_dependency(invoke_lambda_custom_resource) + def _add_model_bias_explainability_extra_attributes(self): + # create paramaters/conditions + # create bias specific paramaters + if self.monitoring_type == "ModelBias": + self.base_config = pf.create_bias_config_parameter(self) + self.model_predicted_label_config = pf.create_model_predicted_label_config_parameter(self) + self.model_predicted_label_config_provided = cf.create_attribute_provided_condition( + self, "PredictedLabelConfigProvided", self.model_predicted_label_config + ) + # update baseline attributes + self.baseline_attributes.update( + dict( + model_predicted_label_config=core.Fn.condition_if( + self.model_predicted_label_config_provided.logical_id, + self.model_predicted_label_config.value_as_string, + core.Aws.NO_VALUE, + ).to_string(), + bias_config=self.base_config.value_as_string, + ) + ) - # Outputs # + if self.monitoring_type == "ModelExplainability": + self.shap_config = pf.create_shap_config_parameter(self) + self.model_scores = pf.create_model_scores_parameter(self) + self.model_scores_provided = cf.create_attribute_provided_condition( + self, "ModelScoresProvided", self.model_scores + ) + # update baseline attributes + self.baseline_attributes.update( + dict( + shap_config=self.shap_config.value_as_string, + model_scores=core.Fn.condition_if( + self.model_scores_provided.logical_id, + self.model_scores.value_as_string, + core.Aws.NO_VALUE, + ).to_string(), + ) + ) + # common parameters + self.features_attribute = pf.create_features_attribute_parameter(self) + self.features_attribute_provided = cf.create_attribute_provided_condition( + self, "FeaturesAttributeProvided", self.features_attribute + ) + + # update monitor attributes + self.monitor_attributes.update( + dict( + features_attribute=core.Fn.condition_if( + self.features_attribute_provided.logical_id, + self.features_attribute.value_as_string, + core.Aws.NO_VALUE, + ).to_string(), + ) + ) + + def _create_stack_outputs(self): core.CfnOutput( self, id="BaselineName", - value=baseline_job_name.value_as_string, + value=self.baseline_job_name.value_as_string, ) core.CfnOutput( self, id="MonitoringScheduleJobName", - value=monitoring_schedule_name.value_as_string, + value=self.monitoring_schedule_name.value_as_string, ) core.CfnOutput( self, id="MonitoringScheduleType", - value=monitoring_type, + value=self.monitoring_type, ) core.CfnOutput( self, id="BaselineJobOutput", - value=f"https://s3.console.aws.amazon.com/s3/buckets/{baseline_job_output_location.value_as_string}/", + value=f"https://s3.console.aws.amazon.com/s3/buckets/{self.baseline_job_output_location.value_as_string}/", ) core.CfnOutput( self, id="MonitoringScheduleOutput", value=( - f"https://s3.console.aws.amazon.com/s3/buckets/{monitoring_output_location.value_as_string}/" - f"{endpoint_name.value_as_string}/{monitoring_schedule_name.value_as_string}/" + f"https://s3.console.aws.amazon.com/s3/buckets/{self.monitoring_output_location.value_as_string}/" + f"{self.endpoint_name.value_as_string}/{self.monitoring_schedule_name.value_as_string}/" ), ) core.CfnOutput( self, id="MonitoredSagemakerEndpoint", - value=endpoint_name.value_as_string, + value=self.endpoint_name.value_as_string, ) core.CfnOutput( self, id="DataCaptureS3Location", value=( - f"https://s3.console.aws.amazon.com/s3/buckets/{data_capture_s3_location.value_as_string}" - f"/{endpoint_name.value_as_string}/" + f"https://s3.console.aws.amazon.com/s3/buckets/{self.data_capture_s3_location.value_as_string}" + f"/{self.endpoint_name.value_as_string}/" ), ) - - def _add_model_quality_resources(self): - """ - Adds ModelQuality specific parameters/conditions and updates self.baseline_attributes/self.monitor_attributes - """ - # add baseline job attributes (they are different from Monitor attributes) - baseline_inference_attribute = pf.create_inference_attribute_parameter(self, "Baseline") - baseline_probability_attribute = pf.create_probability_attribute_parameter(self, "Baseline") - ground_truth_attribute = pf.create_ground_truth_attribute_parameter(self) - # add monitor attributes - monitor_inference_attribute = pf.create_inference_attribute_parameter(self, "Monitor") - monitor_probability_attribute = pf.create_probability_attribute_parameter(self, "Monitor") - ground_truth_s3_uri = pf.create_ground_truth_s3_uri_parameter(self) - # problem_type and probability_threshold_attribute are the same for both - problem_type = pf.create_problem_type_parameter(self) - probability_threshold_attribute = pf.create_probability_threshold_attribute_parameter(self) - - # add conditions (used by monitor) - is_regression_or_multiclass_classification_problem = ( - cf.create_problem_type_regression_or_multiclass_classification_condition(self, problem_type) - ) - is_binary_classification_problem = cf.create_problem_type_binary_classification_condition(self, problem_type) - - # add ModelQuality Baseline attributes - self.baseline_attributes.update( - dict( - problem_type=problem_type.value_as_string, - ground_truth_attribute=ground_truth_attribute.value_as_string, - inference_attribute=baseline_inference_attribute.value_as_string, - probability_attribute=baseline_probability_attribute.value_as_string, - probability_threshold_attribute=probability_threshold_attribute.value_as_string, - ) - ) - - # add ModelQuality Monitor attributes - self.monitor_attributes.update( - dict( - problem_type=problem_type.value_as_string, - ground_truth_s3_uri=ground_truth_s3_uri.value_as_string, - # inference_attribute is required for Regression/Multiclass Classification problems - # probability_attribute/probability_threshold_attribute are not used - inference_attribute=core.Fn.condition_if( - is_regression_or_multiclass_classification_problem.logical_id, - monitor_inference_attribute.value_as_string, - core.Aws.NO_VALUE, - ).to_string(), - # for a Binary Classification problem, we use probability_attribute and probability_threshold_attribute. - # note: probability_attribute is the index of the predicted probability in the captured data by the - # SageMaker endpoint. Tepically, probability_attribute="0" and probability_threshold_attribute="0.5" - probability_attribute=core.Fn.condition_if( - is_binary_classification_problem.logical_id, - monitor_probability_attribute.value_as_string, - core.Aws.NO_VALUE, - ).to_string(), - probability_threshold_attribute=core.Fn.condition_if( - is_binary_classification_problem.logical_id, - probability_threshold_attribute.value_as_string, - core.Aws.NO_VALUE, - ).to_string(), - ) - ) diff --git a/source/lib/blueprints/byom/pipeline_definitions/deploy_actions.py b/source/lib/blueprints/byom/pipeline_definitions/deploy_actions.py index ec5dbdd..a833414 100644 --- a/source/lib/blueprints/byom/pipeline_definitions/deploy_actions.py +++ b/source/lib/blueprints/byom/pipeline_definitions/deploy_actions.py @@ -27,6 +27,8 @@ from lib.blueprints.byom.pipeline_definitions.iam_policies import ( create_service_role, sagemaker_baseline_job_policy, + sagemaker_model_bias_explainability_baseline_job_policy, + baseline_lambda_get_model_name_policy, sagemaker_logs_metrics_policy_document, batch_transform_policy, s3_policy_write, @@ -156,6 +158,7 @@ def create_baseline_job_lambda( monitoring_type, baseline_job_name, baseline_data_location, + baseline_output_bucket, baseline_job_output_location, endpoint_name, instance_type, @@ -170,6 +173,10 @@ def create_baseline_job_lambda( inference_attribute=None, probability_attribute=None, probability_threshold_attribute=None, + model_predicted_label_config=None, + bias_config=None, + shap_config=None, + model_scores=None, ): """ create_baseline_job_lambda creates a data/model baseline processing job in a lambda invoked codepipeline action @@ -200,12 +207,20 @@ def create_baseline_job_lambda( Used only with 'BinaryClassification' problem if 'inference_attribute' is not provided (default: None). :probability_threshold_attribute: threshold to convert probabilities to binaries (used with ModelQuality baseline). Used only with 'BinaryClassification' problem if 'inference_attribute' is not provided (default: None). + :model_predicted_label_config: Config of how to extract the predicted label + from the model output. Required for ModelBias monitor + :bias_config: Config object related to bias configurations of the input dataset. Required for ModelBias monitor + :shap_config: Config of the Shap explainability. Used by ModelExplainability monitor + :model_scores: Index or JSONPath location in the model output for the predicted scores to be explained. + This is not required if the model output is a single score. :return: codepipeline action in a form of a CDK object that can be attached to a codepipeline stage """ s3_read = s3_policy_read( [ f"arn:aws:s3:::{assets_bucket.bucket_name}", - f"arn:aws:s3:::{assets_bucket.bucket_name}/{baseline_data_location}", + f"arn:aws:s3:::{assets_bucket.bucket_name}/*", # give access to files used by different monitors + f"arn:aws:s3:::{baseline_output_bucket}", + f"arn:aws:s3:::{baseline_output_bucket}/*", ] ) s3_write = s3_policy_write( @@ -215,6 +230,7 @@ def create_baseline_job_lambda( ) create_baseline_job_policy = sagemaker_baseline_job_policy(baseline_job_name) + sagemaker_logs_policy = sagemaker_logs_metrics_policy_document(scope, "BaselineLogsMetrics") # Kms Key permissions @@ -244,6 +260,11 @@ def create_baseline_job_lambda( sagemaker_logs_policy.attach_to_role(sagemaker_role) sagemaker_role.add_to_policy(create_baseline_job_policy) + # add extra permissions for "ModelBias", "ModelExplainability" baselines + if monitoring_type in ["ModelBias", "ModelExplainability"]: + lambda_role.add_to_policy(baseline_lambda_get_model_name_policy(endpoint_name)) + sagemaker_role.add_to_policy(baseline_lambda_get_model_name_policy(endpoint_name)) + sagemaker_role.add_to_policy(sagemaker_model_bias_explainability_baseline_job_policy()) sagemaker_role.add_to_policy(s3_read) sagemaker_role.add_to_policy(s3_write) sagemaker_role_nodes = sagemaker_role.node.find_all() @@ -268,6 +289,13 @@ def create_baseline_job_lambda( "MAX_RUNTIME_SECONDS": max_runtime_seconds, "ROLE_ARN": sagemaker_role.role_arn, "KMS_KEY_ARN": kms_key_arn, + "ENDPOINT_NAME": endpoint_name, + "MODEL_PREDICTED_LABEL_CONFIG": model_predicted_label_config + if model_predicted_label_config + else core.Aws.NO_VALUE, + "BIAS_CONFIG": bias_config if bias_config else core.Aws.NO_VALUE, + "SHAP_CONFIG": shap_config if shap_config else core.Aws.NO_VALUE, + "MODEL_SCORES": model_scores if model_scores else core.Aws.NO_VALUE, "STACK_NAME": stack_name, "LOG_LEVEL": "INFO", } diff --git a/source/lib/blueprints/byom/pipeline_definitions/iam_policies.py b/source/lib/blueprints/byom/pipeline_definitions/iam_policies.py index 60266bf..058b433 100644 --- a/source/lib/blueprints/byom/pipeline_definitions/iam_policies.py +++ b/source/lib/blueprints/byom/pipeline_definitions/iam_policies.py @@ -21,7 +21,11 @@ def sagemaker_policy_statement(is_realtime_pipeline, endpoint_name, endpoint_name_provided): - actions = ["sagemaker:CreateModel", "sagemaker:DescribeModel", "sagemaker:DeleteModel"] + actions = [ + "sagemaker:CreateModel", + "sagemaker:DescribeModel", # NOSONAR: permission needs to be repeated for clarity + "sagemaker:DeleteModel", + ] resources = [f"{sagemaker_arn_prefix}:model/mlopssagemakermodel*"] if is_realtime_pipeline: @@ -29,10 +33,10 @@ def sagemaker_policy_statement(is_realtime_pipeline, endpoint_name, endpoint_nam actions.extend( [ "sagemaker:CreateEndpointConfig", - "sagemaker:DescribeEndpointConfig", + "sagemaker:DescribeEndpointConfig", # NOSONAR: permission needs to be repeated for clarity "sagemaker:DeleteEndpointConfig", "sagemaker:CreateEndpoint", - "sagemaker:DescribeEndpoint", + "sagemaker:DescribeEndpoint", # NOSONAR: permission needs to be repeated for clarity "sagemaker:DeleteEndpoint", ] ) @@ -55,15 +59,48 @@ def sagemaker_policy_statement(is_realtime_pipeline, endpoint_name, endpoint_nam ) -def sagemaker_baseline_job_policy(baseline_job_name): +def baseline_lambda_get_model_name_policy(endpoint_name): + # these permissions are required to get the ModelName used by the monitored endpoint return iam.PolicyStatement( actions=[ - "sagemaker:CreateProcessingJob", - "sagemaker:DescribeProcessingJob", - "sagemaker:StopProcessingJob", - "sagemaker:DeleteProcessingJob", + "sagemaker:DescribeModel", + "sagemaker:DescribeEndpointConfig", + "sagemaker:DescribeEndpoint", + ], + resources=[ + f"{sagemaker_arn_prefix}:model/mlopssagemakermodel*", + f"{sagemaker_arn_prefix}:endpoint-config/mlopssagemakerendpointconfig*", + f"{sagemaker_arn_prefix}:endpoint/{endpoint_name}", + ], + ) + + +def sagemaker_model_bias_explainability_baseline_job_policy(): + # required to create/delete a Shadow endpointConfig/Endpoint created by the sagemaker clarify + return iam.PolicyStatement( + actions=[ + "sagemaker:DescribeModel", + "sagemaker:DescribeEndpointConfig", + "sagemaker:DescribeEndpoint", + "sagemaker:CreateEndpointConfig", + "sagemaker:CreateEndpoint", + "sagemaker:DeleteEndpointConfig", + "sagemaker:DeleteEndpoint", + "sagemaker:InvokeEndpoint", + ], + resources=[ + f"{sagemaker_arn_prefix}:endpoint-config/sagemaker-clarify-endpoint-config*", + f"{sagemaker_arn_prefix}:endpoint/sagemaker-clarify-endpoint*", + ], + ) + + +def sagemaker_baseline_job_policy(baseline_job_name): + return iam.PolicyStatement( + actions=["sagemaker:CreateProcessingJob", "sagemaker:DescribeProcessingJob", "sagemaker:StopProcessingJob"], + resources=[ + f"{sagemaker_arn_prefix}:processing-job/{baseline_job_name}", ], - resources=[f"{sagemaker_arn_prefix}:processing-job/{baseline_job_name}"], ) @@ -85,31 +122,70 @@ def create_service_role(scope, id, service, description): ) -def sagemaker_monitor_policy_statement(baseline_job_name, monitoring_schedule_name, endpoint_name): +def sagemaker_monitor_policy_statement(baseline_job_name, monitoring_schedule_name, endpoint_name, monitoring_type): + # common permissions + actions = [ + "sagemaker:DescribeEndpointConfig", + "sagemaker:DescribeEndpoint", + "sagemaker:CreateMonitoringSchedule", + "sagemaker:DescribeMonitoringSchedule", + "sagemaker:StopMonitoringSchedule", + "sagemaker:DeleteMonitoringSchedule", + "sagemaker:DescribeProcessingJob", + ] + # common resources + resources = [ + f"{sagemaker_arn_prefix}:endpoint-config/mlopssagemakerendpointconfig*", + f"{sagemaker_arn_prefix}:endpoint/{endpoint_name}", + f"{sagemaker_arn_prefix}:monitoring-schedule/{monitoring_schedule_name}", + f"{sagemaker_arn_prefix}:processing-job/{baseline_job_name}", + ] + + # create a map of monitoring type -> required permissions/resources + type_permissions = { + "DataQuality": { + "permissions": [ + "sagemaker:CreateDataQualityJobDefinition", + "sagemaker:DescribeDataQualityJobDefinition", + "sagemaker:DeleteDataQualityJobDefinition", + ], + "resources": [f"{sagemaker_arn_prefix}:data-quality-job-definition/*"], + }, + "ModelQuality": { + "permissions": [ + "sagemaker:CreateModelQualityJobDefinition", + "sagemaker:DescribeModelQualityJobDefinition", + "sagemaker:DeleteModelQualityJobDefinition", + ], + "resources": [f"{sagemaker_arn_prefix}:model-quality-job-definition/*"], + }, + "ModelBias": { + "permissions": [ + "sagemaker:CreateModelBiasJobDefinition", + "sagemaker:DescribeModelBiasJobDefinition", + "sagemaker:DeleteModelBiasJobDefinition", + ], + "resources": [f"{sagemaker_arn_prefix}:model-bias-job-definition/*"], + }, + "ModelExplainability": { + "permissions": [ + "sagemaker:CreateModelExplainabilityJobDefinition", + "sagemaker:DescribeModelExplainabilityJobDefinition", + "sagemaker:DeleteModelExplainabilityJobDefinition", + ], + "resources": [f"{sagemaker_arn_prefix}:model-explainability-job-definition/*"], + }, + } + # add monitoring type's specific permissions + actions.extend(type_permissions[monitoring_type]["permissions"]) + + # add monitoring type's specific resources + resources.extend(type_permissions[monitoring_type]["resources"]) + + # create the policy statement return iam.PolicyStatement( - actions=[ - "sagemaker:DescribeEndpointConfig", - "sagemaker:DescribeEndpoint", - "sagemaker:CreateMonitoringSchedule", - "sagemaker:DescribeMonitoringSchedule", - "sagemaker:StopMonitoringSchedule", - "sagemaker:DeleteMonitoringSchedule", - "sagemaker:DescribeProcessingJob", - "sagemaker:CreateDataQualityJobDefinition", - "sagemaker:DescribeDataQualityJobDefinition", - "sagemaker:DeleteDataQualityJobDefinition", - "sagemaker:CreateModelQualityJobDefinition", - "sagemaker:DescribeModelQualityJobDefinition", - "sagemaker:DeleteModelQualityJobDefinition", - ], - resources=[ - f"{sagemaker_arn_prefix}:endpoint-config/mlopssagemakerendpointconfig*", - f"{sagemaker_arn_prefix}:endpoint/{endpoint_name}", - f"{sagemaker_arn_prefix}:monitoring-schedule/{monitoring_schedule_name}", - f"{sagemaker_arn_prefix}:processing-job/{baseline_job_name}", - f"{sagemaker_arn_prefix}:data-quality-job-definition/*", - f"{sagemaker_arn_prefix}:model-quality-job-definition/*", - ], + actions=actions, + resources=resources, ) diff --git a/source/lib/blueprints/byom/pipeline_definitions/sagemaker_model_monitor_construct.py b/source/lib/blueprints/byom/pipeline_definitions/sagemaker_model_monitor_construct.py index 7f9cef3..76e111f 100644 --- a/source/lib/blueprints/byom/pipeline_definitions/sagemaker_model_monitor_construct.py +++ b/source/lib/blueprints/byom/pipeline_definitions/sagemaker_model_monitor_construct.py @@ -23,7 +23,7 @@ class SageMakerModelMonitor(core.Construct): id (str): CDK resource's logical id monitoring_schedule_name (str): name of the monitoring job to be created endpoint_name (str): name of the deployed SageMaker endpoint to be monitored - baseline_job_name (str): name of the baseline job + baseline_job_output_location (str): the baseline job's output location / schedule_expression (str): cron job expression monitoring_output_location (str): S3 location where the output will be stored instance_type (str): compute instance type for the baseline job, in the form of a CDK CfnParameter object @@ -55,7 +55,6 @@ def __init__( id: str, monitoring_schedule_name: str, endpoint_name: str, - baseline_job_name: str, baseline_job_output_location: str, schedule_expression: str, monitoring_output_location: str, @@ -73,6 +72,8 @@ def __init__( inference_attribute: Optional[str] = None, probability_attribute: Optional[str] = None, probability_threshold_attribute: Optional[str] = None, + config_uri: Optional[str] = None, + features_attribute: Optional[str] = None, **kwargs, ) -> None: super().__init__(scope, id, **kwargs) @@ -80,7 +81,6 @@ def __init__( self.id = id self.monitoring_schedule_name = monitoring_schedule_name self.endpoint_name = endpoint_name - self.baseline_job_name = baseline_job_name self.baseline_job_output_location = baseline_job_output_location self.schedule_expression = schedule_expression self.monitoring_output_location = monitoring_output_location @@ -98,13 +98,14 @@ def __init__( self.inference_attribute = inference_attribute self.probability_attribute = probability_attribute self.probability_threshold_attribute = probability_threshold_attribute + self.features_attribute = features_attribute # validate the provided monitoring_type - if monitoring_type not in ["DataQuality", "ModelQuality"]: + if monitoring_type not in ["DataQuality", "ModelQuality", "ModelBias", "ModelExplainability"]: raise ValueError( ( f"The provided monitoring type: {monitoring_type} is not valid. " - + "It must be 'DataQuality'|'ModelQuality'" + + "It must be 'DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability" ) ) @@ -121,20 +122,29 @@ def __init__( def _get_job_definition( self, monitoring_type: str, id: str - ) -> Union[sagemaker.CfnDataQualityJobDefinition, sagemaker.CfnModelQualityJobDefinition]: + ) -> Union[ + sagemaker.CfnDataQualityJobDefinition, + sagemaker.CfnModelQualityJobDefinition, + sagemaker.CfnModelBiasJobDefinition, + sagemaker.CfnModelExplainabilityJobDefinition, + ]: """ Gets the *JobDefinition based on the monitoring_type Args: - monitoring_type (str): possible values [DataQuality, ModelQuality] + monitoring_type (str): possible values ['DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability'] id (str): CDK resource's logical id Returns: - sagemaker.CfnDataQualityJobDefinition or sagemaker.CfnModelQualityJobDefinition object + sagemaker.CfnDataQualityJobDefinition|sagemaker.CfnModelQualityJobDefinition object| + sagemaker.CfnModelBiasJobDefinition|sagemaker.CfnModelExplainabilityJobDefinition """ # create *JobDefinition MonitoringType->function_name map type_function_map = dict( - DataQuality="_create_data_quality_job_definition", ModelQuality="_create_model_quality_job_definition" + DataQuality="_create_data_quality_job_definition", + ModelQuality="_create_model_quality_job_definition", + ModelBias="_create_model_bias_job_definition", + ModelExplainability="_create_model_explainability_job_definition", ) # call the right function to create the *JobDefinition @@ -163,10 +173,10 @@ def _create_data_quality_job_definition( ), data_quality_baseline_config=sagemaker.CfnDataQualityJobDefinition.DataQualityBaselineConfigProperty( constraints_resource=sagemaker.CfnDataQualityJobDefinition.ConstraintsResourceProperty( - s3_uri=f"s3://{self.baseline_job_output_location}/constraints.json" + s3_uri=f"{self.baseline_job_output_location}/constraints.json" ), statistics_resource=sagemaker.CfnDataQualityJobDefinition.StatisticsResourceProperty( - s3_uri=f"s3://{self.baseline_job_output_location}/statistics.json" + s3_uri=f"{self.baseline_job_output_location}/statistics.json" ), ), data_quality_job_input=sagemaker.CfnDataQualityJobDefinition.DataQualityJobInputProperty( @@ -179,7 +189,7 @@ def _create_data_quality_job_definition( monitoring_outputs=[ sagemaker.CfnDataQualityJobDefinition.MonitoringOutputProperty( s3_output=sagemaker.CfnDataQualityJobDefinition.S3OutputProperty( - s3_uri=f"s3://{self.monitoring_output_location}", + s3_uri=self.monitoring_output_location, local_path="/opt/ml/processing/output/data_quality_output", s3_upload_mode="EndOfJob", ) @@ -227,7 +237,7 @@ def _create_model_quality_job_definition( ), model_quality_baseline_config=sagemaker.CfnModelQualityJobDefinition.ModelQualityBaselineConfigProperty( constraints_resource=sagemaker.CfnModelQualityJobDefinition.ConstraintsResourceProperty( - s3_uri=f"s3://{self.baseline_job_output_location}/constraints.json" + s3_uri=f"{self.baseline_job_output_location}/constraints.json" ), ), model_quality_job_input=sagemaker.CfnModelQualityJobDefinition.ModelQualityJobInputProperty( @@ -239,14 +249,14 @@ def _create_model_quality_job_definition( probability_threshold_attribute=core.Token.as_number(self.probability_threshold_attribute), ), ground_truth_s3_input=sagemaker.CfnModelQualityJobDefinition.MonitoringGroundTruthS3InputProperty( - s3_uri=f"s3://{self.ground_truth_s3_uri}" + s3_uri=self.ground_truth_s3_uri ), ), model_quality_job_output_config=sagemaker.CfnModelQualityJobDefinition.MonitoringOutputConfigProperty( monitoring_outputs=[ sagemaker.CfnModelQualityJobDefinition.MonitoringOutputProperty( s3_output=sagemaker.CfnModelQualityJobDefinition.S3OutputProperty( - s3_uri=f"s3://{self.monitoring_output_location}", + s3_uri=self.monitoring_output_location, local_path="/opt/ml/processing/output/model_quality_output", s3_upload_mode="EndOfJob", ) @@ -271,17 +281,153 @@ def _create_model_quality_job_definition( return model_quality_job_definition + def _create_model_bias_job_definition( + self, + id: str, + ) -> sagemaker.CfnModelBiasJobDefinition: + """ + Creates Amazon SageMaker's Model Bias Job Definition + + Args: + id (str): CDK resource's logical id + + Returns: + sagemaker.CfnModelBiasJobDefinition object + """ + # create the ModelBiasJobDefinition + model_bias_job_definition = sagemaker.CfnModelBiasJobDefinition( + self.scope, + id, + model_bias_app_specification=sagemaker.CfnModelBiasJobDefinition.ModelBiasAppSpecificationProperty( + config_uri=f"{self.baseline_job_output_location}/analysis_config.json", image_uri=self.image_uri + ), + model_bias_baseline_config=sagemaker.CfnModelBiasJobDefinition.ModelBiasBaselineConfigProperty( + constraints_resource=sagemaker.CfnModelBiasJobDefinition.ConstraintsResourceProperty( + s3_uri=f"{self.baseline_job_output_location}/analysis.json" + ), + ), + model_bias_job_input=sagemaker.CfnModelBiasJobDefinition.ModelBiasJobInputProperty( + endpoint_input=sagemaker.CfnModelBiasJobDefinition.EndpointInputProperty( + endpoint_name=self.endpoint_name, + local_path="/opt/ml/processing/input/model_bias_input", + features_attribute=self.features_attribute, + inference_attribute=self.inference_attribute, + probability_attribute=self.probability_attribute, + probability_threshold_attribute=core.Token.as_number(self.probability_threshold_attribute), + ), + ground_truth_s3_input=sagemaker.CfnModelBiasJobDefinition.MonitoringGroundTruthS3InputProperty( + s3_uri=self.ground_truth_s3_uri + ), + ), + model_bias_job_output_config=sagemaker.CfnModelBiasJobDefinition.MonitoringOutputConfigProperty( + monitoring_outputs=[ + sagemaker.CfnModelBiasJobDefinition.MonitoringOutputProperty( + s3_output=sagemaker.CfnModelBiasJobDefinition.S3OutputProperty( + s3_uri=self.monitoring_output_location, + local_path="/opt/ml/processing/output/model_bias_output", + s3_upload_mode="EndOfJob", + ) + ) + ], + kms_key_id=self.kms_key_arn, + ), + job_resources=sagemaker.CfnModelBiasJobDefinition.MonitoringResourcesProperty( + cluster_config=sagemaker.CfnModelBiasJobDefinition.ClusterConfigProperty( + instance_count=core.Token.as_number(self.instance_count), + instance_type=self.instance_type, + volume_size_in_gb=core.Token.as_number(self.instance_volume_size), + volume_kms_key_id=self.kms_key_arn, + ) + ), + stopping_condition=sagemaker.CfnModelBiasJobDefinition.StoppingConditionProperty( + max_runtime_in_seconds=core.Token.as_number(self.max_runtime_seconds) + ), + role_arn=self.role_arn, + tags=self.tags, + ) + + return model_bias_job_definition + + def _create_model_explainability_job_definition( + self, + id: str, + ) -> sagemaker.CfnModelExplainabilityJobDefinition: + """ + Creates Amazon SageMaker's Model Explainability Job Definition + + Args: + id (str): CDK resource's logical id + + Returns: + sagemaker.CfnModelExplainabilityJobDefinition object + """ + # create the ModelExplainabilityJobDefinition + model_explainability_job_definition = sagemaker.CfnModelExplainabilityJobDefinition( + self.scope, + id, + model_explainability_app_specification=sagemaker.CfnModelExplainabilityJobDefinition.ModelExplainabilityAppSpecificationProperty( + config_uri=f"{self.baseline_job_output_location}/analysis_config.json", image_uri=self.image_uri + ), + model_explainability_baseline_config=sagemaker.CfnModelExplainabilityJobDefinition.ModelExplainabilityBaselineConfigProperty( + constraints_resource=sagemaker.CfnModelExplainabilityJobDefinition.ConstraintsResourceProperty( + s3_uri=f"{self.baseline_job_output_location}/analysis.json" + ), + ), + model_explainability_job_input=sagemaker.CfnModelExplainabilityJobDefinition.ModelExplainabilityJobInputProperty( + endpoint_input=sagemaker.CfnModelExplainabilityJobDefinition.EndpointInputProperty( + endpoint_name=self.endpoint_name, + local_path="/opt/ml/processing/input/model_explainability_input", + features_attribute=self.features_attribute, + inference_attribute=self.inference_attribute, + probability_attribute=self.probability_attribute, + ) + ), + model_explainability_job_output_config=sagemaker.CfnModelExplainabilityJobDefinition.MonitoringOutputConfigProperty( + monitoring_outputs=[ + sagemaker.CfnModelExplainabilityJobDefinition.MonitoringOutputProperty( + s3_output=sagemaker.CfnModelExplainabilityJobDefinition.S3OutputProperty( + s3_uri=self.monitoring_output_location, + local_path="/opt/ml/processing/output/model_explainability_output", + s3_upload_mode="EndOfJob", + ) + ) + ], + kms_key_id=self.kms_key_arn, + ), + job_resources=sagemaker.CfnModelExplainabilityJobDefinition.MonitoringResourcesProperty( + cluster_config=sagemaker.CfnModelExplainabilityJobDefinition.ClusterConfigProperty( + instance_count=core.Token.as_number(self.instance_count), + instance_type=self.instance_type, + volume_size_in_gb=core.Token.as_number(self.instance_volume_size), + volume_kms_key_id=self.kms_key_arn, + ) + ), + stopping_condition=sagemaker.CfnModelExplainabilityJobDefinition.StoppingConditionProperty( + max_runtime_in_seconds=core.Token.as_number(self.max_runtime_seconds) + ), + role_arn=self.role_arn, + tags=self.tags, + ) + + return model_explainability_job_definition + def _create_sagemaker_monitoring_schedule( self, monitoring_schedule_name: str, - monitor_job_definition: Union[sagemaker.CfnDataQualityJobDefinition, sagemaker.CfnModelQualityJobDefinition], + monitor_job_definition: Union[ + sagemaker.CfnDataQualityJobDefinition, + sagemaker.CfnModelQualityJobDefinition, + sagemaker.CfnModelBiasJobDefinition, + sagemaker.CfnModelExplainabilityJobDefinition, + ], ) -> sagemaker.CfnMonitoringSchedule: """ Creates Amazon SageMaker's Monitoring Schedule object Args: monitoring_schedule_name (str): name of the monitoring job to be created - monitor_job_definition (sagemaker.CfnDataQualityJobDefinition or sagemaker.CfnModelQualityJobDefinition): + monitor_job_definition (sagemaker.CfnDataQualityJobDefinition|sagemaker.CfnModelQualityJobDefinition object| + sagemaker.CfnModelBiasJobDefinition|sagemaker.CfnModelExplainabilityJobDefinition): monitor job definition Returns: @@ -314,7 +460,14 @@ def _create_sagemaker_monitoring_schedule( return schedule @property - def job_definition(self) -> Union[sagemaker.CfnDataQualityJobDefinition, sagemaker.CfnModelQualityJobDefinition]: + def job_definition( + self, + ) -> Union[ + sagemaker.CfnDataQualityJobDefinition, + sagemaker.CfnModelQualityJobDefinition, + sagemaker.CfnModelBiasJobDefinition, + sagemaker.CfnModelExplainabilityJobDefinition, + ]: return self.__job_definition @property diff --git a/source/lib/blueprints/byom/pipeline_definitions/sagemaker_monitor_role.py b/source/lib/blueprints/byom/pipeline_definitions/sagemaker_monitor_role.py index 824a7dd..556b0a4 100644 --- a/source/lib/blueprints/byom/pipeline_definitions/sagemaker_monitor_role.py +++ b/source/lib/blueprints/byom/pipeline_definitions/sagemaker_monitor_role.py @@ -42,7 +42,9 @@ def create_sagemaker_monitor_role( baseline_job_name, monitoring_schedule_name, endpoint_name, + model_monitor_ground_truth_bucket, model_monitor_ground_truth_input, + monitoring_type, ): # create optional policies kms_policy = kms_policy_document(scope, "MLOpsKmsPolicy", kms_key_arn) @@ -54,7 +56,9 @@ def create_sagemaker_monitor_role( role = iam.Role(scope, id, assumed_by=iam.ServicePrincipal("sagemaker.amazonaws.com")) # permissions to create sagemaker resources - sagemaker_policy = sagemaker_monitor_policy_statement(baseline_job_name, monitoring_schedule_name, endpoint_name) + sagemaker_policy = sagemaker_monitor_policy_statement( + baseline_job_name, monitoring_schedule_name, endpoint_name, monitoring_type + ) # sagemaker tags permissions sagemaker_tags_policy = sagemaker_tags_policy_statement() @@ -75,9 +79,9 @@ def create_sagemaker_monitor_role( ) # add permissions to read ground truth data (only for ModelQuality monitor) - if model_monitor_ground_truth_input: + if model_monitor_ground_truth_bucket: s3_read_resources.extend( - [f"arn:aws:s3:::{model_monitor_ground_truth_input}", f"arn:aws:s3:::{model_monitor_ground_truth_input}/*"] + [f"arn:aws:s3:::{model_monitor_ground_truth_bucket}", f"arn:aws:s3:::{model_monitor_ground_truth_input}/*"] ) s3_read = s3_policy_read(s3_read_resources) s3_write = s3_policy_write( diff --git a/source/lib/blueprints/byom/pipeline_definitions/templates_parameters.py b/source/lib/blueprints/byom/pipeline_definitions/templates_parameters.py index a160129..d418c69 100644 --- a/source/lib/blueprints/byom/pipeline_definitions/templates_parameters.py +++ b/source/lib/blueprints/byom/pipeline_definitions/templates_parameters.py @@ -140,6 +140,16 @@ def create_assets_bucket_name_parameter(scope: core.Construct) -> core.CfnParame min_length=3, ) + @staticmethod + def create_ground_truth_bucket_name_parameter(scope: core.Construct) -> core.CfnParameter: + return core.CfnParameter( + scope, + "GroundTruthBucket", + type="String", + description="Bucket name where the ground truth data will be stored.", + min_length=3, + ) + @staticmethod def create_custom_algorithms_ecr_repo_arn_parameter(scope: core.Construct) -> core.CfnParameter: return core.CfnParameter( @@ -539,11 +549,72 @@ def create_probability_threshold_attribute_parameter(scope: core.Construct) -> c return core.CfnParameter( scope, "ProbabilityThresholdAttribute", - default="0.5", - type="Number", + type="String", description="Threshold to convert probabilities to binaries", ) + @staticmethod + def create_model_predicted_label_config_parameter(scope): + return core.CfnParameter( + scope, + "ModelPredictedLabelConfig", + type="String", + description=( + "Dictionary provided as a json of the" + " sagemaker.clarify.ModelPredictedLabelConfig attributes ({'label':...,}). " + "Optional for a regression problem." + ), + ) + + @staticmethod + def create_bias_config_parameter(scope): + return core.CfnParameter( + scope, + "BiasConfig", + type="String", + description=( + "Dictionary provided as a json using " + "of the sagemaker.clarify.BiasConfig attributes ({'label_values_or_threshold':...,})." + ), + min_length=3, + ) + + @staticmethod + def create_shap_config_parameter(scope): + return core.CfnParameter( + scope, + "SHAPConfig", + type="String", + description=( + "Dictionary provided as a json " + "of the sagemaker.clarify.SHAPConfig attributes " + "({'baseline':...,})." + ), + min_length=3, + ) + + @staticmethod + def create_model_scores_parameter(scope): + return core.CfnParameter( + scope, + "ExplainabilityModelScores", + type="String", + description=( + "A Python int/str provided as a string (e.g., using json.dumps(5)) " + "Index or JSONPath location in the model output for the predicted " + "scores to be explained. This is not required if the model output is a single score." + ), + ) + + @staticmethod + def create_features_attribute_parameter(scope): + return core.CfnParameter( + scope, + "FeaturesAttribute", + type="String", + description="Index or JSONpath to locate features", + ) + class ConditionsFactory: @staticmethod @@ -655,15 +726,15 @@ def create_endpoint_name_provided_condition( ) @staticmethod - def create_problem_type_regression_or_multiclass_classification_condition( - scope: core.Construct, problem_type: core.CfnParameter + def create_problem_type_binary_classification_attribute_provided_condition( + scope: core.Construct, problem_type: core.CfnParameter, attribute: core.CfnParameter, attribute_name: str ) -> core.CfnCondition: return core.CfnCondition( scope, - "ProblemTypeRegressionOrMulticlassClassification", - expression=core.Fn.condition_or( - core.Fn.condition_equals(problem_type.value_as_string, "Regression"), - core.Fn.condition_equals(problem_type.value_as_string, "MulticlassClassification"), + f"ProblemTypeBinaryClassification{attribute_name}Provided", + expression=core.Fn.condition_and( + core.Fn.condition_equals(problem_type.value_as_string, "BinaryClassification"), + core.Fn.condition_not(core.Fn.condition_equals(attribute.value_as_string, "")), ), ) @@ -676,3 +747,11 @@ def create_problem_type_binary_classification_condition( "ProblemTypeBinaryClassification", expression=core.Fn.condition_equals(problem_type.value_as_string, "BinaryClassification"), ) + + @staticmethod + def create_attribute_provided_condition(scope, logical_id, attribute): + return core.CfnCondition( + scope, + logical_id, + expression=core.Fn.condition_not(core.Fn.condition_equals(attribute, "")), + ) diff --git a/source/lib/aws_mlops_stack.py b/source/lib/mlops_orchestrator_stack.py similarity index 100% rename from source/lib/aws_mlops_stack.py rename to source/lib/mlops_orchestrator_stack.py