import mermaid from 'https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.2.3/mermaid.esm.min.mjs'; mermaid.initialize({ startOnLoad: true });
Correction.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Le code suivant télécharge les données nécessaires data_velib_paris_2014-11-11_22-23.zip.
import pyensae.datasource
import os, datetime
if not os.path.exists("velib"):
os.mkdir("velib")
files=pyensae.datasource.download_data("data_velib_paris_2014-11-11_22-23.zip", website="xdtd", whereTo="velib")
files[:2]
['velib\\paris.2014-11-11_22-00-18.331391.txt', 'velib\\paris.2014-11-11_22-01-17.859194.txt']
import pyensae
%load_ext pyensae
%load_ext pyenbc
from pyquickhelper.ipythonhelper import open_html_form
params={"server":"df...fr", "username":"", "password":""}
open_html_form(params=params,title="server + credentials", key_save="params")
import pyensae
%load_ext pyensae
password = params["password"]
server = params["server"]
username = params["username"]
client = %remote_open
client
<pyensae.remote.remote_connection.ASSHClient at 0x5b43bb0>
Le programme suivant prend comme argument les colonnes à extraire des fichiers textes qui sont enregistrés au format "python".
%%PYTHON stream_json.py
import sys, datetime
cols = [ _ for _ in sys.argv if ".py" not in _ ]
for row in sys.stdin:
row = row.strip()
if len(row) == 0 :
continue
js = eval(row)
for station in js:
vals = [ str(station[c]) for c in cols ]
sys.stdout.write(",".join(vals))
sys.stdout.write("\n")
sys.stdout.flush()
On vérifie qu'il fonctionne en lui faisant ingérer les deux lignes suivantes :
%%runpy stream_json.py name status
[{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}]
[{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET', 'collect_date': datetime.datetime(2014, 11, 11, 22, 2, 18, 47270), 'lng': 2.416170724425901, 'contract_name': 'Paris', 'name': '31705 - CHAMPEAUX (BAGNOLET)', 'banking': 0, 'lat': 48.8645278209514, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 1, 'last_update': datetime.datetime(2014, 11, 11, 21, 55, 22), 'number': 31705, 'available_bike_stands': 49, 'bike_stands': 50}]
31705 - CHAMPEAUX (BAGNOLET),OPEN 31705 - CHAMPEAUX (BAGNOLET),OPEN
On écrit le script PIG qui utilise plus de colonnes :
%%PIG json_velib.pig
DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status`
SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ;
jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray);
matrice = STREAM jspy THROUGH pystream AS
( available_bike_stands:int,
available_bikes:int,
lat:double,
lng:double,
name:chararray,
status:chararray) ;
STORE matrice INTO 'velib_py_results/firstjob' USING PigStorage('\t') ;
On supprime la précédente exécution si besoin puis on vérifie que le répertoire contenant les résultats est vide :
if client.dfs_exists("velib_py_results/firstjob"):
client.dfs_rm("velib_py_results/firstjob", recursive=True)
%dfs_mkdir velib_py_results
("Moved: 'hdfs://nameservice1/user/xavierdupre/velib_py_results/firstjob' to trash at: hdfs://nameservice1/user/xavierdupre/.Trash/Current\n", '14/11/23 20:35:18 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.\n')
%dfs_ls velib_py_results/
attributes | code | alias | folder | size | date | time | name |
---|
%pig_submit json_velib.pig stream_json.py -r redirection
%remote_cmd tail redirection.pig.err
Total bytes written : 4611956 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1414491244634_0119 2014-11-23 20:36:50,180 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
On récupère les informations qu'on affiche sous forme de dataframe :
if os.path.exists("velib_exo1.txt") : os.remove("velib_exo1.txt")
client.download_cluster("velib_py_results/firstjob","velib_exo1.txt", merge=True)
'velib_py_results/firstjob'
import pandas
df = pandas.read_csv("velib_hd.txt", sep="\t",names=["available_bike_stands","available_bikes","lat","lng","name","status"])
df.head()
available_bike_stands | available_bikes | lat | lng | name | status | |
---|---|---|---|---|---|---|
0 | 47 | 3 | 48.864528 | 2.416171 | 31705 - CHAMPEAUX (BAGNOLET) | OPEN |
1 | 5 | 28 | 48.872420 | 2.348395 | 10042 - POISSONNIÈRE - ENGHIEN | OPEN |
2 | 42 | 1 | 48.882149 | 2.319860 | 08020 - METRO ROME | OPEN |
3 | 5 | 31 | 48.868217 | 2.330494 | 01022 - RUE DE LA PAIX | OPEN |
4 | 20 | 5 | 48.893269 | 2.412716 | 35014 - DE GAULLE (PANTIN) | OPEN |
Les stations fermées ne le sont pas tout le temps. On veut calculer le ratio minutes/stations fermées / total des minutes/stations. Le script python permettant de lire les données ne change pas, il suffit juste de déclarer les sorties numériques comme telles. Quelques détails sur les tables :
jspy
: contient les données brutes dans une liste de fichiersmatrice
: d'après le job qui précède, la table contient une ligne par stations et par minute, chaque ligne décrit le status de la stationgrstation
: table matrice
groupée par statusfermees
: pour chaque groupe, on aggrégé le nombre de minutes multipliés par le nombre de vélosgr*,dist*
: distribution du nombre de stations (Y) en fonction du nombre de vélos ou places disponiblesEn cas d'exécution précédentes :
%dfs_rmr velib_py_results/fermees.txt
%dfs_rmr velib_py_results/distribution_bikes.txt
%dfs_rmr velib_py_results/distribution_stands.txt
("Moved: 'hdfs://nameservice1/user/xavierdupre/velib_py_results/distribution_stands.txt' to trash at: hdfs://nameservice1/user/xavierdupre/.Trash/Current\n", '14/11/23 21:33:49 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 0 minutes.\n')
%%PIG json_fermee.pig
DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status`
SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ;
jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray);
matrice = STREAM jspy THROUGH pystream AS
( available_bike_stands:int,
available_bikes:int,
lat:double,
lng:double,
name:chararray,
status:chararray) ;
grstation = GROUP matrice BY status ;
fermees = FOREACH grstation GENERATE
group
,SUM(matrice.available_bikes) AS available_bikes
,SUM(matrice.available_bike_stands) AS available_bike_stands
;
gr_av = GROUP matrice BY available_bikes ;
dist_av = FOREACH gr_av GENERATE group, COUNT(matrice) ;
gr_pl = GROUP matrice BY available_bike_stands ;
dist_pl = FOREACH gr_pl GENERATE group, COUNT(matrice) ;
STORE fermees INTO 'velib_py_results/fermees.txt' USING PigStorage('\t') ;
STORE dist_av INTO 'velib_py_results/distribution_bikes.txt' USING PigStorage('\t') ;
STORE dist_pl INTO 'velib_py_results/distribution_stands.txt' USING PigStorage('\t') ;
%pig_submit json_fermee.pig stream_json.py -r redirection
%remote_cmd tail redirection.pig.err
Total bytes written : 1009 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1414491244634_0124 2014-11-23 21:35:28,238 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
%dfs_ls velib_py_results
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 20:54 | velib_py_results/distribution_bikes.txt | True |
1 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 20:54 | velib_py_results/distribution_stands.txt | True |
2 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 20:36 | velib_py_results/firstjob | True |
if os.path.exists("distribution_bikes.txt") : os.remove("distribution_bikes.txt")
%remote_down_cluster_merge velib_py_results/distribution_bikes.txt distribution_bikes.txt
'distribution_bikes.txt'
import pandas
df = pandas.read_csv("distribution_bikes.txt", sep="\t", names=["nb_velos", "nb_stations_minutes"])
df.head()
nb_velos | nb_stations_minutes | |
---|---|---|
0 | 0 | 8445 |
1 | 1 | 6589 |
2 | 2 | 4825 |
3 | 3 | 3793 |
4 | 4 | 2839 |
df.plot(x="nb_velos",y="nb_stations_minutes",kind="bar",figsize=(16,4))
<matplotlib.axes.AxesSubplot at 0x9f3c4d0>
if os.path.exists("distribution_stands.txt") : os.remove("distribution_stands.txt")
%remote_down_cluster_merge velib_py_results/distribution_stands.txt distribution_stands.txt
'distribution_stands.txt'
df = pandas.read_csv("distribution_stands.txt", sep="\t", names=["nb_places", "nb_stations_minutes"])
df.plot(x="nb_places",y="nb_stations_minutes",kind="bar",figsize=(16,4))
<matplotlib.axes.AxesSubplot at 0xa04d390>
if os.path.exists("fermees.txt") : os.remove("fermees.txt")
%remote_down_cluster_merge velib_py_results/fermees.txt fermees.txt
'fermees.txt'
df = pandas.read_csv("fermees.txt", sep="\t", names=["status", "nb_velos_stations_minutes", "nb_places_stations_minutes"])
df=df.set_index("status")
df = df.T
df["%close"] = df.CLOSED / (df.CLOSED + df.OPEN)
df
status | OPEN | CLOSED | %close |
---|---|---|---|
nb_velos_stations_minutes | 1048654 | 3060 | 0.002910 |
nb_places_stations_minutes | 1255146 | 120 | 0.000096 |
Ce dernier tableau n'est vrai que dans la mesure où les nombres reportées sont fiables lorsque les stations sont fermées.
Appliquer cela à une journée complète revient à lancer le même job sur des données plus grandes. On verra bientôt commencer éviter de recopier le job une seconde fois (introduisant une source potentielle d'erreur).
%dfs_ls /user/xavierdupre/
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | drwx------ | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 17:55 | /user/xavierdupre/.Trash | True |
1 | drwx------ | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:41 | /user/xavierdupre/.staging | True |
2 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 132727 | 2014-11-16 | 02:37 | /user/xavierdupre/ConfLongDemo_JSI.small.examp... | False |
3 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-16 | 02:38 | /user/xavierdupre/ConfLongDemo_JSI.small.examp... | True |
4 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 461444 | 2014-11-20 | 01:33 | /user/xavierdupre/paris.2014-11-11_22-00-18.33... | False |
5 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-20 | 23:43 | /user/xavierdupre/unitest2 | True |
6 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:42 | /user/xavierdupre/unittest | True |
7 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:41 | /user/xavierdupre/unittest2 | True |
8 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-20 | 01:53 | /user/xavierdupre/velib_1hjs | True |
9 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-21 | 01:17 | /user/xavierdupre/velib_py | True |
10 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:34 | /user/xavierdupre/velib_py_results | True |
11 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-21 | 11:08 | /user/xavierdupre/velib_several_days | True |
%%PIG json_fermee.pig
DEFINE pystream `python stream_json.py available_bike_stands available_bikes lat lng name status`
SHIP ('stream_json.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ;
jspy = LOAD '/user/xavierdupre/velib_several_days/*.txt' USING PigStorage('\t') AS (arow:chararray);
matrice = STREAM jspy THROUGH pystream AS
( available_bike_stands:int,
available_bikes:int,
lat:double,
lng:double,
name:chararray,
status:chararray) ;
grstation = GROUP matrice BY status ;
fermees = FOREACH grstation GENERATE
group
,SUM(matrice.available_bikes) AS available_bikes
,SUM(matrice.available_bike_stands) AS available_bike_stands
;
gr_av = GROUP matrice BY available_bikes ;
dist_av = FOREACH gr_av GENERATE group, COUNT(matrice) ;
gr_pl = GROUP matrice BY available_bike_stands ;
dist_pl = FOREACH gr_pl GENERATE group, COUNT(matrice) ;
STORE fermees INTO 'velib_py_results_3days/fermees.txt' USING PigStorage('\t') ;
STORE dist_av INTO 'velib_py_results_3days/distribution_bikes.txt' USING PigStorage('\t') ;
STORE dist_pl INTO 'velib_py_results_3days/distribution_stands.txt' USING PigStorage('\t') ;
%pigsubmit json_fermee.pig stream_json.py redirection
%remote_cmd tail redirection.err
Total bytes written : 1306 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1414491244634_0128 2014-11-23 21:55:06,336 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
%dfs_ls velib_py_results_3days/*
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_bikes.txt/... | False |
1 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 210 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_bikes.txt/... | False |
2 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 208 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_bikes.txt/... | False |
3 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 216 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_bikes.txt/... | False |
4 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_stands.txt... | False |
5 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 210 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_stands.txt... | False |
6 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 208 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_stands.txt... | False |
7 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 210 | 2014-11-23 | 21:54 | velib_py_results_3days/distribution_stands.txt... | False |
8 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:54 | velib_py_results_3days/fermees.txt/_SUCCESS | False |
9 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 20 | 2014-11-23 | 21:54 | velib_py_results_3days/fermees.txt/part-r-00000 | False |
10 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:54 | velib_py_results_3days/fermees.txt/part-r-00001 | False |
11 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 24 | 2014-11-23 | 21:54 | velib_py_results_3days/fermees.txt/part-r-00002 | False |
On voit que le job a été distribué sur trois machines.
Les erreurs de PIG ne sont pas très explicite surtout si elles se produisent dans le script python. Un moyen simple de débugger est d'attraper les exceptions produites par python et de les récupérer sous PIG (le reste du job est enlevé). On peut tout-à-fait imaginer ajouter la version de python installée sur le cluster ainsi que la liste de modules
%%PYTHON stream_json_err.py
import sys, os
for row in sys.stdin:
sys.stdout.write(sys.executable + "\n")
sys.stdout.write(str(sys.version) + "\n")
sys.stdout.write(row + "\n")
# pour obtenir les variables d'environnement
#for k in os.environ:
# sys.stdout.write("%s=%s\n" % (k,os.environ[k]))
sys.stdout.flush()
%%runpy stream_json_err.py
n'importe quoi
C:\Python34\python.exe 3.4.1 (v3.4.1:c0e311e010fc, May 18 2014, 10:38:22) [MSC v.1600 32 bit (Intel)] n'importe quoi
%%PIG test.pig
DEFINE pystream `python stream_json_err.py`
SHIP ('stream_json_err.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(',')) ;
jspy = LOAD 'velib_py/*.txt' USING PigStorage('\t') AS (arow:chararray);
one = LIMIT jspy 1 ;
info = STREAM one THROUGH pystream AS (row:chararray) ;
STORE info INTO 'python_info2.txt' ;
%pig_submit test.pig stream_json_err.py -r redirection
%remote_cmd tail redirection.pig.err -n 5
2014-11-23 22:07:20,132 [main] INFO org.apache.hadoop.ipc.Client - Retrying connect to server: dn05.dzr323.dza.datazoomr.com/10.58.223.25:59338. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS) 2014-11-23 22:07:21,134 [main] INFO org.apache.hadoop.ipc.Client - Retrying connect to server: dn05.dzr323.dza.datazoomr.com/10.58.223.25:59338. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS) 2014-11-23 22:07:21,251 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2014-11-23 22:07:21,471 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2014-11-23 22:07:21,574 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
if os.path.exists("python_info2.txt") : os.remove("python_info2.txt")
%remote_down_cluster_merge python_info2.txt python_info2.txt
'python_info2.txt'
%head python_info2.txt
/usr/bin/python 2.6.6 (r266:84292 [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] [{'address': 'RUE DES CHAMPEAUX (PRES DE LA GARE ROUTIERE) - 93170 BAGNOLET'