from .compat import pd_DataFrame, pd_Series, pd_read_parquet
import numpy as np
from numpy.random import RandomState
import blosc
import dill
from typing import Union, List, Dict, Optional
_t_var_list = Union[List[str], List[int]]
_t_var_dict = Union[Dict[str, List[str]], Dict[int, List[int]]]
_t_var_sub = Union[Dict[Union[int, int], Union[int, float]]]
_t_dat = Union[pd_DataFrame, np.ndarray]
_t_random_state = Union[int, RandomState, None]
[docs]def ampute_data(
data: _t_dat,
variables: Optional[_t_var_list] = None,
perc: float = 0.1,
random_state: _t_random_state = None,
):
"""
Ampute Data
Returns a copy of data with specified variables amputed.
Parameters
----------
data : Pandas DataFrame
The data to ampute
variables : None or list
If None, are variables are amputed.
perc : double
The percentage of the data to ampute.
random_state: None, int, or np.random.RandomState
The random state to use.
Returns
-------
pandas DataFrame
The amputed data
"""
amputed_data = data.copy()
data_shape = amputed_data.shape
amp_rows = int(perc * data_shape[0])
random_state = ensure_rng(random_state)
if len(data_shape) > 1:
if variables is None:
variables = [i for i in range(amputed_data.shape[1])]
elif isinstance(variables, list):
if isinstance(variables[0], str):
assert isinstance(
data, pd_DataFrame
), "np array was passed but variables are strings"
variables = [data.columns.tolist().index(i) for i in variables]
if isinstance(amputed_data, pd_DataFrame):
for v in variables:
na_ind = random_state.choice(
np.arange(data_shape[0]), replace=False, size=amp_rows
)
amputed_data.iloc[na_ind, v] = np.NaN
if isinstance(amputed_data, np.ndarray):
amputed_data = amputed_data.astype("float64")
for v in variables:
na_ind = random_state.choice(
np.arange(data_shape[0]), replace=False, size=amp_rows
)
amputed_data[na_ind, v] = np.NaN
else:
na_ind = random_state.choice(
np.arange(data_shape[0]), replace=False, size=amp_rows
)
amputed_data[na_ind] = np.NaN
return amputed_data
[docs]def load_kernel(filepath: str, n_threads: Optional[int] = None):
"""
Loads a kernel that was saved using save_kernel().
Parameters
----------
filepath: str
The filepath of the saved kernel
n_threads: int
The threads to use for decompression. By default, all threads are used.
Returns
-------
ImputationKernel
"""
n_threads = blosc.detect_number_of_cores() if n_threads is None else n_threads
blosc.set_nthreads(n_threads)
with open(filepath, "rb") as f:
kernel = dill.loads(blosc.decompress(dill.load(f)))
if kernel.original_data_class == "pd_DataFrame":
kernel.working_data = pd_read_parquet(kernel.working_data)
for col in kernel.working_data.columns:
kernel.working_data[col] = kernel.working_data[col].astype(
kernel.working_dtypes[col]
)
return kernel
def stratified_subset(y, size, groups, cat, seed):
"""
Subsample y using stratification. y is divided into quantiles,
and then elements are randomly chosen from each quantile to
come up with the subsample.
Parameters
----------
y: np.ndarray
The variable to use for stratification
size: int
How large the subset should be
groups: int
How many groups to break y into. The more groups, the more
balanced (but less random) y will be
cat: bool
Is y already categorical? If so, we can skip the group creation
seed: int
The random seed to use.
Returns
-------
The indices of y that have been chosen.
"""
rs = RandomState(seed)
if isinstance(y, pd_Series):
if y.dtype.name == "category":
y = y.cat.codes
y = y.values
if cat:
digits = y
else:
q = [x / groups for x in range(1, groups)]
bins = np.quantile(y, q)
digits = np.digitize(y, bins, right=True)
digits_v, digits_c = np.unique(digits, return_counts=True)
digits_i = np.arange(digits_v.shape[0])
digits_p = digits_c / digits_c.sum()
digits_s = (digits_p * size).round(0).astype("int32")
diff = size - digits_s.sum()
if diff != 0:
digits_fix = rs.choice(digits_i, size=abs(diff), p=digits_p, replace=False)
if diff < 0:
for d in digits_fix:
digits_s[d] -= 1
else:
for d in digits_fix:
digits_s[d] += 1
sub = np.zeros(shape=size).astype("int32")
added = 0
for d_i in digits_i:
d_v = digits_v[d_i]
n = digits_s[d_i]
ind = np.where(digits == d_v)[0]
choice = rs.choice(ind, size=n, replace=False)
sub[added : (added + n)] = choice
added += n
sub.sort()
return sub
def stratified_continuous_folds(y, nfold):
"""
Create primitive stratified folds for continuous data.
Should be digestible by lightgbm.cv function.
"""
if isinstance(y, pd_Series):
y = y.values
elements = len(y)
assert elements >= nfold, "more splits then elements."
sorted = np.argsort(y)
val = [sorted[range(i, len(y), nfold)] for i in range(nfold)]
for v in val:
yield (np.setdiff1d(range(elements), v), v)
def stratified_categorical_folds(y, nfold):
"""
Create primitive stratified folds for categorical data.
Should be digestible by lightgbm.cv function.
"""
if isinstance(y, pd_Series):
y = y.values
y = y.reshape(
y.shape[0],
).copy()
elements = len(y)
uniq, inv, counts = np.unique(y, return_counts=True, return_inverse=True)
assert elements >= nfold, "more splits then elements."
if any(counts < nfold):
print("Decreasing nfold to lowest categorical level count...")
nfold = min(counts)
sorted = np.argsort(inv)
val = [sorted[range(i, len(y), nfold)] for i in range(nfold)]
for v in val:
yield (np.setdiff1d(range(elements), v), v)
# https://stackoverflow.com/questions/664014/what-integer-hash-function-are-good-that-accepts-an-integer-hash-key
# We don't really need to worry that much about diffusion
# since we take % n at the end, and n (mmc) is usually
# very small. This hash performs well enough in testing.
def hash_int32(x):
"""
A hash function which generates random uniform (enough)
int32 integers. Used in mean matching and initialization.
"""
assert isinstance(x, np.ndarray)
assert x.dtype == "int32", "x must be int32"
x = ((x >> 16) ^ x) * 0x45D9F3B
x = ((x >> 16) ^ x) * 0x45D9F3B
x = (x >> 16) ^ x
return x
def _draw_random_int32(random_state, size):
nums = random_state.randint(
low=0, high=np.iinfo("int32").max, size=size, dtype="int32"
)
return nums
def ensure_rng(random_state) -> RandomState:
"""
Creates a random number generator based on an optional seed. This can be
an integer or another random state for a seeded rng, or None for an
unseeded rng.
"""
if random_state is None:
random_state = RandomState()
elif isinstance(random_state, int):
random_state = RandomState(random_state)
else:
assert isinstance(random_state, RandomState)
return random_state
def _ensure_iterable(x):
"""
If the object is iterable, return the object.
Else, return the object in a length 1 list.
"""
return x if hasattr(x, "__iter__") else [x]
def _assert_dataset_equivalent(ds1: _t_dat, ds2: _t_dat):
if isinstance(ds1, pd_DataFrame):
assert isinstance(ds2, pd_DataFrame)
assert ds1.equals(ds2)
else:
assert isinstance(ds2, np.ndarray)
np.testing.assert_array_equal(ds1, ds2)
def _ensure_np_array(x):
if isinstance(x, np.ndarray):
return x
if isinstance(x, pd_DataFrame) | isinstance(x, pd_Series):
return x.values
else:
raise ValueError("Can't cast to numpy array")
def _interpret_ds(val, avail_can):
if isinstance(val, int):
assert val <= avail_can, "data subset is more than available candidates"
elif isinstance(val, float):
assert (val <= 1.0) and (val > 0.0), "if float, 0.0 < data_subset <= 1.0"
val = int(val * avail_can)
else:
raise ValueError("malformed data_subset passed")
return val
def _dict_set_diff(iter1, iter2) -> Dict[int, List[int]]:
"""
Returns a dict, where the elements in iter1 are
the keys, and the values are the set differences
between the key and the values of iter2.
"""
ret = {int(y): [int(x) for x in iter2 if int(x) != int(y)] for y in iter1}
return ret
def _slice(dat, row_slice=slice(None), col_slice=slice(None)):
"""
Returns a view of the subset data if possible.
"""
if isinstance(dat, pd_DataFrame):
return dat.iloc[row_slice, col_slice]
elif isinstance(dat, np.ndarray):
return dat[row_slice, col_slice]
else:
raise ValueError("Unknown data class passed.")
def _assign_col_values_without_copy(dat, row_ind, col_ind, val):
"""
Insert values into different data frame objects.
"""
row_ind = _ensure_iterable(row_ind)
if isinstance(dat, pd_DataFrame):
# Remove iterable attribute if
# we are only assigning 1 value
if len(val) == 1:
val = val[0]
dat.iloc[row_ind, col_ind] = val
elif isinstance(dat, np.ndarray):
val.shape = -1
dat[row_ind, col_ind] = val
else:
raise ValueError("Unknown data class passed.")
def _subset_data(dat, row_ind=None, col_ind=None, return_1d=False):
"""
Can subset data along 2 axis.
Explicitly returns a copy.
"""
row_ind = range(dat.shape[0]) if row_ind is None else row_ind
col_ind = range(dat.shape[1]) if col_ind is None else col_ind
if isinstance(dat, pd_DataFrame):
data_copy = dat.iloc[row_ind, col_ind]
return data_copy.to_numpy().flatten() if return_1d else data_copy
elif isinstance(dat, np.ndarray):
row_ind = _ensure_iterable(row_ind)
col_ind = _ensure_iterable(col_ind)
data_copy = dat[np.ix_(row_ind, col_ind)]
return data_copy.flatten() if return_1d else data_copy
else:
raise ValueError("Unknown data class passed.")
def logodds(probability):
try:
odds_ratio = probability / (1 - probability)
log_odds = np.log(odds_ratio)
except ZeroDivisionError:
raise ValueError(
"lightgbm output a probability of 1.0 or 0.0. "
"This is usually because of rare classes. "
"Try adjusting min_data_in_leaf."
)
return log_odds
def logistic_function(log_odds):
return 1 / (1 + np.exp(-log_odds))