.. _seance5daskrst: ================================================================== 2A.i - Données non structurées, programmation fonctionnelle : dask ================================================================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/2a/seance_5_dask.ipynb|*` `dask `__ est une sorte de mélange entre `pandas `__ et map/reduce. Le module implémente un sous-ensemble des possibilités de `pandas `__ sur des données qui ne tiennent pas nécessairement en mémoire. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Données : `twitter_for_network_100000.db.zip `__ or `twitter_for_network_100000.db.zip `__ (xavierdupre.fr). Troisième partie : dask ----------------------- Dask se présente comme une surcouche à `toolz `__/`cytoolz `__ et `numpy `__. Il n’offre pas de nouvelles fonctionnalités, mais vous permet de paralléliser les fonctions existantes. .. code:: ipython3 import pyensae.datasource pyensae.datasource.download_data("twitter_for_network_100000.db.zip") .. parsed-literal:: ['.\\twitter_for_network_100000.db'] .. code:: ipython3 import cytoolz as ct # import groupby, valmap, compose import cytoolz.curried as ctc ## pipe, map, filter, get import sqlite3 import pprint try: import ujson as json except: import json conn_sqlite = sqlite3.connect("twitter_for_network_100000.db") cursor_sqlite = conn_sqlite.cursor() .. code:: ipython3 import os, psutil, gc, sys if sys.platform.startswith("win"): pass else: # only on Linux import resource def memory_usage_psutil(): gc.collect() process = psutil.Process(os.getpid()) mem = process.memory_info()[0] / float(2 ** 20) print( "Memory used : %i MB" % mem ) if not sys.platform.startswith("win"): print( "Max memory usage : %i MB" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) ) On extrait les données de la base et on les écrit dans un fichiers plat. .. code:: ipython3 cursor_sqlite.execute("select content FROM tw_users") with open("tw_users_all.json", 'w') as f: ct.count( ct.map( f.write, ct.interpose( "\n", ct.pluck(0, cursor_sqlite ) ) ) ) On sépare ce fichier plat en 50 morceaux. .. code:: ipython3 from tkinterquickhelper.funcwin.default_functions import file_split file_split("tw_users_all.json", 50 ) .. parsed-literal:: 100070 On crée un objet de type dask.bag pour gérer l’ensemble de ces fichiers. .. code:: ipython3 import dask.bag as dbag try: import ujson as json except: import json from operator import add a = dbag.read_text('tw_users_all.json.split.*.json') | On va ensuite pouvoir utiliser la syntaxe cytoolz sur l’objet dbag. | dask va alors automatiquement gérer la parallélisation sur les différents fichiers. .. code:: ipython3 b = a.map(json.loads).pluck("followers_count").fold(add).compute() %timeit -n1 b = a.map(json.loads).pluck("followers_count").fold(add).compute() b .. parsed-literal:: 1 loop, best of 3: 3.34 s per loop .. parsed-literal:: 108086205 Attention à l’état de votre mémoire quand vous utilisez dask. .. code:: ipython3 useless = [it**2 for it in range(25000000)] .. code:: ipython3 #del useless De la même façon, dask vous permet de paralléliser efficacement des opérations effectuées avec numpy. .. code:: ipython3 import numpy import dask.array as da big_random = da.random.normal( 1000, 20000, size = (50000,50000), chunks = (1000,1000) ) .. code:: ipython3 big_random.mean().compute() .. parsed-literal:: 1000.1712484654275 Si vous avez des données obtenues de façon classique sur numpy (ici générées de façon aléatoires) .. code:: ipython3 import numpy as np import h5py for it in range(10): a = np.random.random(size=(5000,5000)) h5f = h5py.File('data_{0:02d}.h5'.format(it), 'w') h5f.create_dataset('dataset_1', data=a) h5f.close() if it % 5 == 0: print(it) .. parsed-literal:: 0 5 `dask `__ gère de façon transparente cet ensemble de matrice comme une seule matrice de dimension supérieure. Ci-dessous ``x`` est une matrice de taille 100 X 5000 X 5000, ce qui ne tiendrait absolument pas en mémoire. .. code:: ipython3 from glob import glob import h5py import dask.array as da files = list(sorted(glob('data_*.h5'))) * 10 dsets = [h5py.File(fn)['dataset_1'] for fn in files] arrays = [da.from_array(dset, chunks=(1000, 1000)) for dset in dsets] x = da.stack(arrays, axis=0) print(x.shape) print( x[:,0,0].sum().compute() ) print( x[:,:,:].sum().compute() ) .. parsed-literal:: (100, 5000, 5000) 48.9364660612 1250010550.97 .. code:: ipython3 memory_usage_psutil() .. parsed-literal:: Memory used : 1273 MB