from jyquickhelper import add_notebook_menu
add_notebook_menu()
Données : twitter_for_network_100000.db.zip or twitter_for_network_100000.db.zip (xavierdupre.fr).
import pyensae.datasource
pyensae.datasource.download_data("twitter_for_network_100000.db.zip")
['.\\twitter_for_network_100000.db']
import cytoolz as ct # import groupby, valmap, compose
import cytoolz.curried as ctc ## pipe, map, filter, get
import sqlite3
import pprint
try:
import ujson as json
except:
import json
conn_sqlite = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite = conn_sqlite.cursor()
import os, psutil, gc, sys
if sys.platform.startswith("win"):
pass
else:
# only on Linux
import resource
def memory_usage_psutil():
gc.collect()
process = psutil.Process(os.getpid())
mem = process.memory_info()[0] / float(2 ** 20)
print( "Memory used : %i MB" % mem )
if not sys.platform.startswith("win"):
print( "Max memory usage : %i MB" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) )
On extrait les données de la base et on les écrit dans un fichiers plat.
cursor_sqlite.execute("select content FROM tw_users")
with open("tw_users_all.json", 'w') as f:
ct.count( ct.map( f.write, ct.interpose( "\n", ct.pluck(0, cursor_sqlite ) ) ) )
On sépare ce fichier plat en 50 morceaux.
from tkinterquickhelper.funcwin.default_functions import file_split
file_split("tw_users_all.json", 50 )
100070
On crée un objet de type dask.bag pour gérer l'ensemble de ces fichiers.
import dask.bag as dbag
try:
import ujson as json
except:
import json
from operator import add
a = dbag.read_text('tw_users_all.json.split.*.json')
On va ensuite pouvoir utiliser la syntaxe cytoolz sur l'objet dbag.
dask va alors automatiquement gérer la parallélisation sur les différents fichiers.
b = a.map(json.loads).pluck("followers_count").fold(add).compute()
%timeit -n1 b = a.map(json.loads).pluck("followers_count").fold(add).compute()
b
1 loop, best of 3: 3.34 s per loop
108086205
Attention à l'état de votre mémoire quand vous utilisez dask.
useless = [it**2 for it in range(25000000)]
#del useless
De la même façon, dask vous permet de paralléliser efficacement des opérations effectuées avec numpy.
import numpy
import dask.array as da
big_random = da.random.normal( 1000, 20000, size = (50000,50000), chunks = (1000,1000) )
big_random.mean().compute()
1000.1712484654275
Si vous avez des données obtenues de façon classique sur numpy (ici générées de façon aléatoires)
import numpy as np
import h5py
for it in range(10):
a = np.random.random(size=(5000,5000))
h5f = h5py.File('data_{0:02d}.h5'.format(it), 'w')
h5f.create_dataset('dataset_1', data=a)
h5f.close()
if it % 5 == 0:
print(it)
0 5
dask gère de façon transparente cet ensemble de matrice comme une seule matrice de dimension supérieure. Ci-dessous x
est une matrice de taille 100 X 5000 X 5000, ce qui ne tiendrait absolument pas en mémoire.
from glob import glob
import h5py
import dask.array as da
files = list(sorted(glob('data_*.h5'))) * 10
dsets = [h5py.File(fn)['dataset_1'] for fn in files]
arrays = [da.from_array(dset, chunks=(1000, 1000)) for dset in dsets]
x = da.stack(arrays, axis=0)
print(x.shape)
print( x[:,0,0].sum().compute() )
print( x[:,:,:].sum().compute() )
(100, 5000, 5000) 48.9364660612 1250010550.97
memory_usage_psutil()
Memory used : 1273 MB