2A.i - Données non structurées, programmation fonctionnelle : dask#

Links: notebook, html, python, slides, GitHub

dask est une sorte de mélange entre pandas et map/reduce. Le module implémente un sous-ensemble des possibilités de pandas sur des données qui ne tiennent pas nécessairement en mémoire.

from jyquickhelper import add_notebook_menu
add_notebook_menu()

Données : twitter_for_network_100000.db.zip or twitter_for_network_100000.db.zip (xavierdupre.fr).

Troisième partie : dask#

Dask se présente comme une surcouche à toolz/cytoolz et numpy. Il n’offre pas de nouvelles fonctionnalités, mais vous permet de paralléliser les fonctions existantes.

import pyensae.datasource
pyensae.datasource.download_data("twitter_for_network_100000.db.zip")
['.\twitter_for_network_100000.db']
import cytoolz as ct # import groupby, valmap, compose
import cytoolz.curried as ctc ## pipe, map, filter, get
import sqlite3
import pprint
try:
    import ujson as json
except:
    import json

conn_sqlite = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite = conn_sqlite.cursor()
import os, psutil, gc, sys
if sys.platform.startswith("win"):
    pass
else:
    # only on Linux
    import resource

def memory_usage_psutil():
    gc.collect()
    process = psutil.Process(os.getpid())
    mem = process.memory_info()[0] / float(2 ** 20)

    print( "Memory used : %i MB" % mem )
    if not sys.platform.startswith("win"):
        print( "Max memory usage : %i MB" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) )

On extrait les données de la base et on les écrit dans un fichiers plat.

cursor_sqlite.execute("select content FROM tw_users")

with open("tw_users_all.json", 'w') as f:
    ct.count( ct.map( f.write, ct.interpose( "\n", ct.pluck(0, cursor_sqlite ) ) ) )

On sépare ce fichier plat en 50 morceaux.

from tkinterquickhelper.funcwin.default_functions import file_split
file_split("tw_users_all.json", 50 )
100070

On crée un objet de type dask.bag pour gérer l’ensemble de ces fichiers.

import dask.bag as dbag
try:
    import ujson as json
except:
    import json
from operator import add

a = dbag.read_text('tw_users_all.json.split.*.json')
On va ensuite pouvoir utiliser la syntaxe cytoolz sur l’objet dbag.
dask va alors automatiquement gérer la parallélisation sur les différents fichiers.
b = a.map(json.loads).pluck("followers_count").fold(add).compute()
%timeit -n1 b = a.map(json.loads).pluck("followers_count").fold(add).compute()
b
1 loop, best of 3: 3.34 s per loop
108086205

Attention à l’état de votre mémoire quand vous utilisez dask.

useless = [it**2 for it in range(25000000)]
#del useless

De la même façon, dask vous permet de paralléliser efficacement des opérations effectuées avec numpy.

import numpy
import dask.array as da

big_random = da.random.normal( 1000, 20000, size = (50000,50000), chunks = (1000,1000) )
big_random.mean().compute()
1000.1712484654275

Si vous avez des données obtenues de façon classique sur numpy (ici générées de façon aléatoires)

import numpy as np
import h5py

for it in range(10):
    a = np.random.random(size=(5000,5000))
    h5f = h5py.File('data_{0:02d}.h5'.format(it), 'w')
    h5f.create_dataset('dataset_1', data=a)
    h5f.close()
    if it % 5 == 0:
        print(it)
0
5

dask gère de façon transparente cet ensemble de matrice comme une seule matrice de dimension supérieure. Ci-dessous x est une matrice de taille 100 X 5000 X 5000, ce qui ne tiendrait absolument pas en mémoire.

from glob import glob
import h5py
import dask.array as da

files = list(sorted(glob('data_*.h5'))) * 10
dsets = [h5py.File(fn)['dataset_1'] for fn in files]
arrays = [da.from_array(dset, chunks=(1000, 1000)) for dset in dsets]
x = da.stack(arrays, axis=0)
print(x.shape)
print( x[:,0,0].sum().compute() )
print( x[:,:,:].sum().compute() )
(100, 5000, 5000)
48.9364660612
1250010550.97
memory_usage_psutil()
Memory used : 1273 MB