Source code for mlinsights.timeseries.agg

"""
Data aggregation for timeseries.


:githublink:`%|py|5`
"""
import datetime
import pandas
from pandas.tseries.frequencies import to_offset


[docs]def _get_column_name(df, name='agg'): """ Returns a unique column name not in the existing dataframe. :param df: dataframe :param name: prefix :return: new column name :githublink:`%|py|17` """ while name in df.columns: name += '_' return name
[docs]def aggregate_timeseries(df, index='time', values='y', unit='half-hour', agg='sum', per=None): """ Aggregates timeseries assuming the data is in a dataframe. :param df: dataframe :param index: time column :param values: value or values column :param unit: aggregate over a specific period :param sum: kind of aggregation :param per: second aggregation, per week... :return: aggregated values :githublink:`%|py|36` """ if df is None: if len(values.shape) == 1: df = pandas.DataFrame(dict(time=index, y=values)) values = 'y' else: df = pandas.DataFrame(dict(time=index)) for i in range(values.shape[1]): df['y%d' % i] = values[:, i] values = list(df.columns)[1:] index = 'time' def round_(serie, freq, per): fr = to_offset(freq) res = pandas.DatetimeIndex(serie).floor(fr) # pylint: disable=E1101 if per is None: return res if per == 'week': pyres = res.to_pydatetime() return pandas.to_timedelta( map( lambda t: datetime.timedelta( days=t.weekday(), hours=t.hour, minutes=t.minute), pyres)) if per == 'month': pyres = res.to_pydatetime() return pandas.to_timedelta( map( lambda t: datetime.timedelta( days=t.day, hours=t.hour, minutes=t.minute), pyres)) raise ValueError( # pragma: no cover "Unknown frequency '{}'.".format(per)) agg_name = _get_column_name(df) df = df.copy() if unit == 'half-hour': freq = datetime.timedelta(minutes=30) df[agg_name] = round_(df[index], freq, per) else: raise ValueError( # pragma: no cover "Unknown time unit '{}'.".format(unit)) if not isinstance(values, list): values = [values] if agg == 'sum': gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum() agg_name = _get_column_name(gr, 'week' + index) gr.columns = [agg_name] + list(gr.columns[1:]) elif agg == 'norm': gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum() agg_name = _get_column_name(gr, 'week' + index) agg_cols = list(gr.columns[1:]) gr.columns = [agg_name] + agg_cols for c in agg_cols: su = gr[c].sum() if su != 0: gr[c] /= su else: raise ValueError( # pragma: no cover "Unknown aggregation '{}'.".format(agg)) return gr.sort_values(agg_name).reset_index(drop=True)