.. _pigclouderacorrectionrst: ============================================= Map/Reduce avec PIG sur cloudera - correction ============================================= .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/pig_hive/pig_cloudera_correction.ipynb|*` Correction. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Données ------- On considère le jeu de données suivant : `Localization Data for Person Activity Data Set `__ qui ont déjà été récupéré avec le notebook de l’énoncé. .. code:: ipython3 import os if "CRTERALAB" in os.environ: spl = os.environ["CRTERALAB"].split("**") params=dict(server=spl[0], password=spl[1], username=spl[2]) r = dict else: from pyquickhelper.ipythonhelper import open_html_form params={"server":"ws...fr", "username":"x...e", "password":""} r = open_html_form(params=params,title="server + credentials", key_save="params") r .. raw:: html
server + credentials
password
server
username
.. code:: ipython3 password = params["password"] server = params["server"] username = params["username"] .. code:: ipython3 %load_ext pyensae %load_ext pyenbc %remote_open .. parsed-literal:: .. raw:: html

Exercice 1 : GROUP BY .. raw:: html

.. code:: ipython3 import pandas, sqlite3 con = sqlite3.connect("ConfLongDemo_JSI.db3") df = pandas.read_sql("""SELECT activity, count(*) as nb FROM person GROUP BY activity""", con) con.close() df.head() .. raw:: html
activity nb
0 falling 2973
1 lying 54480
2 lying down 6168
3 on all fours 5210
4 sitting 27244
Il faut maintenant le faire avec PIG. .. code:: ipython3 %%PIG solution_groupby.pig myinput = LOAD 'ConfLongDemo_JSI.small.example.txt' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; gr = GROUP myinput BY activity ; avgact = FOREACH gr GENERATE group, COUNT(myinput) ; STORE avgact INTO 'ConfLongDemo_JSI.small.group.txt' USING PigStorage() ; .. code:: ipython3 %pig_submit solution_groupby.pig -r groupby.redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail groupby.redirection.err .. raw:: html
    Total bytes written : 89
    Spillable Memory Manager spill count : 0
    Total bags proactively spilled: 0
    Total records proactively spilled: 0

    Job DAG:
    job_1444669880271_0038


    2015-10-29 01:10:53,383 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

    
.. code:: ipython3 %remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.txt .. raw:: html
    Found 2 items
    -rw-r--r--   3 xavierdupre xavierdupre          0 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/_SUCCESS
    -rw-r--r--   3 xavierdupre xavierdupre         89 2015-10-29 01:10 ConfLongDemo_JSI.small.group.txt/part-r-00000

    
.. code:: ipython3 %remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.txt/part-r-00000 .. raw:: html
    lying	267
    falling	30
    sitting	435
    walking	170
    sitting down	56
    standing up from sitting	42

    
.. raw:: html

Exercice 2 : JOIN .. raw:: html

.. code:: ipython3 con = sqlite3.connect("ConfLongDemo_JSI.db3") df = pandas.read_sql("""SELECT person.*, A.nb FROM person INNER JOIN ( SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A ON person.activity == A.activity""", con) con.close() df.head() .. raw:: html
index sequence tag timestamp dateformat x y z activity nb
0 0 A01 010-000-024-033 633790226051280329 27.05.2009 14:03:25:127 4.062931 1.892434 0.507425 walking 32710
1 1 A01 020-000-033-111 633790226051820913 27.05.2009 14:03:25:183 4.291954 1.781140 1.344495 walking 32710
2 2 A01 020-000-032-221 633790226052091205 27.05.2009 14:03:25:210 4.359101 1.826456 0.968821 walking 32710
3 3 A01 010-000-024-033 633790226052361498 27.05.2009 14:03:25:237 4.087835 1.879999 0.466983 walking 32710
4 4 A01 010-000-030-096 633790226052631792 27.05.2009 14:03:25:263 4.324462 2.072460 0.488065 walking 32710
Idem, maintenant il faut le faire avec PIG. .. code:: ipython3 %%PIG solution_groupby_join.pig myinput = LOAD 'ConfLongDemo_JSI.small.example.txt' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; gr = GROUP myinput BY activity ; avgact = FOREACH gr GENERATE group, COUNT(myinput) ; joined = JOIN myinput BY activity, avgact BY group ; STORE joined INTO 'ConfLongDemo_JSI.small.group.join.txt' USING PigStorage() ; .. code:: ipython3 %pig_submit solution_groupby_join.pig -r groupby.join.redirection .. raw:: html

    
.. code:: ipython3 %remote_cmd tail groupby.join.redirection.err .. raw:: html
    2015-10-29 01:15:15,416 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
    2015-10-29 01:15:15,416 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-10-29 01:15:15,416 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://nameservice1
    2015-10-29 01:15:17,285 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: HASH_JOIN,GROUP_BY
    2015-10-29 01:15:17,348 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
    2015-10-29 01:15:17,404 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator
    2015-10-29 01:15:17,426 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 6000: 
     Output Location Validation Failed for: 'hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt More info to follow:
    Output directory hdfs://nameservice1/user/xavierdupre/ConfLongDemo_JSI.small.group.join.txt already exists
    Details at logfile: /home/xavierdupre/pig_1446077714461.log

    
.. code:: ipython3 %remote_cmd hdfs dfs -ls ConfLongDemo_JSI.small.group.join.txt .. raw:: html
    Found 2 items
    -rw-r--r--   3 xavierdupre xavierdupre          0 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/_SUCCESS
    -rw-r--r--   3 xavierdupre xavierdupre     144059 2015-10-29 01:13 ConfLongDemo_JSI.small.group.join.txt/part-r-00000

    
.. code:: ipython3 %remote_cmd hdfs dfs -tail ConfLongDemo_JSI.small.group.join.txt/part-r-00000 .. raw:: html
    26262834000	27.05.2009 14:03:46:283	3.3038318157196045	1.938292145729065	0.7622964978218079	standing up from sitting	standing up from sitting	42
    652	A01	020-000-033-111	633790226262563704	27.05.2009 14:03:46:257	3.2363295555114746	2.00623106956482	1.1472841501235962	standing up from sitting	standing up from sitting	42
    651	A01	010-000-030-096	633790226262293413	27.05.2009 14:03:46:230	3.275949239730835	1.7746492624282837	0.3117055296897888	standing up from sitting	standing up from sitting	42
    650	A01	010-000-024-033	633790226262023117	27.05.2009 14:03:46:203	3.2498104572296143	1.878917098045349	0.13854867219924927	standing up from sitting	standing up from sitting	42
    649	A01	020-000-032-221	633790226261752823	27.05.2009 14:03:46:177	3.352446317672729	1.950886845588684	0.8281049728393555	standing up from sitting	standing up from sitting	42
    648	A01	020-000-033-111	633790226261482530	27.05.2009 14:03:46:147	3.2220029830932617	2.0042579174041752	1.032345414161682	standing up from sitting	standing up from sitting	42

    
.. raw:: html

Prolongements .. raw:: html

`PIG `__ n’est pas la seule façon d’exécuter des jobs Map/Reduce. `Hive `__ est un langage dont la syntaxe est très proche de celle du SQL. L’article `Comparing Pig Latin and SQL for Constructing Data Processing Pipelines `__ explicite les différences des deux approches. **langage haut niveau** Ce qu’il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compilé en une séquence d’opérations Map/Reduce transparente pour l’utilisateur. Le temps de développement est très réduit lorsqu’on le compare au même programme écrit en Java. Le compilateur construit un plan d’exécution (`quelques exemples ici `__) et infère le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela nécessite. **petits jeux** Certains jobs peuvent durer des heures, il est conseillée de les essayer sur des petits jeux de données avant de les faire tourner sur les vrais données. Il est toujours frustrant de s’apercevoir qu’un job a planté au bout de deux heures car une chaîne de caractères est vide et que ce cas n’a pas été prévu. Avec ces petits jeux, il est possible de faire tourner et conseillé de tester le job d’abord sur la passerelle (`exécution local `__) avant de le lancer sur le cluster. Avec pyensae, il faut ajouter l’option ``-local`` à la commande `pig_submit `__. **concaténer les fichiers divisés** Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un répertoire. La commande `getmerge `__ télécharge ces fichiers sur la passerelle et les fusionne en un seul. **ordre des lignes** Les jobs sont distribués, même en faisant rien (LOAD + STORE), il n’est pas garanti que l’ordre des lignes soit préservé. La probabilié que ce soit le cas est quasi nulle.