PIG et Paramètres sur Azure - énoncé

Links: notebook, html, PDF, python, slides, GitHub

Manipulation de données JSON en Map/Reduce avec PIG sur HDInsight.

from jyquickhelper import add_notebook_menu
add_notebook_menu()

Paramètres

Les sites web produisent des données en continu. On utilise fréquemment le même script pour traiter les donnéesd’un jour, du lendemain, de jour d’après… Tous les jours, on veut récupérer la fréquentation de la veille. La seule chose qui change est la date des données qu’on veut traiter. Plutôt que de recopier un script en entier pour changer une date qui apparaît parfois à plusieurs endroits, il est préférable d’écrire un script ou la date apparaît comme une variable.

Ce notebook va illustrer ce procédé sur la construction d’un histogramme. Le paramètre du script sera la largeur des barres de l’histogramme (ou bin en anglais).

Connexion au cluster

On prend le cluster Cloudera. Il faut exécuter ce script pour pouvoir notifier au notebook que la variable params existe.

import pyensae
from pyquickhelper.ipythonhelper import open_html_form
params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"alias"}
open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
server + hadoop + credentials
blob_storage
hadoop_server
password1
password2
username
import pyensae
%load_ext pyensae
%load_ext pyenbc
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]
client, bs =  %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0xabaea50>,
 <azure.storage.blobservice.BlobService at 0xabaea90>)

Upload version

On commence par simuler des données.

import random
with open("random.sample.txt", "w") as f :
    for i in range(0,10000) :
        x = random.random()
        f.write(str(x)+"\n")

On uploade le fichier sur le cluster (il n’est pas besoin de créer le répertoire au préalable).

%blob_up random.sample.txt /$PSEUDO/random/random.sample.txt
'$PSEUDO/random/random.sample.txt'
%blob_ls /$PSEUDO/random
name last_modified content_type content_length blob_type
0 xavierdupre/random/random.sample.txt Thu, 27 Nov 2014 23:21:26 GMT application/octet-stream 202619 BlockBlob

PIG et paramètres

On indique un paramètre par le symbole : $bins. La valeur du paramètre est passé sous forme de chaîne de caractères au script et remplacée telle quelle dans le script. Il en va de même des constantes déclarées grâce au mot-clé %declare.

La sortie du script inclut le paramètre : cela permet de retrouver comment ces données ont été générées.

%%PIG histogram.pig

values = LOAD '$CONTAINER/$PSEUDO/random/random.sample.txt' USING PigStorage('\t') AS (x:double);

values_h = FOREACH values GENERATE x, ((int)(x / $bins)) * $bins AS h ;

hist_group = GROUP values_h BY h ;

hist = FOREACH hist_group GENERATE group, COUNT(values_h) AS nb ;

STORE hist INTO '$CONTAINER/$PSEUDO/random/histo_$bins.txt' USING PigStorage('\t') ;

Pour supprimer les précédents résultats :

if client.exists(bs, client.account_name, "$PSEUDO/random/histo_0.1.txt"):
    r = client.delete_folder (bs, client.account_name, "$PSEUDO/random/histo_0.1.txt")
    print(r)

On exécute le job. Comme la commande magique supportant les paramètres n’existe pas encore, il faut utiliser la variable client et sa méthode pig_submit qui fait la même chose. Elle upload le script puis le soumet.

jid = client.pig_submit(bs, client.account_name, "histogram.pig", params = dict(bins="0.1"), stop_on_failure=True )
{'id': 'job_1416874839254_0101'}
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["status"]["jobComplete"]
('job_1416874839254_0101', '100% complete', True)
%hd_tail_stderr jid["id"]

Job DAG:
job_1416874839254_0102


2014-11-27 23:29:00,446 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - No FileSystem for scheme: wasb. Not creating success file
2014-11-27 23:29:00,446 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.74.20.101:9010
2014-11-27 23:29:00,525 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2014-11-27 23:29:03,900 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!


On vérifie que tout s’est bien passé. La taille devrait être équivalent à l’entrée.

%blob_ls /$PSEUDO/random
name last_modified content_type content_length blob_type
0 xavierdupre/random/histo_0.1.txt Thu, 27 Nov 2014 23:28:55 GMT 0 BlockBlob
1 xavierdupre/random/histo_0.1.txt/_SUCCESS Thu, 27 Nov 2014 23:28:55 GMT application/octet-stream 0 BlockBlob
2 xavierdupre/random/histo_0.1.txt/part-r-00000 Thu, 27 Nov 2014 23:28:54 GMT application/octet-stream 131 BlockBlob
3 xavierdupre/random/random.sample.txt Thu, 27 Nov 2014 23:21:26 GMT application/octet-stream 202619 BlockBlob
import os
if os.path.exists("histo.txt") : os.remove("histo.txt")
%blob_downmerge /$PSEUDO/random/histo_0.1.txt histo.txt
'histo.txt'
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import pandas
df = pandas.read_csv("histo.txt", sep="\t",names=["bin","nb"])
df.plot(x="bin",y="nb",kind="bar")
<matplotlib.axes._subplots.AxesSubplot at 0x9b21d30>
../_images/pig_params_azure_23_1.png

Exercice 1 : min, max

Ajouter deux paramètres pour construire l’histogramme entre deux valeurs a,b.