# Statistiques Wikipedia - énoncé

Parallélisation de la récupération de fichiers de données depuis wikipédia.

In [1]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

## Exercice 1 : parallélisation du téléchargement

On peut paralléliser le téléchargement de différentes façons :

* avec des [threads](https://en.wikipedia.org/wiki/Thread_(computing)) (librairie [threading](https://docs.python.org/3/library/threading.html) : synchronisation rapide mais parfois délicate et mémoire partagée entre threads
* avec des [processus](https://fr.wikipedia.org/wiki/Processus_(informatique)) (librairie [multiprocessing](https://docs.python.org/3.5/library/multiprocessing.html), [joblib](https://pythonhosted.org/joblib/), [jupyter](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td2a_cenonce_session_2D.html) : synchronisation lente, pas de mémoire partagée
* avec un [cluster](https://fr.wikipedia.org/wiki/Grappe_de_serveurs), [jupyter](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx/notebooks/td2a_cenonce_session_2D.html) : synchronisation lente, pas de mémoire partagée, parallélisme en grande dimension

La page [ParallelProcessing](https://wiki.python.org/moin/ParallelProcessing) recense des modules qui implémente cela mais elle n'est pas très à jour. Il faut vérifier si les modules proposés sont encore maintenus.

### Approche avec des threads

In [2]:
import threading, time, os
from datetime import datetime, timedelta
from mlstatpy.data.wikipedia import download_pageviews
folder = "d:\\wikipv"
if not os.path.exists(folder):
    os.mkdir(folder)

class DownloadThread(threading.Thread) :
    """thread definition, it downloads a stream one after another
    until a queue is empty"""
    def __init__ (self, qu, name, folder) :
        threading.Thread.__init__ (self)
        self.qu = qu
        self.name = name
        self.folder = folder
       
    def run (self) :
        while not self.qu.empty():
            date = self.qu.get(False)
            if date is None:
                break
            print(self.name, "download", date, "len(qu)", self.qu.qsize())
            try:
                download_pageviews(date, folder=self.folder)
            except Exception as e:
                print("skipping dt", dt, "rerun to get it", e)
            # On doit le faire à chaque fois.
            self.qu.task_done()
            
# on créé des files et les threads associés
import queue       
queues = [queue.Queue() for i in range(0, 3)]
m = [DownloadThread(q, "thread %d" % i, folder) for i, q in enumerate(queues)]

# on remplit les files
dt = datetime.now() - timedelta(15)
hour = timedelta(hours=1)
for h in range(0, 24*7):
    queues[h%3].put(dt)
    dt += hour
    
# on démarre les threads
for t in m:
    t.start()
    
# on attend qu'elles se vident
for i, q in enumerate(queues):
    print("attendre file", i, [q.qsize() for q in queues])
    q.join()
    
    # On ne peut pas utiliser quelque chose comme ceci :
    while not q.empty():
        time.sleep(1)
    # Le programme s'arrête dès que les files sont vides.
    # Ceci arrive avec l'instruction q.get()
    # avant que le téléchargement soit fini.
    # Le programme s'arrête et interrompt les threads en cours.

thread 0 download 2016-08-28 05:27:45.899868 len(qu) 55
thread 0 download 2016-08-28 08:27:45.899868 len(qu) 54
thread 0 download 2016-08-28 11:27:45.899868 len(qu) 53
thread 0 download 2016-08-28 14:27:45.899868 len(qu) 52
thread 0 download 2016-08-28 17:27:45.899868 len(qu) 51
thread 0 download 2016-08-28 20:27:45.899868 len(qu) 50
thread 0 download 2016-08-28 23:27:45.899868 len(qu) 49
thread 0 download 2016-08-29 02:27:45.899868 len(qu) 48
thread 0 download 2016-08-29 05:27:45.899868 len(qu) 47
thread 1 download 2016-08-28 06:27:45.899868 len(qu) 55
thread 0 download 2016-08-29 08:27:45.899868 len(qu) 46
thread 1 download 2016-08-28 09:27:45.899868 len(qu) 54
thread 1 download 2016-08-28 12:27:45.899868 len(qu) 53
thread 0 download 2016-08-29 11:27:45.899868 len(qu) 45
thread 1 download 2016-08-28 15:27:45.899868 len(qu) 52
thread 1 download 2016-08-28 18:27:45.899868 len(qu) 51
thread 1 download 2016-08-28 21:27:45.899868 len(qu) 50
thread 1 download 2016-08-29 00:27:45.899868 len

### Parallélisation avec des processus

Il n'est pas toujours évident de comprendre ce qu'il se passe quand l'erreur se produit dans un processus différent. Si on change le *backend* pour ``"threading"``, l'erreur devient visible. Voir [Parallel](https://pythonhosted.org/joblib/generated/joblib.Parallel.html?highlight=parallel). Le code ne fonctionne pas toujours lorsque ``n_jobs > 1`` sous Windows et que le backend est celui par défaut (processus). Lire [Embarrassingly Parallel For Loops](https://pythonhosted.org/joblib/parallel.html#embarrassingly-parallel-for-loops).

In [3]:
from joblib import Parallel, delayed
from datetime import datetime, timedelta
import os
folder = "d:\\wikipv"
if not os.path.exists(folder):
    os.mkdir(folder)
    
# on remplit les files
dt = datetime.now() - timedelta(14)
hour = timedelta(hours=1)
dates = [dt + hour*i for i in range(0,24)]
    
def downloadp2(dt, folder):
    from mlstatpy.data.wikipedia import download_pageviews
    download_pageviews(dt, folder=folder)

# L'instruction ne marche pas depuis un notebook lorsque le backend est "muliprocessing".
# Dans ce cas, il faut exécuter un programme.
if __name__ == "__main__":
    Parallel(n_jobs=3, verbose=5)(delayed(downloadp2)(dt, folder) for dt in dates)

## Filtrage pour ne garder que les lignes avec fr

In [4]:
def filtre(input, country):
    import os
    print(input)
    output = input + "." + country
    if not os.path.exists(output):
        with open(input, "r", encoding="utf-8") as f:
            with open(output, "w", encoding="utf-8") as g:
                for line in f:
                    if line.startswith(country):
                        g.write(line)

import os
from joblib import Parallel, delayed
folder = "wikipv"
files = os.listdir(folder)  
files = [os.path.join(folder, _) for _ in files if _.startswith("pageviews") and _.endswith("0000")]

Parallel(n_jobs=3, verbose=5, backend="threading")(delayed(filtre)(name, "fr") for name in files)

wikipv\pageviews-20160827-210000wikipv\pageviews-20160827-220000
wikipv\pageviews-20160827-230000

wikipv\pageviews-20160828-000000
wikipv\pageviews-20160828-010000
wikipv\pageviews-20160828-020000
wikipv\pageviews-20160828-030000
wikipv\pageviews-20160828-040000
wikipv\pageviews-20160828-050000
wikipv\pageviews-20160828-060000
wikipv\pageviews-20160828-070000
wikipv\pageviews-20160828-080000
wikipv\pageviews-20160828-090000
wikipv\pageviews-20160828-100000
wikipv\pageviews-20160828-110000


[Parallel(n_jobs=3)]: Done  12 tasks      | elapsed:   53.4s


wikipv\pageviews-20160828-120000
wikipv\pageviews-20160828-130000
wikipv\pageviews-20160828-140000
wikipv\pageviews-20160828-150000
wikipv\pageviews-20160828-160000
wikipv\pageviews-20160828-170000
wikipv\pageviews-20160828-180000
wikipv\pageviews-20160828-190000
wikipv\pageviews-20160828-200000
wikipv\pageviews-20160828-210000
wikipv\pageviews-20160828-220000
wikipv\pageviews-20160828-230000
wikipv\pageviews-20160829-000000
wikipv\pageviews-20160829-010000
wikipv\pageviews-20160829-020000
wikipv\pageviews-20160829-030000
wikipv\pageviews-20160829-040000
wikipv\pageviews-20160829-050000
wikipv\pageviews-20160829-060000
wikipv\pageviews-20160829-070000
wikipv\pageviews-20160829-080000


## Insérer le fichier dans une base de données SQL

In [5]:
import pandas
df = pandas.read