diff --git a/nannyml/drift/univariate/calculator.py b/nannyml/drift/univariate/calculator.py index 836a3b73..f072083e 100644 --- a/nannyml/drift/univariate/calculator.py +++ b/nannyml/drift/univariate/calculator.py @@ -111,7 +111,6 @@ def __init__( chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. thresholds: dict - Defaults to:: { @@ -136,8 +135,7 @@ def __init__( The `chi2` method does not support custom thresholds for now. Additional research is required to determine how to transition from its current p-value based implementation. - computation_params : dict - + computation_params: dict Defaults to:: { diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index b5710318..a635c831 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -2,7 +2,7 @@ # # License: Apache Software License 2.0 -""" This module contains the different drift detection method implementations. +"""This module contains the different drift detection method implementations. The :class:`~nannyml.drift.univariate.methods.MethodFactory` will convert the drift detection method names into an instance of the base :class:`~nannyml.drift.univariate.methods.Method` class. @@ -62,10 +62,8 @@ def __init__( computation_params : dict, default=None A dictionary specifying parameter names and values to be used in the computation of the drift method. - upper_threshold : float, default=None - An optional upper threshold for the data quality metric. - lower_threshold : float, default=None - An optional lower threshold for the data quality metric. + threshold : Threshold + Threshold class defining threshold strategy. upper_threshold_limit : float, default=None An optional upper threshold limit for the data quality metric. lower_threshold_limit : float, default=0 @@ -257,6 +255,7 @@ class JensenShannonDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Jensen-Shannon method.""" super().__init__( display_name='Jensen-Shannon distance', column_name='jensen_shannon', @@ -339,6 +338,7 @@ class KolmogorovSmirnovStatistic(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Kolmogorov-Smirnov method.""" super().__init__( display_name='Kolmogorov-Smirnov statistic', column_name='kolmogorov_smirnov', @@ -405,7 +405,7 @@ def _calculate(self, data: pd.Series): chunk_rel_freqs = chunk_proba_in_qts / len(data) rel_freq_lower_than_edges = len(data[data < self._qts[0]]) / len(data) chunk_rel_freqs = rel_freq_lower_than_edges + np.cumsum(chunk_rel_freqs) - stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) + stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) # type: ignore else: stat, _ = ks_2samp(self._reference_data, data) @@ -420,6 +420,7 @@ class Chi2Statistic(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Chi2-contingency method.""" super().__init__( display_name='Chi2 statistic', column_name='chi2', @@ -444,6 +445,16 @@ def __init__(self, **kwargs) -> None: self._fitted = False def fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self: + """Fits Chi2 Method on reference data. + + Parameters + ---------- + reference_data: pd.DataFrame + The reference data used for fitting a Method. Must have target data available. + timestamps: Optional[pd.Series], default=None + A series containing the reference data Timestamps + + """ super().fit(reference_data, timestamps) # Thresholding is based on p-values. Ignoring all custom thresholding and disable plotting a threshold @@ -470,6 +481,16 @@ def _calculate(self, data: pd.Series): return stat def alert(self, value: float): + """Evaluates if an alert has occurred for Chi2 on the current chunk data. + + For Chi2 alerts are based on p-values rather than the actual method values like + in all other Univariate drift methods. + + Parameters + ---------- + value: float + The method value for a given chunk + """ return self._p_value < 0.05 def _calc_chi2(self, data: pd.Series): @@ -491,6 +512,7 @@ class LInfinityDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize L-Infinity Distance method.""" super().__init__( display_name='L-Infinity distance', column_name='l_infinity', @@ -537,6 +559,7 @@ class WassersteinDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Wasserstein Distance method.""" super().__init__( display_name='Wasserstein distance', column_name='wasserstein', @@ -559,6 +582,9 @@ def __init__(self, **kwargs) -> None: self._bin_width: float self._bin_edges: np.ndarray self._ref_rel_freqs: Optional[np.ndarray] = None + self._ref_min: float + self._ref_max: float + self._ref_cdf: np.ndarray self._fitted = False if ( (not kwargs) @@ -579,6 +605,9 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None reference_proba_in_bins, self._bin_edges = np.histogram(reference_data, bins=self.n_bins) self._ref_rel_freqs = reference_proba_in_bins / len(reference_data) self._bin_width = self._bin_edges[1] - self._bin_edges[0] + self._ref_min = self._bin_edges[0] + self._ref_max = self._bin_edges[-1] + self._ref_cdf = np.cumsum(self._ref_rel_freqs) self._fitted = True self._reference_size = len(reference_data) @@ -596,47 +625,49 @@ def _calculate(self, data: pd.Series): if ( self.calculation_method == 'auto' and self._reference_size >= 10_000 ) or self.calculation_method == 'estimated': - min_chunk = np.min(data) - - if min_chunk < self._bin_edges[0]: - extra_bins_left = (min_chunk - self._bin_edges[0]) / self._bin_width - extra_bins_left = np.ceil(extra_bins_left) + data_smaller = data[data < self._ref_min] + data_bigger = data[data > self._ref_max] + n_smaller = len(data_smaller) + n_bigger = len(data_bigger) + + if n_smaller > 0: + amount_smaller = (n_smaller + 1) / len(data) + smaller_with_first_ref_value = np.concatenate((data_smaller, [self._ref_min])) + x, y = self._ecdf(smaller_with_first_ref_value) + term_smaller = np.sum((y)[:-1] * np.diff(x)) + term_smaller = term_smaller * amount_smaller else: - extra_bins_left = 0 - - max_chunk = np.max(data) - - if max_chunk > self._bin_edges[-1]: - extra_bins_right = (max_chunk - self._bin_edges[-1]) / self._bin_width - extra_bins_right = np.ceil(extra_bins_right) + term_smaller, amount_smaller = 0, 0 + + if n_bigger > 0: + amount_bigger = (n_bigger + 1) / len(data) + bigger_with_last_ref_value = np.concatenate(([self._ref_max], data_bigger)) + x, y = self._ecdf(bigger_with_last_ref_value) + term_bigger = np.sum((1 - y)[:-1] * np.diff(x)) + term_bigger = term_bigger * amount_bigger else: - extra_bins_right = 0 + term_bigger, amount_bigger = 0, 0 - left_edges_to_prepand = np.arange( - min_chunk - self._bin_width, self._bin_edges[0] - self._bin_width, self._bin_width - ) - right_edges_to_append = np.arange( - self._bin_edges[-1] + self._bin_width, max_chunk + self._bin_width, self._bin_width - ) - - updated_edges = np.concatenate([left_edges_to_prepand, self._bin_edges, right_edges_to_append]) - updated_ref_binned_pdf = np.concatenate( - [np.zeros(len(left_edges_to_prepand)), self._ref_rel_freqs, np.zeros(len(right_edges_to_append))] - ) + data_histogram, _ = np.histogram(data, bins=self._bin_edges) + data_histogram = data_histogram / len(data) - chunk_histogram, _ = np.histogram(data, bins=updated_edges) + data_cdf = np.cumsum(data_histogram) + data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side + term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width) - chunk_binned_pdf = chunk_histogram / len(data) - - ref_binned_cdf = np.cumsum(updated_ref_binned_pdf) - chunk_binned_cdf = np.cumsum(chunk_binned_pdf) - - distance = np.sum(np.abs(ref_binned_cdf - chunk_binned_cdf) * self._bin_width) + distance = term_within + term_smaller + term_bigger else: distance = wasserstein_distance(self._reference_data, data) return distance + def _ecdf(self, vec: np.ndarray): + """Custom implementation to calculate ECDF.""" + vec = np.sort(vec) + x, counts = np.unique(vec, return_counts=True) + cdf = np.cumsum(counts) / len(vec) + return x, cdf + @MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS) @MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL) @@ -644,6 +675,7 @@ class HellingerDistance(Method): """Calculates the Hellinger Distance between two distributions.""" def __init__(self, **kwargs) -> None: + """Initialize Hellinger Distance method.""" super().__init__( display_name='Hellinger distance', column_name='hellinger', diff --git a/nannyml/drift/univariate/result.py b/nannyml/drift/univariate/result.py index c1d31df1..7cce49fa 100644 --- a/nannyml/drift/univariate/result.py +++ b/nannyml/drift/univariate/result.py @@ -44,7 +44,8 @@ def __init__( analysis_data: pd.DataFrame = None, reference_data: pd.DataFrame = None, ): - """ + """Initialize resuts class. + Parameters ---------- results_data: pd.DataFrame @@ -112,6 +113,7 @@ def __init__( @property def methods(self) -> List[Method]: + """Methods used during calculation.""" return cast(List[Method], self.metrics) def _filter( @@ -167,9 +169,9 @@ def _get_result_property(self, property_name: str) -> List[pd.Series]: return continuous_values + categorical_values def keys(self) -> List[Key]: - """ - Creates a list of keys for continuos and categorial columns where each Key is a `namedtuple('Key', - 'properties display_names')` + """Creates a list of keys for continuos and categorial columns. + + Each Key is a `namedtuple('Key', 'properties display_names')` """ continuous_keys = [ Key(properties=(column, method.column_name), display_names=(column, method.display_name)) @@ -204,6 +206,7 @@ def plot( - 'distribution' plots feature distribution per :class:`~nannyml.chunk.Chunk`. Joyplot for continuous features, stacked bar charts for categorical features. + Returns ------- fig: :class:`plotly.graph_objs._figure.Figure` diff --git a/tests/drift/test_univariate_drift_methods.py b/tests/drift/test_univariate_drift_methods.py index 7a797851..50d742c8 100644 --- a/tests/drift/test_univariate_drift_methods.py +++ b/tests/drift/test_univariate_drift_methods.py @@ -118,6 +118,16 @@ def test_wasserstein_both_continuous_analysis_with_neg_mean_medium_drift(): # n assert wass_dist == 3.99 +def test_wasserstein_both_continuous_analysis_estimate_with_out_of_reference_drift(): # noqa: D103 + np.random.seed(1) + reference = pd.Series(np.random.normal(0, 1, 15_000), name='A') + analysis = pd.Series(np.random.normal(0, 10, 1_000_000), name='A') + wass_dist = WassersteinDistance(chunker=chunker, threshold=threshold) + wass_dist = wass_dist.fit(reference).calculate(analysis) + wass_dist = np.round(wass_dist, 3) + assert wass_dist == 7.180 + + # ************* Hellinger Tests *************