Skip to content

Commit

Permalink
Merge pull request #393 from NannyML/upd_wass
Browse files Browse the repository at this point in the history
Wasserstein update
  • Loading branch information
nnansters authored Jun 6, 2024
2 parents b8b237f + 878dc60 commit c8fde0e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 44 deletions.
4 changes: 1 addition & 3 deletions nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
chunker : Chunker
The `Chunker` used to split the data sets into a lists of chunks.
thresholds: dict
Defaults to::
{
Expand All @@ -136,8 +135,7 @@ def __init__(
The `chi2` method does not support custom thresholds for now. Additional research is required to determine
how to transition from its current p-value based implementation.
computation_params : dict
computation_params: dict
Defaults to::
{
Expand Down
106 changes: 69 additions & 37 deletions nannyml/drift/univariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# License: Apache Software License 2.0

""" This module contains the different drift detection method implementations.
"""This module contains the different drift detection method implementations.
The :class:`~nannyml.drift.univariate.methods.MethodFactory` will convert the drift detection method names
into an instance of the base :class:`~nannyml.drift.univariate.methods.Method` class.
Expand Down Expand Up @@ -62,10 +62,8 @@ def __init__(
computation_params : dict, default=None
A dictionary specifying parameter names and values to be used in the computation of the
drift method.
upper_threshold : float, default=None
An optional upper threshold for the data quality metric.
lower_threshold : float, default=None
An optional lower threshold for the data quality metric.
threshold : Threshold
Threshold class defining threshold strategy.
upper_threshold_limit : float, default=None
An optional upper threshold limit for the data quality metric.
lower_threshold_limit : float, default=0
Expand Down Expand Up @@ -257,6 +255,7 @@ class JensenShannonDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Jensen-Shannon method."""
super().__init__(
display_name='Jensen-Shannon distance',
column_name='jensen_shannon',
Expand Down Expand Up @@ -339,6 +338,7 @@ class KolmogorovSmirnovStatistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Kolmogorov-Smirnov method."""
super().__init__(
display_name='Kolmogorov-Smirnov statistic',
column_name='kolmogorov_smirnov',
Expand Down Expand Up @@ -405,7 +405,7 @@ def _calculate(self, data: pd.Series):
chunk_rel_freqs = chunk_proba_in_qts / len(data)
rel_freq_lower_than_edges = len(data[data < self._qts[0]]) / len(data)
chunk_rel_freqs = rel_freq_lower_than_edges + np.cumsum(chunk_rel_freqs)
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs))
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) # type: ignore
else:
stat, _ = ks_2samp(self._reference_data, data)

Expand All @@ -420,6 +420,7 @@ class Chi2Statistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Chi2-contingency method."""
super().__init__(
display_name='Chi2 statistic',
column_name='chi2',
Expand All @@ -444,6 +445,16 @@ def __init__(self, **kwargs) -> None:
self._fitted = False

def fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
"""Fits Chi2 Method on reference data.
Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting a Method. Must have target data available.
timestamps: Optional[pd.Series], default=None
A series containing the reference data Timestamps
"""
super().fit(reference_data, timestamps)

# Thresholding is based on p-values. Ignoring all custom thresholding and disable plotting a threshold
Expand All @@ -470,6 +481,16 @@ def _calculate(self, data: pd.Series):
return stat

def alert(self, value: float):
"""Evaluates if an alert has occurred for Chi2 on the current chunk data.
For Chi2 alerts are based on p-values rather than the actual method values like
in all other Univariate drift methods.
Parameters
----------
value: float
The method value for a given chunk
"""
return self._p_value < 0.05

def _calc_chi2(self, data: pd.Series):
Expand All @@ -491,6 +512,7 @@ class LInfinityDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize L-Infinity Distance method."""
super().__init__(
display_name='L-Infinity distance',
column_name='l_infinity',
Expand Down Expand Up @@ -537,6 +559,7 @@ class WassersteinDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Wasserstein Distance method."""
super().__init__(
display_name='Wasserstein distance',
column_name='wasserstein',
Expand All @@ -559,6 +582,9 @@ def __init__(self, **kwargs) -> None:
self._bin_width: float
self._bin_edges: np.ndarray
self._ref_rel_freqs: Optional[np.ndarray] = None
self._ref_min: float
self._ref_max: float
self._ref_cdf: np.ndarray
self._fitted = False
if (
(not kwargs)
Expand All @@ -579,6 +605,9 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None
reference_proba_in_bins, self._bin_edges = np.histogram(reference_data, bins=self.n_bins)
self._ref_rel_freqs = reference_proba_in_bins / len(reference_data)
self._bin_width = self._bin_edges[1] - self._bin_edges[0]
self._ref_min = self._bin_edges[0]
self._ref_max = self._bin_edges[-1]
self._ref_cdf = np.cumsum(self._ref_rel_freqs)

self._fitted = True
self._reference_size = len(reference_data)
Expand All @@ -596,54 +625,57 @@ def _calculate(self, data: pd.Series):
if (
self.calculation_method == 'auto' and self._reference_size >= 10_000
) or self.calculation_method == 'estimated':
min_chunk = np.min(data)

if min_chunk < self._bin_edges[0]:
extra_bins_left = (min_chunk - self._bin_edges[0]) / self._bin_width
extra_bins_left = np.ceil(extra_bins_left)
data_smaller = data[data < self._ref_min]
data_bigger = data[data > self._ref_max]
n_smaller = len(data_smaller)
n_bigger = len(data_bigger)

if n_smaller > 0:
amount_smaller = (n_smaller + 1) / len(data)
smaller_with_first_ref_value = np.concatenate((data_smaller, [self._ref_min]))
x, y = self._ecdf(smaller_with_first_ref_value)
term_smaller = np.sum((y)[:-1] * np.diff(x))
term_smaller = term_smaller * amount_smaller
else:
extra_bins_left = 0

max_chunk = np.max(data)

if max_chunk > self._bin_edges[-1]:
extra_bins_right = (max_chunk - self._bin_edges[-1]) / self._bin_width
extra_bins_right = np.ceil(extra_bins_right)
term_smaller, amount_smaller = 0, 0

if n_bigger > 0:
amount_bigger = (n_bigger + 1) / len(data)
bigger_with_last_ref_value = np.concatenate(([self._ref_max], data_bigger))
x, y = self._ecdf(bigger_with_last_ref_value)
term_bigger = np.sum((1 - y)[:-1] * np.diff(x))
term_bigger = term_bigger * amount_bigger
else:
extra_bins_right = 0
term_bigger, amount_bigger = 0, 0

left_edges_to_prepand = np.arange(
min_chunk - self._bin_width, self._bin_edges[0] - self._bin_width, self._bin_width
)
right_edges_to_append = np.arange(
self._bin_edges[-1] + self._bin_width, max_chunk + self._bin_width, self._bin_width
)

updated_edges = np.concatenate([left_edges_to_prepand, self._bin_edges, right_edges_to_append])
updated_ref_binned_pdf = np.concatenate(
[np.zeros(len(left_edges_to_prepand)), self._ref_rel_freqs, np.zeros(len(right_edges_to_append))]
)
data_histogram, _ = np.histogram(data, bins=self._bin_edges)
data_histogram = data_histogram / len(data)

chunk_histogram, _ = np.histogram(data, bins=updated_edges)
data_cdf = np.cumsum(data_histogram)
data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side
term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width)

chunk_binned_pdf = chunk_histogram / len(data)

ref_binned_cdf = np.cumsum(updated_ref_binned_pdf)
chunk_binned_cdf = np.cumsum(chunk_binned_pdf)

distance = np.sum(np.abs(ref_binned_cdf - chunk_binned_cdf) * self._bin_width)
distance = term_within + term_smaller + term_bigger
else:
distance = wasserstein_distance(self._reference_data, data)

return distance

def _ecdf(self, vec: np.ndarray):
"""Custom implementation to calculate ECDF."""
vec = np.sort(vec)
x, counts = np.unique(vec, return_counts=True)
cdf = np.cumsum(counts) / len(vec)
return x, cdf


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class HellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
"""Initialize Hellinger Distance method."""
super().__init__(
display_name='Hellinger distance',
column_name='hellinger',
Expand Down
11 changes: 7 additions & 4 deletions nannyml/drift/univariate/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def __init__(
analysis_data: pd.DataFrame = None,
reference_data: pd.DataFrame = None,
):
"""
"""Initialize resuts class.
Parameters
----------
results_data: pd.DataFrame
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(

@property
def methods(self) -> List[Method]:
"""Methods used during calculation."""
return cast(List[Method], self.metrics)

def _filter(
Expand Down Expand Up @@ -167,9 +169,9 @@ def _get_result_property(self, property_name: str) -> List[pd.Series]:
return continuous_values + categorical_values

def keys(self) -> List[Key]:
"""
Creates a list of keys for continuos and categorial columns where each Key is a `namedtuple('Key',
'properties display_names')`
"""Creates a list of keys for continuos and categorial columns.
Each Key is a `namedtuple('Key', 'properties display_names')`
"""
continuous_keys = [
Key(properties=(column, method.column_name), display_names=(column, method.display_name))
Expand Down Expand Up @@ -204,6 +206,7 @@ def plot(
- 'distribution'
plots feature distribution per :class:`~nannyml.chunk.Chunk`.
Joyplot for continuous features, stacked bar charts for categorical features.
Returns
-------
fig: :class:`plotly.graph_objs._figure.Figure`
Expand Down
10 changes: 10 additions & 0 deletions tests/drift/test_univariate_drift_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def test_wasserstein_both_continuous_analysis_with_neg_mean_medium_drift(): # n
assert wass_dist == 3.99


def test_wasserstein_both_continuous_analysis_estimate_with_out_of_reference_drift(): # noqa: D103
np.random.seed(1)
reference = pd.Series(np.random.normal(0, 1, 15_000), name='A')
analysis = pd.Series(np.random.normal(0, 10, 1_000_000), name='A')
wass_dist = WassersteinDistance(chunker=chunker, threshold=threshold)
wass_dist = wass_dist.fit(reference).calculate(analysis)
wass_dist = np.round(wass_dist, 3)
assert wass_dist == 7.180


# ************* Hellinger Tests *************


Expand Down

0 comments on commit c8fde0e

Please sign in to comment.