.. _pigreservoirsamplingazurecorrectionrst: ========================================================== 3A.mr - Reservoir Sampling distribué - énoncé - correction ========================================================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/pig_hive/pig_reservoir_sampling_azure_correction.ipynb|*` Correction. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: création d’un fichier à sampler ------------------------------- .. code:: ipython3 with open("sample4.txt", "w", encoding="utf8") as f: for i in range(0,100000): f.write("{0}\t{1}{0}\n".format(i, chr(i%26 + 65))) f.write("100001\tAAAAAA") .. code:: ipython3 %load_ext pyensae %head sample4.txt .. raw:: html
    0	A0
    1	B1
    2	C2
    3	D3
    4	E4
    5	F5
    6	G6
    7	H7
    8	I8
    9	J9

    
connexion --------- .. code:: ipython3 import os blobhp = {} if "HDCREDENTIALS" in os.environ: blobhp["blob_storage"], blobhp["password1"], blobhp["hadoop_server"], blobhp["password2"], blobhp["username"] = \ os.environ["HDCREDENTIALS"].split("**") r = type(blobhp) else: from pyquickhelper.ipythonhelper import open_html_form params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"axavier"} r = open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp") r .. parsed-literal:: dict .. code:: ipython3 import pyensae %load_ext pyensae %load_ext pyenbc blobstorage = blobhp["blob_storage"] blobpassword = blobhp["password1"] hadoop_server = blobhp["hadoop_server"] hadoop_password = blobhp["password2"] username = blobhp["username"] .. code:: ipython3 client, bs = %hd_open client, bs .. parsed-literal:: (, ) upload du fichier ----------------- .. code:: ipython3 %blob_up sample3.txt /$PSEUDO/sampling/sample4.txt .. parsed-literal:: '$PSEUDO/sampling/sample4.txt' .. code:: ipython3 %blob_ls /$PSEUDO/sampling .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/out_sampled_rs4_2015.txt Fri, 13 Nov 2015 01:08:22 GMT 0 BlockBlob
2 axavier/sampling/out_sampled_rs4_2015.txt/_SUC... Fri, 13 Nov 2015 01:08:22 GMT application/octet-stream 0 BlockBlob
3 axavier/sampling/out_sampled_rs4_2015.txt/part... Fri, 13 Nov 2015 01:08:21 GMT application/octet-stream 12785 BlockBlob
4 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
5 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
6 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
7 axavier/sampling/sample4.txt Sun, 15 Nov 2015 12:24:22 GMT application/octet-stream 1377793 BlockBlob
8 axavier/sampling/sample4_hash.txt Fri, 13 Nov 2015 14:50:39 GMT 0 BlockBlob
9 axavier/sampling/sample4_hash.txt/_SUCCESS Fri, 13 Nov 2015 14:50:39 GMT application/octet-stream 0 BlockBlob
10 axavier/sampling/sample4_hash.txt/part-r-00000 Fri, 13 Nov 2015 14:50:38 GMT application/octet-stream 4771358 BlockBlob
11 axavier/sampling/sampled4_2015.txt Fri, 13 Nov 2015 00:50:20 GMT 0 BlockBlob
12 axavier/sampling/sampled4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:50:20 GMT application/octet-stream 0 BlockBlob
13 axavier/sampling/sampled4_2015.txt/part-m-00000 Fri, 13 Nov 2015 00:50:19 GMT application/octet-stream 1277794 BlockBlob
14 axavier/sampling/sampled_rs4_2015.txt Fri, 13 Nov 2015 01:04:51 GMT 0 BlockBlob
15 axavier/sampling/sampled_rs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 01:04:51 GMT application/octet-stream 0 BlockBlob
16 axavier/sampling/sampled_rs4_2015.txt/part-m-0... Fri, 13 Nov 2015 01:04:50 GMT application/octet-stream 1277794 BlockBlob
17 axavier/sampling/sampled_srs4_2015.txt Fri, 13 Nov 2015 00:56:09 GMT 0 BlockBlob
18 axavier/sampling/sampled_srs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 0 BlockBlob
19 axavier/sampling/sampled_srs4_2015.txt/part-m-... Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 1277794 BlockBlob
20 axavier/sampling/sampled_srs_2015.txt Fri, 13 Nov 2015 00:52:34 GMT 0 BlockBlob
21 axavier/sampling/sampled_srs_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 0 BlockBlob
22 axavier/sampling/sampled_srs_2015.txt/part-m-0... Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 1277794 BlockBlob
Code python pour le reservoir sampling -------------------------------------- .. code:: ipython3 ensemble = [ "%d%s" % (i, chr(i%26 + 97)) for i in range(0,10000)] ensemble[:5] .. parsed-literal:: ['0a', '1b', '2c', '3d', '4e'] .. code:: ipython3 import random def reservoir_sampling(ensemble, k): N = len(ensemble) echantillon = [] for i, e in enumerate(ensemble): if len(echantillon) < k: echantillon.append(e) else: j = random.randint(0, i) if j < k: echantillon[j] = e return echantillon reservoir_sampling(ensemble, 10) .. parsed-literal:: ['8681x', '8356k', '5490e', '4405l', '5890o', '2689l', '8672o', '3603p', '8599t', '6086c'] python à jython --------------- On s’assure que le code précédent fonctionne en jython (python compilé en java). On s’inspire pour cela de la documentation `jython-udfs `__. On créé d’abord un script PIG pour récupérer le schema et les premières lignes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 %%PIG sample_explore.pig ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; DESCRIBE ensemble; ens_group = GROUP ensemble ALL; DESCRIBE ens_group; sampled = FOREACH ens_group GENERATE FLATTEN(ensemble); DESCRIBE sampled; --ens10 = LIMIT ensemble 10; --ens_group10 = LIMIT en_group10 ; --DUMP ens10; --DUMP ens_group10; Si la fonction suivante provoque une erreur :: :: AzureException: STATUS: 403, JSON: Expecting value: line 1 column 1 (char 0) unable to submit job: sample_explore.pig Vérifier les identifiants utilisés pour se connecter. .. code:: ipython3 jid = %hd_pig_submit sample_explore.pig jid .. parsed-literal:: {'id': 'job_1446540516812_0185'} .. code:: ipython3 st = %hd_job_status jid["id"] (st["id"],st["percentComplete"],st["completed"], st["status"]["jobComplete"],st["status"]["state"]) .. parsed-literal:: ('job_1446540516812_0185', None, None, False, 'RUNNING') La sortie standard contient les informations souhaitées : .. code:: ipython3 %hd_tail_stderr jid["id"] -n 5 .. raw:: html
    2015-11-15 12:33:06,608 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-11-15 12:33:06,608 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net
    2015-11-15 12:33:08,233 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-11-15 12:33:09,374 [main] INFO  org.apache.pig.Main - Pig script completed in 4 seconds and 578 milliseconds (4578 ms)

    

OUT:
    ensemble: {x: int,v: chararray}
    ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}}
    sampled: {ensemble::x: int,ensemble::v: chararray}

    
Et la sortie du second dump :: :: (all,{(100001,AAAAAA),(99999,D99999),(99998,C99998)... Le code Jython ~~~~~~~~~~~~~~ .. code:: ipython3 import pyensae .. code:: ipython3 %%PYTHON reservoir_sampling.py import random @outputSchemaFunction("rsSchema") def reservoir_sampling(ensemble): ensemble = eval(ensemble) k = 10 N = len(ensemble) echantillon = [] for i, e in enumerate(ensemble): if len(echantillon) < k: echantillon.append(e) else: j = random.randint(0, i) if j < k: echantillon[j] = e return echantillon @schemaFunction("rsSchema") def rsSchema(input): return input .. code:: ipython3 %%jython reservoir_sampling.py reservoir_sampling {(100001,"AAAAAA"),(99999,"D99999"),(99998,"C99998")} .. raw:: html
    [(99998, 'C99998'), (99999, 'D99999'), (100001, 'AAAAAA')]

    
On ajoute le code jython au script précédent : .. code:: ipython3 %%PIG sample_explore_complete.pig REGISTER '$CONTAINER/$SCRIPTPIG/reservoir_sampling.py' using jython as myrs; ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; DESCRIBE ensemble; ens_group = GROUP ensemble ALL; DESCRIBE ens_group; sampled = FOREACH ens_group GENERATE FLATTEN(myrs.reservoir_sample(ensemble)); DESCRIBE sampled; STORE sampled INTO INTO '$CONTAINER/$PSEUDO/sampling/sample_rs.txt' USING PigStorage(); .. code:: ipython3 jid = %hd_pig_submit sample_explore_complete.pig -d reservoir_sampling.py jid .. parsed-literal:: {'id': 'job_1446540516812_0229'} .. code:: ipython3 st = %hd_job_status jid["id"] (st["id"],st["percentComplete"],st["completed"], st["status"]["jobComplete"],st["status"]["state"]) .. parsed-literal:: ('job_1446540516812_0229', None, 'done', False, 'RUNNING') .. code:: ipython3 %hd_tail_stderr jid["id"] -n 100 .. raw:: html
    15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
    15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
    15/11/15 18:43:49 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
    2015-11-15 18:43:49,598 [main] INFO  org.apache.pig.Main - Apache Pig version 0.14.0.2.2.7.1-33 (r: unknown) compiled Oct 13 2015, 04:18:06
    2015-11-15 18:43:49,598 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log
    2015-11-15 18:43:50,848 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
    2015-11-15 18:43:51,145 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
    2015-11-15 18:43:51,145 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-11-15 18:43:51,145 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net
    2015-11-15 18:43:51,879 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-11-15 18:43:52,192 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=D:\Users\hdp\AppData\Local\Temp\pig_jython_3357684506669481882
    2015-11-15 18:43:54,817 [main] WARN  org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing.
    2015-11-15 18:43:57,645 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myrs.reservoir_sampling
    2015-11-15 18:43:58,535 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
    2015-11-15 18:43:59,660 [main] ERROR org.apache.pig.PigServer - exception during parsing: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    Failed to parse: Pig script failed to parse: 
     Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199)
    	at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735)
    	at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443)
    	at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387)
    	at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300)
    	at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    	at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    	at org.apache.pig.Main.run(Main.java:495)
    	at org.apache.pig.Main.main(Main.java:170)
    Caused by: 
     Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572)
    	at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372)
    	at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051)
    	at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810)
    	at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159)
    	at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629)
    	at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452)
    	at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590)
    	at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982)
    	at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849)
    	at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)
    	at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102)
    	at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
    	at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
    	at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
    	... 10 more
    Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677)
    	at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793)
    	at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569)
    	... 24 more
    2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1000: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1748)
    	at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443)
    	at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387)
    	at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300)
    	at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    	at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    	at org.apache.pig.Main.run(Main.java:495)
    	at org.apache.pig.Main.main(Main.java:170)
    Caused by: Failed to parse: Pig script failed to parse: 
     Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199)
    	at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735)
    	... 9 more
    Caused by: 
     Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572)
    	at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372)
    	at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051)
    	at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810)
    	at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159)
    	at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629)
    	at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452)
    	at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590)
    	at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982)
    	at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849)
    	at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)
    	at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102)
    	at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
    	at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
    	at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
    	... 10 more
    Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    	at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677)
    	at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793)
    	at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569)
    	... 24 more

    Details also at logfile: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log
    2015-11-15 18:43:59,754 [main] INFO  org.apache.pig.Main - Pig script completed in 10 seconds and 453 milliseconds (10453 ms)

    

OUT:
    ensemble: {x: int,v: chararray}
    ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}}

    
A corriger plus tard. Dans l’immédiat, on utilisera la librairie `datafu `__. Si le cluster ne reconnaît pas la librairie, voir la section java pour comprendre comment l’importer. On la déclare dans le script par l’instruction ``REGISTER``. .. code:: ipython3 %%PIG sample_explore_datafu.pig REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar'; DEFINE RS datafu.pig.sampling.ReservoirSample('1000'); ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; DESCRIBE ensemble; ens_group = GROUP ensemble ALL; DESCRIBE ens_group; sampled = FOREACH ens_group GENERATE FLATTEN(RS(ensemble)); DESCRIBE sampled; STORE sampled INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs.txt' USING PigStorage(); .. code:: ipython3 jid = %hd_pig_submit sample_explore_datafu.pig jid .. parsed-literal:: {'id': 'job_1446540516812_0193'} .. code:: ipython3 st = %hd_job_status jid["id"] (st["id"],st["percentComplete"],st["completed"], st["status"]["jobComplete"],st["status"]["state"]) .. parsed-literal:: ('job_1446540516812_0193', '50% complete', None, False, 'RUNNING') .. code:: ipython3 %hd_tail_stderr jid["id"] -n 100 .. raw:: html

    

.. code:: ipython3 %blob_ls /$PSEUDO/sampling/sample_datafu .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_datafu_rs.txt Sun, 15 Nov 2015 13:23:40 GMT 0 BlockBlob
1 axavier/sampling/sample_datafu_rs.txt/_SUCCESS Sun, 15 Nov 2015 13:23:40 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_datafu_rs.txt/part-r-0... Sun, 15 Nov 2015 13:23:38 GMT application/octet-stream 12780 BlockBlob
version distribuée ------------------ Astuce : on distribue puis on recombine les échantillons en faisant un dernier reservoir sampling mais pondéré. Comment distribuer ? Le second sampling est remplacé par une méthode d’échantillonage classique car le reservoir sampling pondéré n’est pas disponible dans la librairie datafu version 1.2.0. .. code:: ipython3 %%PIG sample_explore_datafu_dist.pig REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar'; DEFINE RS datafu.pig.sampling.ReservoirSample('1000'); DEFINE WeightedSample datafu.pig.sampling.WeightedSample(); ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; DESCRIBE ensemble; keys = FOREACH ensemble GENERATE x, v, x%10 AS key; DESCRIBE keys; ens_group = GROUP keys BY key ; DESCRIBE ens_group; sampled = FOREACH ens_group GENERATE COUNT(keys) AS weigth, FLATTEN(RS(keys)); DESCRIBE sampled; wsampled = FOREACH (GROUP sampled ALL) GENERATE FLATTEN(WeightedSample(sampled, 0, 1000)); DESCRIBE wsampled; STORE wsampled INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs_dist2.txt' USING PigStorage(); .. code:: ipython3 jid = %hd_pig_submit sample_explore_datafu_dist.pig jid .. parsed-literal:: {'id': 'job_1446540516812_0238'} .. code:: ipython3 st = %hd_job_status jid["id"] (st["id"],st["percentComplete"],st["completed"], st["status"]["jobComplete"],st["status"]["state"]) .. parsed-literal:: ('job_1446540516812_0238', '100% complete', 'done', True, 'SUCCEEDED') .. code:: ipython3 %hd_tail_stderr jid["id"] -n 10 .. raw:: html
    2015-11-15 19:22:17,553 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
    2015-11-15 19:22:17,553 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
    2015-11-15 19:22:17,615 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
    2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
    2015-11-15 19:22:17,865 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-11-15 19:22:17,943 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
    2015-11-15 19:22:17,975 [main] INFO  org.apache.pig.Main - Pig script completed in 1 minute, 42 seconds and 839 milliseconds (102839 ms)

    

OUT:
    ensemble: {x: int,v: chararray}
    keys: {x: int,v: chararray,key: int}
    ens_group: {group: int,keys: {(x: int,v: chararray,key: int)}}
    sampled: {weigth: long,datafu.pig.sampling.reservoirsample_keys_4::x: int,datafu.pig.sampling.reservoirsample_keys_4::v: chararray,datafu.pig.sampling.reservoirsample_keys_4::key: int}
    wsampled: {datafu.pig.sampling.weightedsample_sampled_12::weigth: long,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::x: int,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::v: chararray,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::key: int}

    
.. code:: ipython3 %blob_ls /$PSEUDO/sampling/sample_datafu_rs_dist2 .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_datafu_rs_dist2.txt Sun, 15 Nov 2015 19:22:05 GMT 0 BlockBlob
1 axavier/sampling/sample_datafu_rs_dist2.txt/_S... Sun, 15 Nov 2015 19:22:06 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_datafu_rs_dist2.txt/pa... Sun, 15 Nov 2015 19:22:05 GMT application/octet-stream 20770 BlockBlob
.. code:: ipython3 df = %blob_head /$PSEUDO/sampling/sample_datafu_rs_dist2.txt -m .. code:: ipython3 df.head() .. raw:: html
10001 21260 S21260 0
0 10000 25191 X25191 1
1 10000 73760 Y73760 0
2 10000 90105 P90105 5
3 10000 46070 Y46070 0
4 10001 58590 M58590 0
version distribuée améliorée ---------------------------- Le problème de la version précédente : chaque sous-ensemble traité d’un seul bloc utilise une séquence de nombres aléatoires sur laquelle on ne connaît pas grand chose. Si les mêmes *seed* sont utilisées, il est possible que les séquences, même si elles simulent le hasard, soient extrêmement corrélées entre chaque bloc. Il faut remédier à cela. Il faut également s’assurer que chaque bloc n’est pas *skewed*. .. code:: ipython3 %%PIG_azure script_rs.pig REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar'; DEFINE MD5 datafu.pig.hash.MD5(); DEFINE RS datafu.pig.sampling.ReservoirSample('1000'); DEFINE WeightedSample datafu.pig.sampling.WeightedSample(); ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; DESCRIBE ensemble; ens_group = GROUP ensemble BY (x,v); DESCRIBE ens_group; compte_group = FOREACH ens_group GENERATE group.x AS x, group.v AS v, COUNT(ensemble) AS nb_ligne ; DESCRIBE compte_group; hash_group = FOREACH compte_group GENERATE x, v, nb_ligne, SUBSTRING(MD5(v), 0, 1) AS val; DESCRIBE hash_group; group_hash = GROUP hash_group BY val ; DESCRIBE group_hash; rs_parall = FOREACH group_hash GENERATE COUNT(hash_group) AS nb_hash, FLATTEN(RS(hash_group)) ; DESCRIBE rs_parall; wsampled = FOREACH (GROUP rs_parall ALL) GENERATE FLATTEN(WeightedSample(rs_parall, 0, 1000)); DESCRIBE wsampled; STORE wsampled INTO '$CONTAINER/$PSEUDO/sampling/sample_distributed_hash.txt' USING PigStorage(); .. code:: ipython3 jid=%hd_pig_submit script_rs.pig jid .. parsed-literal:: {'id': 'job_1446540516812_0244'} .. code:: ipython3 st = %hd_job_status jid["id"] (st["id"],st["percentComplete"],st["completed"], st["status"]["jobComplete"],st["status"]["state"]) .. parsed-literal:: ('job_1446540516812_0244', '100% complete', None, False, 'RUNNING') .. code:: ipython3 %hd_tail_stderr jid["id"] -n 10 .. raw:: html
    2015-11-15 19:52:05,138 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
    2015-11-15 19:52:05,138 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
    2015-11-15 19:52:05,200 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
    2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
    2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
    2015-11-15 19:52:05,513 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
    2015-11-15 19:52:05,560 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
    2015-11-15 19:52:05,607 [main] INFO  org.apache.pig.Main - Pig script completed in 2 minutes, 29 seconds and 962 milliseconds (149962 ms)

    

OUT:
    ensemble: {x: int,v: chararray}
    ens_group: {group: (x: int,v: chararray),ensemble: {(x: int,v: chararray)}}
    compte_group: {x: int,v: chararray,nb_ligne: long}
    hash_group: {x: int,v: chararray,nb_ligne: long,val: chararray}
    group_hash: {group: chararray,hash_group: {(x: int,v: chararray,nb_ligne: long,val: chararray)}}
    rs_parall: {nb_hash: long,datafu.pig.sampling.reservoirsample_hash_group_4::x: int,datafu.pig.sampling.reservoirsample_hash_group_4::v: chararray,datafu.pig.sampling.reservoirsample_hash_group_4::nb_ligne: long,datafu.pig.sampling.reservoirsample_hash_group_4::val: chararray}
    wsampled: {datafu.pig.sampling.weightedsample_rs_parall_12::nb_hash: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::x: int,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::v: chararray,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::nb_ligne: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::val: chararray}

    
.. code:: ipython3 %blob_ls /$PSEUDO/sampling/sample_distributed_hash.txt .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_distributed_hash.txt Sun, 15 Nov 2015 19:51:56 GMT 0 BlockBlob
1 axavier/sampling/sample_distributed_hash.txt/_... Sun, 15 Nov 2015 19:51:56 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_distributed_hash.txt/p... Sun, 15 Nov 2015 19:51:55 GMT application/octet-stream 21750 BlockBlob
.. code:: ipython3 df =%blob_head /$PSEUDO/sampling/sample_distributed_hash.txt -m df.head() .. raw:: html
6693 27244 W27244 1 6
0 6749 51104 O51104 1 1
1 6605 91527 H91527 1 6
2 6630 75027 R75027 1 4
3 6789 58148 M58148 1 1
4 6659 71659 D71659 1 5
.. code:: ipython3 %blob_downmerge /$PSEUDO/sampling/sample_distributed_hash.txt sample_distributed_hash.txt .. parsed-literal:: 'sample_distributed_hash.txt' .. code:: ipython3 %head sample_distributed_hash.txt .. raw:: html
    6693	27244	W27244	1	6
    6749	51104	O51104	1	1
    6605	91527	H91527	1	6
    6630	75027	R75027	1	4
    6789	58148	M58148	1	1
    6659	71659	D71659	1	5
    6811	74380	U74380	1	9
    6749	20125	B20125	1	2
    6587	33466	E33466	1	5
    6587	21645	N21645	1	5

    
version java ------------ On s’inspire de l’exemple suivant `Sampling `__. On télécharge `datafu 1.2 `__ depuis `Maven `__. Ce n’est pas la dernière version mais suivre les instructions pour *builder* datafu (voir `documentation `__). En particulier, la version pondérée du reservoir sampling n’est pas disponible (voir `history `__, la version 1.2.0 est sorti en décembre 2013). L’implémentation `java `__ n’a pas l’air de résoudre un problème qui peut survenir si la taille de l’échantillon demandée est trop grande. Voir section suivante. .. code:: ipython3 import pyensae.datasource pyensae.datasource.download_data("datafu-1.2.0.jar", url="http://central.maven.org/maven2/com/linkedin/datafu/datafu/1.2.0/") .. parsed-literal:: 'datafu-1.2.0.jar' .. code:: ipython3 %blob_up datafu-1.2.0.jar /$PSEUDO/sampling/datafu-1.2.0.jar .. parsed-literal:: '$PSEUDO/sampling/datafu-1.2.0.jar' .. code:: ipython3 %blob_ls /$PSEUDO/sampling .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
2 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
3 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
4 axavier/sampling/sample4.txt Fri, 13 Nov 2015 00:41:49 GMT application/octet-stream 1377793 BlockBlob
.. code:: ipython3 %%PIG_azure sample.pig REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar'; DEFINE RS datafu.pig.sampling.ReservoirSample('1000'); dset = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' USING PigStorage('\t') AS (x:int, v:chararray) ; sampled = FOREACH (GROUP dset ALL) GENERATE FLATTEN(RS(dset)); STORE sampled INTO '$CONTAINER/$PSEUDO/sampling/out_sampled_rs4_2015.txt' USING PigStorage() ; .. code:: ipython3 jid = %hd_pig_submit sample.pig .. code:: ipython3 st = %hd_job_status jid["id"] st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"] .. parsed-literal:: ('job_1446540516812_0136', None, None, False, 'RUNNING') .. code:: ipython3 %hd_tail_stderr jid["id"] -n 10 .. raw:: html

    

.. code:: ipython3 %blob_ls /$PSEUDO/sampling .. raw:: html
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/out_sampled_rs4_2015.txt Fri, 13 Nov 2015 01:08:22 GMT 0 BlockBlob
2 axavier/sampling/out_sampled_rs4_2015.txt/_SUC... Fri, 13 Nov 2015 01:08:22 GMT application/octet-stream 0 BlockBlob
3 axavier/sampling/out_sampled_rs4_2015.txt/part... Fri, 13 Nov 2015 01:08:21 GMT application/octet-stream 12785 BlockBlob
4 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
5 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
6 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
7 axavier/sampling/sample4.txt Fri, 13 Nov 2015 00:41:49 GMT application/octet-stream 1377793 BlockBlob
8 axavier/sampling/sampled4_2015.txt Fri, 13 Nov 2015 00:50:20 GMT 0 BlockBlob
9 axavier/sampling/sampled4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:50:20 GMT application/octet-stream 0 BlockBlob
10 axavier/sampling/sampled4_2015.txt/part-m-00000 Fri, 13 Nov 2015 00:50:19 GMT application/octet-stream 1277794 BlockBlob
11 axavier/sampling/sampled_rs4_2015.txt Fri, 13 Nov 2015 01:04:51 GMT 0 BlockBlob
12 axavier/sampling/sampled_rs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 01:04:51 GMT application/octet-stream 0 BlockBlob
13 axavier/sampling/sampled_rs4_2015.txt/part-m-0... Fri, 13 Nov 2015 01:04:50 GMT application/octet-stream 1277794 BlockBlob
14 axavier/sampling/sampled_srs4_2015.txt Fri, 13 Nov 2015 00:56:09 GMT 0 BlockBlob
15 axavier/sampling/sampled_srs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 0 BlockBlob
16 axavier/sampling/sampled_srs4_2015.txt/part-m-... Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 1277794 BlockBlob
17 axavier/sampling/sampled_srs_2015.txt Fri, 13 Nov 2015 00:52:34 GMT 0 BlockBlob
18 axavier/sampling/sampled_srs_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 0 BlockBlob
19 axavier/sampling/sampled_srs_2015.txt/part-m-0... Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 1277794 BlockBlob
.. code:: ipython3 %blob_downmerge /$PSEUDO/sampling/out_sampled_rs4_2015.txt out_sampled_rs4_2015.txt -o .. parsed-literal:: 'out_sampled_rs4_2015.txt' .. code:: ipython3 %head out_sampled_rs4_2015.txt .. raw:: html
    90648	M90648
    49678	S49678
    41434	Q41434
    30149	P30149
    15836	C15836
    61110	K61110
    3838	Q3838
    81515	F81515
    48052	E48052
    16332	E16332

    
fin --- .. code:: ipython3 %blob_close .. parsed-literal:: True version avec itérateur ----------------------