Séries temporelles et map reduce

Links: notebook, html ., PDF, python, slides ., GitHub

from jyquickhelper import add_notebook_menu
add_notebook_menu()
run previous cell, wait for 2 seconds

On souhaite simplement calculer la dérivée de la série temporelle : \Delta Y_t = Y_t - Y_{t-1}.

%matplotlib inline

Data

from pyensae.finance import StockPrices
stock = StockPrices("NASDAQ:MSFT", folder=".")
stock.tail()
Date Open High Low Close Volume
Date
2017-05-24 2017-05-24 68.87 68.88 68.45 68.77 14666865
2017-05-25 2017-05-25 68.97 69.88 68.91 69.62 21854095
2017-05-26 2017-05-26 69.80 70.22 69.52 69.96 19827923
2017-05-30 2017-05-30 69.79 70.41 69.77 70.41 17072838
2017-05-31 2017-05-31 70.53 70.74 69.81 69.84 30291492

On crée une colonne supplémentaire pour l’indice t de la série temporelle :

dt = stock.df()
dt = dt[["Close"]]
dt = dt.reset_index(drop=True)
data = dt
dt.tail()
Close
3995 68.77
3996 69.62
3997 69.96
3998 70.41
3999 69.84

La fonction shift rend le calcul très simple avec pandas.

data_pandas = data.copy()
data_pandas["Close2"] = data.shift(1)
data_pandas["delta"] = data_pandas["Close"] - data_pandas["Close2"]
data_pandas.tail()
Close Close2 delta
3995 68.77 68.68 0.09
3996 69.62 68.77 0.85
3997 69.96 69.62 0.34
3998 70.41 69.96 0.45
3999 69.84 70.41 -0.57

La fonction est très rapide car elle utilise l’ordre des données et son index. Pour s’en passer, on ajoute une colonne qui contient l’index original puis on mélange pour simuler le fait qu’en map/reduce, l’ordre des informations n’est pas connu à l’avance.

import numpy
data = dt.reindex(numpy.random.permutation(data.index))
data = data.reset_index(drop=False)
data.columns = ["t", "close"]
data.tail()
t close
3995 548 28.34
3996 294 24.90
3997 3222 40.40
3998 478 24.87
3999 1112 27.75

Dérivée avec pandas

Le traitement de chaque ligne est indépendant et ne doit pas prendre en compte aucune autre ligne mais on a besoin de Y_{t-1} pour calculer \Delta Y_t.

data["tt"] = data["t"] - 1

méthode efficace

Lors d’une jointure, pandas va trier chaque côté de la jointure par ordre croissant de clé. S’il y a N observations, cela a un coût de N\ln N, il réalise ensuite une fusion des deux bases en ne considérant que les lignes partageant la même clé. Cette façon de faire ne convient que lorsqu’on fait une jointure avec une condition n’incluant que des ET logique et des égalités.

join = data.merge(data, left_on="t", right_on="tt", suffixes=("", "2"))
join.tail()
t close tt t2 close2 tt2
3994 548 28.34 547 549 28.36 548
3995 294 24.90 293 295 24.29 294
3996 3222 40.40 3221 3223 40.00 3222
3997 478 24.87 477 479 24.09 478
3998 1112 27.75 1111 1113 27.69 1112
derivee = join.copy()
derivee["derivee"] = derivee["close"] - derivee["close2"]
derivee.tail()
t close tt t2 close2 tt2 derivee
3994 548 28.34 547 549 28.36 548 -0.02
3995 294 24.90 293 295 24.29 294 0.61
3996 3222 40.40 3221 3223 40.00 3222 0.40
3997 478 24.87 477 479 24.09 478 0.78
3998 1112 27.75 1111 1113 27.69 1112 0.06

En résumé :

new_data = data.copy()
new_data["tt"] = new_data["t"] + 1  # MAP
new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = SORT + MAP + REDUCE
new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
print(new_data.shape)
new_data.tail()
(3999, 7)
t close tt t2 close2 tt2 derivee
3994 548 28.34 549 547 27.84 548 0.50
3995 294 24.90 295 293 24.35 294 0.55
3996 3222 40.40 3223 3221 40.51 3222 -0.11
3997 478 24.87 479 477 24.88 478 -0.01
3998 1112 27.75 1113 1111 27.69 1112 0.06

mesure de coût

La série n’est pas assez longue pour observer la relation entre le temps de calcul et le nombre d’observations.

def derive(data):
    new_data = data.copy()
    new_data["tt"] = new_data["t"] + 1  # MAP
    new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = MAP + REDUCE
    new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
    return new_data

On choisit une régression linéaire de type HuberRegressor pour régresser C(N) \sim a N + b \ln N + c N \ln N + d. Ce modèle est beaucoup moins sensible aux points aberrants que les autres formes de régressions. On utilise cette régression avec le module RobustLinearModels du module statsmodels pour calculer les p-values

import time, pandas, numpy, sklearn.linear_model as lin
import matplotlib.pyplot as plt
import statsmodels.api as smapi
from random import randint

def graph_cout(data, h=100, nb=10, add_n2=False, derive=derive):
    dh = len(data) // h
    res = []
    for n in range(max(dh, 500), len(data), dh):
        df = data[0:n+randint(-10,10)]
        mean = []
        for i in range(0, nb):
            t = time.clock()
            derive(df)
            dt = time.clock() - t
            mean.append(dt)
        res.append((n, mean[len(mean)//2]))
    # stat
    stat = pandas.DataFrame(res, columns=["N", "processing time"])
    stat["logN"] = numpy.log(stat["N"]) / numpy.log(10)
    stat["NlogN"] = stat["N"] * stat["logN"]
    stat["N2"] = stat["N"] * stat["N"]
    # statsmodels
    stat["one"] = 1
    X = stat[["logN", "N", "NlogN", "N2", "one"]]
    if not add_n2:
        X = X.drop("N2", axis=1)
    rlm_model = smapi.RLM(stat["processing time"], X, M=smapi.robust.norms.HuberT())
    rlm_results = rlm_model.fit()
    yp = rlm_results.predict(X)
    print(yp.shape, type(yp))
    stat["smooth"] = yp
    # graph
    fig, ax = plt.subplots()
    stat.plot(x="N", y=["processing time", "smooth"], ax=ax)
    return ax, rlm_results

ax, results = graph_cout(data)
print(results.summary())
ax
c:Python36_x64libsite-packagesstatsmodelscompatpandas.py:56: FutureWarning: The pandas.core.datetools module is deprecated and will be removed in a future version. Please use the pandas.tseries module instead.
  from pandas.core import datetools
(88,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   88
Model:                            RLM   Df Residuals:                       84
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Thu, 01 Jun 2017
Time:                        00:52:10
No. Iterations:                    42
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN           0.0048      0.002      2.635      0.008       0.001       0.008
N          -1.104e-05   4.52e-06     -2.445      0.015   -1.99e-05   -2.19e-06
NlogN       2.691e-06   1.09e-06      2.462      0.014    5.48e-07    4.83e-06
one           -0.0089      0.004     -2.081      0.037      -0.017      -0.001
==============================================================================

If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
<matplotlib.axes._subplots.AxesSubplot at 0x285f6b0c7f0>
../_images/map_reduce_timeseries_23_3.png

Le coût de l’algorithme est au moins linéaire. Cela signifie que le coût du calcul qu’on cherche à mesurer est noyé dans plein d’autres choses non négligeable. La valeur des coefficients n’indique pas grand chose car les variables N, Nlog N évoluent sur des échelles différentes.

N = 1000000
from random import random
data2 = pandas.DataFrame({"t":range(0,N), "close": [random() for i in range(0, N)]})
ax, stats = graph_cout(data2, h=25)
print(stats.summary())
ax
(24,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   24
Model:                            RLM   Df Residuals:                       20
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Thu, 01 Jun 2017
Time:                        00:52:49
No. Iterations:                    23
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -0.0111      0.067     -0.168      0.867      -0.142       0.119
N           2.533e-07   1.54e-06      0.165      0.869   -2.76e-06    3.27e-06
NlogN       1.538e-08   2.41e-07      0.064      0.949   -4.57e-07    4.87e-07
one            0.0503      0.298      0.169      0.866      -0.533       0.634
==============================================================================
If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
<matplotlib.axes._subplots.AxesSubplot at 0x285f8d24400>
../_images/map_reduce_timeseries_25_2.png

Les résultats sont assez volatiles. Il faut regarder les intervalles de confiance et regarder lesquels n’incluent pas 0.

méthode inefficace

Dans ce cas, on fait un produit en croix de toutes les lignes de la bases avec elles-même puis on filtre le résultat pour ne garder que les lignes qui vérifient la condition souhaitée quelle qu’elle soit. Le temps d’exécution est en O(N^2) et la différence est vite significative.

new_data2 = data.copy()
new_data2["tt"] = new_data2["t"] + 1  # MAP
new_data2["key"] = 1
new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2")) # JOIN = MAP^2
print(new_data2.shape)
new_data2 = new_data2[new_data2.t == new_data2.t2] # MAP
print(new_data2.shape)
new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
new_data2.tail()
(16000000, 7)
(4000, 7)
t close tt key t2 close2 tt2 derivee
15983995 548 28.34 549 1 548 28.34 549 0
15987996 294 24.90 295 1 294 24.90 295 0
15991997 3222 40.40 3223 1 3222 40.40 3223 0
15995998 478 24.87 479 1 478 24.87 479 0
15999999 1112 27.75 1113 1 1112 27.75 1113 0

mesure de coût inefficace

def derive_inefficace(data):
    new_data2 = data.copy()
    new_data2["tt"] = new_data2["t"] + 1
    new_data2["key"] = 1
    new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2"))
    new_data2 = new_data2[new_data2.t == new_data2.t2]
    new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
    return new_data2

ax, stats = graph_cout(data, h=10, nb=5, derive=derive_inefficace, add_n2=True)
print(stats.summary())
ax
(9,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                    9
Model:                            RLM   Df Residuals:                        4
Method:                          IRLS   Df Model:                            4
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Thu, 01 Jun 2017
Time:                        00:53:37
No. Iterations:                    50
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -4.0121      0.209    -19.179      0.000      -4.422      -3.602
N              0.0234      0.001     22.624      0.000       0.021       0.025
NlogN         -0.0066      0.000    -23.198      0.000      -0.007      -0.006
N2          6.854e-07   1.81e-08     37.819      0.000     6.5e-07    7.21e-07
one            7.9497      0.427     18.602      0.000       7.112       8.787
==============================================================================
If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
<matplotlib.axes._subplots.AxesSubplot at 0x285f6aed358>
../_images/map_reduce_timeseries_30_2.png

avec des itérateurs

version efficace

rows = data[["t", "close"]].itertuples()
list(rows)[:2]
[Pandas(Index=0, t=2511, close=26.0),
 Pandas(Index=1, t=3958, close=65.859999999999999)]
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1), rows)
list(rows)[:2]
[(2511, 26.0, 2512), (3958, 65.859999999999999, 3959)]
import copy
from cytoolz.itertoolz import join
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1), rows)
rows = join(lambda t: t[2], rows, lambda t: t[0], copy.deepcopy(rows))
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = map(lambda row: (row[0], row[4] - row[1]), rows)
results = list(rows)
results[:2]
[(2510, 0.37999999999999901), (3957, 0.15000000000000568)]

version inefficace

La méthode inefficace avec pandas est particulièrement inefficace car elle nécessite de stocker un tableau intermédiaire qui contient N^2 lignes.

import copy
from cytoolz.itertoolz import join
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1, 1), rows)   # on ajoute 1
rows = join(lambda t: t[-1], rows, lambda t: t[-1], copy.deepcopy(rows))  # on fait un produit croisé
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = filter(lambda t: t[2] == t[4], rows)     # on filtre les lignes qui nous intéresse
rows = map(lambda row: (row[0], row[5] - row[1]), rows)
results = list(rows)   # c'est très lent
results[:4]
[(2510, 0.37999999999999901),
 (3957, 0.15000000000000568),
 (3161, -1.3600000000000065),
 (358, 0.32999999999999829)]

avec SQL

conversion du dataframe au format SQL

data = stock.df()
data.columns = [_.replace(" ", "_") for _ in data.columns]
data = data.reset_index(drop=True).reset_index(drop=False)
data.columns = ["t"] + list(data.columns[1:])
data.head()
t Date Open High Low Close Volume
0 0 2001-07-05 35.11 35.36 34.22 34.26 47680400
1 1 2001-07-06 34.15 34.20 32.76 33.03 64697400
2 2 2001-07-09 33.10 33.46 32.52 32.84 65217200
3 3 2001-07-10 32.95 33.12 32.04 32.24 64222600
4 4 2001-07-11 32.10 33.51 32.10 33.25 65988600
import sqlite3
con = sqlite3.connect(":memory:")
tbl = data.to_sql("stock", con, index=False)
%load_ext pyensae
c:Python36_x64libimportlib_bootstrap.py:205: ImportWarning: can't resolve package from __spec__ or __package__, falling back on __name__ and __path__
  return f(*args, **kwds)
c:Python36_x64libsite-packagesqgridgrid.py:224: DeprecationWarning: metadata {'sync': True} was set from the constructor. With traitlets 4.1, metadata should be set using the .tag() method, e.g., Int().tag(key1='value1', key2='value2')
  _view_module = Unicode("nbextensions/qgridjs/qgrid.widget", sync=True)
%SQL_tables -v con
['stock']
%%SQL -v con

SELECT * FROM stock LIMIT 5
t Date Open High Low Close Volume
0 0 2001-07-05 35.11 35.36 34.22 34.26 47680400
1 1 2001-07-06 34.15 34.20 32.76 33.03 64697400
2 2 2001-07-09 33.10 33.46 32.52 32.84 65217200
3 3 2001-07-10 32.95 33.12 32.04 32.24 64222600
4 4 2001-07-11 32.10 33.51 32.10 33.25 65988600

version efficace

L’instruction ON précise sur quelle ou quelles colonnes opérer la fusion.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
ON A.tt == B.t
t Close tt t2 Close2 tt2
0 0 34.26 1 1 33.03 2
1 1 33.03 2 2 32.84 3
2 2 32.84 3 3 32.24 4
3 3 32.24 4 4 33.25 5
4 4 33.25 5 5 35.80 6
5 5 35.80 6 6 35.67 7
6 6 35.67 7 7 35.59 8
7 7 35.59 8 8 35.91 9
8 8 35.91 9 9 35.28 10
9 9 35.28 10 10 36.28 11

version inefficace

Pour la version inefficace, l’instruction JOIN effectue tous les combinaisons de lignes possibles, soit N^2. Le mot-clé WHERE garde les couples de lignes qui nous intéresse.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
WHERE A.tt >= B.t AND A.tt <= B.t -- on écrit l'égalité comme ceci pour contourner les optimisations
                                  -- réalisée par SQLlite
t Close tt t2 Close2 tt2
0 0 34.26 1 1 33.03 2
1 1 33.03 2 2 32.84 3
2 2 32.84 3 3 32.24 4
3 3 32.24 4 4 33.25 5
4 4 33.25 5 5 35.80 6
5 5 35.80 6 6 35.67 7
6 6 35.67 7 7 35.59 8
7 7 35.59 8 8 35.91 9
8 8 35.91 9 9 35.28 10
9 9 35.28 10 10 36.28 11