Coverage for mlinsights/timeseries/agg.py: 96%
49 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-28 08:46 +0100
1"""
2@file
3@brief Data aggregation for timeseries.
4"""
5import datetime
6import pandas
7from pandas.tseries.frequencies import to_offset
10def _get_column_name(df, name='agg'):
11 """
12 Returns a unique column name not in the existing dataframe.
14 @param df dataframe
15 @param name prefix
16 @return new column name
17 """
18 while name in df.columns:
19 name += '_'
20 return name
23def aggregate_timeseries(df, index='time', values='y',
24 unit='half-hour', agg='sum',
25 per=None):
26 """
27 Aggregates timeseries assuming the data is in a dataframe.
29 @param df dataframe
30 @param index time column
31 @param values value or values column
32 @param unit aggregate over a specific period
33 @param sum kind of aggregation
34 @param per second aggregation, per week...
35 @return aggregated values
36 """
37 if df is None:
38 if len(values.shape) == 1:
39 df = pandas.DataFrame(dict(time=index, y=values))
40 values = 'y'
41 else:
42 df = pandas.DataFrame(dict(time=index))
43 for i in range(values.shape[1]):
44 df['y%d' % i] = values[:, i]
45 values = list(df.columns)[1:]
46 index = 'time'
48 def round_(serie, freq, per):
49 fr = to_offset(freq)
50 res = pandas.DatetimeIndex(serie).floor(fr) # pylint: disable=E1101
51 if per is None:
52 return res
53 if per == 'week':
54 pyres = res.to_pydatetime()
55 return pandas.to_timedelta(
56 map(
57 lambda t: datetime.timedelta(
58 days=t.weekday(), hours=t.hour, minutes=t.minute),
59 pyres))
60 if per == 'month':
61 pyres = res.to_pydatetime()
62 return pandas.to_timedelta(
63 map(
64 lambda t: datetime.timedelta(
65 days=t.day, hours=t.hour, minutes=t.minute),
66 pyres))
67 raise ValueError( # pragma: no cover
68 f"Unknown frequency '{per}'.")
70 agg_name = _get_column_name(df)
71 df = df.copy()
72 if unit == 'half-hour':
73 freq = datetime.timedelta(minutes=30)
74 df[agg_name] = round_(df[index], freq, per)
75 else:
76 raise ValueError( # pragma: no cover
77 f"Unknown time unit '{unit}'.")
78 if not isinstance(values, list):
79 values = [values]
80 if agg == 'sum':
81 gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum()
82 agg_name = _get_column_name(gr, 'week' + index)
83 gr.columns = [agg_name] + list(gr.columns[1:])
84 elif agg == 'norm':
85 gr = df[[agg_name] + values].groupby(agg_name, as_index=False).sum()
86 agg_name = _get_column_name(gr, 'week' + index)
87 agg_cols = list(gr.columns[1:])
88 gr.columns = [agg_name] + agg_cols
89 for c in agg_cols:
90 su = gr[c].sum()
91 if su != 0:
92 gr[c] /= su
93 else:
94 raise ValueError( # pragma: no cover
95 f"Unknown aggregation '{agg}'.")
96 return gr.sort_values(agg_name).reset_index(drop=True)