Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'weights' and 'axis' in sample at DataFrame and Series #1893

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 115 additions & 29 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
else:
from pandas.core.dtypes.common import _get_dtype_from_object as infer_dtype_from_object
from pandas.core.accessor import CachedAccessor
import pandas.core.common as com
from pandas.core.dtypes.inference import is_sequence
import pyspark
from pyspark import StorageLevel
Expand Down Expand Up @@ -7194,7 +7195,9 @@ def sample(
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
weights: Optional[Any] = None,
random_state: Optional[int] = None,
axis: Optional[Any] = None,
) -> "DataFrame":
"""
Return a random sample of items from an axis of object.
Expand All @@ -7215,14 +7218,34 @@ def sample(
Fraction of axis items to return.
replace : bool, default False
Sample with or without replacement.
weights : str or ndarray-like, optional
Default 'None' results in equal probability weighting.
If passed a Series, will align with target object on index. Index
values in weights not found in sampled object will be ignored and
index values in sampled object not in weights will be assigned
weights of zero.
If called on a DataFrame, will accept the name of a column
when axis = 0.
Unless weights are a Series, weights must be same length as axis
being sampled.
If weights do not sum to 1, they will be normalized to sum to 1.
Missing values in the weights column will be treated as zero.
Infinite values not allowed.
random_state : int, optional
Seed for the random number generator (if int).
axis : {0 or ‘index’, 1 or ‘columns’, None}, default None
Axis to sample. Accepts axis number or name. Default is stat axis
for given data type (0 for Series and DataFrames).

Returns
-------
Series or DataFrame
A new object of same type as caller containing the sampled items.

Notes
-----
If `frac` > 1, `replacement` should be set to `True`.
Comment on lines +7246 to +7248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


Examples
--------
>>> df = ks.DataFrame({'num_legs': [2, 4, 8, 0],
Expand All @@ -7237,46 +7260,109 @@ def sample(
spider 8 0 1
fish 0 0 8

A random 25% sample of the ``DataFrame``.
Extract 3 random elements from the ``Series`` ``df['num_legs']``:
Note that we use `random_state` to ensure the reproducibility of
the examples.

>>> df.sample(frac=0.25, random_state=1) # doctest: +SKIP
num_legs num_wings num_specimen_seen
falcon 2 2 10
fish 0 0 8

Extract 25% random elements from the ``Series`` ``df['num_legs']``, with replacement,
so the same items could appear more than once.

>>> df['num_legs'].sample(frac=0.4, replace=True, random_state=1) # doctest: +SKIP
>>> df['num_legs'].sample(n=3, random_state=1).sort_index()
itholic marked this conversation as resolved.
Show resolved Hide resolved
falcon 2
spider 8
fish 0
spider 8
Name: num_legs, dtype: int64

Specifying the exact number of items to return is not supported at the moment.
A random 50% sample of the ``DataFrame`` with replacement:

>>> df.sample(n=5) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
NotImplementedError: Function sample currently does not support specifying ...
>>> df.sample(frac=0.5, replace=True, random_state=1).sort_index()
num_legs num_wings num_specimen_seen
dog 4 0 2
fish 0 0 8
"""
# Note: we don't run any of the doctests because the result can change depending on the
# system's core count.
if n is not None:
raise NotImplementedError(
"Function sample currently does not support specifying "
"exact number of items to return. Use frac instead."
)
if axis in ("index", "rows", 0, None):
axis = 0
elif axis in ("columns", 1):
raise NotImplementedError("Function sample currently does not support axis=1.")
else:
raise ValueError("No axis named %s for object type %s." % (axis, type(axis)))
itholic marked this conversation as resolved.
Show resolved Hide resolved

if frac is None:
raise ValueError("frac must be specified.")
axis_length = self.shape[axis]

sdf = self._internal.resolved_copy.spark_frame.sample(
withReplacement=replace, fraction=frac, seed=random_state
)
return DataFrame(self._internal.with_new_sdf(sdf))
# Process random_state argument
if LooseVersion(pd.__version__) >= LooseVersion("0.24"):
rs = com.random_state(random_state)
else:
rs = com._random_state(random_state)

# Check weights for compliance
if weights is not None:

# If a series, align with frame
if isinstance(weights, ks.Series):
weights = weights.reindex(self.axes[axis])

# Strings acceptable if a dataframe and axis = 0
if isinstance(weights, str):
if isinstance(self, ks.DataFrame):
if axis == 0:
try:
weights = self[weights]
except KeyError as err:
raise KeyError("String passed to weights not a valid column") from err

# Because ks.Series currently does not support the Series.__iter__ method,
# It cannot be initialized to the pandas Series, so here is to_pandas.
if isinstance(weights, ks.Series):
weights = pd.Series(weights.to_pandas(), dtype="float64")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is weights always expected to small enough so that good to use to_pandas() ??
If not, I think we better don't support weights as a Series for now since it could occur serious performance degradation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right!
If the series is relatively large, it may indeed cause performance problems.
The series weight is not supported here right now.

else:
weights = pd.Series(weights, dtype="float64")

if len(weights) != axis_length:
raise ValueError("Weights and axis to be sampled must be of same length")

if (weights == np.inf).any() or (weights == -np.inf).any():
raise ValueError("weight vector may not include `inf` values")

if (weights < 0).any():
raise ValueError("weight vector many not include negative values")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: many -> may
Of course I understand that you've just followed pandas' message 👍 , but It looks obviously typo.


# If has nan, set to zero.
weights = weights.fillna(0)

# Renormalize if don't sum to 1
if weights.sum() != 1:
if weights.sum() != 0:
weights = weights / weights.sum()
itholic marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError("Invalid weights: weights sum to zero")

weights = weights._values

# If no frac or n, default to n=1.
if n is None and frac is None:
n = 1
elif frac is not None and frac > 1 and not replace:
raise ValueError(
"Replace has to be set to `True` when " "upsampling the population `frac` > 1."
)
elif n is not None and frac is None and n % 1 != 0:
raise ValueError("Only integers accepted as `n` values")
elif n is None and frac is not None:
n = int(round(frac * axis_length))
elif n is not None and frac is not None:
raise ValueError("Please enter a value for `frac` OR `n`, not both")

# Check for negative sizes
if n < 0:
raise ValueError("A negative number of rows requested. Please provide positive value.")

# Because duplicated row selection is not currently supported.
# So if frac > 1, use the pyspark implementation.
if frac is not None and frac > 1:
sdf = self._internal.resolved_copy.spark_frame.sample(
withReplacement=replace, fraction=float(frac), seed=random_state
)
return DataFrame(self._internal.with_new_sdf(sdf))
locs = rs.choice(axis_length, size=n, replace=replace, p=weights)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the estimated size of locs? Will it also be much huge?
e.g., for the case A random 50% sample of the ``DataFrame`` with replacement?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rs.choice method is similar to the numpy.random.RandomState.choice.
The return value of rs.choice is the size of the size parameter, in this case is n.
So the size of locs here is n.
If parameter n is set, locs is equal to n, otherwise locs is equal to n = int(round(frac * axis_length)).
For the case A random 50% sample of the ``DataFrame`` with replacement, the size of locs will be one-half of the size of the DataFrame, to be precise, it should be int(round(0.5 * DataFrame.nrows)).
If n is large, or frac is large, locs will be large.
Performance depends on the performance of the take method, which is actually the performance of the iloc method

Copy link
Collaborator

@ueshin ueshin Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I wouldn't recommend to use take or iloc for this.
Thinking of Koalas workload, the length of DataFrame could be so huge, then locs will be too huge for a single Driver node.
Also, row access by its row number is essentially heavy on Spark (to be exact, Spark doesn't provide the way to access rows by its row number), and so iloc is heavy in Koalas, especially if the locs is huge.

Copy link
Author

@chi2liu chi2liu Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah!
I agree with you, because spark does not have the concept of rowindex, iloc is a heavy operation.
But here the weights parameter of the sample function is the same as iloc, which must depend on the row index.
There may be no other way to support the weights parameter right now.
The weights parameter specifies the corresponding weight for the corresponding row, so it may be necessary to
access rows by its row number.
Just like iloc is a heavy operation, but it must also be implemented based on row index right now.
Maybe the current sample function supports weights operation and must also be based on row index.

return self.take(locs, axis=axis)

def astype(self, dtype) -> "DataFrame":
"""
Expand Down
11 changes: 10 additions & 1 deletion databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2855,10 +2855,19 @@ def sample(
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
weights: Optional[Any] = None,
random_state: Optional[int] = None,
axis: Optional[Any] = None,
) -> "Series":
return first_series(
self.to_frame().sample(n=n, frac=frac, replace=replace, random_state=random_state)
self.to_frame().sample(
n=n,
frac=frac,
replace=replace,
weights=weights,
random_state=random_state,
axis=axis,
)
).rename(self.name)

sample.__doc__ = DataFrame.sample.__doc__
Expand Down
Loading