Séries temporelles et map reduce#

Links: notebook, html, PDF, python, slides, GitHub

Map/Reduce est un concept qui permet de distribuer les données facilement si elles sont indépendantes. C’est une condition qu’une série temporelle ne vérifie pas du fait de la dépendance temporelle. Voyons ce que cela change.

from jyquickhelper import add_notebook_menu
add_notebook_menu()
# Répare une incompatibilité entre scipy 1.0 et statsmodels 0.8.
from pymyinstall.fix import fix_scipy10_for_statsmodels08
fix_scipy10_for_statsmodels08()

On souhaite simplement calculer la dérivée de la série temporelle : \Delta Y_t = Y_t - Y_{t-1}.

%matplotlib inline

Data#

from pyensae.finance import StockPrices
stock = StockPrices("MSFT", folder=".", url="yahoo")
stock.tail()
Date High Low Open Close Volume Adj Close
Date
2020-12-24 2020-12-24 223.610001 221.199997 221.419998 222.750000 10550600.0 222.750000
2020-12-28 2020-12-28 226.029999 223.020004 224.449997 224.960007 17933500.0 224.960007
2020-12-29 2020-12-29 227.179993 223.580002 226.309998 224.149994 17403200.0 224.149994
2020-12-30 2020-12-30 225.630005 221.470001 225.229996 221.679993 20272300.0 221.679993
2020-12-31 2020-12-31 223.000000 219.679993 221.699997 222.419998 20926900.0 222.419998

On crée une colonne supplémentaire pour l’indice t de la série temporelle :

dt = stock.df()
dt = dt[["Close"]]
dt = dt.reset_index(drop=True)
data = dt
dt.tail()
Close
5279 222.750000
5280 224.960007
5281 224.149994
5282 221.679993
5283 222.419998

La fonction shift rend le calcul très simple avec pandas.

data_pandas = data.copy()
data_pandas["Close2"] = data.shift(1)
data_pandas["delta"] = data_pandas["Close"] - data_pandas["Close2"]
data_pandas.tail()
Close Close2 delta
5279 222.750000 221.020004 1.729996
5280 224.960007 222.750000 2.210007
5281 224.149994 224.960007 -0.810013
5282 221.679993 224.149994 -2.470001
5283 222.419998 221.679993 0.740005

La fonction est très rapide car elle utilise l’ordre des données et son index. Pour s’en passer, on ajoute une colonne qui contient l’index original puis on mélange pour simuler le fait qu’en map/reduce, l’ordre des informations n’est pas connu à l’avance.

import numpy
data = dt.reindex(numpy.random.permutation(data.index))
data = data.reset_index(drop=False)
data.columns = ["t", "close"]
data.tail()
t close
5279 4660 104.190002
5280 476 31.870001
5281 3522 36.910000
5282 2867 24.670000
5283 4348 65.480003

Dérivée avec pandas#

Le traitement de chaque ligne est indépendant et ne doit pas prendre en compte aucune autre ligne mais on a besoin de Y_{t-1} pour calculer \Delta Y_t.

data["tt"] = data["t"] - 1

méthode efficace#

Lors d’une jointure, pandas va trier chaque côté de la jointure par ordre croissant de clé. S’il y a N observations, cela a un coût de N\ln N, il réalise ensuite une fusion des deux bases en ne considérant que les lignes partageant la même clé. Cette façon de faire ne convient que lorsqu’on fait une jointure avec une condition n’incluant que des ET logique et des égalités.

join = data.merge(data, left_on="t", right_on="tt", suffixes=("", "2"))
join.tail()
t close tt t2 close2 tt2
5278 4660 104.190002 4659 4661 105.430000 4660
5279 476 31.870001 475 477 31.400000 476
5280 3522 36.910000 3521 3523 36.130001 3522
5281 2867 24.670000 2866 2868 24.760000 2867
5282 4348 65.480003 4347 4349 65.389999 4348
derivee = join.copy()
derivee["derivee"] = derivee["close"] - derivee["close2"]
derivee.tail()
t close tt t2 close2 tt2 derivee
5278 4660 104.190002 4659 4661 105.430000 4660 -1.239998
5279 476 31.870001 475 477 31.400000 476 0.470001
5280 3522 36.910000 3521 3523 36.130001 3522 0.779999
5281 2867 24.670000 2866 2868 24.760000 2867 -0.090000
5282 4348 65.480003 4347 4349 65.389999 4348 0.090004

En résumé :

new_data = data.copy()
new_data["tt"] = new_data["t"] + 1  # MAP
new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = SORT + MAP + REDUCE
new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
print(new_data.shape)
new_data.tail()
(5283, 7)
t close tt t2 close2 tt2 derivee
5278 4660 104.190002 4661 4659 101.980003 4660 2.209999
5279 476 31.870001 477 475 32.570000 476 -0.699999
5280 3522 36.910000 3523 3521 37.160000 3522 -0.250000
5281 2867 24.670000 2868 2866 24.190001 2867 0.480000
5282 4348 65.480003 4349 4347 64.949997 4348 0.530006

mesure de coût#

La série n’est pas assez longue pour observer la relation entre le temps de calcul et le nombre d’observations.

def derive(data):
    new_data = data.copy()
    new_data["tt"] = new_data["t"] + 1  # MAP
    new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = MAP + REDUCE
    new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
    return new_data

On choisit une régression linéaire de type HuberRegressor pour régresser C(N) \sim a N + b \ln N + c N \ln N + d. Ce modèle est beaucoup moins sensible aux points aberrants que les autres formes de régressions. On utilise cette régression avec le module RobustLinearModels du module statsmodels pour calculer les p-values

import time, pandas, numpy, sklearn.linear_model as lin
import matplotlib.pyplot as plt
import statsmodels.api as smapi
from random import randint

def graph_cout(data, h=100, nb=10, add_n2=False, derive=derive):
    dh = len(data) // h
    res = []
    for n in range(max(dh, 500), len(data), dh):
        df = data[0:n+randint(-10,10)]
        mean = []
        for i in range(0, nb):
            t = time.perf_counter()
            derive(df)
            dt = time.perf_counter() - t
            mean.append(dt)
        res.append((n, mean[len(mean)//2]))
    # stat
    stat = pandas.DataFrame(res, columns=["N", "processing time"])
    stat["logN"] = numpy.log(stat["N"]) / numpy.log(10)
    stat["NlogN"] = stat["N"] * stat["logN"]
    stat["N2"] = stat["N"] * stat["N"]
    # statsmodels
    stat["one"] = 1
    X = stat[["logN", "N", "NlogN", "N2", "one"]]
    if not add_n2:
        X = X.drop("N2", axis=1)
    rlm_model = smapi.RLM(stat["processing time"], X, M=smapi.robust.norms.HuberT())
    rlm_results = rlm_model.fit()
    yp = rlm_results.predict(X)
    print(yp.shape, type(yp))
    stat["smooth"] = yp
    # graph
    fig, ax = plt.subplots()
    stat.plot(x="N", y=["processing time", "smooth"], ax=ax)
    return ax, rlm_results

ax, results = graph_cout(data)
print(results.summary())
ax;
(92,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   92
Model:                            RLM   Df Residuals:                       88
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Fri, 01 Jan 2021
Time:                        03:27:54
No. Iterations:                    50
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN           0.0035      0.001      2.661      0.008       0.001       0.006
N          -8.829e-06   2.77e-06     -3.191      0.001   -1.43e-05   -3.41e-06
NlogN       2.117e-06   6.56e-07      3.228      0.001    8.31e-07     3.4e-06
one           -0.0049      0.003     -1.544      0.123      -0.011       0.001
==============================================================================
If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_24_1.png

Le coût de l’algorithme est au moins linéaire. Cela signifie que le coût du calcul qu’on cherche à mesurer est noyé dans plein d’autres choses non négligeable. La valeur des coefficients n’indique pas grand chose car les variables N, Nlog N évoluent sur des échelles différentes.

N = 200000
from random import random
data2 = pandas.DataFrame({"t":range(0,N), "close": [random() for i in range(0, N)]})
ax, stats = graph_cout(data2, h=25)
print(stats.summary())
ax;
(24,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   24
Model:                            RLM   Df Residuals:                       20
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Fri, 01 Jan 2021
Time:                        03:28:04
No. Iterations:                    47
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -0.0396      0.017     -2.327      0.020      -0.073      -0.006
N            5.63e-06   1.75e-06      3.213      0.001     2.2e-06    9.06e-06
NlogN      -9.342e-07   3.08e-07     -3.036      0.002   -1.54e-06   -3.31e-07
one            0.1434      0.064      2.232      0.026       0.017       0.269
==============================================================================
If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_26_1.png

Les résultats sont assez volatiles. Il faut regarder les intervalles de confiance et regarder lesquels n’incluent pas 0.

méthode inefficace#

Dans ce cas, on fait un produit en croix de toutes les lignes de la bases avec elles-même puis on filtre le résultat pour ne garder que les lignes qui vérifient la condition souhaitée quelle qu’elle soit. Le temps d’exécution est en O(N^2) et la différence est vite significative.

new_data2 = data.copy()
new_data2["tt"] = new_data2["t"] + 1  # MAP
new_data2["key"] = 1
new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2")) # JOIN = MAP^2
new_data2.shape
(27920656, 7)
mapind = new_data2.t == new_data2.t2
new_data2 = new_data2[mapind] # MAP
new_data2.shape
(5284, 7)
new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
new_data2.tail()
t close tt key t2 close2 tt2 derivee
27899515 4660 104.190002 4661 1 4660 104.190002 4661 0
27904800 476 31.870001 477 1 476 31.870001 477 0
27910085 3522 36.910000 3523 1 3522 36.910000 3523 0
27915370 2867 24.670000 2868 1 2867 24.670000 2868 0
27920655 4348 65.480003 4349 1 4348 65.480003 4349 0

mesure de coût inefficace#

def derive_inefficace(data):
    new_data2 = data.copy()
    new_data2["tt"] = new_data2["t"] + 1
    new_data2["key"] = 1
    new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2"))
    new_data2 = new_data2[new_data2.t == new_data2.t2]
    new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
    return new_data2

ax, stats = graph_cout(data, h=10, nb=5, derive=derive_inefficace, add_n2=True)
print(stats.summary())
ax;
(10,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   10
Model:                            RLM   Df Residuals:                        5
Method:                          IRLS   Df Model:                            4
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Fri, 01 Jan 2021
Time:                        03:29:44
No. Iterations:                    50
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -3.5525      6.675     -0.532      0.595     -16.635       9.530
N              0.0226      0.027      0.837      0.403      -0.030       0.076
NlogN         -0.0064      0.007     -0.889      0.374      -0.021       0.008
N2          6.411e-07   3.49e-07      1.836      0.066   -4.32e-08    1.33e-06
one            6.8518     14.218      0.482      0.630     -21.015      34.718
==============================================================================
If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_33_1.png

avec des itérateurs#

version efficace#

rows = data[["t", "close"]].itertuples()
list(rows)[:2]
[Pandas(Index=0, t=1726, close=29.350000381469727),
 Pandas(Index=1, t=1244, close=27.11000061035156)]
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1), rows)
list(rows)[:2]
[(1726, 29.350000381469727, 1727), (1244, 27.11000061035156, 1245)]
import copy
from cytoolz.itertoolz import join

def get_iterrows():
    rows_as_tuple = data[["t", "close"]].itertuples()
    rows = map(lambda r: (r.t, r.close, r.t+1), rows_as_tuple)
    return rows

rows1 = get_iterrows()
rows2 = get_iterrows()
rows = join(lambda t: t[2], rows1, lambda t: t[0], rows2)
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = map(lambda row: (row[0], row[4] - row[1]), rows)
results = list(rows)
results[:4]
[(1725, 0.1100006103515625),
 (1243, -0.11999893188476918),
 (3128, -0.75),
 (4095, 0.29000091552734375)]

version inefficace#

La méthode inefficace avec pandas est particulièrement inefficace car elle nécessite de stocker un tableau intermédiaire qui contient N^2 lignes.

import copy
from cytoolz.itertoolz import join

def get_iterrows():
    rows_as_tuple = data[["t", "close"]].itertuples()
    rows = map(lambda r: (r.t, r.close, r.t+1, 1), rows_as_tuple)   # on ajoute 1
    return rows

rows1 = get_iterrows()
rows2 = get_iterrows()
rows = join(lambda t: t[-1], rows1, lambda t: t[-1], rows2)  # on fait un produit croisé
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = filter(lambda t: t[2] == t[4], rows)     # on filtre les lignes qui nous intéresse
rows = map(lambda row: (row[0], row[5] - row[1]), rows)
results = list(rows)   # c'est très lent
results[:4]
[(1725, 0.1100006103515625),
 (1243, -0.11999893188476918),
 (3128, -0.75),
 (4095, 0.29000091552734375)]

avec SQL#

conversion du dataframe au format SQL#

data = stock.df()
data.columns = [_.replace(" ", "_") for _ in data.columns]
data = data.reset_index(drop=True).reset_index(drop=False)
data.columns = ["t"] + list(data.columns[1:])
data.head()
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.102634
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 35.849308
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.227283
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.013741
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.471302
import sqlite3
con = sqlite3.connect(":memory:")
tbl = data.to_sql("stock", con, index=False)
%load_ext pyensae
%SQL_tables -v con
['stock']
%%SQL -v con

SELECT * FROM stock LIMIT 5
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.102634
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 35.849308
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.227283
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.013741
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.471302

version efficace#

L’instruction ON précise sur quelle ou quelles colonnes opérer la fusion.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
ON A.tt == B.t
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11

version inefficace#

Pour la version inefficace, l’instruction JOIN effectue tous les combinaisons de lignes possibles, soit N^2. Le mot-clé WHERE garde les couples de lignes qui nous intéresse.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
WHERE A.tt >= B.t AND A.tt <= B.t -- on écrit l'égalité comme ceci pour contourner les optimisations
                                  -- réalisée par SQLlite
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11