From 9e6e32f0f8393fdbcfe5aab14edbb5e745b55e8c Mon Sep 17 00:00:00 2001 From: Apurva Koti Date: Mon, 6 May 2024 11:22:25 -0700 Subject: [PATCH] test-impl Signed-off-by: Apurva Koti --- mlflow/metrics/genai/genai_metric.py | 206 +++++++++++++++++++-------- 1 file changed, 150 insertions(+), 56 deletions(-) diff --git a/mlflow/metrics/genai/genai_metric.py b/mlflow/metrics/genai/genai_metric.py index b555dadcfa5a6..8ff23fb6ddefc 100644 --- a/mlflow/metrics/genai/genai_metric.py +++ b/mlflow/metrics/genai/genai_metric.py @@ -3,12 +3,13 @@ import re from concurrent.futures import ThreadPoolExecutor, as_completed from inspect import Parameter, Signature -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, Tuple from mlflow.exceptions import MlflowException from mlflow.metrics.base import MetricValue from mlflow.metrics.genai import model_utils from mlflow.metrics.genai.base import EvaluationExample +from mlflow.metrics.genai.prompt_template import PromptTemplate from mlflow.metrics.genai.utils import _get_default_model, _get_latest_metric_version from mlflow.models import EvaluationMetric, make_metric from mlflow.protos.databricks_pb2 import ( @@ -26,6 +27,14 @@ _logger = logging.getLogger(__name__) +_PROMPT_FORMATTING_WRAPPER = """ +You must return the following fields in your response in two lines, one below the other: +score: Your numerical score for the model's {name} based on the rubric +justification: Your reasoning about the model's {name} score + +Do not add additional new lines. Do not add any other fields. +""" + def _format_args_string(grading_context_columns: Optional[List[str]], eval_values, indx) -> str: import pandas as pd @@ -84,11 +93,132 @@ def _extract_score_and_justification(text): return None, None +def _score_model_on_one_payload( + payload, + eval_model, + parameters, +): + try: + raw_result = model_utils.score_model_on_payload( + eval_model, payload, parameters + ) + return _extract_score_and_justification(raw_result) + except ImportError: + raise + except MlflowException as e: + if e.error_code in [ + ErrorCode.Name(BAD_REQUEST), + ErrorCode.Name(UNAUTHENTICATED), + ErrorCode.Name(INVALID_PARAMETER_VALUE), + ]: + raise + else: + return None, f"Failed to score model on payload. Error: {e!s}" + except Exception as e: + return None, f"Failed to score model on payload. Error: {e!s}" + + +def _score_model_on_payloads(grading_payloads, model, parameters, max_workers) -> Tuple[List[int], List[str]]: + scores = [None] * len(grading_payloads) + justifications = [None] * len(grading_payloads) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit( + _score_model_on_one_payload, + payload, + model, + parameters, + ): indx + for indx, payload in enumerate(grading_payloads) + } + + as_comp = as_completed(futures) + try: + from tqdm.auto import tqdm + + as_comp = tqdm(as_comp, total=len(futures)) + except ImportError: + pass + + for future in as_comp: + indx = futures[future] + score, justification = future.result() + scores[indx] = score + justifications[indx] = justification + + return scores, justifications + + +def _get_aggregate_results(scores, aggregations): + # loop over the aggregations and compute the aggregate results on the scores + def aggregate_function(aggregate_option, scores): + import numpy as np + + options = { + "min": np.min, + "max": np.max, + "mean": np.mean, + "median": np.median, + "variance": np.var, + "p90": lambda x: np.percentile(x, 90) if x else None, + } + + if aggregate_option not in options: + raise MlflowException( + message=f"Invalid aggregate option {aggregate_option}.", + error_code=INVALID_PARAMETER_VALUE, + ) + + return options[aggregate_option](scores) + + scores_for_aggregation = [score for score in scores if score is not None] + + return ( + {option: aggregate_function(option, scores_for_aggregation) for option in aggregations} + if aggregations is not None + else {} + ) + + +def _make_custom_genai_metric( + name: str, + judge_prompt: Optional[str] = None, + model: Optional[str] = _get_default_model(), + parameters: Optional[Dict[str, Any]] = None, + aggregations: Optional[List[str]] = ["mean", "variance", "p90"], # noqa: B006 + greater_is_better: bool = True, + max_workers: int = 10, +) -> EvaluationMetric: + + def eval_fn( + *args, + **kwargs, + ) -> MetricValue: + """ + This is the function that is called when the metric is evaluated. + """ + prompt_template = PromptTemplate([judge_prompt, _PROMPT_FORMATTING_WRAPPER]) + grading_payloads = pd.DataFrame(kwargs).to_dict(orient="records") + arg_strings = [prompt_template.format(**payload) for payload in grading_payloads] + scores, justifications = _score_model_on_payloads(arg_strings, model, parameters, max_workers) + + aggregate_scores = _get_aggregate_results(scores, aggregations) + + return MetricValue(scores, justifications, aggregate_scores) + + return make_metric( + eval_fn=eval_fn, + greater_is_better=greater_is_better, + name=name, + ) + + @experimental def make_genai_metric( name: str, - definition: str, - grading_prompt: str, + definition: Optional[str] = None, + grading_prompt: Optional[str] = None, + judge_prompt: Optional[str] = None, examples: Optional[List[EvaluationExample]] = None, version: Optional[str] = _get_latest_metric_version(), model: Optional[str] = _get_default_model(), @@ -107,6 +237,9 @@ def make_genai_metric( name: Name of the metric. definition: Definition of the metric. grading_prompt: Grading criteria of the metric. + judge_prompt: (Optional) The entire prompt to be used for the judge model. This is useful for including + use cases or system prompts that are not covered by the full grading prompt in any ``EvaluationMetric`` + object. If used, examples: (Optional) Examples of the metric. version: (Optional) Version of the metric. Currently supported versions are: v1. model: (Optional) Model uri of an openai, gateway, or deployments judge model in the @@ -196,6 +329,15 @@ def make_genai_metric( greater_is_better=True, ) """ + if judge_prompt is not None: + return _make_custom_genai_metric() + + if definition is None or grading_prompt is None: + raise MlflowException( + "Both definition and grading_prompt must be provided.", + error_code=INVALID_PARAMETER_VALUE, + ) + if not isinstance(grading_context_columns, list): grading_context_columns = [grading_context_columns] @@ -306,38 +448,16 @@ def eval_fn( ) ) - def score_model_on_one_payload( - payload, - eval_model, - ): - try: - raw_result = model_utils.score_model_on_payload( - eval_model, payload, eval_parameters - ) - return _extract_score_and_justification(raw_result) - except ImportError: - raise - except MlflowException as e: - if e.error_code in [ - ErrorCode.Name(BAD_REQUEST), - ErrorCode.Name(UNAUTHENTICATED), - ErrorCode.Name(INVALID_PARAMETER_VALUE), - ]: - raise - else: - return None, f"Failed to score model on payload. Error: {e!s}" - except Exception as e: - return None, f"Failed to score model on payload. Error: {e!s}" - scores = [None] * len(inputs) justifications = [None] * len(inputs) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit( - score_model_on_one_payload, + _score_model_on_one_payload, payload, eval_model, + eval_parameters, ): indx for indx, payload in enumerate(grading_payloads) } @@ -356,35 +476,7 @@ def score_model_on_one_payload( scores[indx] = score justifications[indx] = justification - # loop over the aggregations and compute the aggregate results on the scores - def aggregate_function(aggregate_option, scores): - import numpy as np - - options = { - "min": np.min, - "max": np.max, - "mean": np.mean, - "median": np.median, - "variance": np.var, - "p90": lambda x: np.percentile(x, 90) if x else None, - } - - if aggregate_option not in options: - raise MlflowException( - message=f"Invalid aggregate option {aggregate_option}.", - error_code=INVALID_PARAMETER_VALUE, - ) - - return options[aggregate_option](scores) - - scores_for_aggregation = [score for score in scores if score is not None] - - aggregate_results = ( - {option: aggregate_function(option, scores_for_aggregation) for option in aggregations} - if aggregations is not None - else {} - ) - + aggregate_results = _get_aggregate_results(scores, aggregations) return MetricValue(scores, justifications, aggregate_results) signature_parameters = [ @@ -406,3 +498,5 @@ def aggregate_function(aggregate_option, scores): version=version, metric_details=evaluation_context["eval_prompt"].__str__(), ) + +