"""Module for preprocessing_functions subpackage."""
from __future__ import annotations
from typing import TYPE_CHECKING, cast, Union
from typing_extensions import Literal
import numpy as np
import pandas as pd
from mypythontools.system import check_library_is_available
from ...types import DataFrameOrArrayGeneric, Numeric
from ...helpers import get_copy_or_view
if TYPE_CHECKING:
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
ScalerType = Union[MinMaxScaler, RobustScaler, StandardScaler]
# Lazy load
# import scipy.signal
# import scipy.stats
# from sklearn import preprocessing
[docs]def remove_the_outliers(
data: DataFrameOrArrayGeneric,
threshold: Numeric = 3,
) -> DataFrameOrArrayGeneric:
"""Deprecated function. Historically, remove_outliers was parameter in pipeline and in the same module,
function needed different name. Use `remove_outliers` if possible. This will be removed in new major."""
return remove_outliers(data, threshold)
[docs]def remove_outliers(
data: DataFrameOrArrayGeneric,
threshold: Numeric = 3,
) -> DataFrameOrArrayGeneric:
"""Remove values far from mean - probably errors.
If more columns, then only rows that have outlier on predicted column will be deleted. Predicted column
(column where we are searching for outliers) is supposed to be 0.
Args:
data (DataFrameOrArrayGeneric): Time series data. Must have ndim = 2, so if univariate, reshape.
threshold (Numeric, optional): How many times must be standard deviation from mean to be ignored.
Defaults to 3.
Returns:
DataFrameOrArrayGeneric: Cleaned data.
Examples:
>>> data = np.array(
... [
... [1, 7],
... [66, 3],
... [5, 5],
... [2, 3],
... [2, 3],
... [3, 9],
... ]
... )
>>> remove_outliers(data, threshold=2)
array([[1, 7],
[5, 5],
[2, 3],
[2, 3],
[3, 9]])
"""
if isinstance(data, np.ndarray):
data_mean = data[:, 0].mean()
data_std = data[:, 0].std()
range_array = np.array(range(data.shape[0]))
names_to_del = range_array[abs(data[:, 0] - data_mean) > threshold * data_std]
data = np.delete(data, names_to_del, axis=0)
elif isinstance(data, pd.DataFrame):
main_column = data.columns[0]
data_mean = data[main_column].mean()
data_std = data[main_column].std()
data = data[abs(data[main_column] - data_mean) < threshold * data_std]
return data
[docs]def do_difference(data: DataFrameOrArrayGeneric) -> DataFrameOrArrayGeneric:
"""Transform data into neighbor difference.
Args:
data (DataFrameOrArrayGeneric): Data.
Returns:
DataFrameOrArrayGeneric: Differenced data in same format as inserted.
Examples:
>>> data = np.array([1, 3, 5, 2])
>>> print(do_difference(data))
[ 2 2 -3]
"""
if isinstance(data, np.ndarray):
return np.diff(data, axis=0)
elif isinstance(data, (pd.DataFrame, pd.Series)):
return data.diff().iloc[1:]
else:
raise TypeError("Only DataFrame, Series or numpy array supported.")
[docs]def inverse_difference(data: np.ndarray, last_undiff_value: Numeric) -> np.ndarray:
"""Transform do_difference transform back.
Args:
data (np.ndarray): One dimensional differenced data from do_difference function.
last_undiff_value (Numeric): First value to computer the rest.
Returns:
np.ndarray: Normal data, not the additive series.
Examples:
>>> data = np.array([1, 1, 1, 1])
>>> print(inverse_difference(data, 1))
[2 3 4 5]
"""
assert data.ndim == 1, "Data input must be one-dimensional."
return np.insert(data, 0, last_undiff_value).cumsum()[1:]
[docs]def standardize(
data: DataFrameOrArrayGeneric, used_scaler: Literal["standardize", "01", "-11", "robust"] = "standardize"
) -> tuple[DataFrameOrArrayGeneric, "ScalerType"]:
"""Standardize or normalize data.
More standardize methods available. Predicted column is supposed to be 0.
Args:
data (DataFrameOrArrayGeneric): Time series data.
used_scaler (Literal['standardize', '01', '-11', 'robust'], optional): '01' and '-11' means scope
from to for normalization. 'robust' use RobustScaler and 'standardize' use StandardScaler - mean
is 0 and std is 1. Defaults to 'standardize'.
Returns:
tuple[DataFrameOrArrayGeneric, ScalerType]: Standardized data and scaler for inverse transformation.
"""
check_library_is_available("sklearn")
from sklearn import preprocessing
if used_scaler == "01":
scaler = preprocessing.MinMaxScaler(feature_range=(0, 1))
elif used_scaler == "-11":
scaler = preprocessing.MinMaxScaler(feature_range=(-1, 1))
elif used_scaler == "robust":
scaler = preprocessing.RobustScaler()
elif used_scaler == "standardize":
scaler = preprocessing.StandardScaler()
else:
raise TypeError(
f"Your scaler {used_scaler} not in options. Use one of ['01', '-11', 'robust', 'standardize']"
)
# First normalized values are calculated, then scaler just for predicted value is computed again so no
# full matrix is necessary for inverse
if isinstance(data, pd.DataFrame):
normalized = data.copy()
normalized.iloc[:, :] = scaler.fit_transform(data.copy().values)
final_scaler = scaler.fit(data.values[:, 0].reshape(-1, 1))
else:
normalized = scaler.fit_transform(data)
final_scaler = scaler.fit(data[:, 0].reshape(-1, 1))
return normalized, final_scaler # type: ignore
[docs]def standardize_one_way(
data: DataFrameOrArrayGeneric,
minimum: float,
maximum: float,
axis: Literal[0, 1] = 0,
inplace: bool = False,
) -> DataFrameOrArrayGeneric:
"""Own implementation of standardization. No inverse transformation available.
Reason is for builded applications to do not carry sklearn with build.
Args:
data (DataFrameOrArrayGeneric): Data.
minimum (float): Minimum in transformed axis.
maximum (float): Max in transformed axis.
axis (Literal[0, 1], optional): 0 to columns, 1 to rows. Defaults to 0.
inplace (bool, optional): If True, then original data are edited. If False, copy is created.
Defaults to False.
Returns:
DataFrameOrArrayGeneric: Standardized data. If numpy inserted, numpy returned, same for DataFrame.
If input in numpy array, then also output in array, if DataFrame input, then DataFrame output.
"""
data = get_copy_or_view(data, inplace)
values = data.values if isinstance(data, pd.DataFrame) else data
if axis == 0:
values[:, :] = (values - np.nanmin(values, axis=0)) / (
np.nanmax(values, axis=0) - np.nanmin(values, axis=0)
) * (maximum - minimum) + minimum
elif axis == 1:
values[:, :] = (
(values.T - np.nanmin(values.T, axis=0))
/ (np.nanmax(values.T, axis=0) - np.nanmin(values.T, axis=0))
* (maximum - minimum)
+ minimum
).T
return data
[docs]def binning(
data: DataFrameOrArrayGeneric, bins: int, binning_type: Literal["cut", "qcut"] = "cut"
) -> DataFrameOrArrayGeneric:
"""Discretize value on defined number of bins.
It will return the same shape of data, where middle (average) values of bins interval returned.
Args:
data (DataFrameOrArrayGeneric): Data for preprocessing. ndim = 2 (n_samples, n_features).
bins (int): Number of bins - unique values.
binning_type (Literal["cut", "qcut"], optional): "cut" for equal size of bins intervals (different
number of members in bins) or "qcut" for equal number of members in bins and various size of bins.
It uses pandas cut or qcut function. Defaults to "cut".
Returns:
DataFrameOrArrayGeneric: Discretized data of same type as input. If input in numpy
array, then also output in array, if DataFrame input, then DataFrame output.
Example:
>>> binning(np.array(range(5)), bins=3, binning_type="cut")
array([[0.6645],
[0.6645],
[2. ],
[3.3335],
[3.3335]])
"""
df = pd.DataFrame(data)
if binning_type == "qcut":
func = pd.qcut
elif binning_type == "cut":
func = pd.cut
else:
raise TypeError("`binning_type` has to be one of ['cut', 'qcut'].")
for i in df:
df[i] = func(df[i].values, bins)
df[i] = df[i].map(lambda x: x.mid)
if isinstance(data, np.ndarray):
return df.values
else:
return df
[docs]def smooth(
data: DataFrameOrArrayGeneric,
window=101,
polynomial_order=2,
inplace: bool = False,
) -> DataFrameOrArrayGeneric:
"""Smooth data (reduce noise) with Savitzky-Golay filter. For more info on filter check scipy docs.
Args:
data (DataFrameOrArrayGeneric): Input data.
window (int, optional): Length of sliding window. Must be odd. Defaults to 101.
polynomial_order (int, optional) - Must be smaller than window. Defaults to 2.
inplace (bool, optional): If True, then original data are edited. If False, copy is created.
Defaults to False.
Returns:
DataFrameOrArrayGeneric: Cleaned data with less noise.
"""
check_library_is_available("scipy")
import scipy.signal
data = get_copy_or_view(data, inplace)
if isinstance(data, pd.DataFrame):
for i in range(data.shape[1]):
data.iloc[:, i] = scipy.signal.savgol_filter(data.values[:, i], window, polynomial_order)
elif isinstance(data, np.ndarray):
for i in range(data.shape[1]):
data[:, i] = scipy.signal.savgol_filter(data[:, i], window, polynomial_order)
return data