diff --git a/nannyml/drift/multivariate/data_reconstruction/calculator.py b/nannyml/drift/multivariate/data_reconstruction/calculator.py index 9d0d61fce..465e4815c 100644 --- a/nannyml/drift/multivariate/data_reconstruction/calculator.py +++ b/nannyml/drift/multivariate/data_reconstruction/calculator.py @@ -14,7 +14,7 @@ """ -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union, Dict import numpy as np import pandas as pd @@ -29,12 +29,12 @@ from nannyml.drift.multivariate.data_reconstruction.result import Result from nannyml.exceptions import InvalidArgumentsException from nannyml.sampling_error import SAMPLING_ERROR_RANGE -from nannyml.thresholds import StandardDeviationThreshold, Threshold +from nannyml.thresholds import StandardDeviationThreshold, Threshold, calculate_threshold_values from nannyml.usage_logging import UsageEvent, log_usage class DataReconstructionDriftCalculator(AbstractCalculator): - """BaseDriftCalculator implementation using Reconstruction Error as a measure of drift.""" + """Multivariate Drift Calculator using PCA Reconstruction Error as a measure of drift.""" def __init__( self, @@ -69,7 +69,7 @@ def __init__( chunk_period: str, default=None Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. - chunker : Chunker, default=None + chunker: Chunker, default=None The `Chunker` used to split the data sets into a lists of chunks. imputer_categorical: SimpleImputer, default=None The SimpleImputer used to impute categorical features in the data. @@ -80,15 +80,18 @@ def __init__( The threshold you wish to evaluate values on. Defaults to a StandardDeviationThreshold with default options. The other allowed value is ConstantThreshold. - Examples: >>> import nannyml as nml >>> # Load synthetic data >>> reference, analysis, _ = nml.load_synthetic_car_loan_dataset() - >>> non_feature_columns = ['timestamp', 'y_pred_proba', 'y_pred', 'repaid'] >>> feature_column_names = [ - ... col for col in reference.columns - ... if col not in non_feature_columns + ... 'car_value', + ... 'salary_range', + ... 'debt_to_income_ratio', + ... 'loan_length', + ... 'repaid_loan_on_prev_car', + ... 'size_of_downpayment', + ... 'driver_tenure', >>> ] >>> calc = nml.DataReconstructionDriftCalculator( ... column_names=feature_column_names, @@ -106,17 +109,13 @@ def __init__( self.column_names = column_names self.continuous_column_names: List[str] = [] self.categorical_column_names: List[str] = [] - + self.column_name = 'reconstruction_error' self._n_components = n_components - self.threshold = threshold - self._scaler = None - self._encoder = None - self._pca = None - - self._upper_alert_threshold: Optional[float] - self._lower_alert_threshold: Optional[float] + self.lower_threshold_value: Optional[float] + self.upper_threshold_value: Optional[float] + self.lower_threshold_value_limit: float = 0 if imputer_categorical: if not isinstance(imputer_categorical, SimpleImputer): @@ -126,19 +125,12 @@ def __init__( else: imputer_categorical = SimpleImputer(missing_values=np.nan, strategy='most_frequent') self._imputer_categorical = imputer_categorical - if imputer_continuous: if not isinstance(imputer_continuous, SimpleImputer): raise TypeError("imputer_continuous needs to be an instantiated SimpleImputer object.") else: imputer_continuous = SimpleImputer(missing_values=np.nan, strategy='mean') self._imputer_continuous = imputer_continuous - - # sampling error - self._sampling_error_components: Tuple = () - - self.previous_reference_results: Optional[pd.DataFrame] = None - self.result: Optional[Result] = None @log_usage(UsageEvent.MULTIVAR_DRIFT_CALC_FIT) @@ -154,49 +146,27 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): ) # TODO: We duplicate the reference data 3 times, here. Improve to something more memory efficient? - imputed_reference_data = reference_data.copy(deep=True) + data = reference_data.copy(deep=True) if self.categorical_column_names: - imputed_reference_data[self.categorical_column_names] = self._imputer_categorical.fit_transform( - imputed_reference_data[self.categorical_column_names] + data[self.categorical_column_names] = self._imputer_categorical.fit_transform( + data[self.categorical_column_names] ) if self.continuous_column_names: - imputed_reference_data[self.continuous_column_names] = self._imputer_continuous.fit_transform( - imputed_reference_data[self.continuous_column_names] + data[self.continuous_column_names] = self._imputer_continuous.fit_transform( + data[self.continuous_column_names] ) encoder = CountEncoder(cols=self.categorical_column_names, normalize=True) - encoded_reference_data = imputed_reference_data.copy(deep=True) - encoded_reference_data[self.column_names] = encoder.fit_transform(encoded_reference_data[self.column_names]) + data = encoder.fit_transform(data[self.column_names]).to_numpy() scaler = StandardScaler() - scaled_reference_data = pd.DataFrame( - scaler.fit_transform(encoded_reference_data[self.column_names]), columns=self.column_names - ) - + data = scaler.fit_transform(data) pca = PCA(n_components=self._n_components, random_state=16) - pca.fit(scaled_reference_data[self.column_names]) + pca.fit(data) - self._encoder = encoder - self._scaler = scaler - self._pca = pca - - # Calculate thresholds - self._lower_alert_threshold, self._upper_alert_threshold = self._calculate_alert_thresholds(reference_data) - - # Reference stability - self._sampling_error_components = ( - _calculate_reconstruction_error_for_data( - column_names=self.column_names, - categorical_column_names=self.categorical_column_names, - continuous_column_names=self.continuous_column_names, - data=reference_data, # TODO: check with Nikos if this needs to be chunked or not? - encoder=self._encoder, - scaler=self._scaler, - pca=self._pca, - imputer_categorical=self._imputer_categorical, - imputer_continuous=self._imputer_continuous, - ).std(), - ) + self._encoder: CountEncoder = encoder + self._scaler: StandardScaler = scaler + self._pca: PCA = pca self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' @@ -211,8 +181,6 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: _list_missing(self.column_names, data) - self.continuous_column_names, self.categorical_column_names = _split_features_by_type(data, self.column_names) - chunks = self.chunker.split(data, columns=self.column_names) res = pd.DataFrame.from_records( @@ -225,33 +193,18 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'period': 'analysis', - 'sampling_error': sampling_error(self._sampling_error_components, chunk.data), - 'reconstruction_error': _calculate_reconstruction_error_for_data( - column_names=self.column_names, - categorical_column_names=self.categorical_column_names, - continuous_column_names=self.continuous_column_names, - data=chunk.data, - encoder=self._encoder, - scaler=self._scaler, - pca=self._pca, - imputer_categorical=self._imputer_categorical, - imputer_continuous=self._imputer_continuous, - ).mean(), + **self._calculate_chunk_record(chunk.data) } for chunk in chunks ] ) - res['upper_confidence_bound'] = res['reconstruction_error'] + SAMPLING_ERROR_RANGE * res['sampling_error'] - res['lower_confidence_bound'] = res['reconstruction_error'] - SAMPLING_ERROR_RANGE * res['sampling_error'] - res['upper_threshold'] = [self._upper_alert_threshold] * len(res) - res['lower_threshold'] = [self._lower_alert_threshold] * len(res) - res['alert'] = _add_alert_flag(res, self._upper_alert_threshold, self._lower_alert_threshold) - multilevel_index = _create_multilevel_index() res.columns = multilevel_index res = res.reset_index(drop=True) if self.result is None: + self._set_thresholds(results=res) + res = self._populate_thresholds(results=res) self.result = Result( results_data=res, timestamp_column_name=self.timestamp_column_name, @@ -261,163 +214,90 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: ) else: self.result = self.result.filter(period='reference') + res = self._populate_thresholds(results=res) self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True) - return self.result - def _calculate_alert_thresholds(self, reference_data) -> Tuple[Optional[float], Optional[float]]: - reference_chunks = self.chunker.split(reference_data) - reference_reconstruction_error = np.asarray( - [ - _calculate_reconstruction_error_for_data( - column_names=self.column_names, - categorical_column_names=self.categorical_column_names, - continuous_column_names=self.continuous_column_names, - data=chunk.data, - encoder=self._encoder, - scaler=self._scaler, - pca=self._pca, - imputer_categorical=self._imputer_categorical, - imputer_continuous=self._imputer_continuous, - ).mean() - for chunk in reference_chunks - ] - ) + def _calculate_chunk_record(self, data: pd.DataFrame) -> Dict[str, float]: + _size = data.shape[0] + rcerr_mean, rcerr_std = self._calculate_dre_results(data) + # sampling error based on data distribution on chunk - it's simple std err of mean + sampling_error = rcerr_std / np.sqrt(_size) + record = {} + try: + record['reconstruction_error'] = rcerr_mean + record['sampling_error'] = sampling_error + record['upper_confidence_bound'] = rcerr_mean + SAMPLING_ERROR_RANGE * sampling_error + record['lower_confidence_bound'] = np.maximum( + rcerr_mean + SAMPLING_ERROR_RANGE * sampling_error, + self.lower_threshold_value_limit, + ) + except Exception as exc: + record['reconstruction_error'] = np.nan + record['sampling_error'] = np.nan + record['upper_confidence_bound'] = np.nan + record['lower_confidence_bound'] = np.nan + self._logger.error( + f"An unexpected error occurred while calculating reconstruction error, returning NaN's: {exc}" + ) + finally: + return record - return self.threshold.thresholds(reference_reconstruction_error) - - -def _calculate_reconstruction_error_for_data( - column_names: List[str], - categorical_column_names: List[str], - continuous_column_names: List[str], - data: pd.DataFrame, - encoder: CountEncoder, - scaler: StandardScaler, - pca: PCA, - imputer_categorical: SimpleImputer, - imputer_continuous: SimpleImputer, -) -> pd.Series: - """Calculates reconstruction error for a single Chunk. - - Parameters - ---------- - column_names : List[str] - Subset of features to be included in calculation. - categorical_column_names : List[str] - Subset of categorical features to be included in calculation. - continuous_column_names : List[str] - Subset of continuous features to be included in calculation. - data : pd.DataFrame - The dataset to calculate reconstruction error on - encoder : category_encoders.CountEncoder - Encoder used to transform categorical features into a numerical representation - scaler : sklearn.preprocessing.StandardScaler - Standardize features by removing the mean and scaling to unit variance - pca : sklearn.decomposition.PCA - Linear dimensionality reduction using Singular Value Decomposition of the - data to project it to a lower dimensional space. - imputer_categorical: SimpleImputer - The SimpleImputer fitted to impute categorical features in the data. - imputer_continuous: SimpleImputer - The SimpleImputer fitted to impute continuous features in the data. - - Returns - ------- - rce_for_chunk: pd.DataFrame - A pandas.DataFrame containing the Chunk key and reconstruction error for the given Chunk data. - - """ - # encode categorical features - data = data.copy(deep=True).reset_index(drop=True) - - # Impute missing values - if categorical_column_names: - data[categorical_column_names] = imputer_categorical.transform(data[categorical_column_names]) - if continuous_column_names: - data[continuous_column_names] = imputer_continuous.transform(data[continuous_column_names]) - - data[column_names] = encoder.transform(data[column_names]) - - # scale all features - data[column_names] = scaler.transform(data[column_names]) - - # perform dimensionality reduction - reduced_data = pca.transform(data[column_names]) - - # perform reconstruction - reconstructed = pca.inverse_transform(reduced_data) - reconstructed_feature_column_names = [f'rf_{col}' for col in column_names] - reconstructed_data = pd.DataFrame(reconstructed, columns=reconstructed_feature_column_names) - - # combine preprocessed rows with reconstructed rows - data = pd.concat([data, reconstructed_data], axis=1) - - # calculate reconstruction error using euclidian norm (row-wise between preprocessed and reconstructed value) - data = data.assign(rc_error=lambda x: _calculate_distance(data, column_names, reconstructed_feature_column_names)) - - return data['rc_error'] - - -def _calculate_distance(df: pd.DataFrame, features_preprocessed: List[str], features_reconstructed: List[str]): - """Calculate row-wise euclidian distance between preprocessed and reconstructed feature values.""" - x1 = df[features_preprocessed] - x2 = df[features_reconstructed] - x2.columns = x1.columns - - x = x1.subtract(x2) - - x['rc_error'] = x.apply(lambda row: np.linalg.norm(row), axis=1) - return x['rc_error'] - - -def _add_alert_flag( - drift_result: pd.DataFrame, upper_threshold: Optional[float], lower_threshold: Optional[float] -) -> pd.Series: - alert = drift_result.apply( - lambda row: True - if ( - (upper_threshold is not None and row['reconstruction_error'] > upper_threshold) - or (lower_threshold is not None and row['reconstruction_error'] < lower_threshold) + def _calculate_dre_results(self, data: pd.DataFrame) -> Tuple[float, float]: + # Impute missing values + if self.categorical_column_names: + data[self.categorical_column_names] = self._imputer_categorical.transform(data[self.categorical_column_names]) # noqa: E501 + if self.continuous_column_names: + data[self.continuous_column_names] = self._imputer_continuous.transform(data[self.continuous_column_names]) + + data = self._encoder.transform(data[self.column_names]).to_numpy() + data = self._scaler.transform(data) + + tmp = self._pca.transform(data) + tmp = self._pca.inverse_transform(tmp) + tmp = data - tmp + tmp = np.linalg.norm(tmp, axis=1) + + # std returns nan there is only 1 row + return (np.mean(tmp), np.std(tmp, ddof=1)) + + def _set_thresholds(self, results: pd.DataFrame): + lower, upper = calculate_threshold_values( + threshold=self.threshold, + data=results[(self.column_name, 'value')].to_numpy(), + lower_threshold_value_limit=self.lower_threshold_value_limit, + upper_threshold_value_limit=None, + override_using_none=True, + logger=self._logger, + metric_name=self.column_name ) - else False, - axis=1, - ) - - return alert - - -def sampling_error(components: Tuple, data: pd.DataFrame) -> float: - """Calculates the sampling error with respect to the reference data for a given chunk of data. - - Parameters - ---------- - components: Tuple - data: pd.DataFrame - The data to calculate the sampling error on, with respect to the reference data. - - Returns - ------- - sampling_error: float - The expected sampling error. - """ - return components[0] / np.sqrt(len(data)) + self.lower_threshold_value = lower + self.upper_threshold_value = upper + + def _populate_thresholds(self, results: pd.DataFrame): + results[(self.column_name, 'upper_threshold')] = self.upper_threshold_value + results[(self.column_name, 'lower_threshold')] = self.lower_threshold_value + results[(self.column_name, 'alert')] = results.apply( + lambda row: True + if ( + (self.upper_threshold_value is not None and row[(self.column_name, 'value')] > self.upper_threshold_value) # noqa: E501 + or (self.lower_threshold_value is not None and row[(self.column_name, 'value')] < self.lower_threshold_value) # noqa: E501 + ) + else False, + axis=1, + ) + return results def _create_multilevel_index(): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] method_column_names = [ - 'sampling_error', 'value', + 'sampling_error', 'upper_confidence_boundary', 'lower_confidence_boundary', - 'upper_threshold', - 'lower_threshold', - 'alert', ] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] - reconstruction_tuples = [('reconstruction_error', column_name) for column_name in method_column_names] - + reconstruction_tuples = [('reconstruction_error', method_column_name) for method_column_name in method_column_names] tuples = chunk_tuples + reconstruction_tuples - return MultiIndex.from_tuples(tuples) diff --git a/tests/drift/test_multiv_pca.py b/tests/drift/test_multiv_pca.py index 63e0cf35a..d4fca9c37 100644 --- a/tests/drift/test_multiv_pca.py +++ b/tests/drift/test_multiv_pca.py @@ -470,23 +470,6 @@ def test_data_reconstruction_drift_result_filter_period(reconstruction_drift_res assert filtered_result.data.equals(ref_period) -def test_data_reconstruction_drift_chunked_by_size_has_fixed_sampling_error(sample_drift_data): # noqa: D103 - ref_data = sample_drift_data.loc[sample_drift_data['period'] == 'reference'] - - chunker = SizeBasedChunker(chunk_size=2500, incomplete='drop') - - calc = DataReconstructionDriftCalculator( - column_names=['f1', 'f2', 'f3', 'f4'], timestamp_column_name='timestamp', chunker=chunker - ).fit(ref_data) - results = calc.calculate(data=sample_drift_data) - - assert ('reconstruction_error', 'sampling_error') in results.data.columns - assert np.array_equal( - np.round(results.to_df().loc[:, ('reconstruction_error', 'sampling_error')], 4), - np.round([0.01164 for _ in range(len(results.data))], 4), - ) - - def test_data_reconstruction_drift_chunked_by_period_has_variable_sampling_error(sample_drift_data): # noqa: D103 ref_data = sample_drift_data.loc[sample_drift_data['period'] == 'reference'] @@ -498,7 +481,7 @@ def test_data_reconstruction_drift_chunked_by_period_has_variable_sampling_error assert ('reconstruction_error', 'sampling_error') in results.data.columns assert np.array_equal( np.round(results.filter(period='analysis').to_df().loc[:, ('reconstruction_error', 'sampling_error')], 4), - np.round([0.009511, 0.009005, 0.008710, 0.008854, 0.009899], 4), + [0.0095, 0.0090, 0.0086, 0.0086, 0.0092], )