Parallélisation de la récupération de fichiers de données depuis wikipédia.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
On peut paralléliser le téléchargement de différentes façons :
La page ParallelProcessing recense des modules qui implémente cela mais elle n'est pas très à jour. Il faut vérifier si les modules proposés sont encore maintenus.
import threading, time, os
from datetime import datetime, timedelta
from mlstatpy.data.wikipedia import download_pageviews
folder = "d:\\wikipv"
if not os.path.exists(folder):
os.mkdir(folder)
class DownloadThread(threading.Thread) :
"""thread definition, it downloads a stream one after another
until a queue is empty"""
def __init__ (self, qu, name, folder) :
threading.Thread.__init__ (self)
self.qu = qu
self.name = name
self.folder = folder
def run (self) :
while not self.qu.empty():
date = self.qu.get(False)
if date is None:
break
print(self.name, "download", date, "len(qu)", self.qu.qsize())
try:
download_pageviews(date, folder=self.folder)
except Exception as e:
print("skipping dt", dt, "rerun to get it", e)
# On doit le faire à chaque fois.
self.qu.task_done()
# on créé des files et les threads associés
import queue
queues = [queue.Queue() for i in range(0, 3)]
m = [DownloadThread(q, "thread %d" % i, folder) for i, q in enumerate(queues)]
# on remplit les files
dt = datetime.now() - timedelta(15)
hour = timedelta(hours=1)
for h in range(0, 24*7):
queues[h%3].put(dt)
dt += hour
# on démarre les threads
for t in m:
t.start()
# on attend qu'elles se vident
for i, q in enumerate(queues):
print("attendre file", i, [q.qsize() for q in queues])
q.join()
# On ne peut pas utiliser quelque chose comme ceci :
while not q.empty():
time.sleep(1)
# Le programme s'arrête dès que les files sont vides.
# Ceci arrive avec l'instruction q.get()
# avant que le téléchargement soit fini.
# Le programme s'arrête et interrompt les threads en cours.
thread 0 download 2016-08-28 05:27:45.899868 len(qu) 55 thread 0 download 2016-08-28 08:27:45.899868 len(qu) 54 thread 0 download 2016-08-28 11:27:45.899868 len(qu) 53 thread 0 download 2016-08-28 14:27:45.899868 len(qu) 52 thread 0 download 2016-08-28 17:27:45.899868 len(qu) 51 thread 0 download 2016-08-28 20:27:45.899868 len(qu) 50 thread 0 download 2016-08-28 23:27:45.899868 len(qu) 49 thread 0 download 2016-08-29 02:27:45.899868 len(qu) 48 thread 0 download 2016-08-29 05:27:45.899868 len(qu) 47 thread 1 download 2016-08-28 06:27:45.899868 len(qu) 55 thread 0 download 2016-08-29 08:27:45.899868 len(qu) 46 thread 1 download 2016-08-28 09:27:45.899868 len(qu) 54 thread 1 download 2016-08-28 12:27:45.899868 len(qu) 53 thread 0 download 2016-08-29 11:27:45.899868 len(qu) 45 thread 1 download 2016-08-28 15:27:45.899868 len(qu) 52 thread 1 download 2016-08-28 18:27:45.899868 len(qu) 51 thread 1 download 2016-08-28 21:27:45.899868 len(qu) 50 thread 1 download 2016-08-29 00:27:45.899868 len(qu) 49 thread 1 download 2016-08-29 03:27:45.899868 len(qu) 48 thread 1 download 2016-08-29 06:27:45.899868 len(qu) 47 thread 1 download 2016-08-29 09:27:45.899868 len(qu) 46 thread 1 download 2016-08-29 12:27:45.899868 len(qu) 45 thread 0 download 2016-08-29 14:27:45.899868 len(qu) 44 thread 2 download 2016-08-28 07:27:45.899868 len(qu) 55 thread 2 download 2016-08-28 10:27:45.899868 len(qu) 54 thread 1 download 2016-08-29 15:27:45.899868 len(qu) 44 thread 1 download 2016-08-29 18:27:45.899868 len(qu) 43 thread 2 download 2016-08-28 13:27:45.899868 len(qu) 53 thread 1 download 2016-08-29 21:27:45.899868 len(qu) 42 thread 2 download 2016-08-28 16:27:45.899868 len(qu) 52 thread 1 download 2016-08-30 00:27:45.899868 len(qu) 41 thread 1 download 2016-08-30 03:27:45.899868 len(qu) 40 thread 2 download 2016-08-28 19:27:45.899868 len(qu) 51 attendre file 0 [44, 40, 51] thread 0 download 2016-08-29 17:27:45.899868 len(qu) 43 thread 1 download 2016-08-30 06:27:45.899868 len(qu) 39 thread 0 download 2016-08-29 20:27:45.899868 len(qu) 42 thread 0 download 2016-08-29 23:27:45.899868 len(qu) 41 thread 0 download 2016-08-30 02:27:45.899868 len(qu) 40 thread 0 download 2016-08-30 05:27:45.899868 len(qu) 39 thread 0 download 2016-08-30 08:27:45.899868 len(qu) 38 thread 1 download 2016-08-30 09:27:45.899868 len(qu) 38 thread 0 download 2016-08-30 11:27:45.899868 len(qu) 37 thread 0 download 2016-08-30 14:27:45.899868 len(qu) 36 thread 1 download 2016-08-30 12:27:45.899868 len(qu) 37 thread 0 download 2016-08-30 17:27:45.899868 len(qu) 35 thread 1 download 2016-08-30 15:27:45.899868 len(qu) 36 thread 0 download 2016-08-30 20:27:45.899868 len(qu) 34 thread 1 download 2016-08-30 18:27:45.899868 len(qu) 35 thread 0 download 2016-08-30 23:27:45.899868 len(qu) 33 thread 1 download 2016-08-30 21:27:45.899868 len(qu) 34 thread 0 download 2016-08-31 02:27:45.899868 len(qu) 32 thread 1 download 2016-08-31 00:27:45.899868 len(qu) 33 thread 0 download 2016-08-31 05:27:45.899868 len(qu) 31 thread 1 download 2016-08-31 03:27:45.899868 len(qu) 32 thread 0 download 2016-08-31 08:27:45.899868 len(qu) 30 thread 1 download 2016-08-31 06:27:45.899868 len(qu) 31 thread 0 download 2016-08-31 11:27:45.899868 len(qu) 29 thread 1 download 2016-08-31 09:27:45.899868 len(qu) 30 thread 0 download 2016-08-31 14:27:45.899868 len(qu) 28 thread 1 download 2016-08-31 12:27:45.899868 len(qu) 29 thread 0 download 2016-08-31 17:27:45.899868 len(qu) 27 thread 1 download 2016-08-31 15:27:45.899868 len(qu) 28 thread 1 download 2016-08-31 18:27:45.899868 len(qu) 27 thread 0 download 2016-08-31 20:27:45.899868 len(qu) 26 thread 2 download 2016-08-28 22:27:45.899868 len(qu) 50 thread 1 download 2016-08-31 21:27:45.899868 len(qu) 26 thread 2 download 2016-08-29 01:27:45.899868 len(qu) 49 thread 0 download 2016-08-31 23:27:45.899868 len(qu) 25 thread 1 download 2016-09-01 00:27:45.899868 len(qu) 25 thread 0 download 2016-09-01 02:27:45.899868 len(qu) 24 thread 2 download 2016-08-29 04:27:45.899868 len(qu) 48 thread 0 download 2016-09-01 05:27:45.899868 len(qu) 23 thread 2 download 2016-08-29 07:27:45.899868 len(qu) 47 thread 0 download 2016-09-01 08:27:45.899868 len(qu) 22 thread 2 download 2016-08-29 10:27:45.899868 len(qu) 46 thread 0 download 2016-09-01 11:27:45.899868 len(qu) 21 thread 0 download 2016-09-01 14:27:45.899868 len(qu) 20 thread 1 download 2016-09-01 03:27:45.899868 len(qu) 24 thread 1 download 2016-09-01 06:27:45.899868 len(qu) 23 thread 1 download 2016-09-01 09:27:45.899868 len(qu) 22 thread 2 download 2016-08-29 13:27:45.899868 len(qu) 45 thread 2 download 2016-08-29 16:27:45.899868 len(qu) 44 thread 2 download 2016-08-29 19:27:45.899868 len(qu) 43 thread 2 download 2016-08-29 22:27:45.899868 len(qu) 42 thread 2 download 2016-08-30 01:27:45.899868 len(qu) 41 thread 2 download 2016-08-30 04:27:45.899868 len(qu) 40 thread 2 download 2016-08-30 07:27:45.899868 len(qu) 39 thread 2 download 2016-08-30 10:27:45.899868 len(qu) 38 thread 2 download 2016-08-30 13:27:45.899868 len(qu) 37 thread 2 download 2016-08-30 16:27:45.899868 len(qu) 36 thread 2 download 2016-08-30 19:27:45.899868 len(qu) 35 thread 2 download 2016-08-30 22:27:45.899868 len(qu) 34 thread 2 download 2016-08-31 01:27:45.899868 len(qu) 33 thread 2 download 2016-08-31 04:27:45.899868 len(qu) 32 thread 2 download 2016-08-31 07:27:45.899868 len(qu) 31 thread 2 download 2016-08-31 10:27:45.899868 len(qu) 30 thread 1 download 2016-09-01 12:27:45.899868 len(qu) 21 thread 2 download 2016-08-31 13:27:45.899868 len(qu) 29 thread 0 download 2016-09-01 17:27:45.899868 len(qu) 19 thread 1 download 2016-09-01 15:27:45.899868 len(qu) 20 thread 0 download 2016-09-01 20:27:45.899868 len(qu) 18 thread 2 download 2016-08-31 16:27:45.899868 len(qu) 28 thread 0 download 2016-09-01 23:27:45.899868 len(qu) 17 thread 1 download 2016-09-01 18:27:45.899868 len(qu) 19 thread 0 download 2016-09-02 02:27:45.899868 len(qu) 16 thread 1 download 2016-09-01 21:27:45.899868 len(qu) 18 thread 1 download 2016-09-02 00:27:45.899868 len(qu) 17 thread 1 download 2016-09-02 03:27:45.899868 len(qu) 16 thread 2 download 2016-08-31 19:27:45.899868 len(qu) 27 thread 0 download 2016-09-02 05:27:45.899868 len(qu) 15 thread 0 download 2016-09-02 08:27:45.899868 len(qu) 14 thread 0 download 2016-09-02 11:27:45.899868 len(qu) 13 thread 0 download 2016-09-02 14:27:45.899868 len(qu) 12 thread 2 download 2016-08-31 22:27:45.899868 len(qu) 26 thread 0 download 2016-09-02 17:27:45.899868 len(qu) 11 thread 2 download 2016-09-01 01:27:45.899868 len(qu) 25 thread 0 download 2016-09-02 20:27:45.899868 len(qu) 10 thread 2 download 2016-09-01 04:27:45.899868 len(qu) 24 thread 2 download 2016-09-01 07:27:45.899868 len(qu) 23 thread 1 download 2016-09-02 06:27:45.899868 len(qu) 15 thread 1 download 2016-09-02 09:27:45.899868 len(qu) 14 thread 1 download 2016-09-02 12:27:45.899868 len(qu) 13 thread 0 download 2016-09-02 23:27:45.899868 len(qu) 9 thread 0 download 2016-09-03 02:27:45.899868 len(qu) 8 thread 2 download 2016-09-01 10:27:45.899868 len(qu) 22 thread 1 download 2016-09-02 15:27:45.899868 len(qu) 12 thread 2 download 2016-09-01 13:27:45.899868 len(qu) 21 thread 0 download 2016-09-03 05:27:45.899868 len(qu) 7 thread 2 download 2016-09-01 16:27:45.899868 len(qu) 20 thread 1 download 2016-09-02 18:27:45.899868 len(qu) 11 thread 2 download 2016-09-01 19:27:45.899868 len(qu) 19 thread 0 download 2016-09-03 08:27:45.899868 len(qu) 6 thread 1 download 2016-09-02 21:27:45.899868 len(qu) 10 thread 2 download 2016-09-01 22:27:45.899868 len(qu) 18 thread 0 download 2016-09-03 11:27:45.899868 len(qu) 5 thread 1 download 2016-09-03 00:27:45.899868 len(qu) 9 thread 2 download 2016-09-02 01:27:45.899868 len(qu) 17 thread 1 download 2016-09-03 03:27:45.899868 len(qu) 8 thread 2 download 2016-09-02 04:27:45.899868 len(qu) 16 thread 1 download 2016-09-03 06:27:45.899868 len(qu) 7 thread 1 download 2016-09-03 09:27:45.899868 len(qu) 6 thread 2 download 2016-09-02 07:27:45.899868 len(qu) 15 thread 2 download 2016-09-02 10:27:45.899868 len(qu) 14 thread 1 download 2016-09-03 12:27:45.899868 len(qu) 5 thread 2 download 2016-09-02 13:27:45.899868 len(qu) 13 thread 2 download 2016-09-02 16:27:45.899868 len(qu) 12 thread 2 download 2016-09-02 19:27:45.899868 len(qu) 11 thread 2 download 2016-09-02 22:27:45.899868 len(qu) 10 thread 2 download 2016-09-03 01:27:45.899868 len(qu) 9 thread 2 download 2016-09-03 04:27:45.899868 len(qu) 8 thread 2 download 2016-09-03 07:27:45.899868 len(qu) 7 thread 2 download 2016-09-03 10:27:45.899868 len(qu) 6 skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device thread 2 download 2016-09-03 13:27:45.899868 len(qu) 5 skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device thread 0 download 2016-09-03 14:27:45.899868 len(qu) 4 skipping dt 2016-09-04 05:27:45.899868 rerun to get it [Errno 28] No space left on device thread 1 download 2016-09-03 15:27:45.899868 len(qu) 4 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-130000.gz, exc=[Errno 28] No space left on device thread 2 download 2016-09-03 16:27:45.899868 len(qu) 4 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-140000.gz, exc=[Errno 28] No space left on device thread 0 download 2016-09-03 17:27:45.899868 len(qu) 3 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-150000.gz, exc=[Errno 28] No space left on device thread 1 download 2016-09-03 18:27:45.899868 len(qu) 3 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-160000.gz, exc=[Errno 28] No space left on device thread 2 download 2016-09-03 19:27:45.899868 len(qu) 3 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-170000.gz, exc=[Errno 28] No space left on device thread 0 download 2016-09-03 20:27:45.899868 len(qu) 2 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-180000.gz, exc=[Errno 28] No space left on device thread 1 download 2016-09-03 21:27:45.899868 len(qu) 2 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-190000.gz, exc=[Errno 28] No space left on device thread 2 download 2016-09-03 22:27:45.899868 len(qu) 2 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-200000.gz, exc=[Errno 28] No space left on device thread 0 download 2016-09-03 23:27:45.899868 len(qu) 1 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-210000.gz, exc=[Errno 28] No space left on device thread 1 download 2016-09-04 00:27:45.899868 len(qu) 1 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-220000.gz, exc=[Errno 28] No space left on device thread 2 download 2016-09-04 01:27:45.899868 len(qu) 1 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160903-230000.gz, exc=[Errno 28] No space left on device thread 0 download 2016-09-04 02:27:45.899868 len(qu) 0 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-000000.gz, exc=[Errno 28] No space left on device thread 1 download 2016-09-04 03:27:45.899868 len(qu) 0 attendre file 1 [0, 0, 1] attendre file 2 [0, 0, 1] skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-010000.gz, exc=[Errno 28] No space left on device thread 2 download 2016-09-04 04:27:45.899868 len(qu) 0 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-020000.gz, exc=[Errno 28] No space left on device done thread 0 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-030000.gz, exc=[Errno 28] No space left on device done thread 1 skipping dt 2016-09-04 05:27:45.899868 rerun to get it unable to retrieve content, url=https://dumps.wikimedia.org/other/pageviews/2016/2016-09/pageviews-20160904-040000.gz, exc=[Errno 28] No space left on device done thread 2
Il n'est pas toujours évident de comprendre ce qu'il se passe quand l'erreur se produit dans un processus différent. Si on change le backend pour "threading"
, l'erreur devient visible. Voir Parallel. Le code ne fonctionne pas toujours lorsque n_jobs > 1
sous Windows et que le backend est celui par défaut (processus). Lire Embarrassingly Parallel For Loops.
from joblib import Parallel, delayed
from datetime import datetime, timedelta
import os
folder = "d:\\wikipv"
if not os.path.exists(folder):
os.mkdir(folder)
# on remplit les files
dt = datetime.now() - timedelta(14)
hour = timedelta(hours=1)
dates = [dt + hour*i for i in range(0,24)]
def downloadp2(dt, folder):
from mlstatpy.data.wikipedia import download_pageviews
download_pageviews(dt, folder=folder)
# L'instruction ne marche pas depuis un notebook lorsque le backend est "muliprocessing".
# Dans ce cas, il faut exécuter un programme.
if __name__ == "__main__":
Parallel(n_jobs=3, verbose=5)(delayed(downloadp2)(dt, folder) for dt in dates)
def filtre(input, country):
import os
print(input)
output = input + "." + country
if not os.path.exists(output):
with open(input, "r", encoding="utf-8") as f:
with open(output, "w", encoding="utf-8") as g:
for line in f:
if line.startswith(country):
g.write(line)
import os
from joblib import Parallel, delayed
folder = "wikipv"
files = os.listdir(folder)
files = [os.path.join(folder, _) for _ in files if _.startswith("pageviews") and _.endswith("0000")]
Parallel(n_jobs=3, verbose=5, backend="threading")(delayed(filtre)(name, "fr") for name in files)
wikipv\pageviews-20160827-210000wikipv\pageviews-20160827-220000 wikipv\pageviews-20160827-230000 wikipv\pageviews-20160828-000000 wikipv\pageviews-20160828-010000 wikipv\pageviews-20160828-020000 wikipv\pageviews-20160828-030000 wikipv\pageviews-20160828-040000 wikipv\pageviews-20160828-050000 wikipv\pageviews-20160828-060000 wikipv\pageviews-20160828-070000 wikipv\pageviews-20160828-080000 wikipv\pageviews-20160828-090000 wikipv\pageviews-20160828-100000 wikipv\pageviews-20160828-110000
[Parallel(n_jobs=3)]: Done 12 tasks | elapsed: 53.4s
wikipv\pageviews-20160828-120000 wikipv\pageviews-20160828-130000 wikipv\pageviews-20160828-140000 wikipv\pageviews-20160828-150000 wikipv\pageviews-20160828-160000 wikipv\pageviews-20160828-170000 wikipv\pageviews-20160828-180000 wikipv\pageviews-20160828-190000 wikipv\pageviews-20160828-200000 wikipv\pageviews-20160828-210000 wikipv\pageviews-20160828-220000 wikipv\pageviews-20160828-230000 wikipv\pageviews-20160829-000000 wikipv\pageviews-20160829-010000 wikipv\pageviews-20160829-020000 wikipv\pageviews-20160829-030000 wikipv\pageviews-20160829-040000 wikipv\pageviews-20160829-050000 wikipv\pageviews-20160829-060000 wikipv\pageviews-20160829-070000 wikipv\pageviews-20160829-080000
import pandas
df = pandas.read