Source code for mlinsights.timeseries.utils

"""
Timeseries data manipulations.


:githublink:`%|py|5`
"""
import numpy
from sklearn import get_config


[docs]def build_ts_X_y(model, X, y, weights=None, same_rows=False): """ Builds standard *X, y* based in the given one. :param model: a timeseries model (:class:`BaseTimeSeries <mlinsights.timeseries.base.BaseTimeSeries>`) :param X: times series, used as features, [n_obs, n_features], X may be empty (None) :param y: timeseries (one single vector), [n_obs] :param weights: weights None or array [n_obs] :param same_rows: keep the same number of rows as the original datasets, use nan when no value is available :return: *(X, y, weights)*: X is array of features [nrows, n_features + past] where `nrows = n_obs + model.delay2 - model.past + 2`, y is an array of targets [nrows], weights is None or array [nrows] .. runpython:: :showcode: import numpy from mlinsights.timeseries import build_ts_X_y from mlinsights.timeseries.base import BaseTimeSeries X = numpy.arange(10).reshape(5, 2) y = numpy.arange(5) * 100 weights = numpy.arange(5) * 1000 bs = BaseTimeSeries(past=2) nx, ny, nw = build_ts_X_y(bs, X, y, weights) print('X=', X) print('y=', y) print('nx=', nx) print('ny=', ny) With ``use_all_past=True``: .. runpython:: :showcode: import numpy from mlinsights.timeseries.base import BaseTimeSeries from mlinsights.timeseries import build_ts_X_y X = numpy.arange(10).reshape(5, 2) y = numpy.arange(5) * 100 weights = numpy.arange(5) * 1000 bs = BaseTimeSeries(past=2, use_all_past=True) nx, ny, nw = build_ts_X_y(bs, X, y, weights) print('X=', X) print('y=', y) print('nx=', nx) print('ny=', ny) :githublink:`%|py|61` """ if not hasattr(model, "use_all_past") or not hasattr(model, "past"): raise TypeError( # pragma: no cover "model must be of type BaseTimeSeries not {}".format(type(model))) if same_rows: if model.use_all_past: ncol = X.shape[1] if X is not None else 0 nrow = y.shape[0] - model.delay2 - model.past + 2 new_X = numpy.full( (y.shape[0], ncol * model.past + model.past), numpy.nan, dtype=y.dtype) first = y.shape[0] - nrow if X is not None: for i in range(0, model.past): begin = i * ncol end = begin + ncol new_X[i:, begin:end] = X[i:] for i in range(0, model.past): end = y.shape[0] + i + model.delay1 - 1 - model.delay2 new_X[first - i:first - i + end - i, i + ncol * model.past] = y[i: end] new_y = numpy.full( (y.shape[0], model.delay2 - model.delay1), numpy.nan, dtype=y.dtype) for i in range(model.delay1, model.delay2): new_y[first:, i - model.delay1] = y[i + 1:i + nrow + 1] new_weights = weights else: ncol = X.shape[1] if X is not None else 0 nrow = y.shape[0] - model.delay2 - model.past + 2 first = y.shape[0] - nrow new_X = numpy.full( (y.shape[0], ncol + model.past), numpy.nan, dtype=y.dtype) if X is not None: new_X[first:, :X.shape[1]] = ( X[model.past - 1: X.shape[0] - model.delay2 + 1]) for i in range(model.past): end = y.shape[0] + i + model.delay1 - \ 1 - model.delay2 - model.past + 2 new_X[first:, i + ncol] = y[i: end] new_y = numpy.full( (y.shape[0], model.delay2 - model.delay1), numpy.nan, dtype=y.dtype) for i in range(model.delay1, model.delay2): dec = model.past - 1 new_y[first:, i - model.delay1] = y[i + dec:i + nrow + dec] new_weights = weights else: if model.use_all_past: ncol = X.shape[1] if X is not None else 0 nrow = y.shape[0] - model.delay2 - model.past + 2 new_X = numpy.empty( (nrow, ncol * model.past + model.past), dtype=y.dtype) if X is not None: for i in range(0, model.past): begin = i * ncol end = begin + ncol new_X[:, begin:end] = X[i: i + nrow] for i in range(0, model.past): end = y.shape[0] + i + model.delay1 - 1 - model.delay2 new_X[:, i + ncol * model.past] = y[i: end] new_y = numpy.empty( (nrow, model.delay2 - model.delay1), dtype=y.dtype) for i in range(model.delay1, model.delay2): new_y[:, i - model.delay1] = y[i + 1:i + nrow + 1] new_weights = (None if weights is None else weights[model.past - 1:model.past - 1 + nrow]) else: ncol = X.shape[1] if X is not None else 0 nrow = y.shape[0] - model.delay2 - model.past + 2 new_X = numpy.empty((nrow, ncol + model.past), dtype=y.dtype) if X is not None: new_X[:, :X.shape[1]] = X[model.past - 1: X.shape[0] - model.delay2 + 1] for i in range(model.past): end = y.shape[0] + i + model.delay1 - \ 1 - model.delay2 - model.past + 2 new_X[:, i + ncol] = y[i: end] new_y = numpy.empty( (nrow, model.delay2 - model.delay1), dtype=y.dtype) for i in range(model.delay1, model.delay2): dec = model.past - 1 new_y[:, i - model.delay1] = y[i + dec:i + nrow + dec] new_weights = (None if weights is None else weights[model.past - 1:model.past - 1 + nrow]) return new_X, new_y, new_weights
[docs]def check_ts_X_y(model, X, y): """ Checks that datasets *(X, y)* was built with function :func:`build_ts_X_y <mlinsights.timeseries.utils.build_ts_X_y>`. :githublink:`%|py|160` """ cfg = get_config() if cfg.get('assume_finite', True): return # pragma: no cover if X.dtype not in (numpy.float32, numpy.float64): raise TypeError( "Features must be of type float32 and float64 not {}.".format(X.dtype)) if y is not None and y.dtype not in (numpy.float32, numpy.float64): raise TypeError( # pragma: no cover "Features must be of type float32 and float64 not {}.".format(y.dtype)) cst = model.past if (hasattr(model, 'preprocessing_') and model.preprocessing_ is not None): cst += model.preprocessing_.context_length if y is None: if cst > 0: raise AssertionError( # pragma: no cover "y must be specified to give the model past data to predict, " "it requires at least {} observations.".format(cst)) return # pragma: no cover if y.shape[0] != X.shape[0]: raise AssertionError( # pragma: no cover "X and y must have the same number of rows {} != {}.".format( X.shape[0], y.shape[0])) if len(y.shape) > 1 and y.shape[1] != 1: raise AssertionError( # pragma: no cover "y must be 1-dimensional not has shape {}.".format(y.shape)) if y.shape[0] < cst: raise AssertionError( # pragma: no cover "y is not enough past data to predict, " "it requires at least {} observations.".format(cst))