Source code for mlinsights.timeseries.preprocessing
"""
Timeseries preprocessing.
:githublink:`%|py|5`
"""
import numpy
from .base import BaseReciprocalTimeSeriesTransformer
[docs]class TimeSeriesDifference(BaseReciprocalTimeSeriesTransformer):
"""
Computes timeseries differences.
:githublink:`%|py|12`
"""
[docs] def __init__(self, degree=1):
"""
:param degree: number of differences
:githublink:`%|py|17`
"""
BaseReciprocalTimeSeriesTransformer.__init__(self, degree)
@property
def degree(self):
"""
Returns the degree.
:githublink:`%|py|24`
"""
return self.context_length
[docs] def fit(self, X, y, sample_weight=None):
"""
Stores the first values.
:githublink:`%|py|30`
"""
self.X_ = X[:self.degree].copy()
self.y_ = y[:self.degree].copy()
for n in range(1, self.degree):
self.y_[n:] -= self.y_[n - 1:-1]
return self
[docs] def transform(self, X, y, sample_weight=None):
"""
Transforms both *X* and *y*.
Returns *X* and *y*, returns
*sample_weight* as well if not None.
:githublink:`%|py|42`
"""
for _ in range(self.degree):
y = y[1:] - y[:-1]
X = X[1:]
if sample_weight is None:
return X, y
return X, y, sample_weight[1:]
[docs] def get_fct_inv(self):
"""
Returns the reverse tranform.
:githublink:`%|py|53`
"""
return TimeSeriesDifferenceInv(self).fit()
[docs]class TimeSeriesDifferenceInv(BaseReciprocalTimeSeriesTransformer):
"""
Computes the reverse of :class:`TimeSeriesDifference <mlinsights.timeseries.preprocessing.TimeSeriesDifference>`.
:githublink:`%|py|60`
"""
[docs] def __init__(self, estimator):
"""
:param estimator: of type :class:`TimeSeriesDifference <mlinsights.timeseries.preprocessing.TimeSeriesDifference>`
:githublink:`%|py|65`
"""
BaseReciprocalTimeSeriesTransformer.__init__(
self, estimator.context_length)
if not isinstance(estimator, TimeSeriesDifference):
raise TypeError( # pragma: no cover
"estimator must be of type TimeSeriesDifference not {}"
"".format(type(estimator)))
self.estimator = estimator
[docs] def fit(self, X=None, y=None, sample_weight=None):
"""
Checks that estimator is fitted.
:githublink:`%|py|77`
"""
if not hasattr(self.estimator, 'X_'):
raise RuntimeError( # pragma: no cover
"Estimator is not fitted.")
self.estimator_ = self.estimator
return self
[docs] def transform(self, X, y, sample_weight=None):
"""
Transforms both *X* and *y*.
Returns *X* and *y*, returns
*sample_weight* as well if not None.
:githublink:`%|py|89`
"""
if len(y.shape) == 1:
y = y.reshape((y.shape[0], 1))
squeeze = True
else:
squeeze = False
if len(self.estimator_.y_.shape) == 1:
y0 = self.estimator_.y_.reshape((y.shape[0], 1))
else:
y0 = self.estimator_.y_
r0 = self.estimator_.X_.shape[0]
nx = numpy.empty((r0 + X.shape[0], X.shape[1]), dtype=X.dtype)
nx[:r0, :] = self.estimator_.X_
nx[r0:, :] = X
ny = numpy.empty((r0 + X.shape[0], y.shape[1]), dtype=X.dtype)
ny[:r0, :] = y0
ny[r0:, :] = y
for i in range(self.estimator_.degree):
numpy.cumsum(ny[r0 - i - 1:, :], axis=0, out=ny[r0 - i - 1:, :])
if squeeze:
ny = numpy.squeeze(ny)
if sample_weight is None:
return nx, ny
nw = numpy.zeros(ny.shape[0])
de = nw.shape[0] - sample_weight.shape[0]
nw[de:] = sample_weight
return nx, ny, nw