.. _pigstreamingclouderacorrectionrst: ========================================================================== PIG et JSON et streaming avec les données vélib - correction avec cloudera ========================================================================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/pig_hive/pig_streaming_cloudera_correction.ipynb|*` Correction. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Récupération des données ------------------------ Le code suivant télécharge les données nécessaires `data_velib_paris_2014-11-11_22-23.zip `__. .. code:: ipython3 import pyensae.datasource import os, datetime if not os.path.exists("velib"): os.mkdir("velib") files=pyensae.datasource.download_data("data_velib_paris_2014-11-11_22-23.zip", website="xdtd", whereTo="velib") files[:2] .. parsed-literal:: ['velib\\paris.2014-11-11_22-00-18.331391.txt', 'velib\\paris.2014-11-11_22-01-17.859194.txt'] Connexion au cluster -------------------- .. code:: ipython3 import pyensae %load_ext pyensae %load_ext pyenbc from pyquickhelper.ipythonhelper import open_html_form params={"server":"df...fr", "username":"", "password":""} open_html_form(params=params,title="server + credentials", key_save="params") .. raw:: html
server + credentials
password
server
username
.. code:: ipython3 import pyensae %load_ext pyensae password = params["password"] server = params["server"] username = params["username"] client = %remote_open client .. parsed-literal:: Exercice 1 : convertir les valeurs numériques --------------------------------------------- Le programme suivant prend comme argument les colonnes à extraire des fichiers textes qui sont enregistrés au format “python”. .. code:: ipython3 %%PYTHON stream_json.py import sys, datetime cols = [ _ for _ in sys.argv if ".py" not in _ ] for row in sys.stdin: row = row.strip() if len(row) == 0 : continue js = eval(row) for station in js: vals = [ str(station[c]) for c in cols ] sys.stdout.write(",".join(vals)) sys.stdout.write("\n") sys.stdout.flush() On vérifie qu’il fonctionne en lui faisant ingérer les deux lignes suivantes : .. code:: ipython3 %%runpy stream_json.py name status [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}] [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}] .. raw:: html
    31705 - CHAMPEAUX (BAGNOLET),OPEN
    31705 - CHAMPEAUX (BAGNOLET),OPEN

    
On écrit le script PIG qui utilise plus de colonnes : .. code:: ipython3 %%PIG json_velib.pig DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status` SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ; jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray); matrice = STREAM jspy THROUGH pystream AS ( available_bike_stands:int, available_bikes:int, lat:double, lng:double, name:chararray, status:chararray) ; STORE matrice INTO 'velib_py_results/firstjob' USING PigStorage('\t') ; On supprime la précédente exécution si besoin puis on vérifie que le répertoire contenant les résultats est vide : .. code:: ipython3 if client.dfs_exists("velib_py_results/firstjob"): client.dfs_rm("velib_py_results/firstjob", recursive=True) %dfs_mkdir velib_py_results .. parsed-literal:: ("Moved: 'hdfs://nameservice1/user/xavierdupre/velib_py_results/firstjob' to trash at: hdfs://nameservice1/user/xavierdupre/.Trash/Current\n", '14/11/23 20:35:18 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.\n') .. code:: ipython3 %dfs_ls velib_py_results/ .. raw:: html
attributes code alias folder size date time name
.. code:: ipython3 %pig_submit json_velib.pig stream_json.py -r redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail redirection.pig.err .. raw:: html
    Total bytes written : 4611956
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1414491244634_0119


    2014-11-23 20:36:50,180 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
On récupère les informations qu’on affiche sous forme de dataframe : .. code:: ipython3 if os.path.exists("velib_exo1.txt") : os.remove("velib_exo1.txt") client.download_cluster("velib_py_results/firstjob","velib_exo1.txt", merge=True) .. parsed-literal:: 'velib_py_results/firstjob' .. code:: ipython3 import pandas df = pandas.read_csv("velib_hd.txt", sep="\t",names=["available_bike_stands","available_bikes","lat","lng","name","status"]) df.head() .. raw:: html
available_bike_stands available_bikes lat lng name status
0 47 3 48.864528 2.416171 31705 - CHAMPEAUX (BAGNOLET) OPEN
1 5 28 48.872420 2.348395 10042 - POISSONNIÈRE - ENGHIEN OPEN
2 42 1 48.882149 2.319860 08020 - METRO ROME OPEN
3 5 31 48.868217 2.330494 01022 - RUE DE LA PAIX OPEN
4 20 5 48.893269 2.412716 35014 - DE GAULLE (PANTIN) OPEN
Exercice 2 : stations fermées ----------------------------- Les stations fermées ne le sont pas tout le temps. On veut calculer le ratio minutes/stations fermées / total des minutes/stations. Le script python permettant de lire les données ne change pas, il suffit juste de déclarer les sorties numériques comme telles. Quelques détails sur les tables : - ``jspy`` : contient les données brutes dans une liste de fichiers - ``matrice`` : d’après le job qui précède, la table contient une ligne par stations et par minute, chaque ligne décrit le status de la station - ``grstation`` : table ``matrice`` groupée par status - ``fermees`` : pour chaque groupe, on aggrégé le nombre de minutes multipliés par le nombre de vélos - ``gr*,dist*`` : distribution du nombre de stations (Y) en fonction du nombre de vélos ou places disponibles En cas d’exécution précédentes : .. code:: ipython3 %dfs_rmr velib_py_results/fermees.txt %dfs_rmr velib_py_results/distribution_bikes.txt %dfs_rmr velib_py_results/distribution_stands.txt .. parsed-literal:: ("Moved: 'hdfs://nameservice1/user/xavierdupre/velib_py_results/distribution_stands.txt' to trash at: hdfs://nameservice1/user/xavierdupre/.Trash/Current\n", '14/11/23 21:33:49 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.\n') .. code:: ipython3 %%PIG json_fermee.pig DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status` SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ; jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray); matrice = STREAM jspy THROUGH pystream AS ( available_bike_stands:int, available_bikes:int, lat:double, lng:double, name:chararray, status:chararray) ; grstation = GROUP matrice BY status ; fermees = FOREACH grstation GENERATE group ,SUM(matrice.available_bikes) AS available_bikes ,SUM(matrice.available_bike_stands) AS available_bike_stands ; gr_av = GROUP matrice BY available_bikes ; dist_av = FOREACH gr_av GENERATE group, COUNT(matrice) ; gr_pl = GROUP matrice BY available_bike_stands ; dist_pl = FOREACH gr_pl GENERATE group, COUNT(matrice) ; STORE fermees INTO 'velib_py_results/fermees.txt' USING PigStorage('\t') ; STORE dist_av INTO 'velib_py_results/distribution_bikes.txt' USING PigStorage('\t') ; STORE dist_pl INTO 'velib_py_results/distribution_stands.txt' USING PigStorage('\t') ; .. code:: ipython3 %pig_submit json_fermee.pig stream_json.py -r redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail redirection.pig.err .. raw:: html
    Total bytes written : 1009
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1414491244634_0124


    2014-11-23 21:35:28,238 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
.. code:: ipython3 %dfs_ls velib_py_results .. raw:: html
attributes code alias folder size date time name isdir
0 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 20:54 velib_py_results/distribution_bikes.txt True
1 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 20:54 velib_py_results/distribution_stands.txt True
2 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 20:36 velib_py_results/firstjob True
.. code:: ipython3 if os.path.exists("distribution_bikes.txt") : os.remove("distribution_bikes.txt") %remote_down_cluster_merge velib_py_results/distribution_bikes.txt distribution_bikes.txt .. parsed-literal:: 'distribution_bikes.txt' .. code:: ipython3 import pandas df = pandas.read_csv("distribution_bikes.txt", sep="\t", names=["nb_velos", "nb_stations_minutes"]) df.head() .. raw:: html
nb_velos nb_stations_minutes
0 0 8445
1 1 6589
2 2 4825
3 3 3793
4 4 2839
.. code:: ipython3 df.plot(x="nb_velos",y="nb_stations_minutes",kind="bar",figsize=(16,4)) .. parsed-literal:: .. image:: pig_streaming_cloudera_correction_30_1.png .. code:: ipython3 if os.path.exists("distribution_stands.txt") : os.remove("distribution_stands.txt") %remote_down_cluster_merge velib_py_results/distribution_stands.txt distribution_stands.txt .. parsed-literal:: 'distribution_stands.txt' .. code:: ipython3 df = pandas.read_csv("distribution_stands.txt", sep="\t", names=["nb_places", "nb_stations_minutes"]) df.plot(x="nb_places",y="nb_stations_minutes",kind="bar",figsize=(16,4)) .. parsed-literal:: .. image:: pig_streaming_cloudera_correction_32_1.png .. code:: ipython3 if os.path.exists("fermees.txt") : os.remove("fermees.txt") %remote_down_cluster_merge velib_py_results/fermees.txt fermees.txt .. parsed-literal:: 'fermees.txt' .. code:: ipython3 df = pandas.read_csv("fermees.txt", sep="\t", names=["status", "nb_velos_stations_minutes", "nb_places_stations_minutes"]) df=df.set_index("status") df = df.T df["%close"] = df.CLOSED / (df.CLOSED + df.OPEN) df .. raw:: html
status OPEN CLOSED %close
nb_velos_stations_minutes 1048654 3060 0.002910
nb_places_stations_minutes 1255146 120 0.000096
Ce dernier tableau n’est vrai que dans la mesure où les nombres reportées sont fiables lorsque les stations sont fermées. Exercice 3 : stations fermées, journée complète ----------------------------------------------- Appliquer cela à une journée complète revient à lancer le même job sur des données plus grandes. On verra bientôt commencer éviter de recopier le job une seconde fois (introduisant une source potentielle d’erreur). .. code:: ipython3 %dfs_ls /user/xavierdupre/ .. raw:: html
attributes code alias folder size date time name isdir
0 drwx------ - xavierdupre xavierdupre 0 2014-11-23 17:55 /user/xavierdupre/.Trash True
1 drwx------ - xavierdupre xavierdupre 0 2014-11-23 21:41 /user/xavierdupre/.staging True
2 -rw-r--r-- 3 xavierdupre xavierdupre 132727 2014-11-16 02:37 /user/xavierdupre/ConfLongDemo_JSI.small.examp... False
3 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-16 02:38 /user/xavierdupre/ConfLongDemo_JSI.small.examp... True
4 -rw-r--r-- 3 xavierdupre xavierdupre 461444 2014-11-20 01:33 /user/xavierdupre/paris.2014-11-11_22-00-18.33... False
5 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 23:43 /user/xavierdupre/unitest2 True
6 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:42 /user/xavierdupre/unittest True
7 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:41 /user/xavierdupre/unittest2 True
8 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 01:53 /user/xavierdupre/velib_1hjs True
9 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:17 /user/xavierdupre/velib_py True
10 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:34 /user/xavierdupre/velib_py_results True
11 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 11:08 /user/xavierdupre/velib_several_days True
.. code:: ipython3 %%PIG json_fermee.pig DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status` SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ; jspy = LOAD '/user/xavierdupre/velib_several_days/*.txt' USING PigStorage('\t') AS (arow:chararray); matrice = STREAM jspy THROUGH pystream AS ( available_bike_stands:int, available_bikes:int, lat:double, lng:double, name:chararray, status:chararray) ; grstation = GROUP matrice BY status ; fermees = FOREACH grstation GENERATE group ,SUM(matrice.available_bikes) AS available_bikes ,SUM(matrice.available_bike_stands) AS available_bike_stands ; gr_av = GROUP matrice BY available_bikes ; dist_av = FOREACH gr_av GENERATE group, COUNT(matrice) ; gr_pl = GROUP matrice BY available_bike_stands ; dist_pl = FOREACH gr_pl GENERATE group, COUNT(matrice) ; STORE fermees INTO 'velib_py_results_3days/fermees.txt' USING PigStorage('\t') ; STORE dist_av INTO 'velib_py_results_3days/distribution_bikes.txt' USING PigStorage('\t') ; STORE dist_pl INTO 'velib_py_results_3days/distribution_stands.txt' USING PigStorage('\t') ; .. code:: ipython3 %pigsubmit json_fermee.pig stream_json.py redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail redirection.err .. raw:: html
    Total bytes written : 1306
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1414491244634_0128


    2014-11-23 21:55:06,336 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
.. code:: ipython3 %dfs_ls velib_py_results_3days/* .. raw:: html
attributes code alias folder size date time name isdir
0 -rw-r--r-- 3 xavierdupre xavierdupre 0 2014-11-23 21:54 velib_py_results_3days/distribution_bikes.txt/... False
1 -rw-r--r-- 3 xavierdupre xavierdupre 210 2014-11-23 21:54 velib_py_results_3days/distribution_bikes.txt/... False
2 -rw-r--r-- 3 xavierdupre xavierdupre 208 2014-11-23 21:54 velib_py_results_3days/distribution_bikes.txt/... False
3 -rw-r--r-- 3 xavierdupre xavierdupre 216 2014-11-23 21:54 velib_py_results_3days/distribution_bikes.txt/... False
4 -rw-r--r-- 3 xavierdupre xavierdupre 0 2014-11-23 21:54 velib_py_results_3days/distribution_stands.txt... False
5 -rw-r--r-- 3 xavierdupre xavierdupre 210 2014-11-23 21:54 velib_py_results_3days/distribution_stands.txt... False
6 -rw-r--r-- 3 xavierdupre xavierdupre 208 2014-11-23 21:54 velib_py_results_3days/distribution_stands.txt... False
7 -rw-r--r-- 3 xavierdupre xavierdupre 210 2014-11-23 21:54 velib_py_results_3days/distribution_stands.txt... False
8 -rw-r--r-- 3 xavierdupre xavierdupre 0 2014-11-23 21:54 velib_py_results_3days/fermees.txt/_SUCCESS False
9 -rw-r--r-- 3 xavierdupre xavierdupre 20 2014-11-23 21:54 velib_py_results_3days/fermees.txt/part-r-00000 False
10 -rw-r--r-- 3 xavierdupre xavierdupre 0 2014-11-23 21:54 velib_py_results_3days/fermees.txt/part-r-00001 False
11 -rw-r--r-- 3 xavierdupre xavierdupre 24 2014-11-23 21:54 velib_py_results_3days/fermees.txt/part-r-00002 False
On voit que le job a été distribué sur trois machines. Exercice 4 : astuces -------------------- Les erreurs de PIG ne sont pas très explicite surtout si elles se produisent dans le script python. Un moyen simple de débugger est d’attraper les exceptions produites par python et de les récupérer sous PIG (le reste du job est enlevé). On peut tout-à-fait imaginer ajouter la version de python installée sur le cluster ainsi que la liste de modules .. code:: ipython3 %%PYTHON stream_json_err.py import sys, os for row in sys.stdin: sys.stdout.write(sys.executable + "\n") sys.stdout.write(str(sys.version) + "\n") sys.stdout.write(row + "\n") # pour obtenir les variables d'environnement #for k in os.environ: # sys.stdout.write("%s=%s\n" % (k,os.environ[k])) sys.stdout.flush() .. code:: ipython3 %%runpy stream_json_err.py n'importe quoi .. raw:: html
    C:\Python34\python.exe
    3.4.1 (v3.4.1:c0e311e010fc, May 18 2014, 10:38:22) [MSC v.1600 32 bit (Intel)]
    n'importe quoi

    
.. code:: ipython3 %%PIG test.pig DEFINE pystream `python stream_json_err.py` SHIP ('stream_json_err.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ; jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray); one = LIMIT jspy 1 ; info = STREAM one THROUGH pystream AS (row:chararray) ; STORE info INTO 'python_info2.txt' ; .. code:: ipython3 %pig_submit test.pig stream_json_err.py -r redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail redirection.pig.err -n 5 .. raw:: html
    2014-11-23 22:07:20,132 [main] INFO  org.apache.hadoop.ipc.Client - Retrying connect to server: dn05.dzr323.dza.datazoomr.com/10.58.223.25:59338. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
    2014-11-23 22:07:21,134 [main] INFO  org.apache.hadoop.ipc.Client - Retrying connect to server: dn05.dzr323.dza.datazoomr.com/10.58.223.25:59338. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
    2014-11-23 22:07:21,251 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2014-11-23 22:07:21,471 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2014-11-23 22:07:21,574 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
.. code:: ipython3 if os.path.exists("python_info2.txt") : os.remove("python_info2.txt") %remote_down_cluster_merge python_info2.txt python_info2.txt .. parsed-literal:: 'python_info2.txt' .. code:: ipython3 %head python_info2.txt .. raw:: html
    /usr/bin/python
    2.6.6 (r266:84292
    [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)]
    [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET'