.. _pigazurecorrectionrst: ========================================== Map/Reduce avec PIG sur Azure - correction ========================================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/pig_hive/pig_azure_correction.ipynb|*` .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Données ------- On considère le jeu de données suivant : `Localization Data for Person Activity Data Set `__ qu’on récupère comme indiqué dans le notebook de l’énoncé. .. code:: ipython3 from pyquickhelper.ipythonhelper import open_html_form params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"xavierdupre"} open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp") .. raw:: html
server + hadoop + credentials
blob_storage
hadoop_server
password1
password2
username
.. code:: ipython3 blobstorage = blobhp["blob_storage"] blobpassword = blobhp["password1"] hadoop_server = blobhp["hadoop_server"] hadoop_password = blobhp["password2"] username = blobhp["username"] .. code:: ipython3 import pyensae %load_ext pyensae %load_ext pyenbc %hd_open .. parsed-literal:: (, ) Exercice 1 : GROUP BY --------------------- .. code:: ipython3 import pandas, sqlite3 con = sqlite3.connect("ConfLongDemo_JSI.db3") df = pandas.read_sql("""SELECT activity, count(*) as nb FROM person GROUP BY activity""", con) con.close() df.head() .. raw:: html
activity nb
0 falling 2973
1 lying 54480
2 lying down 6168
3 on all fours 5210
4 sitting 27244
On vérifie que le fichier qu’on veut traiter est bien là : .. code:: ipython3 %blob_ls /testensae/ConfLongDemo_JSI.small.txt .. raw:: html
name last_modified content_type content_length blob_type
0 testensae/ConfLongDemo_JSI.small.txt Thu, 29 Oct 2015 00:23:00 GMT application/octet-stream 132727 BlockBlob
Il faut maintenant le faire avec PIG. .. code:: ipython3 %%PIG_azure solution_groupby.pig myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; gr = GROUP myinput BY activity ; avgact = FOREACH gr GENERATE group, COUNT(myinput) ; STORE avgact INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt' USING PigStorage() ; On soumet le job : .. code:: ipython3 jid = %hd_pig_submit solution_groupby.pig jid .. parsed-literal:: {'id': 'job_1445989166328_0009'} On vérifie le status du job : .. code:: ipython3 st = %hd_job_status jid["id"] st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"] .. parsed-literal:: ('job_1445989166328_0009', '100% complete', None, False, 'RUNNING') On regarde si la compilation s’est bien passée : .. code:: ipython3 %hd_tail_stderr jid["id"] .. raw:: html

    Job DAG:
    job_1445989166328_0010


    2015-10-29 00:55:14,395 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2015-10-29 00:55:14,395 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010
    2015-10-29 00:55:14,395 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200
    2015-10-29 00:55:14,473 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-10-29 00:55:14,676 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2015-10-29 00:55:14,676 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010
    2015-10-29 00:55:14,676 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200
    2015-10-29 00:55:14,754 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-10-29 00:55:14,957 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2015-10-29 00:55:14,957 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.100.164:9010
    2015-10-29 00:55:14,957 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.100.164:10200
    2015-10-29 00:55:15,020 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-10-29 00:55:15,082 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
    2015-10-29 00:55:15,113 [main] INFO  org.apache.pig.Main - Pig script completed in 49 seconds and 706 milliseconds (49706 ms)

    

On regarde le contenu du répertoire sur le blob storage : .. code:: ipython3 df=%blob_ls /$PSEUDO/testensae list(df["name"]) .. parsed-literal:: ['xavierdupre/testensae', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/part-m-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/part-m-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/part-m-00000'] .. code:: ipython3 import os if os.path.exists("results.group.2015.xt") : os.remove("results.group.2015.txt") %blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt results.group.2015.txt .. parsed-literal:: 'results.group.2015.txt' .. code:: ipython3 %lsr res.*[.]txt .. raw:: html
directory last_modified name size
0 False 2015-10-29 01:56:11.025867 .\results.group.2015.txt 89
1 False 2015-10-29 01:46:45.425028 .\results.txt 21.65 Kb
2 False 2015-10-29 01:46:46.705466 .\results_allfiles.txt 21.65 Kb
.. code:: ipython3 %head results.group.2015.txt .. raw:: html
    lying	267
    falling	30
    sitting	435
    walking	170
    sitting down	56
    standing up from sitting	42

    
Exercice 2 : JOIN ----------------- .. code:: ipython3 con = sqlite3.connect("ConfLongDemo_JSI.db3") df = pandas.read_sql("""SELECT person.*, A.nb FROM person INNER JOIN ( SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A ON person.activity == A.activity""", con) con.close() df.head() .. raw:: html
index sequence tag timestamp dateformat x y z activity nb
0 0 A01 010-000-024-033 633790226051280329 27.05.2009 14:03:25:127 4.062931 1.892434 0.507425 walking 32710
1 1 A01 020-000-033-111 633790226051820913 27.05.2009 14:03:25:183 4.291954 1.781140 1.344495 walking 32710
2 2 A01 020-000-032-221 633790226052091205 27.05.2009 14:03:25:210 4.359101 1.826456 0.968821 walking 32710
3 3 A01 010-000-024-033 633790226052361498 27.05.2009 14:03:25:237 4.087835 1.879999 0.466983 walking 32710
4 4 A01 010-000-030-096 633790226052631792 27.05.2009 14:03:25:263 4.324462 2.072460 0.488065 walking 32710
Idem, maintenant il faut le faire avec PIG. .. code:: ipython3 %%PIG_azure solution_groupby_join.pig myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; gr = GROUP myinput BY activity ; avgact = FOREACH gr GENERATE group, COUNT(myinput) ; joined = JOIN myinput BY activity, avgact BY group ; STORE joined INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt' USING PigStorage() ; .. code:: ipython3 jid = %hd_pig_submit solution_groupby_join.pig jid .. parsed-literal:: {'id': 'job_1445989166328_0011'} .. code:: ipython3 st = %hd_job_status jid["id"] st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"], st["userargs"]["file"] .. parsed-literal:: ('job_1445989166328_0011', '100% complete', 'done', True, 'SUCCEEDED', 'wasb://hdblobstorage@hdblobstorage.blob.core.windows.net/xavierdupre/scripts/pig/solution_groupby_join.pig') .. code:: ipython3 df=%blob_ls /$PSEUDO/testensae df .. raw:: html
name last_modified content_type content_length blob_type
0 xavierdupre/testensae Tue, 25 Nov 2014 00:50:34 GMT application/octet-stream 0 BlockBlob
1 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:55:09 GMT 0 BlockBlob
2 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:55:09 GMT application/octet-stream 0 BlockBlob
3 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:55:08 GMT application/octet-stream 89 BlockBlob
4 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:58:43 GMT 0 BlockBlob
5 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:58:43 GMT application/octet-stream 0 BlockBlob
6 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Thu, 29 Oct 2015 00:58:42 GMT application/octet-stream 144059 BlockBlob
7 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:16:11 GMT 0 BlockBlob
8 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:16:11 GMT application/octet-stream 0 BlockBlob
9 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:16:10 GMT application/octet-stream 144059 BlockBlob
10 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:12:49 GMT 0 BlockBlob
11 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:12:49 GMT application/octet-stream 0 BlockBlob
12 xavierdupre/testensae/ConfLongDemo_JSI.small.g... Tue, 25 Nov 2014 01:12:49 GMT application/octet-stream 89 BlockBlob
13 xavierdupre/testensae/ConfLongDemo_JSI.small.k... Tue, 25 Nov 2014 00:50:45 GMT 0 BlockBlob
14 xavierdupre/testensae/ConfLongDemo_JSI.small.k... Tue, 25 Nov 2014 00:50:46 GMT application/octet-stream 0 BlockBlob
15 xavierdupre/testensae/ConfLongDemo_JSI.small.k... Tue, 25 Nov 2014 00:50:45 GMT application/octet-stream 22166 BlockBlob
16 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:28:30 GMT 0 BlockBlob
17 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:28:30 GMT application/octet-stream 0 BlockBlob
18 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:28:30 GMT application/octet-stream 22166 BlockBlob
19 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:46:05 GMT 0 BlockBlob
20 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:46:05 GMT application/octet-stream 0 BlockBlob
21 xavierdupre/testensae/ConfLongDemo_JSI.small.w... Thu, 29 Oct 2015 00:46:04 GMT application/octet-stream 22166 BlockBlob
.. code:: ipython3 set(df.name) .. parsed-literal:: {'xavierdupre/testensae', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/part-m-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt/part-m-00000', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/_SUCCESS', 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking_2015.txt/part-m-00000'} .. code:: ipython3 if os.path.exists("results.join.2015.txt") : os.remove("results.join.2015.txt") %blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt results.join.2015.txt .. parsed-literal:: 'results.join.2015.txt' .. code:: ipython3 %head results.join.2015.txt .. raw:: html
    999	A01	010-000-024-033	633790226379871138	27.05.2009 14:03:57:987	3.198556661605835	1.1257659196853638	0.3567752242088318	lying	lying	267
    998	A01	020-000-032-221	633790226379600847	27.05.2009 14:03:57:960	4.3730292320251465	1.3821170330047607	0.38861045241355896	lying	lying	267
    997	A01	020-000-033-111	633790226379330550	27.05.2009 14:03:57:933	4.7574005126953125	1.285519003868103	-0.08946932852268219	lying	lying	267
    996	A01	010-000-030-096	633790226379060251	27.05.2009 14:03:57:907	3.182415008544922	1.1020996570587158	0.29104289412498474	lying	lying	267
    995	A01	010-000-024-033	633790226378789954	27.05.2009 14:03:57:880	3.0784008502960205	1.0197675228118896	0.6061218976974487	lying	lying	267
    994	A01	020-000-032-221	633790226378519655	27.05.2009 14:03:57:853	4.36382532119751	1.4307395219802856	0.3206148743629456	lying	lying	267
    993	A01	010-000-024-033	633790226377708776	27.05.2009 14:03:57:770	3.0621800422668457	1.0790562629699707	0.6795752048492432	lying	lying	267
    992	A01	020-000-032-221	633790226377438480	27.05.2009 14:03:57:743	4.371500492095946	1.4781558513641355	0.5384233593940735	lying	lying	267
    991	A01	020-000-033-111	633790226377168187	27.05.2009 14:03:57:717	4.918898105621338	1.1530661582946775	0.19635945558547974	lying	lying	267
    990	A01	010-000-030-096	633790226376897895	27.05.2009 14:03:57:690	3.208510637283325	1.1156394481658936	0.3381773829460144	lying	lying	267

    
.. raw:: html

Prolongements .. raw:: html

`PIG `__ n’est pas la seule façon d’exécuter des jobs Map/Reduce. `Hive `__ est un langage dont la syntaxe est très proche de celle du SQL. L’article `Comparing Pig Latin and SQL for Constructing Data Processing Pipelines `__ explicite les différences des deux approches. **langage haut niveau** Ce qu’il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compilé en une séquence d’opérations Map/Reduce transparente pour l’utilisateur. Le temps de développement est très réduit lorsqu’on le compare au même programme écrit en Java. Le compilateur construit un plan d’exécution (`quelques exemples ici `__) et infère le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela nécessite. **petits jeux** Certains jobs peuvent durer des heures, il est conseillée de les essayer sur des petits jeux de données avant de les faire tourner sur les vrais données. Il est toujours frustrant de s’apercevoir qu’un job a planté au bout de deux heures car une chaîne de caractères est vide et que ce cas n’a pas été prévu. Avec ces petits jeux, il est possible de faire tourner et conseillé de tester le job d’abord sur la passerelle (`exécution local `__) avant de le lancer sur le cluster. Avec pyensae, il faut ajouter l’option ``-local`` à la commande `hd_pig_submit `__. **concaténer les fichiers divisés** Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un répertoire. La commande `getmerge `__ télécharge ces fichiers sur la passerelle et les fusionne en un seul. **ordre des lignes** Les jobs sont distribués, même en faisant rien (LOAD + STORE), il n’est pas garanti que l’ordre des lignes soit préservé. La probabilié que ce soit le cas est quasi nulle.