import numpy as np
from .compat import pd_DataFrame
from .utils import (
_t_dat,
_t_var_list,
_t_var_dict,
_ensure_iterable,
_dict_set_diff,
_assign_col_values_without_copy,
_slice,
_subset_data,
)
from itertools import combinations
from typing import Dict, List, Union, Any, Optional
from warnings import warn
[docs]class ImputedData:
"""
Imputed Data
This class should not be instantiated directly.
Instead, it is returned when ImputationKernel.impute_new_data() is called.
For parameter arguments, see ImputationKernel documentation.
"""
[docs] def __init__(
self,
impute_data: _t_dat,
datasets: int = 5,
variable_schema: Union[_t_var_list, _t_var_dict, None] = None,
imputation_order: Union[str, _t_var_list] = "ascending",
train_nonmissing: bool = False,
categorical_feature: Union[str, _t_var_list] = "auto",
save_all_iterations: bool = True,
copy_data: bool = True,
):
# All references to the data should be through self.
self.working_data = impute_data.copy() if copy_data else impute_data
data_shape = self.working_data.shape
int_storage_types = ["uint64", "uint32", "uint16", "uint8"]
na_where_type = "uint64"
for st in int_storage_types:
if data_shape[0] <= np.iinfo(st).max:
na_where_type = st
# Collect metadata and format data
if isinstance(self.working_data, pd_DataFrame):
if len(self.working_data.shape) != 2 or self.working_data.shape[0] < 1:
raise ValueError("Input data must be 2 dimensional and non empty.")
original_data_class = "pd_DataFrame"
column_names: List[str] = [str(x) for x in self.working_data.columns]
self.column_names = column_names
pd_dtypes_orig = self.working_data.dtypes
if any([x.name == "object" for x in pd_dtypes_orig]):
raise ValueError(
"Please convert object columns to categorical or some numeric type."
)
# Assume categories are set dtypes.
if categorical_feature == "auto":
categorical_variables = [
column_names.index(var)
for var in pd_dtypes_orig.index
if pd_dtypes_orig[var].name in ["category"]
]
elif isinstance(categorical_feature, list):
if any([x.name == "category" for x in pd_dtypes_orig]):
raise ValueError(
"If categories are already encoded as such, set categorical_feature = auto"
)
categorical_variables = self._get_var_ind_from_list(categorical_feature)
else:
raise ValueError("Unknown categorical_feature")
# Collect category counts.
category_counts = {}
for cat in categorical_variables:
cat_name = self._get_var_name_from_scalar(cat)
cat_dat = self.working_data.iloc[:, cat]
uniq = set(cat_dat.dropna())
category_counts[cat] = len(uniq)
# Collect info about what data is missing.
na_where: Dict[int, np.ndarray] = {
col: np.where(self.working_data.iloc[:, col].isnull())[0].astype(
na_where_type
)
for col in range(data_shape[1])
}
na_counts = {col: len(nw) for col, nw in na_where.items()}
vars_with_any_missing = [
col for col, ind in na_where.items() if len(ind > 0)
]
# if len(vars_with_any_missing) == 0:
# raise ValueError("No missing values to impute.")
# Keep track of datatypes. Needed for loading kernels.
self.working_dtypes = self.working_data.dtypes
elif isinstance(self.working_data, np.ndarray):
if len(self.working_data.shape) != 2 or self.working_data.shape[0] < 1:
raise ValueError("Input data must be 2 dimensional and non empty.")
original_data_class = "np_ndarray"
# DATASET ALTERATION
if (
self.working_data.dtype != np.float32
and self.working_data.dtype != np.float64
):
self.working_data = self.working_data.astype(np.float32)
# Collect information about dataset
column_names = [str(x) for x in range(self.working_data.shape[1])]
self.column_names = column_names
na_where = {
col: np.where(np.isnan(self.working_data[:, col]))[0].astype(
na_where_type
)
for col in range(data_shape[1])
}
na_counts = {col: len(nw) for col, nw in na_where.items()}
vars_with_any_missing = [
int(col) for col, ind in na_where.items() if len(ind > 0)
]
if categorical_feature == "auto":
categorical_variables = []
elif isinstance(categorical_feature, list):
categorical_variables = self._get_var_ind_from_list(categorical_feature)
assert (
max(categorical_variables) < self.working_data.shape[1]
), "categorical_feature not in dataset"
else:
raise ValueError("categorical_feature not recognized")
# Collect category counts.
category_counts = {}
for cat in categorical_variables:
cat_dat = self.working_data[:, cat]
cat_dat = cat_dat[~np.isnan(cat_dat)]
uniq = set(cat_dat)
category_counts[cat] = len(uniq)
# Keep track of datatype.
self.working_dtypes = self.working_data.dtype
else:
raise ValueError("impute_data not recognized.")
# Formatting of variable_schema.
if variable_schema is None:
variable_schema = _dict_set_diff(range(data_shape[1]), range(data_shape[1]))
else:
if isinstance(variable_schema, list):
var_schem = self._get_var_ind_from_list(variable_schema)
variable_schema = _dict_set_diff(var_schem, range(data_shape[1]))
elif isinstance(variable_schema, dict):
variable_schema = self._get_var_ind_from_dict(variable_schema)
# Check for any self-impute attempts
self_impute_attempt = [
var for var, prd in variable_schema.items() if var in prd
]
if len(self_impute_attempt) > 0:
raise ValueError(
",".join(self._get_var_name_from_list(self_impute_attempt))
+ " variables cannot be used to impute itself."
)
# Format imputation order
if isinstance(imputation_order, list):
imputation_order = self._get_var_ind_from_list(imputation_order)
assert set(imputation_order).issubset(
variable_schema
), "variable_schema does not include all variables to be imputed."
imputation_order = [i for i in imputation_order if na_counts[i] > 0]
elif isinstance(imputation_order, str):
if imputation_order in ["ascending", "descending"]:
imputation_order = self._get_var_ind_from_list(
np.argsort(list(na_counts.values())).tolist()
if imputation_order == "ascending"
else np.argsort(list(na_counts.values()))[::-1].tolist()
)
imputation_order = [
int(i)
for i in imputation_order
if na_counts[i] > 0 and i in list(variable_schema)
]
elif imputation_order == "roman":
imputation_order = list(variable_schema).copy()
elif imputation_order == "arabic":
imputation_order = list(variable_schema).copy()
imputation_order.reverse()
else:
raise ValueError("imputation_order not recognized.")
self.imputation_order = imputation_order
self.variable_schema = variable_schema
self.unimputed_variables = list(
np.setdiff1d(np.arange(data_shape[1]), imputation_order)
)
if train_nonmissing:
self.variable_training_order = [
v
for v in self.imputation_order + self.unimputed_variables
if v in list(self.variable_schema)
]
else:
self.variable_training_order = self.imputation_order
predictor_vars = [prd for prd in variable_schema.values()]
self.predictor_vars = list(
dict.fromkeys([item for sublist in predictor_vars for item in sublist])
)
self.categorical_feature = categorical_feature
self.categorical_variables = categorical_variables
self.category_counts = category_counts
self.original_data_class = original_data_class
self.save_all_iterations = save_all_iterations
self.data_shape = data_shape
self.na_counts = na_counts
self.na_where = na_where
self.vars_with_any_missing = vars_with_any_missing
self.imputed_variable_count = len(imputation_order)
self.modeled_variable_count = len(self.variable_training_order)
self.iterations = np.zeros(
shape=(datasets, self.modeled_variable_count)
).astype(int)
# Create structure to store imputation values.
# These will be initialized by an ImputationKernel.
self.imputation_values: Dict[Any, np.ndarray] = {}
self.initialized = False
# Sanity checks
# if self.imputed_variable_count == 0:
# raise ValueError("Something went wrong. No variables to impute.")
# Subsetting allows us to get to the imputation values:
def __getitem__(self, tup):
ds, var, iter = tup
return self.imputation_values[ds, var, iter]
def __setitem__(self, tup, newitem):
ds, var, iter = tup
self.imputation_values[ds, var, iter] = newitem
def __delitem__(self, tup):
ds, var, iter = tup
del self.imputation_values[ds, var, iter]
def __repr__(self):
summary_string = f'\n{" " * 14}Class: ImputedData\n{self._ids_info()}'
return summary_string
def _ids_info(self):
summary_string = f"""\
Datasets: {self.dataset_count()}
Iterations: {self.iteration_count()}
Data Samples: {self.data_shape[0]}
Data Columns: {self.data_shape[1]}
Imputed Variables: {len(self.imputation_order)}
save_all_iterations: {self.save_all_iterations}"""
return summary_string
[docs] def dataset_count(self):
"""
Return the number of datasets.
Datasets are defined by how many different sets of imputation
values we have accumulated.
"""
return self.iterations.shape[0]
def _get_var_name_from_scalar(self, ind: Union[str, int]) -> str:
"""
Gets the variable name from an index.
Returns a list of names if a list of indexes was passed.
Otherwise, returns the variable name directly from self.column_names.
"""
if isinstance(ind, str):
return ind
else:
return self.column_names[ind]
def _get_var_name_from_list(self, variable_list: _t_var_list) -> List[str]:
ret = [
self.column_names[x] if isinstance(x, int) else str(x)
for x in variable_list
]
return ret
def _get_var_ind_from_dict(self, variable_dict) -> Dict[int, List[int]]:
indx: Dict[int, List[int]] = {}
for variable, value in variable_dict.items():
if isinstance(variable, str):
variable = self.column_names.index(variable)
variable = int(variable)
val = [
int(self.column_names.index(v)) if isinstance(v, str) else int(v)
for v in value
]
indx[variable] = sorted(val)
return indx
def _get_var_ind_from_list(self, variable_list) -> List[int]:
ret = [
int(self.column_names.index(x)) if isinstance(x, str) else int(x)
for x in variable_list
]
return ret
def _get_var_ind_from_scalar(self, variable) -> int:
if isinstance(variable, str):
variable = self.column_names.index(variable)
variable = int(variable)
return variable
def _get_nonmissing_indx(self, var):
non_missing_ind = np.setdiff1d(
np.arange(self.data_shape[0]), self.na_where[var]
)
return non_missing_ind
def _insert_new_data(self, dataset, variable_index, new_data):
current_iter = self.iteration_count(datasets=dataset, variables=variable_index)
# We need to insert the categories if the raw data is stored as a category.
# Otherwise, pandas won't let us insert.
view = _slice(self.working_data, col_slice=variable_index)
if view.dtype.name == "category":
new_data = np.array(view.cat.categories)[new_data]
_assign_col_values_without_copy(
dat=self.working_data,
row_ind=self.na_where[variable_index],
col_ind=variable_index,
val=new_data,
)
self[dataset, variable_index, current_iter + 1] = new_data
if not self.save_all_iterations:
del self[dataset, variable_index, current_iter]
def _ampute_original_data(self):
"""Need to put self.working_data back in its original form"""
for c in self.imputation_order:
_assign_col_values_without_copy(
dat=self.working_data,
row_ind=self.na_where[c],
col_ind=c,
val=np.array([np.NaN]),
)
def _get_num_vars(self, subset: Optional[List] = None):
"""Returns the non-categorical imputed variable indexes."""
num_vars = [
v for v in self.imputation_order if v not in self.categorical_variables
]
if subset is not None:
subset = self._get_var_ind_from_list(subset)
num_vars = [v for v in num_vars if v in subset]
return num_vars
def _prep_multi_plot(
self,
variables,
):
plots = len(variables)
plotrows, plotcols = int(np.ceil(np.sqrt(plots))), int(
np.ceil(plots / np.ceil(np.sqrt(plots)))
)
return plots, plotrows, plotcols
[docs] def iteration_count(self, datasets=None, variables=None):
"""
Grabs the iteration count for specified variables, datasets.
If the iteration count is not consistent across the provided
datasets/variables, an error will be thrown. Providing None
will use all datasets/variables.
This is to ensure the process is in a consistent state when
the iteration count is needed.
Parameters
----------
datasets: int or list[int]
The datasets to check the iteration count for.
variables: int, str, list[int] or list[str]:
The variables to check the iteration count for.
Variables can be specified by their names or indexes.
Returns
-------
An integer representing the iteration count.
"""
ds = (
list(range(self.dataset_count()))
if datasets is None
else _ensure_iterable(datasets)
)
if variables is None:
var = self.variable_training_order
else:
variables = _ensure_iterable(variables)
var = self._get_var_ind_from_list(variables)
assert set(var).issubset(self.variable_training_order)
iter_indx = [self.variable_training_order.index(v) for v in var]
ds_uniq = np.unique(self.iterations[np.ix_(ds, iter_indx)])
if len(ds_uniq) == 0:
return -1
if len(ds_uniq) > 1:
raise ValueError(
"iterations were not consistent across provided datasets, variables."
)
return ds_uniq[0]
[docs] def complete_data(
self,
dataset: int = 0,
iteration: Optional[int] = None,
inplace: bool = False,
variables: Optional[_t_var_list] = None,
):
"""
Return dataset with missing values imputed.
Parameters
----------
dataset: int
The dataset to complete.
iteration: int
Impute data with values obtained at this iteration.
If None, returns the most up-to-date iterations,
even if different between variables. If not none,
iteration must have been saved in imputed values.
inplace: bool
Should the data be completed in place? If True,
self.working_data is imputed,and nothing is returned.
This is useful if the dataset is very large. If
False, a copy of the data is returned, with missing
values imputed.
Returns
-------
The completed data, with values imputed for specified variables.
"""
# Return a copy if not inplace.
impute_data = self.working_data if inplace else self.working_data.copy()
# Figure out which variables we need to impute.
# Never impute variables that are not in imputation_order.
imp_vars = self.imputation_order if variables is None else variables
imp_vars = self._get_var_ind_from_list(imp_vars)
imp_vars = [v for v in imp_vars if v in self.imputation_order]
for var in imp_vars:
if iteration is None:
iteration = self.iteration_count(datasets=dataset, variables=var)
_assign_col_values_without_copy(
dat=impute_data,
row_ind=self.na_where[var],
col_ind=var,
val=self[dataset, var, iteration],
)
if not inplace:
return impute_data
[docs] def get_means(self, datasets, variables=None):
"""
Return a dict containing the average imputation value
for specified variables at each iteration.
"""
num_vars = self._get_num_vars(variables)
# For every variable, get the correlations between every dataset combination
# at each iteration
curr_iteration = self.iteration_count(datasets=datasets)
if self.save_all_iterations:
iter_range = list(range(curr_iteration + 1))
else:
iter_range = [curr_iteration]
mean_dict = {
ds: {
var: {itr: np.mean(self[ds, var, itr]) for itr in iter_range}
for var in num_vars
}
for ds in datasets
}
return mean_dict
[docs] def plot_mean_convergence(self, datasets=None, variables=None, **adj_args):
"""
Plots the average value of imputations over each iteration.
Parameters
----------
variables: None or list
The variables to plot. Must be numeric.
adj_args
Passed to matplotlib.pyplot.subplots_adjust()
"""
# Move this to .compat at some point.
try:
import matplotlib.pyplot as plt
from matplotlib import gridspec
except ImportError:
raise ImportError("matplotlib must be installed to plot mean convergence")
if self.iteration_count() < 2 or not self.save_all_iterations:
raise ValueError("There is only one iteration.")
if datasets is None:
datasets = list(range(self.dataset_count()))
else:
datasets = _ensure_iterable(datasets)
num_vars = self._get_num_vars(variables)
mean_dict = self.get_means(datasets=datasets, variables=variables)
plots, plotrows, plotcols = self._prep_multi_plot(num_vars)
gs = gridspec.GridSpec(plotrows, plotcols)
fig, ax = plt.subplots(plotrows, plotcols, squeeze=False)
for v in range(plots):
axr, axc = next(iter(gs[v].rowspan)), next(iter(gs[v].colspan))
var = num_vars[v]
for d in mean_dict.values():
ax[axr, axc].plot(list(d[var].values()), color="black")
ax[axr, axc].set_title(var)
ax[axr, axc].set_xlabel("Iteration")
ax[axr, axc].set_ylabel("mean")
plt.subplots_adjust(**adj_args)
[docs] def plot_imputed_distributions(
self, datasets=None, variables=None, iteration=None, **adj_args
):
"""
Plot the imputed value distributions.
Red lines are the distribution of original data
Black lines are the distribution of the imputed values.
Parameters
----------
datasets: None, int, list[int]
variables: None, str, int, list[str], or list[int]
The variables to plot. If None, all numeric variables
are plotted.
iteration: None, int
The iteration to plot the distribution for.
If None, the latest iteration is plotted.
save_all_iterations must be True if specifying
an iteration.
adj_args
Additional arguments passed to plt.subplots_adjust()
"""
# Move this to .compat at some point.
try:
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import gridspec
except ImportError:
raise ImportError(
"matplotlib and seaborn must be installed to plot distributions."
)
if datasets is None:
datasets = list(range(self.dataset_count()))
else:
datasets = _ensure_iterable(datasets)
if iteration is None:
iteration = self.iteration_count(datasets=datasets, variables=variables)
num_vars = self._get_num_vars(variables)
plots, plotrows, plotcols = self._prep_multi_plot(num_vars)
gs = gridspec.GridSpec(plotrows, plotcols)
fig, ax = plt.subplots(plotrows, plotcols, squeeze=False)
for v in range(plots):
var = num_vars[v]
axr, axc = next(iter(gs[v].rowspan)), next(iter(gs[v].colspan))
iteration_level_imputations = {
ds: self[ds, var, iteration] for ds in datasets
}
plt.sca(ax[axr, axc])
non_missing_ind = self._get_nonmissing_indx(var)
nonmissing_values = _subset_data(
self.working_data, row_ind=non_missing_ind, col_ind=var, return_1d=True
)
ax[axr, axc] = sns.kdeplot(nonmissing_values, color="red", linewidth=2)
for imparray in iteration_level_imputations.values():
ax[axr, axc] = sns.kdeplot(
imparray, color="black", linewidth=1, warn_singular=False
)
ax[axr, axc].set(xlabel=self._get_var_name_from_scalar(var))
plt.subplots_adjust(**adj_args)
[docs] def get_correlations(
self, datasets: List[int], variables: Union[List[int], List[str]]
):
"""
Return the correlations between datasets for
the specified variables.
Parameters
----------
variables: list[str], list[int]
The variables to return the correlations for.
Returns
-------
dict
The correlations at each iteration for the specified
variables.
"""
if self.dataset_count() < 3:
raise ValueError(
"Not enough datasets to calculate correlations between them"
)
curr_iteration = self.iteration_count()
var_indx = self._get_var_ind_from_list(variables)
# For every variable, get the correlations between every dataset combination
# at each iteration
correlation_dict = {}
if self.save_all_iterations:
iter_range = list(range(1, curr_iteration + 1))
else:
# Make this iterable for code tidyness
iter_range = [curr_iteration]
for var in var_indx:
# Get a dict of variables and imputations for all datasets for this iteration
iteration_level_imputations = {
iteration: {ds: self[ds, var, iteration] for ds in datasets}
for iteration in iter_range
}
combination_correlations = {
iteration: [
round(np.corrcoef(impcomb)[0, 1], 3)
for impcomb in list(combinations(varimps.values(), 2))
]
for iteration, varimps in iteration_level_imputations.items()
}
correlation_dict[var] = combination_correlations
return correlation_dict
[docs] def plot_correlations(self, datasets=None, variables=None, **adj_args):
"""
Plot the correlations between datasets.
See get_correlations() for more details.
Parameters
----------
datasets: None or list[int]
The datasets to plot.
variables: None,list
The variables to plot.
adj_args
Additional arguments passed to plt.subplots_adjust()
"""
# Move this to .compat at some point.
try:
import matplotlib.pyplot as plt
from matplotlib import gridspec
except ImportError:
raise ImportError("matplotlib must be installed to plot importance")
if self.dataset_count() < 4:
raise ValueError("Not enough datasets to make box plot")
if datasets is None:
datasets = list(range(self.dataset_count()))
else:
datasets = _ensure_iterable(datasets)
var_indx = self._get_var_ind_from_list(variables)
num_vars = self._get_num_vars(var_indx)
plots, plotrows, plotcols = self._prep_multi_plot(num_vars)
correlation_dict = self.get_correlations(datasets=datasets, variables=num_vars)
gs = gridspec.GridSpec(plotrows, plotcols)
fig, ax = plt.subplots(plotrows, plotcols, squeeze=False)
for v in range(plots):
axr, axc = next(iter(gs[v].rowspan)), next(iter(gs[v].colspan))
var = list(correlation_dict)[v]
ax[axr, axc].boxplot(
list(correlation_dict[var].values()),
labels=range(len(correlation_dict[var])),
)
ax[axr, axc].set_title(self._get_var_name_from_scalar(var))
ax[axr, axc].set_xlabel("Iteration")
ax[axr, axc].set_ylabel("Correlations")
ax[axr, axc].set_ylim([-1, 1])
plt.subplots_adjust(**adj_args)