Séries temporelles et map reduce

Links: notebook, html, PDF, python, slides, GitHub

Map/Reduce est un concept qui permet de distribuer les données facilement si elles sont indépendantes. C’est une condition qu’une série temporelle ne vérifie pas du fait de la dépendance temporelle. Voyons ce que cela change.

from jyquickhelper import add_notebook_menu
add_notebook_menu()
# Répare une incompatibilité entre scipy 1.0 et statsmodels 0.8.
from pymyinstall.fix import fix_scipy10_for_statsmodels08
fix_scipy10_for_statsmodels08()

On souhaite simplement calculer la dérivée de la série temporelle : \Delta Y_t = Y_t - Y_{t-1}.

%matplotlib inline

Data

from pyensae.finance import StockPrices
stock = StockPrices("MSFT", folder=".", url="yahoo")
stock.tail()
c:python372_x64libsite-packagespandas_datareadercompat__init__.py:7: FutureWarning: pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.
  from pandas.util.testing import assert_frame_equal
Date High Low Open Close Volume Adj Close
Date
2020-07-06 2020-07-06 211.130005 208.089996 208.830002 210.699997 31897600.0 210.699997
2020-07-07 2020-07-07 214.669998 207.990005 210.449997 208.250000 33600700.0 208.250000
2020-07-08 2020-07-08 213.259995 208.690002 210.070007 212.830002 33600000.0 212.830002
2020-07-09 2020-07-09 216.380005 211.470001 216.330002 214.320007 33121700.0 214.320007
2020-07-10 2020-07-10 214.080002 211.080002 213.619995 213.669998 26149700.0 213.669998

On crée une colonne supplémentaire pour l’indice t de la série temporelle :

dt = stock.df()
dt = dt[["Close"]]
dt = dt.reset_index(drop=True)
data = dt
dt.tail()
Close
5158 210.699997
5159 208.250000
5160 212.830002
5161 214.320007
5162 213.669998

La fonction shift rend le calcul très simple avec pandas.

data_pandas = data.copy()
data_pandas["Close2"] = data.shift(1)
data_pandas["delta"] = data_pandas["Close"] - data_pandas["Close2"]
data_pandas.tail()
Close Close2 delta
5158 210.699997 206.259995 4.440002
5159 208.250000 210.699997 -2.449997
5160 212.830002 208.250000 4.580002
5161 214.320007 212.830002 1.490005
5162 213.669998 214.320007 -0.650009

La fonction est très rapide car elle utilise l’ordre des données et son index. Pour s’en passer, on ajoute une colonne qui contient l’index original puis on mélange pour simuler le fait qu’en map/reduce, l’ordre des informations n’est pas connu à l’avance.

import numpy
data = dt.reindex(numpy.random.permutation(data.index))
data = data.reset_index(drop=False)
data.columns = ["t", "close"]
data.tail()
t close
5158 4063 50.880001
5159 2429 24.650000
5160 451 30.080000
5161 592 27.375000
5162 822 24.760000

Dérivée avec pandas

Le traitement de chaque ligne est indépendant et ne doit pas prendre en compte aucune autre ligne mais on a besoin de Y_{t-1} pour calculer \Delta Y_t.

data["tt"] = data["t"] - 1

méthode efficace

Lors d’une jointure, pandas va trier chaque côté de la jointure par ordre croissant de clé. S’il y a N observations, cela a un coût de N\ln N, il réalise ensuite une fusion des deux bases en ne considérant que les lignes partageant la même clé. Cette façon de faire ne convient que lorsqu’on fait une jointure avec une condition n’incluant que des ET logique et des égalités.

join = data.merge(data, left_on="t", right_on="tt", suffixes=("", "2"))
join.tail()
t close tt t2 close2 tt2
5157 4063 50.880001 4062 4064 52.580002 4063
5158 2429 24.650000 2428 2430 24.000000 2429
5159 451 30.080000 450 452 30.215000 451
5160 592 27.375000 591 593 27.870001 592
5161 822 24.760000 821 823 24.600000 822
derivee = join.copy()
derivee["derivee"] = derivee["close"] - derivee["close2"]
derivee.tail()
t close tt t2 close2 tt2 derivee
5157 4063 50.880001 4062 4064 52.580002 4063 -1.700001
5158 2429 24.650000 2428 2430 24.000000 2429 0.650000
5159 451 30.080000 450 452 30.215000 451 -0.135000
5160 592 27.375000 591 593 27.870001 592 -0.495001
5161 822 24.760000 821 823 24.600000 822 0.160000

En résumé :

new_data = data.copy()
new_data["tt"] = new_data["t"] + 1  # MAP
new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = SORT + MAP + REDUCE
new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
print(new_data.shape)
new_data.tail()
(5162, 7)
t close tt t2 close2 tt2 derivee
5157 4063 50.880001 4064 4062 51.299999 4063 -0.419998
5158 2429 24.650000 2430 2428 24.680000 2429 -0.030001
5159 451 30.080000 452 450 28.950001 451 1.129999
5160 592 27.375000 593 591 27.440001 592 -0.065001
5161 822 24.760000 823 821 24.200001 822 0.559999

mesure de coût

La série n’est pas assez longue pour observer la relation entre le temps de calcul et le nombre d’observations.

def derive(data):
    new_data = data.copy()
    new_data["tt"] = new_data["t"] + 1  # MAP
    new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = MAP + REDUCE
    new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP
    return new_data

On choisit une régression linéaire de type HuberRegressor pour régresser C(N) \sim a N + b \ln N + c N \ln N + d. Ce modèle est beaucoup moins sensible aux points aberrants que les autres formes de régressions. On utilise cette régression avec le module RobustLinearModels du module statsmodels pour calculer les p-values

import time, pandas, numpy, sklearn.linear_model as lin
import matplotlib.pyplot as plt
import statsmodels.api as smapi
from random import randint

def graph_cout(data, h=100, nb=10, add_n2=False, derive=derive):
    dh = len(data) // h
    res = []
    for n in range(max(dh, 500), len(data), dh):
        df = data[0:n+randint(-10,10)]
        mean = []
        for i in range(0, nb):
            t = time.perf_counter()
            derive(df)
            dt = time.perf_counter() - t
            mean.append(dt)
        res.append((n, mean[len(mean)//2]))
    # stat
    stat = pandas.DataFrame(res, columns=["N", "processing time"])
    stat["logN"] = numpy.log(stat["N"]) / numpy.log(10)
    stat["NlogN"] = stat["N"] * stat["logN"]
    stat["N2"] = stat["N"] * stat["N"]
    # statsmodels
    stat["one"] = 1
    X = stat[["logN", "N", "NlogN", "N2", "one"]]
    if not add_n2:
        X = X.drop("N2", axis=1)
    rlm_model = smapi.RLM(stat["processing time"], X, M=smapi.robust.norms.HuberT())
    rlm_results = rlm_model.fit()
    yp = rlm_results.predict(X)
    print(yp.shape, type(yp))
    stat["smooth"] = yp
    # graph
    fig, ax = plt.subplots()
    stat.plot(x="N", y=["processing time", "smooth"], ax=ax)
    return ax, rlm_results

ax, results = graph_cout(data)
print(results.summary())
ax;
(92,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   92
Model:                            RLM   Df Residuals:                       88
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Sat, 11 Jul 2020
Time:                        11:27:22
No. Iterations:                    50
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN           0.0066      0.002      3.251      0.001       0.003       0.011
N          -1.539e-05   4.27e-06     -3.608      0.000   -2.38e-05   -7.03e-06
NlogN       3.752e-06   1.01e-06      3.706      0.000    1.77e-06    5.74e-06
one           -0.0122      0.005     -2.529      0.011      -0.022      -0.003
==============================================================================
If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_24_1.png

Le coût de l’algorithme est au moins linéaire. Cela signifie que le coût du calcul qu’on cherche à mesurer est noyé dans plein d’autres choses non négligeable. La valeur des coefficients n’indique pas grand chose car les variables N, Nlog N évoluent sur des échelles différentes.

N = 200000
from random import random
data2 = pandas.DataFrame({"t":range(0,N), "close": [random() for i in range(0, N)]})
ax, stats = graph_cout(data2, h=25)
print(stats.summary())
ax;
(24,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   24
Model:                            RLM   Df Residuals:                       20
Method:                          IRLS   Df Model:                            3
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Sat, 11 Jul 2020
Time:                        11:27:31
No. Iterations:                    20
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -0.0127      0.011     -1.210      0.226      -0.033       0.008
N           2.962e-06   1.08e-06      2.733      0.006    8.38e-07    5.09e-06
NlogN      -4.718e-07    1.9e-07     -2.479      0.013   -8.45e-07   -9.87e-08
one            0.0446      0.040      1.123      0.261      -0.033       0.122
==============================================================================
If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_26_1.png

Les résultats sont assez volatiles. Il faut regarder les intervalles de confiance et regarder lesquels n’incluent pas 0.

méthode inefficace

Dans ce cas, on fait un produit en croix de toutes les lignes de la bases avec elles-même puis on filtre le résultat pour ne garder que les lignes qui vérifient la condition souhaitée quelle qu’elle soit. Le temps d’exécution est en O(N^2) et la différence est vite significative.

new_data2 = data.copy()
new_data2["tt"] = new_data2["t"] + 1  # MAP
new_data2["key"] = 1
new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2")) # JOIN = MAP^2
new_data2.shape
(26656569, 7)
mapind = new_data2.t == new_data2.t2
new_data2 = new_data2[mapind] # MAP
new_data2.shape
(5163, 7)
new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
new_data2.tail()
t close tt key t2 close2 tt2 derivee
26635912 4063 50.880001 4064 1 4063 50.880001 4064 0
26641076 2429 24.650000 2430 1 2429 24.650000 2430 0
26646240 451 30.080000 452 1 451 30.080000 452 0
26651404 592 27.375000 593 1 592 27.375000 593 0
26656568 822 24.760000 823 1 822 24.760000 823 0

mesure de coût inefficace

def derive_inefficace(data):
    new_data2 = data.copy()
    new_data2["tt"] = new_data2["t"] + 1
    new_data2["key"] = 1
    new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2"))
    new_data2 = new_data2[new_data2.t == new_data2.t2]
    new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP
    return new_data2

ax, stats = graph_cout(data, h=10, nb=5, derive=derive_inefficace, add_n2=True)
print(stats.summary())
ax;
(10,) <class 'pandas.core.series.Series'>
                    Robust linear Model Regression Results
==============================================================================
Dep. Variable:        processing time   No. Observations:                   10
Model:                            RLM   Df Residuals:                        5
Method:                          IRLS   Df Model:                            4
Norm:                          HuberT
Scale Est.:                       mad
Cov Type:                          H1
Date:                Sat, 11 Jul 2020
Time:                        11:29:01
No. Iterations:                    50
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
logN          -3.2810      1.768     -1.856      0.063      -6.746       0.184
N              0.0173      0.007      2.359      0.018       0.003       0.032
NlogN         -0.0048      0.002     -2.428      0.015      -0.009      -0.001
N2          4.344e-07   9.68e-08      4.486      0.000    2.45e-07    6.24e-07
one            6.5873      3.749      1.757      0.079      -0.760      13.934
==============================================================================
If the model instance has been used for another fit with different fit
parameters, then the fit options might not be the correct ones anymore .
../_images/map_reduce_timeseries_33_1.png

avec des itérateurs

version efficace

rows = data[["t", "close"]].itertuples()
list(rows)[:2]
[Pandas(Index=0, t=4275, close=62.9000015258789),
 Pandas(Index=1, t=4747, close=106.94000244140624)]
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1), rows)
list(rows)[:2]
[(4275, 62.9000015258789, 4276), (4747, 106.94000244140624, 4748)]
import copy
from cytoolz.itertoolz import join
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1), rows)
rows = join(lambda t: t[2], rows, lambda t: t[0], copy.deepcopy(rows))
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = map(lambda row: (row[0], row[4] - row[1]), rows)
results = list(rows)
results[:2]
[(4274, -0.09000015258789773), (4746, 0.06999969482419033)]

version inefficace

La méthode inefficace avec pandas est particulièrement inefficace car elle nécessite de stocker un tableau intermédiaire qui contient N^2 lignes.

import copy
from cytoolz.itertoolz import join
rows = data[["t", "close"]].itertuples()
rows = map(lambda r: (r.t, r.close, r.t+1, 1), rows)   # on ajoute 1
rows = join(lambda t: t[-1], rows, lambda t: t[-1], copy.deepcopy(rows))  # on fait un produit croisé
rows = map(lambda tu: tu[0] + tu[1], rows)
rows = filter(lambda t: t[2] == t[4], rows)     # on filtre les lignes qui nous intéresse
rows = map(lambda row: (row[0], row[5] - row[1]), rows)
results = list(rows)   # c'est très lent
results[:4]
[(4274, -0.09000015258789773),
 (4746, 0.06999969482419033),
 (200, 0.65625),
 (2324, -0.019998550415042615)]

avec SQL

conversion du dataframe au format SQL

data = stock.df()
data.columns = [_.replace(" ", "_") for _ in data.columns]
data = data.reset_index(drop=True).reset_index(drop=False)
data.columns = ["t"] + list(data.columns[1:])
data.head()
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.289700
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 36.030037
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.409924
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.190277
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.650139
import sqlite3
con = sqlite3.connect(":memory:")
tbl = data.to_sql("stock", con, index=False)
%load_ext pyensae
%SQL_tables -v con
['stock']
%%SQL -v con

SELECT * FROM stock LIMIT 5
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.289700
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 36.030037
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.409924
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.190277
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.650139

version efficace

L’instruction ON précise sur quelle ou quelles colonnes opérer la fusion.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
ON A.tt == B.t
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11

version inefficace

Pour la version inefficace, l’instruction JOIN effectue tous les combinaisons de lignes possibles, soit N^2. Le mot-clé WHERE garde les couples de lignes qui nous intéresse.

%%SQL -v con

SELECT A.t AS t, A.Close AS Close, A.tt AS tt,
       B.t AS t2, B.Close AS Close2, B.tt AS tt2
FROM (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS A
INNER JOIN (
    SELECT t, Close, t+1 AS tt FROM stock
    ) AS B
WHERE A.tt >= B.t AND A.tt <= B.t -- on écrit l'égalité comme ceci pour contourner les optimisations
                                  -- réalisée par SQLlite
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11