Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wasserstein update #393

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
chunker : Chunker
The `Chunker` used to split the data sets into a lists of chunks.
thresholds: dict

Defaults to::

{
Expand All @@ -136,8 +135,7 @@ def __init__(
The `chi2` method does not support custom thresholds for now. Additional research is required to determine
how to transition from its current p-value based implementation.

computation_params : dict

computation_params: dict
Defaults to::

{
Expand Down
103 changes: 66 additions & 37 deletions nannyml/drift/univariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# License: Apache Software License 2.0

""" This module contains the different drift detection method implementations.
"""This module contains the different drift detection method implementations.

The :class:`~nannyml.drift.univariate.methods.MethodFactory` will convert the drift detection method names
into an instance of the base :class:`~nannyml.drift.univariate.methods.Method` class.
Expand Down Expand Up @@ -62,10 +62,8 @@ def __init__(
computation_params : dict, default=None
A dictionary specifying parameter names and values to be used in the computation of the
drift method.
upper_threshold : float, default=None
An optional upper threshold for the data quality metric.
lower_threshold : float, default=None
An optional lower threshold for the data quality metric.
threshold : Threshold
Threshold class defining threshold strategy.
upper_threshold_limit : float, default=None
An optional upper threshold limit for the data quality metric.
lower_threshold_limit : float, default=0
Expand Down Expand Up @@ -257,6 +255,7 @@ class JensenShannonDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Jensen-Shannon method."""
super().__init__(
display_name='Jensen-Shannon distance',
column_name='jensen_shannon',
Expand Down Expand Up @@ -339,6 +338,7 @@ class KolmogorovSmirnovStatistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Kolmogorov-Smirnov method."""
super().__init__(
display_name='Kolmogorov-Smirnov statistic',
column_name='kolmogorov_smirnov',
Expand Down Expand Up @@ -405,7 +405,7 @@ def _calculate(self, data: pd.Series):
chunk_rel_freqs = chunk_proba_in_qts / len(data)
rel_freq_lower_than_edges = len(data[data < self._qts[0]]) / len(data)
chunk_rel_freqs = rel_freq_lower_than_edges + np.cumsum(chunk_rel_freqs)
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs))
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) # type: ignore
else:
stat, _ = ks_2samp(self._reference_data, data)

Expand All @@ -420,6 +420,7 @@ class Chi2Statistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Chi2-contingency method."""
super().__init__(
display_name='Chi2 statistic',
column_name='chi2',
Expand All @@ -444,6 +445,16 @@ def __init__(self, **kwargs) -> None:
self._fitted = False

def fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
"""Fits Chi2 Method on reference data.

Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting a Method. Must have target data available.
timestamps: Optional[pd.Series], default=None
A series containing the reference data Timestamps

"""
super().fit(reference_data, timestamps)

# Thresholding is based on p-values. Ignoring all custom thresholding and disable plotting a threshold
Expand All @@ -470,6 +481,16 @@ def _calculate(self, data: pd.Series):
return stat

def alert(self, value: float):
"""Evaluates if an alert has occurred for Chi2 on the current chunk data.

For Chi2 alerts are based on p-values rather than the actual method values like
in all other Univariate drift methods.

Parameters
----------
value: float
The method value for a given chunk
"""
return self._p_value < 0.05

def _calc_chi2(self, data: pd.Series):
Expand All @@ -491,6 +512,7 @@ class LInfinityDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize L-Infinity Distance method."""
super().__init__(
display_name='L-Infinity distance',
column_name='l_infinity',
Expand Down Expand Up @@ -537,6 +559,7 @@ class WassersteinDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Wasserstein Distance method."""
super().__init__(
display_name='Wasserstein distance',
column_name='wasserstein',
Expand Down Expand Up @@ -579,6 +602,9 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None
reference_proba_in_bins, self._bin_edges = np.histogram(reference_data, bins=self.n_bins)
self._ref_rel_freqs = reference_proba_in_bins / len(reference_data)
self._bin_width = self._bin_edges[1] - self._bin_edges[0]
self._ref_min = self._bin_edges[0]
self._ref_max = self._bin_edges[-1]
self._ref_cdf = np.cumsum(self._ref_rel_freqs)

self._fitted = True
self._reference_size = len(reference_data)
Expand All @@ -596,54 +622,57 @@ def _calculate(self, data: pd.Series):
if (
self.calculation_method == 'auto' and self._reference_size >= 10_000
) or self.calculation_method == 'estimated':
min_chunk = np.min(data)

if min_chunk < self._bin_edges[0]:
extra_bins_left = (min_chunk - self._bin_edges[0]) / self._bin_width
extra_bins_left = np.ceil(extra_bins_left)
data_smaller = data[data < self._ref_min]
data_bigger = data[data > self._ref_max]
n_smaller = len(data_smaller)
n_bigger = len(data_bigger)

if n_smaller > 0:
amount_smaller = (n_smaller + 1) / len(data)
smaller_with_first_ref_value = np.concatenate((data_smaller, [self._ref_min]))
x, y = self._ecdf(smaller_with_first_ref_value)
term_smaller = np.sum((y)[:-1] * np.diff(x))
term_smaller = term_smaller * amount_smaller
else:
extra_bins_left = 0

max_chunk = np.max(data)

if max_chunk > self._bin_edges[-1]:
extra_bins_right = (max_chunk - self._bin_edges[-1]) / self._bin_width
extra_bins_right = np.ceil(extra_bins_right)
term_smaller, amount_smaller = 0, 0

if n_bigger > 0:
amount_bigger = (n_bigger + 1) / len(data)
bigger_with_last_ref_value = np.concatenate(([self._ref_max], data_bigger))
x, y = self._ecdf(bigger_with_last_ref_value)
term_bigger = np.sum((1 - y)[: -1] * np.diff(x))
term_bigger = term_bigger * amount_bigger
else:
extra_bins_right = 0
term_bigger, amount_bigger = 0, 0

left_edges_to_prepand = np.arange(
min_chunk - self._bin_width, self._bin_edges[0] - self._bin_width, self._bin_width
)
right_edges_to_append = np.arange(
self._bin_edges[-1] + self._bin_width, max_chunk + self._bin_width, self._bin_width
)

updated_edges = np.concatenate([left_edges_to_prepand, self._bin_edges, right_edges_to_append])
updated_ref_binned_pdf = np.concatenate(
[np.zeros(len(left_edges_to_prepand)), self._ref_rel_freqs, np.zeros(len(right_edges_to_append))]
)
data_histogram, _ = np.histogram(data, bins=self._bin_edges)
data_histogram = data_histogram / len(data)

chunk_histogram, _ = np.histogram(data, bins=updated_edges)
data_cdf = np.cumsum(data_histogram)
data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side
term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width)

chunk_binned_pdf = chunk_histogram / len(data)

ref_binned_cdf = np.cumsum(updated_ref_binned_pdf)
chunk_binned_cdf = np.cumsum(chunk_binned_pdf)

distance = np.sum(np.abs(ref_binned_cdf - chunk_binned_cdf) * self._bin_width)
distance = term_within + term_smaller + term_bigger
else:
distance = wasserstein_distance(self._reference_data, data)

return distance

def _ecdf(self, vec: np.ndarray):
"""Custom implementation to calculate ECDF."""
vec = np.sort(vec)
x, counts = np.unique(vec, return_counts=True)
cdf = np.cumsum(counts) / len(vec)
return x, cdf


@MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS)
@MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL)
class HellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
"""Initialize Hellinger Distance method."""
super().__init__(
display_name='Hellinger distance',
column_name='hellinger',
Expand Down
11 changes: 7 additions & 4 deletions nannyml/drift/univariate/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def __init__(
analysis_data: pd.DataFrame = None,
reference_data: pd.DataFrame = None,
):
"""
"""Initialize resuts class.

Parameters
----------
results_data: pd.DataFrame
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(

@property
def methods(self) -> List[Method]:
"""Methods used during calculation."""
return cast(List[Method], self.metrics)

def _filter(
Expand Down Expand Up @@ -167,9 +169,9 @@ def _get_result_property(self, property_name: str) -> List[pd.Series]:
return continuous_values + categorical_values

def keys(self) -> List[Key]:
"""
Creates a list of keys for continuos and categorial columns where each Key is a `namedtuple('Key',
'properties display_names')`
"""Creates a list of keys for continuos and categorial columns.

Each Key is a `namedtuple('Key', 'properties display_names')`
"""
continuous_keys = [
Key(properties=(column, method.column_name), display_names=(column, method.display_name))
Expand Down Expand Up @@ -204,6 +206,7 @@ def plot(
- 'distribution'
plots feature distribution per :class:`~nannyml.chunk.Chunk`.
Joyplot for continuous features, stacked bar charts for categorical features.

Returns
-------
fig: :class:`plotly.graph_objs._figure.Figure`
Expand Down
10 changes: 10 additions & 0 deletions tests/drift/test_univariate_drift_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def test_wasserstein_both_continuous_analysis_with_neg_mean_medium_drift(): # n
assert wass_dist == 3.99


def test_wasserstein_both_continuous_analysis_estimate_with_out_of_reference_drift(): # noqa: D103
np.random.seed(1)
reference = pd.Series(np.random.normal(0, 1, 15_000), name='A')
analysis = pd.Series(np.random.normal(0, 10, 1_000_000), name='A')
nnansters marked this conversation as resolved.
Show resolved Hide resolved
wass_dist = WassersteinDistance(chunker=chunker, threshold=threshold)
wass_dist = wass_dist.fit(reference).calculate(analysis)
wass_dist = np.round(wass_dist, 3)
assert wass_dist == 7.180


# ************* Hellinger Tests *************


Expand Down
Loading