.. _pigstreamingrst: ======================================================== PIG et JSON et streaming avec les données vélib - énoncé ======================================================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/pig_hive/pig_streaming.ipynb|*` Manipulation de données JSON en Map/Reduce avec `PIG `__. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Récupération des données ------------------------ Les données ne se présentent pas toujours sous forme de tables. Elles sont parfois non structurées comme c’est le cas des données vélib qui sont décrites au format `JSON `__. Le premier jeu de données est le décompte des vélos et places disponibles toutes les minutes le 11 novembre 2014 entre 22h et 23h à Paris. Ces données ont été accumulées grâce au programme `collect Velib data `__. .. code:: ipython3 import pyensae.datasource %load_ext pyensae %load_ext pyenbc import os, datetime .. code:: ipython3 if not os.path.exists("velib") : os.mkdir("velib") files=pyensae.datasource.download_data("data_velib_paris_2014-11-11_22-23.zip", website="xdtd", whereTo="velib") files[:2] .. parsed-literal:: ['velib\\paris.2014-11-11_22-00-18.331391.txt', 'velib\\paris.2014-11-11_22-01-17.859194.txt'] On regarde un extrait : .. code:: ipython3 with open("velib/paris.2014-11-11_22-00-18.331391.txt","r",encoding="utf-8")as f : text = f.read() text[:300] + "..." .. parsed-literal:: "[{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 0, 18, 628226), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': '..." Les données ont été sauvegardées au format Python si bien que la fonction `eval `__ suffit à les récupérer. .. code:: ipython3 data = eval(text) data[:2] .. parsed-literal:: [{'name': '31705 - CHAMPEAUX (BAGNOLET)', 'available_bikes': 1, 'status': 'OPEN', 'number': 31705, 'lng': 2.416170724425901, 'available_bike_stands': 49, 'contract_name': 'Paris', 'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'lat': 48.8645278209514, 'bike_stands': 50, 'collect_date': datetime.datetime(2014, 11, 11, 22, 0, 18, 628226), 'bonus': 0, 'banking': 0}, {'name': '10042 - POISSONNIÈRE - ENGHIEN', 'available_bikes': 32, 'status': 'OPEN', 'number': 10042, 'lng': 2.348395236282807, 'available_bike_stands': 1, 'contract_name': 'Paris', 'address': "52 RUE D'ENGHIEN / ANGLE RUE DU FAUBOURG POISSONIERE - 75010 PARIS", 'last_update': datetime.datetime(2014, 11, 11, 21, 59, 5), 'lat': 48.87242006305313, 'bike_stands': 33, 'collect_date': datetime.datetime(2014, 11, 11, 22, 0, 18, 628226), 'bonus': 0, 'banking': 0}] Et on peut les rassembler dans un DataFrame : .. code:: ipython3 import pandas df = pandas.DataFrame(data) df.head(n=2) .. raw:: html
address available_bike_stands available_bikes banking bike_stands bonus collect_date contract_name last_update lat lng name number status
0 RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) -... 49 1 0 50 0 2014-11-11 22:00:18.628226 Paris 2014-11-11 21:55:22 48.864528 2.416171 31705 - CHAMPEAUX (BAGNOLET) 31705 OPEN
1 52 RUE D'ENGHIEN / ANGLE RUE DU FAUBOURG POISS... 1 32 0 33 0 2014-11-11 22:00:18.628226 Paris 2014-11-11 21:59:05 48.872420 2.348395 10042 - POISSONNIÈRE - ENGHIEN 10042 OPEN
Chaque ligne donne le nombre de places ``available_bike_stands`` et le nombre de vélos ``available_bikes`` disponibles à la date ``collect_date``. La colone ``last_update`` contient l’heure du dernier retrait ou de la dernière pose de vélo à cette station. Il y plus d’un millier de stations à Paris (voir `carte `__). .. code:: ipython3 df.shape .. parsed-literal:: (1229, 14) .. code:: ipython3 df.plot(x="lng",y="lat",style=".",xlim=[2.20, 2.50]) .. parsed-literal:: .. image:: pig_streaming_13_1.png L’objectif est de calculer des statistiques sur ce petit jeu de données puis de calculer ces mêmes statistiques sur des jeux plus grands. Et on va essayer de ne pas transformer les données localement. Elles vont être uploadés sur le cluster au format JSON. Conversion des données en streaming ----------------------------------- Le format `JSON `__ telle que l’accepte PIG stipule que chaque ligne (ou observation) doit suivre le format ``{ id1: JSON, id2:valeur2, ... }``. Les chaînes de caractères doivent être entourées de guillements ``"``. Il faudrait donc modifier les fichiers précédents de telle sorte qu’ils suivent ce schéma. On pourrait faire quelque chose comme ceci : .. code:: ipython3 import json class DateTimeEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): encoded_object = "%04d-%02d-%02dT%02d:%02d:%02d"% (obj.timetuple()[:6] ) else: encoded_object =json.JSONEncoder.default(self, obj) return encoded_object files = [ os.path.join("velib",_) for _ in os.listdir("velib") if "paris" in _ and _.endswith(".txt") ] for f in files : print("*****",f) with open(f, "r", encoding="utf8") as h: for row in h: js = eval(row) sjs = json.dumps( { "minute":js }, cls = DateTimeEncoder ) # essayer sans le paramètre cls pour # voir l'erreur que cela produit print(sjs [:400] + "...") break .. parsed-literal:: ***** velib\paris.2014-11-11_22-00-18.331391.txt {"minute": [{"name": "31705 - CHAMPEAUX (BAGNOLET)", "available_bikes": 1, "status": "OPEN", "number": 31705, "lng": 2.416170724425901, "available_bike_stands": 49, "contract_name": "Paris", "address": "RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET", "last_update": "2014-11-11T21:55:22", "lat": 48.8645278209514, "bike_stands": 50, "collect_date": "2014-11-11T22:00:18", "bonus": 0, ... Toutefois, cette conversion pourrait tout aussi bien se produire sur le cluster. C’est ce qu’on va implémenter avec l’instruction `STREAM `__. Celle-ci permet d’insérer du code écrit en n’importe quel langage dans un script PIG. Côté python, le **streaming** se résume à l’écriture d’un programme qui écoute l’entrée standard et retourne des résultats sur la sorties standard. Il faut écrire selon une syntaxe acceptée par Python 2 et 3. Ce notebook marche avec Python 3, la version sur le cluster est le plus souvent 2.7. Cela signifie aussi que les modules en local ne sont pas forcément installés sur le cluster. .. code:: ipython3 %%PYTHON stream_json.py import sys, datetime cols = [ _ for _ in sys.argv if ".py" not in _ ] for row in sys.stdin: row = row.strip() if len(row) == 0 : continue js = eval(row) for station in js: vals = [ str(station[c]) for c in cols ] sys.stdout.write(",".join(vals)) sys.stdout.write("\n") sys.stdout.flush() On teste sur quelques exemples que le script fonctionne : .. code:: ipython3 %%runpy stream_json.py name status [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}] [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}] .. raw:: html
    31705 - CHAMPEAUX (BAGNOLET),OPEN
    31705 - CHAMPEAUX (BAGNOLET),OPEN

    
Ca fonctionne. On passe au cluster maintenant. Connexion au cluster -------------------- Dans un premier temps, on prend le cluster `Cloudera `__. Il faut exécuter ce script pour pouvoir notifier au notebook que la variable ``params`` existe. .. code:: ipython3 import pyensae from pyquickhelper.ipythonhelper import open_html_form params={"server":"df...fr", "username":"", "password":""} open_html_form(params=params,title="server + credentials", key_save="params") .. raw:: html
server + credentials
password
server
username
.. code:: ipython3 import pyensae %load_ext pyensae password = params["password"] server = params["server"] username = params["username"] client = %remote_open client .. parsed-literal:: Upload version -------------- On créé un répertoire sur le cluster : .. code:: ipython3 %dfs_mkdir velib_py .. parsed-literal:: ('', '') .. code:: ipython3 %dfs_ls . .. raw:: html
attributes code alias folder size date time name isdir
0 drwx------ - xavierdupre xavierdupre 0 2014-11-21 01:05 .Trash True
1 drwx------ - xavierdupre xavierdupre 0 2014-11-21 01:06 .staging True
2 -rw-r--r-- 3 xavierdupre xavierdupre 132727 2014-11-16 02:37 ConfLongDemo_JSI.small.example.txt False
3 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-16 02:38 ConfLongDemo_JSI.small.example2.walking.txt True
4 -rw-r--r-- 3 xavierdupre xavierdupre 461444 2014-11-20 01:33 paris.2014-11-11_22-00-18.331391.txt False
5 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 23:43 unitest2 True
6 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 22:29 unittest True
7 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:05 unittest2 True
8 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 01:53 velib_1hjs True
9 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:06 velib_py True
.. code:: ipython3 files = [ os.path.join("velib",_) for _ in os.listdir("velib") if "paris" in _ and _.endswith(".txt")] for i,f in enumerate(files[51:]) : if i % 10 == 0 : print(i,"/",len(files),f) filename = os.path.split(f)[-1] f = os.path.abspath(f) client.upload_cluster(f, "velib_py") .. parsed-literal:: 0 / 61 velib\paris.2014-11-11_22-51-17.300775.txt On crée le répertoire de résultats : .. code:: ipython3 %dfs_mkdir velib_py_results .. parsed-literal:: ('', '') PIG, pseudo JSON et streaming ----------------------------- On a créé plus haut le script python capable d’interpréter du python puis d’extraire les informations voulues. On va maintenant dire à PIG d’envoyer chaque ligne qu’il va charger (instruction ``LOAD``) au script python (instruction ``STREAM``). On lui dit de récuper les informations selon un certain format. La seconde ligne ``DEFINE`` définit la commande utilisée. Elle précise également que le script ``pystream.py`` devra être envoyé à toutes les machines du cluster participant au job. Et il faut attention, cette ligne ne comporte pas que des apostrophes, le caractère **’ incliné** est très important. Les premiers essais de streaming sont souvent douloureux. .. code:: ipython3 %%PIG json_velib.pig DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status` SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ; jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray); matrice = STREAM jspy THROUGH pystream AS ( available_bike_stands:chararray, available_bikes:chararray, lat:double, lng:double, name:chararray, status:chararray) ; STORE matrice INTO 'velib_py_results/firstjob' USING PigStorage('\t') ; Pour supprimer les précédents résultats : .. code:: ipython3 if client.dfs_exists("velib_py_results/firstjob"): client.dfs_rm("velib_py_results/firstjob", recursive=True) %dfs_mkdir velib_py_results .. parsed-literal:: ("Moved: 'hdfs://nameservice1/user/xavierdupre/velib_py_results/firstjob' to trash at: hdfs://nameservice1/user/xavierdupre/.Trash/Current\n", '14/11/21 01:50:26 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.\n') On exécute le job : .. code:: ipython3 %pig_submit json_velib.pig stream_json.py -r redirection .. raw:: html

    
.. code:: ipython3 %remote_ls . .. raw:: html
attributes code alias folder size unit name isdir
-rw-rw-r-- 1 xavierdupre xavierdupre 0 Nov 21 01:43 dummy False
1 xavierdupre xavierdupre 650 Nov 21 01:50 json_velib.pig False
1 xavierdupre xavierdupre 523646 Nov 21 01:15 paris.2014-11-11_22-50-17.777867.txt False
1 xavierdupre xavierdupre 3077 Nov 21 01:24 pig_1416529443864.log False
1 xavierdupre xavierdupre 3297 Nov 21 01:37 pig_1416530241713.log False
1 xavierdupre xavierdupre 672 Nov 21 01:43 pystream.pig False
1 xavierdupre xavierdupre 382 Nov 21 01:43 pystream.py False
1 xavierdupre xavierdupre 860 Nov 21 01:50 redirection.err False
1 xavierdupre xavierdupre 0 Nov 21 01:50 redirection.out False
1 xavierdupre xavierdupre 356 Nov 21 01:50 stream_json.py False
S’il se produit des erreurs, il est recommandé d’afficher plus de lignes : .. code:: ipython3 %remote_cmd tail redirection.pig.err .. raw:: html
    Total bytes written : 4611956
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1414491244634_0092


    2014-11-21 01:51:16,166 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
On vérifie que tout s’est bien passé. La taille devrait être équivalent à l’entrée. .. code:: ipython3 %dfs_ls velib_py_results .. raw:: html
attributes code alias folder size date time name isdir
0 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:51 velib_py_results/firstjob True
.. code:: ipython3 if os.path.exists("velib_hd.txt") : os.remove("velib_hd.txt") client.download_cluster("velib_py_results/firstjob","velib_hd.txt", merge=True) .. parsed-literal:: 'velib_py_results/firstjob' .. code:: ipython3 %head velib_hd.txt -n 5 .. raw:: html
    47	3	48.864527821	2.41617072443	31705 - CHAMPEAUX (BAGNOLET)	OPEN
    5	28	48.8724200631	2.34839523628	10042 - POISSONNIÈRE - ENGHIEN	OPEN
    42	1	48.8821489456	2.31986005477	08020 - METRO ROME	OPEN
    5	31	48.8682170168	2.3304935114	01022 - RUE DE LA PAIX	OPEN
    20	5	48.8932686647	2.41271573339	35014 - DE GAULLE (PANTIN)	OPEN

    
.. code:: ipython3 import pandas df = pandas.read_csv("velib_hd.txt", sep="\t",names=["available_bike_stands","available_bikes","lat","lng","name","status"]) df.head() .. raw:: html
available_bike_stands available_bikes lat lng name status
0 47 3 48.864528 2.416171 31705 - CHAMPEAUX (BAGNOLET) OPEN
1 5 28 48.872420 2.348395 10042 - POISSONNIÈRE - ENGHIEN OPEN
2 42 1 48.882149 2.319860 08020 - METRO ROME OPEN
3 5 31 48.868217 2.330494 01022 - RUE DE LA PAIX OPEN
4 20 5 48.893269 2.412716 35014 - DE GAULLE (PANTIN) OPEN
.. code:: ipython3 df.shape .. parsed-literal:: (73740, 6) Exercice 1 : convertir les valeurs numériques --------------------------------------------- Exercice 2 : stations fermées ----------------------------- Les stations fermées ne le sont pas tout le temps. On veut calculer le ratio vélo/minute/station fermées / total des vélo/minute/station. Exercice 3 : stations fermées, journée complète ----------------------------------------------- Appliquer cela à une journée complète. Exercice 4 : astuces -------------------- Les erreurs de PIG ne sont pas très explicite surtout si elles se produisent dans le script python. Un moyen simple de débugger est d’attraper les exceptions produites par python et de les récupérer sous PIG (le reste du job est enlevé). On peut tout-à-fait imaginer ajouter la version de python installée sur le cluster ainsi que la liste de modules