.. _mapreducetimeseriesrst: ================================ Séries temporelles et map reduce ================================ .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/sql/map_reduce_timeseries.ipynb|*` Map/Reduce est un concept qui permet de distribuer les données facilement si elles sont indépendantes. C’est une condition qu’une série temporelle ne vérifie pas du fait de la dépendance temporelle. Voyons ce que cela change. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: .. code:: ipython3 # Répare une incompatibilité entre scipy 1.0 et statsmodels 0.8. from pymyinstall.fix import fix_scipy10_for_statsmodels08 fix_scipy10_for_statsmodels08() On souhaite simplement calculer la dérivée de la série temporelle : :math:`\Delta Y_t = Y_t - Y_{t-1}`. .. code:: ipython3 %matplotlib inline Data ---- .. code:: ipython3 from pyensae.finance import StockPrices stock = StockPrices("MSFT", folder=".", url="yahoo") stock.tail() .. raw:: html
Date High Low Open Close Volume Adj Close
Date
2020-12-24 2020-12-24 223.610001 221.199997 221.419998 222.750000 10550600.0 222.750000
2020-12-28 2020-12-28 226.029999 223.020004 224.449997 224.960007 17933500.0 224.960007
2020-12-29 2020-12-29 227.179993 223.580002 226.309998 224.149994 17403200.0 224.149994
2020-12-30 2020-12-30 225.630005 221.470001 225.229996 221.679993 20272300.0 221.679993
2020-12-31 2020-12-31 223.000000 219.679993 221.699997 222.419998 20926900.0 222.419998
On crée une colonne supplémentaire pour l’indice :math:`t` de la série temporelle : .. code:: ipython3 dt = stock.df() dt = dt[["Close"]] dt = dt.reset_index(drop=True) data = dt dt.tail() .. raw:: html
Close
5279 222.750000
5280 224.960007
5281 224.149994
5282 221.679993
5283 222.419998
La fonction `shift `__ rend le calcul très simple avec pandas. .. code:: ipython3 data_pandas = data.copy() data_pandas["Close2"] = data.shift(1) data_pandas["delta"] = data_pandas["Close"] - data_pandas["Close2"] data_pandas.tail() .. raw:: html
Close Close2 delta
5279 222.750000 221.020004 1.729996
5280 224.960007 222.750000 2.210007
5281 224.149994 224.960007 -0.810013
5282 221.679993 224.149994 -2.470001
5283 222.419998 221.679993 0.740005
La fonction est très rapide car elle utilise l’ordre des données et son index. Pour s’en passer, on ajoute une colonne qui contient l’index original puis on mélange pour simuler le fait qu’en map/reduce, l’ordre des informations n’est pas connu à l’avance. .. code:: ipython3 import numpy data = dt.reindex(numpy.random.permutation(data.index)) data = data.reset_index(drop=False) data.columns = ["t", "close"] data.tail() .. raw:: html
t close
5279 4660 104.190002
5280 476 31.870001
5281 3522 36.910000
5282 2867 24.670000
5283 4348 65.480003
Dérivée avec pandas ------------------- Le traitement de chaque ligne est indépendant et ne doit pas prendre en compte aucune autre ligne mais on a besoin de :math:`Y_{t-1}` pour calculer :math:`\Delta Y_t`. .. code:: ipython3 data["tt"] = data["t"] - 1 méthode efficace ~~~~~~~~~~~~~~~~ Lors d’une jointure, `pandas `__ va trier chaque côté de la jointure par ordre croissant de clé. S’il y a :math:`N` observations, cela a un coût de :math:`N\ln N`, il réalise ensuite une fusion des deux bases en ne considérant que les lignes partageant la même clé. Cette façon de faire ne convient que lorsqu’on fait une jointure avec une condition n’incluant que des ET logique et des égalités. .. code:: ipython3 join = data.merge(data, left_on="t", right_on="tt", suffixes=("", "2")) join.tail() .. raw:: html
t close tt t2 close2 tt2
5278 4660 104.190002 4659 4661 105.430000 4660
5279 476 31.870001 475 477 31.400000 476
5280 3522 36.910000 3521 3523 36.130001 3522
5281 2867 24.670000 2866 2868 24.760000 2867
5282 4348 65.480003 4347 4349 65.389999 4348
.. code:: ipython3 derivee = join.copy() derivee["derivee"] = derivee["close"] - derivee["close2"] derivee.tail() .. raw:: html
t close tt t2 close2 tt2 derivee
5278 4660 104.190002 4659 4661 105.430000 4660 -1.239998
5279 476 31.870001 475 477 31.400000 476 0.470001
5280 3522 36.910000 3521 3523 36.130001 3522 0.779999
5281 2867 24.670000 2866 2868 24.760000 2867 -0.090000
5282 4348 65.480003 4347 4349 65.389999 4348 0.090004
En résumé : .. code:: ipython3 new_data = data.copy() new_data["tt"] = new_data["t"] + 1 # MAP new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = SORT + MAP + REDUCE new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP print(new_data.shape) new_data.tail() .. parsed-literal:: (5283, 7) .. raw:: html
t close tt t2 close2 tt2 derivee
5278 4660 104.190002 4661 4659 101.980003 4660 2.209999
5279 476 31.870001 477 475 32.570000 476 -0.699999
5280 3522 36.910000 3523 3521 37.160000 3522 -0.250000
5281 2867 24.670000 2868 2866 24.190001 2867 0.480000
5282 4348 65.480003 4349 4347 64.949997 4348 0.530006
mesure de coût ~~~~~~~~~~~~~~ La série n’est pas assez longue pour observer la relation entre le temps de calcul et le nombre d’observations. .. code:: ipython3 def derive(data): new_data = data.copy() new_data["tt"] = new_data["t"] + 1 # MAP new_data = new_data.merge(new_data, left_on="t", right_on="tt", suffixes=("", "2")) # JOIN = MAP + REDUCE new_data["derivee"] = new_data["close"] - new_data["close2"] # MAP return new_data On choisit une régression linéaire de type `HuberRegressor `__ pour régresser :math:`C(N) \sim a N + b \ln N + c N \ln N + d`. Ce modèle est beaucoup moins sensible aux points aberrants que les autres formes de régressions. On utilise cette régression avec le module `RobustLinearModels `__ du module *statsmodels* pour calculer les p-values .. code:: ipython3 import time, pandas, numpy, sklearn.linear_model as lin import matplotlib.pyplot as plt import statsmodels.api as smapi from random import randint def graph_cout(data, h=100, nb=10, add_n2=False, derive=derive): dh = len(data) // h res = [] for n in range(max(dh, 500), len(data), dh): df = data[0:n+randint(-10,10)] mean = [] for i in range(0, nb): t = time.perf_counter() derive(df) dt = time.perf_counter() - t mean.append(dt) res.append((n, mean[len(mean)//2])) # stat stat = pandas.DataFrame(res, columns=["N", "processing time"]) stat["logN"] = numpy.log(stat["N"]) / numpy.log(10) stat["NlogN"] = stat["N"] * stat["logN"] stat["N2"] = stat["N"] * stat["N"] # statsmodels stat["one"] = 1 X = stat[["logN", "N", "NlogN", "N2", "one"]] if not add_n2: X = X.drop("N2", axis=1) rlm_model = smapi.RLM(stat["processing time"], X, M=smapi.robust.norms.HuberT()) rlm_results = rlm_model.fit() yp = rlm_results.predict(X) print(yp.shape, type(yp)) stat["smooth"] = yp # graph fig, ax = plt.subplots() stat.plot(x="N", y=["processing time", "smooth"], ax=ax) return ax, rlm_results ax, results = graph_cout(data) print(results.summary()) ax; .. parsed-literal:: (92,) Robust linear Model Regression Results ============================================================================== Dep. Variable: processing time No. Observations: 92 Model: RLM Df Residuals: 88 Method: IRLS Df Model: 3 Norm: HuberT Scale Est.: mad Cov Type: H1 Date: Fri, 01 Jan 2021 Time: 03:27:54 No. Iterations: 50 ============================================================================== coef std err z P>|z| [0.025 0.975] ------------------------------------------------------------------------------ logN 0.0035 0.001 2.661 0.008 0.001 0.006 N -8.829e-06 2.77e-06 -3.191 0.001 -1.43e-05 -3.41e-06 NlogN 2.117e-06 6.56e-07 3.228 0.001 8.31e-07 3.4e-06 one -0.0049 0.003 -1.544 0.123 -0.011 0.001 ============================================================================== If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore . .. image:: map_reduce_timeseries_24_1.png Le coût de l’algorithme est au moins linéaire. Cela signifie que le coût du calcul qu’on cherche à mesurer est noyé dans plein d’autres choses non négligeable. La valeur des coefficients n’indique pas grand chose car les variables :math:`N`, :math:`Nlog N` évoluent sur des échelles différentes. .. code:: ipython3 N = 200000 from random import random data2 = pandas.DataFrame({"t":range(0,N), "close": [random() for i in range(0, N)]}) ax, stats = graph_cout(data2, h=25) print(stats.summary()) ax; .. parsed-literal:: (24,) Robust linear Model Regression Results ============================================================================== Dep. Variable: processing time No. Observations: 24 Model: RLM Df Residuals: 20 Method: IRLS Df Model: 3 Norm: HuberT Scale Est.: mad Cov Type: H1 Date: Fri, 01 Jan 2021 Time: 03:28:04 No. Iterations: 47 ============================================================================== coef std err z P>|z| [0.025 0.975] ------------------------------------------------------------------------------ logN -0.0396 0.017 -2.327 0.020 -0.073 -0.006 N 5.63e-06 1.75e-06 3.213 0.001 2.2e-06 9.06e-06 NlogN -9.342e-07 3.08e-07 -3.036 0.002 -1.54e-06 -3.31e-07 one 0.1434 0.064 2.232 0.026 0.017 0.269 ============================================================================== If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore . .. image:: map_reduce_timeseries_26_1.png Les résultats sont assez volatiles. Il faut regarder les intervalles de confiance et regarder lesquels n’incluent pas 0. méthode inefficace ~~~~~~~~~~~~~~~~~~ Dans ce cas, on fait un produit en croix de toutes les lignes de la bases avec elles-même puis on filtre le résultat pour ne garder que les lignes qui vérifient la condition souhaitée quelle qu’elle soit. Le temps d’exécution est en :math:`O(N^2)` et la différence est vite significative. .. code:: ipython3 new_data2 = data.copy() new_data2["tt"] = new_data2["t"] + 1 # MAP new_data2["key"] = 1 new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2")) # JOIN = MAP^2 new_data2.shape .. parsed-literal:: (27920656, 7) .. code:: ipython3 mapind = new_data2.t == new_data2.t2 new_data2 = new_data2[mapind] # MAP new_data2.shape .. parsed-literal:: (5284, 7) .. code:: ipython3 new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP new_data2.tail() .. raw:: html
t close tt key t2 close2 tt2 derivee
27899515 4660 104.190002 4661 1 4660 104.190002 4661 0
27904800 476 31.870001 477 1 476 31.870001 477 0
27910085 3522 36.910000 3523 1 3522 36.910000 3523 0
27915370 2867 24.670000 2868 1 2867 24.670000 2868 0
27920655 4348 65.480003 4349 1 4348 65.480003 4349 0
mesure de coût inefficace ~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 def derive_inefficace(data): new_data2 = data.copy() new_data2["tt"] = new_data2["t"] + 1 new_data2["key"] = 1 new_data2 = new_data2.merge(new_data2, on="key", suffixes=("", "2")) new_data2 = new_data2[new_data2.t == new_data2.t2] new_data2["derivee"] = new_data2["t"] - new_data2["t2"] # MAP return new_data2 ax, stats = graph_cout(data, h=10, nb=5, derive=derive_inefficace, add_n2=True) print(stats.summary()) ax; .. parsed-literal:: (10,) Robust linear Model Regression Results ============================================================================== Dep. Variable: processing time No. Observations: 10 Model: RLM Df Residuals: 5 Method: IRLS Df Model: 4 Norm: HuberT Scale Est.: mad Cov Type: H1 Date: Fri, 01 Jan 2021 Time: 03:29:44 No. Iterations: 50 ============================================================================== coef std err z P>|z| [0.025 0.975] ------------------------------------------------------------------------------ logN -3.5525 6.675 -0.532 0.595 -16.635 9.530 N 0.0226 0.027 0.837 0.403 -0.030 0.076 NlogN -0.0064 0.007 -0.889 0.374 -0.021 0.008 N2 6.411e-07 3.49e-07 1.836 0.066 -4.32e-08 1.33e-06 one 6.8518 14.218 0.482 0.630 -21.015 34.718 ============================================================================== If the model instance has been used for another fit with different fit parameters, then the fit options might not be the correct ones anymore . .. image:: map_reduce_timeseries_33_1.png avec des itérateurs ------------------- version efficace ~~~~~~~~~~~~~~~~ .. code:: ipython3 rows = data[["t", "close"]].itertuples() list(rows)[:2] .. parsed-literal:: [Pandas(Index=0, t=1726, close=29.350000381469727), Pandas(Index=1, t=1244, close=27.11000061035156)] .. code:: ipython3 rows = data[["t", "close"]].itertuples() rows = map(lambda r: (r.t, r.close, r.t+1), rows) list(rows)[:2] .. parsed-literal:: [(1726, 29.350000381469727, 1727), (1244, 27.11000061035156, 1245)] .. code:: ipython3 import copy from cytoolz.itertoolz import join def get_iterrows(): rows_as_tuple = data[["t", "close"]].itertuples() rows = map(lambda r: (r.t, r.close, r.t+1), rows_as_tuple) return rows rows1 = get_iterrows() rows2 = get_iterrows() rows = join(lambda t: t[2], rows1, lambda t: t[0], rows2) rows = map(lambda tu: tu[0] + tu[1], rows) rows = map(lambda row: (row[0], row[4] - row[1]), rows) results = list(rows) results[:4] .. parsed-literal:: [(1725, 0.1100006103515625), (1243, -0.11999893188476918), (3128, -0.75), (4095, 0.29000091552734375)] version inefficace ~~~~~~~~~~~~~~~~~~ La méthode inefficace avec *pandas* est particulièrement inefficace car elle nécessite de stocker un tableau intermédiaire qui contient :math:`N^2` lignes. .. code:: ipython3 import copy from cytoolz.itertoolz import join def get_iterrows(): rows_as_tuple = data[["t", "close"]].itertuples() rows = map(lambda r: (r.t, r.close, r.t+1, 1), rows_as_tuple) # on ajoute 1 return rows rows1 = get_iterrows() rows2 = get_iterrows() rows = join(lambda t: t[-1], rows1, lambda t: t[-1], rows2) # on fait un produit croisé rows = map(lambda tu: tu[0] + tu[1], rows) rows = filter(lambda t: t[2] == t[4], rows) # on filtre les lignes qui nous intéresse rows = map(lambda row: (row[0], row[5] - row[1]), rows) results = list(rows) # c'est très lent results[:4] .. parsed-literal:: [(1725, 0.1100006103515625), (1243, -0.11999893188476918), (3128, -0.75), (4095, 0.29000091552734375)] avec SQL -------- conversion du dataframe au format SQL ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 data = stock.df() data.columns = [_.replace(" ", "_") for _ in data.columns] data = data.reset_index(drop=True).reset_index(drop=False) data.columns = ["t"] + list(data.columns[1:]) data.head() .. raw:: html
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.102634
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 35.849308
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.227283
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.013741
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.471302
.. code:: ipython3 import sqlite3 con = sqlite3.connect(":memory:") tbl = data.to_sql("stock", con, index=False) .. code:: ipython3 %load_ext pyensae .. code:: ipython3 %SQL_tables -v con .. parsed-literal:: ['stock'] .. code:: ipython3 %%SQL -v con SELECT * FROM stock LIMIT 5 .. raw:: html
t Date High Low Open Close Volume Adj_Close
0 0 2000-01-03 59.3125 56.00000 58.68750 58.28125 53228400.0 37.102634
1 1 2000-01-04 58.5625 56.12500 56.78125 56.31250 54119000.0 35.849308
2 2 2000-01-05 58.1875 54.68750 55.56250 56.90625 64059600.0 36.227283
3 3 2000-01-06 56.9375 54.18750 56.09375 55.00000 54976600.0 35.013741
4 4 2000-01-07 56.1250 53.65625 54.31250 55.71875 62013600.0 35.471302
version efficace ~~~~~~~~~~~~~~~~ L’instruction ``ON`` précise sur quelle ou quelles colonnes opérer la fusion. .. code:: ipython3 %%SQL -v con SELECT A.t AS t, A.Close AS Close, A.tt AS tt, B.t AS t2, B.Close AS Close2, B.tt AS tt2 FROM ( SELECT t, Close, t+1 AS tt FROM stock ) AS A INNER JOIN ( SELECT t, Close, t+1 AS tt FROM stock ) AS B ON A.tt == B.t .. raw:: html
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11
version inefficace ~~~~~~~~~~~~~~~~~~ Pour la version inefficace, l’instruction ``JOIN`` effectue tous les combinaisons de lignes possibles, soit :math:`N^2`. Le mot-clé ``WHERE`` garde les couples de lignes qui nous intéresse. .. code:: ipython3 %%SQL -v con SELECT A.t AS t, A.Close AS Close, A.tt AS tt, B.t AS t2, B.Close AS Close2, B.tt AS tt2 FROM ( SELECT t, Close, t+1 AS tt FROM stock ) AS A INNER JOIN ( SELECT t, Close, t+1 AS tt FROM stock ) AS B WHERE A.tt >= B.t AND A.tt <= B.t -- on écrit l'égalité comme ceci pour contourner les optimisations -- réalisée par SQLlite .. raw:: html
t Close tt t2 Close2 tt2
0 0 58.28125 1 1 56.31250 2
1 1 56.31250 2 2 56.90625 3
2 2 56.90625 3 3 55.00000 4
3 3 55.00000 4 4 55.71875 5
4 4 55.71875 5 5 56.12500 6
5 5 56.12500 6 6 54.68750 7
6 6 54.68750 7 7 52.90625 8
7 7 52.90625 8 8 53.90625 9
8 8 53.90625 9 9 56.12500 10
9 9 56.12500 10 10 57.65625 11