{"cells": [{"cell_type": "markdown", "metadata": {}, "source": ["# 2A.i - Donn\u00e9es non structur\u00e9es, programmation fonctionnelle : dask\n", "\n", "[dask](https://dask.pydata.org/en/latest/) est une sorte de m\u00e9lange entre [pandas](http://pandas.pydata.org/) et map/reduce. Le module impl\u00e9mente un sous-ensemble des possibilit\u00e9s de [pandas](http://pandas.pydata.org/) sur des donn\u00e9es qui ne tiennent pas n\u00e9cessairement en m\u00e9moire."]}, {"cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [{"data": {"text/html": ["
run previous cell, wait for 2 seconds
\n", ""], "text/plain": [""]}, "execution_count": 2, "metadata": {}, "output_type": "execute_result"}], "source": ["from jyquickhelper import add_notebook_menu\n", "add_notebook_menu()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Donn\u00e9es : [twitter_for_network_100000.db.zip](https://drive.google.com/open?id=0B6jkqYitZ0uTQ3k1NDZmLUJBZVk) or [twitter_for_network_100000.db.zip](http://www.xavierdupre.fr/enseignement/complements/twitter_for_network_100000.db.zip) (xavierdupre.fr)."]}, {"cell_type": "markdown", "metadata": {}, "source": ["## Troisi\u00e8me partie : dask "]}, {"cell_type": "markdown", "metadata": {}, "source": ["Dask se pr\u00e9sente comme une surcouche \u00e0 [toolz](https://github.com/pytoolz/toolz/)/[cytoolz](https://github.com/pytoolz/cytoolz) et [numpy](http://www.numpy.org/). Il n'offre pas de nouvelles fonctionnalit\u00e9s, mais vous permet de parall\u00e9liser les fonctions existantes."]}, {"cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [{"data": {"text/plain": ["['.\\\\twitter_for_network_100000.db']"]}, "execution_count": 3, "metadata": {}, "output_type": "execute_result"}], "source": ["import pyensae.datasource\n", "pyensae.datasource.download_data(\"twitter_for_network_100000.db.zip\")"]}, {"cell_type": "code", "execution_count": 3, "metadata": {"collapsed": true}, "outputs": [], "source": ["import cytoolz as ct # import groupby, valmap, compose\n", "import cytoolz.curried as ctc ## pipe, map, filter, get\n", "import sqlite3\n", "import pprint\n", "try:\n", " import ujson as json\n", "except:\n", " import json\n", "\n", "conn_sqlite = sqlite3.connect(\"twitter_for_network_100000.db\")\n", "cursor_sqlite = conn_sqlite.cursor()"]}, {"cell_type": "code", "execution_count": 4, "metadata": {"collapsed": true}, "outputs": [], "source": ["import os, psutil, gc, sys\n", "if sys.platform.startswith(\"win\"):\n", " pass\n", "else:\n", " # only on Linux\n", " import resource\n", "\n", "def memory_usage_psutil():\n", " gc.collect()\n", " process = psutil.Process(os.getpid())\n", " mem = process.memory_info()[0] / float(2 ** 20)\n", "\n", " print( \"Memory used : %i MB\" % mem )\n", " if not sys.platform.startswith(\"win\"): \n", " print( \"Max memory usage : %i MB\" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) )"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On extrait les donn\u00e9es de la base et on les \u00e9crit dans un fichiers plat."]}, {"cell_type": "code", "execution_count": 5, "metadata": {"collapsed": true}, "outputs": [], "source": ["cursor_sqlite.execute(\"select content FROM tw_users\")\n", "\n", "with open(\"tw_users_all.json\", 'w') as f:\n", " ct.count( ct.map( f.write, ct.interpose( \"\\n\", ct.pluck(0, cursor_sqlite ) ) ) )"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On s\u00e9pare ce fichier plat en 50 morceaux."]}, {"cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [{"data": {"text/plain": ["100070"]}, "execution_count": 7, "metadata": {}, "output_type": "execute_result"}], "source": ["from tkinterquickhelper.funcwin.default_functions import file_split\n", "file_split(\"tw_users_all.json\", 50 )"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On cr\u00e9e un objet de type dask.bag pour g\u00e9rer l'ensemble de ces fichiers."]}, {"cell_type": "code", "execution_count": 7, "metadata": {"collapsed": true}, "outputs": [], "source": ["import dask.bag as dbag\n", "try:\n", " import ujson as json\n", "except:\n", " import json\n", "from operator import add\n", "\n", "a = dbag.read_text('tw_users_all.json.split.*.json')"]}, {"cell_type": "markdown", "metadata": {}, "source": ["On va ensuite pouvoir utiliser la syntaxe cytoolz sur l'objet dbag. \n", "dask va alors automatiquement g\u00e9rer la parall\u00e9lisation sur les diff\u00e9rents fichiers."]}, {"cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["1 loop, best of 3: 3.34 s per loop\n"]}, {"data": {"text/plain": ["108086205"]}, "execution_count": 9, "metadata": {}, "output_type": "execute_result"}], "source": ["b = a.map(json.loads).pluck(\"followers_count\").fold(add).compute()\n", "%timeit -n1 b = a.map(json.loads).pluck(\"followers_count\").fold(add).compute()\n", "b"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Attention \u00e0 l'\u00e9tat de votre m\u00e9moire quand vous utilisez dask."]}, {"cell_type": "code", "execution_count": 9, "metadata": {"collapsed": true}, "outputs": [], "source": ["useless = [it**2 for it in range(25000000)]"]}, {"cell_type": "code", "execution_count": 10, "metadata": {"collapsed": true}, "outputs": [], "source": ["#del useless"]}, {"cell_type": "markdown", "metadata": {}, "source": ["De la m\u00eame fa\u00e7on, dask vous permet de parall\u00e9liser efficacement des op\u00e9rations effectu\u00e9es avec numpy."]}, {"cell_type": "code", "execution_count": 11, "metadata": {"collapsed": true}, "outputs": [], "source": ["import numpy\n", "import dask.array as da\n", "\n", "big_random = da.random.normal( 1000, 20000, size = (50000,50000), chunks = (1000,1000) )"]}, {"cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [{"data": {"text/plain": ["1000.1712484654275"]}, "execution_count": 13, "metadata": {}, "output_type": "execute_result"}], "source": ["big_random.mean().compute()"]}, {"cell_type": "markdown", "metadata": {}, "source": ["Si vous avez des donn\u00e9es obtenues de fa\u00e7on classique sur numpy (ici g\u00e9n\u00e9r\u00e9es de fa\u00e7on al\u00e9atoires)"]}, {"cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["0\n", "5\n"]}], "source": ["import numpy as np\n", "import h5py\n", "\n", "for it in range(10):\n", " a = np.random.random(size=(5000,5000))\n", " h5f = h5py.File('data_{0:02d}.h5'.format(it), 'w')\n", " h5f.create_dataset('dataset_1', data=a)\n", " h5f.close()\n", " if it % 5 == 0:\n", " print(it)"]}, {"cell_type": "markdown", "metadata": {}, "source": ["[dask](http://dask.pydata.org/en/latest/) g\u00e8re de fa\u00e7on transparente cet ensemble de matrice comme une seule matrice de dimension sup\u00e9rieure. Ci-dessous ``x`` est une matrice de taille 100 X 5000 X 5000, ce qui ne tiendrait absolument pas en m\u00e9moire."]}, {"cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["(100, 5000, 5000)\n", "48.9364660612\n", "1250010550.97\n"]}], "source": ["from glob import glob\n", "import h5py\n", "import dask.array as da\n", "\n", "files = list(sorted(glob('data_*.h5'))) * 10\n", "dsets = [h5py.File(fn)['dataset_1'] for fn in files]\n", "arrays = [da.from_array(dset, chunks=(1000, 1000)) for dset in dsets]\n", "x = da.stack(arrays, axis=0)\n", "print(x.shape)\n", "print( x[:,0,0].sum().compute() )\n", "print( x[:,:,:].sum().compute() )"]}, {"cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [{"name": "stdout", "output_type": "stream", "text": ["Memory used : 1273 MB\n"]}], "source": ["memory_usage_psutil()"]}, {"cell_type": "code", "execution_count": 16, "metadata": {"collapsed": true}, "outputs": [], "source": []}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4"}}, "nbformat": 4, "nbformat_minor": 2}