from typing import Iterable, Union
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, TransformerMixin
from ..multivariate.imputer import MultivariateImputer
from ._utils import interpolate_small_gaps
[docs]
class TimeSeriesImputer(BaseEstimator, TransformerMixin):
"""Imputes missing values in time series data.
This class wraps the :class:`MultivariateImputer` to specifically handle
time series data in pandas DataFrames. It automatically creates lagged and
lead features based on the time series index, then uses these new
features to impute missing values.
Args:
lags (Iterable[int], optional): An iterable of integers specifying
the lags and leads to create as autoregressive features. Positive
integers create lags (e.g., `t-1`), and negative integers create
leads (e.g., `t+1`). Defaults to `(1,)`.
regressor (RegressorMixin, optional): A scikit-learn compatible
regressor used for numeric targets. Defaults to ``FastRidge``.
classifier (ClassifierMixin, optional): A scikit-learn compatible
classifier used for categorical or string targets. Defaults to
``DecisionTreeClassifier(max_depth=4)``.
min_samples_train (int, optional): The minimum number of samples
required to train a model. Defaults to `None`, which means that a
model will be trained if at least one sample is available.
rng (int, optional): A seed for the random number generator. This is
used for reproducible feature sampling when `n_nearest_features`
is not None. Defaults to None.
verbose (int, optional): The verbosity level. Defaults to 0.
scoring (str or callable, optional): The scoring function to use for
feature selection. If 'default', the default scoring function is
used. If a callable, it must take two arguments (the data matrix
and the columns to impute) and return a score matrix.
Defaults to 'default'.
interpolate_gaps_less_than (int, optional): The maximum length of
gaps to interpolate linearly. If None, no linear interpolation is
performed. Defaults to None.
Attributes:
imputation_features_ (dict or None): A dictionary mapping each imputed
column to the features used to impute it. This attribute is only
populated when `n_nearest_features` is not None. The keys and
values are the column names, which will include the lagged/lead
features created during the imputation process.
.. code-block:: python
import pandas as pd
import numpy as np
from datafiller import TimeSeriesImputer
# Create a time series DataFrame with missing values
rng = pd.date_range('2020-01-01', periods=10, freq='D')
data = {'value': [1, 2, np.nan, 4, 5, 6, np.nan, 8, 9, 10]}
df = pd.DataFrame(data, index=rng)
# Create a time series imputer and fill missing values
ts_imputer = TimeSeriesImputer(lags=[1, -1])
df_imputed = ts_imputer(df)
print(df_imputed)
"""
def __init__(
self,
lags: Iterable[int] = (1,),
regressor: RegressorMixin | None = None,
classifier: ClassifierMixin | None = None,
min_samples_train: int | None = None,
rng: Union[int, None] = None,
verbose: int = 0,
scoring: Union[str, callable] = "default",
interpolate_gaps_less_than: int = None,
):
if not isinstance(lags, Iterable) or not all(isinstance(i, int) for i in lags):
raise ValueError("lags must be an iterable of integers.")
if 0 in lags:
raise ValueError("lags cannot contain 0.")
self.lags = lags
self.regressor = regressor
self.classifier = classifier
self.min_samples_train = min_samples_train
self.rng = rng
self.verbose = verbose
self.scoring = scoring
self.interpolate_gaps_less_than = interpolate_gaps_less_than
self._build_multivariate_imputer()
self.imputation_features_ = None
def _build_multivariate_imputer(self) -> None:
min_samples_train = 1 if self.min_samples_train is None else self.min_samples_train
self.multivariate_imputer = MultivariateImputer(
regressor=self.regressor,
classifier=self.classifier,
verbose=self.verbose,
min_samples_train=min_samples_train,
rng=self.rng,
scoring=self.scoring,
)
def fit(self, X: pd.DataFrame, y: None = None) -> "TimeSeriesImputer":
"""No-op fit for sklearn compatibility."""
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
"""Impute missing values in X using stored configuration."""
return self(X)
def set_params(self, **params) -> "TimeSeriesImputer":
"""Set parameters and refresh dependent objects."""
rebuild_keys = {"regressor", "classifier", "min_samples_train", "rng", "verbose", "scoring"}
rebuild = any(key in params for key in rebuild_keys)
super().set_params(**params)
if rebuild:
self._build_multivariate_imputer()
return self
[docs]
def __call__(
self,
df: pd.DataFrame,
rows_to_impute: Union[None, int, Iterable[int]] = None,
cols_to_impute: Union[None, int, str, Iterable[Union[int, str]]] = None,
n_nearest_features: Union[None, float, int] = None,
before: object = None,
after: object = None,
) -> pd.DataFrame:
"""Imputes missing values in a time series DataFrame.
Args:
df: The input DataFrame with a `DatetimeIndex` and missing
values (NaNs). The index must have a defined frequency.
rows_to_impute: The rows to impute. Can be an iterable of
integer indices, a pandas DatetimeIndex, or None. If None,
all rows are considered. Defaults to None.
cols_to_impute: The indices or names of columns
to impute. If None, all columns are considered. Defaults to None.
n_nearest_features: The number of features to use for
imputation. If it's an int, it's the absolute number of
features. If it's a float, it's the fraction of features to
use. If None, all features are used. Defaults to None.
before: A timestamp-like object. If specified, only rows
before this timestamp are imputed. Can be anything that can be
parsed by ``lambda x: pd.to_datetime(str(x))``. Defaults to None.
after: A timestamp-like object. If specified, only rows
after this timestamp are imputed. Can be anything that can be
parsed by ``lambda x: pd.to_datetime(str(x))``. Defaults to None.
Returns:
The imputed DataFrame with the same columns as the original.
Raises:
TypeError: If the input is not a pandas DataFrame or if the index
is not a DatetimeIndex.
ValueError: If the DataFrame's index does not have a frequency.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame.")
if not isinstance(df.index, pd.DatetimeIndex):
raise TypeError("DataFrame index must be a DatetimeIndex.")
if df.index.freq is None:
raise ValueError("DataFrame index must have a frequency.")
if self.interpolate_gaps_less_than is not None:
df = df.copy()
for col in df.columns:
df[col] = interpolate_small_gaps(df[col], self.interpolate_gaps_less_than)
original_cols = df.columns
n_original_cols = len(original_cols)
# Create autoregressive features
df_with_lags = df
shifted_frames = []
for lag in self.lags:
shifted = df.shift(lag)
shifted.columns = [f"{col}_lag_{lag}" for col in original_cols]
shifted_frames.append(shifted)
if shifted_frames:
df_with_lags = pd.concat([df_with_lags, *shifted_frames], axis=1)
df_with_lags = df_with_lags.dropna(how="all", axis=1)
# Process cols_to_impute
if cols_to_impute is None:
cols_to_impute_indices = np.arange(n_original_cols)
else:
if isinstance(cols_to_impute, (int, str)):
cols_to_impute = [cols_to_impute]
indices = []
for c in cols_to_impute:
if isinstance(c, int):
indices.append(c)
elif isinstance(c, str):
indices.append(original_cols.get_loc(c))
else:
raise ValueError("cols_to_impute must be an int, str, or an iterable of ints or strs.")
cols_to_impute_indices = np.array(indices)
# Process rows_to_impute
if rows_to_impute is not None:
if isinstance(rows_to_impute, (pd.DatetimeIndex, pd.TimedeltaIndex, pd.PeriodIndex)):
rows_to_impute = df.index.get_indexer(rows_to_impute)
elif isinstance(rows_to_impute, int):
rows_to_impute = [rows_to_impute]
elif rows_to_impute is None:
if before is not None or after is not None:
mask = pd.Series(True, index=df.index)
if before is not None and (before_timestamp := pd.to_datetime(str(before))):
mask &= df.index < before_timestamp
if after is not None and (after_timestamp := pd.to_datetime(str(after))):
mask &= df.index > after_timestamp
rows_to_impute = np.where(mask)[0]
# Impute the data
imputed_data = self.multivariate_imputer(
df_with_lags.values,
rows_to_impute=rows_to_impute,
cols_to_impute=cols_to_impute_indices,
n_nearest_features=n_nearest_features,
)
self.imputation_features_ = self.multivariate_imputer.imputation_features_
if self.imputation_features_ is not None:
self.imputation_features_ = {
df_with_lags.columns[col]: df_with_lags.columns[features].tolist()
for col, features in self.imputation_features_.items()
}
# Return a DataFrame with the same columns as the original
imputed_df = pd.DataFrame(imputed_data, index=df.index, columns=df_with_lags.columns)
return imputed_df[original_cols]