.. _sparkfirststepsrst: ======================= Premiers pas avec Spark ======================= .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/spark_local/spark_first_steps.ipynb|*` Introduction à `Spark `__ et aux `RDD `__. .. code:: ipython3 %matplotlib inline .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Deux ou trois petites choses à ne pas oublier --------------------------------------------- Local et cluster ~~~~~~~~~~~~~~~~ `Spark `__ n’est pas un langage de programmation mais un environnement de calcul distribué. L’installation en locale reproduit ce que `Spark `__ donnerait à grande échelle sur un cluster mais ce n’est pas rigoureusement identique. En particulier cela veut dire que si votre script tourne en local sur un petit jeu de données, il est possible qu’il échoue sur le cluster : - Les dépendances du script sont installées en local mais pas sur chaque machine du cluster `Spark `__. Cela peut se faire à l’installation du cluster pour des dépendances conséquentes ou juste avant l’exécution d’un *job* pour des dépendances ponctuelles. - Les données sur le cluster sont en plus grand nombre, il est fort probable que l’échantillon aléatoire local ne soit pas représentatif. - Les chemins locaux ne fonctionnent pas sur le cluster. Il faudra d’abord uploader les données sur le cluster pour faire tourner le script. - Débugger est compliqué : les print ne marchent pas souvent, surtout si c’est en distribué. Le print va s’exécuter sur une machine distance qui est à mille lieues de votre écran. Quand ça plante sur une machine distante, il faut s’accrocher. Le pire, c’est quand l’erreur arrive pour une observation toute bizarre après cinq heures de calcul. Si le message d’erreur n’est pas trop incompréhensible, on sen tire. En fait, le plus agaçant, c’est quand le calcul est carrément interrompu par le cluster au bout de cinq heures car il décrète que les probabilités d’aboutir sont quasi nulles. Là, on connaît l’erreur (skewed dataset) et on sait qu’on va souffrir pour construire la contournante. Spark et RDD ~~~~~~~~~~~~ `Spark `__ ne manipule pas des fichiers mais des `Resilient Distributed Dataset `__ ou *RDD*. En particulier : 1. Les *RDD* sont organisés en ligne : ce sont des blocs qui ne seront jamais *cassés* ni *modifié*. Ces lignes ne peuvent pas excéder 2 Go (voir `SPARK-6235 `__) mais il est conseillé de ne pas aller au-delà de quelques Mo. 2. Sauf exception, il est impossible d’accéder à une partie du fichier. Il faut le parcourir en entier (il n’y a pas d’index). 3. Les *RDD* fonctionnent comme des *flux* ou *stream*. On peut soit les lire, soit les écrire mais jamais les deux en même temps. Par conséquent, on ne peut pas modifier un *RDD*, il faut toujours en créer un autre. 4. Les *RDD* sont distribués. L’ordre des lignes qui le composent n’est pas prévisible. 5. Comme l’ordre est imprévisible, on ne stocke **jamais** les noms des colonnes dans les *RDD*. Les partitions ~~~~~~~~~~~~~~ Il existe une exception au point 2 : les `partitions `__. Une partition est un ensemble de lignes traitées par le même processus. La parallélisation ne peut excéder le nombre de partitions. Par défaut, c’est aléatoire (hash hash). Mais on peut tout-à-fait partionner selon une colonne, deux colonnes. D’ailleurs, c’est là-dessus qu’on joue pour optimiser la distribution. Si on réduit (ou grouper) selon une colonne, c’est d’autant plus rapide si le stream est déjà partitionnée sur cette colonne. Spark et Python ~~~~~~~~~~~~~~~ Spark est implémenté en Java. L’API `Python `__ permet de faire beaucoup de choses mais : - Elle ne sera jamais aussi complète que l’API `Java `__. - Elle sera plus lente que l’API `Java `__ ou `Scala `__ (car `Scala `__ est une surcouche fonctionnelle de `Java `__). Librairies sur Spark ~~~~~~~~~~~~~~~~~~~~ Un des succès de Spark est de proposer des extensions dédiées à certains usages comme `MLlib `__ qui implémente des algorihmes de machine learning distribués, `GraphX `__ pour des algorithmes sur des graphes. `MLlib `__ sera bientôt remplacé par ML qui s’appuie sur les `DataFrame `__. Erreur : Cannot run program “python” ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Il vous manque probablement ``PYSPARK_PYTHON``. .. code:: ipython3 import os for o, v in sorted(os.environ.items()): if "SPARK" in o.upper(): print("{0:25}= {1}".format(o, v.replace(os.environ["USERNAME"], ""))) .. parsed-literal:: LOCAL_PYSPARK = c:\rdupre\spark-2.2.0-bin-hadoop2.7 PYSPARK_DRIVER_PYTHON = jupyter-notebook PYSPARK_PYTHON = c:\Python36_x64\python PYSPARK_SUBMIT_ARGS = "--name" "PySparkShell" "pyspark-shell" SPARK_CMD = set PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell" && jupyter-notebook SPARK_ENV_LOADED = 1 SPARK_HIVE = true SPARK_HOME = c:\rdupre\spark-2.2.0-bin-hadoop2.7\bin\.. SPARK_JARS_DIR = "c:\rdupre\spark-2.2.0-bin-hadoop2.7\bin\..\jars" SPARK_SCALA_VERSION = 2.10 _SPARK_CMD_USAGE = Usage: bin\pyspark.cmd [options] Erreur : Output directory file:/… already exists ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Spark n’aime pas écrire des données dans un RDD qui existe déjà. Il faut le supprimer. Tout dépend de l’environnement où on se trouve, sur Hadoop ou en local. Comme c’est en local, nous ferons : .. code:: ipython3 from pyquickhelper.filehelper import remove_folder def clean(folder): if os.path.exists(folder): return remove_folder(folder) else: return [] clean("fichier.out.txt") .. parsed-literal:: [] Vérifier que Spark en local fonctionne ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ On essaye le *“hello world”* en *Spark* qui consiste à compter les mots dans un fichier. On prend le fichier du notebook. .. code:: ipython3 text_file = sc.textFile("spark_first_steps.ipynb") counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("fichier.out.txt") .. code:: ipython3 os.listdir("fichier.out.txt/") .. parsed-literal:: ['.part-00000.crc', '.part-00001.crc', '._SUCCESS.crc', 'part-00000', 'part-00001', '_SUCCESS'] Sortie en plusieurs fichiers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Un *job* *Spark* est distribué. La sortie d’un *job* *Spark* s’effectue sous la forme de plusieurs stream dans un répertoire, un stream par processus. Cela explique la présence de *part-00000*, *part-00001*. Le fichier ``_SUCCESS`` indique le statut du job. .. code:: ipython3 %load_ext pyensae %head fichier.out.txt/part-00000 -n 3 .. raw:: html
    ('', 11686)
    ('[collect](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect)', 1)
    ('SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html)\\n",', 1)

    
Le format dépend du dernier résultat. Les opérations de bases ----------------------- Documentation : `programming-guide.html - transformations `__. Dans cette section, on considère les données comme un ensemble de lignes de texte. Rien de plus. Donc, pas d’information de type, des conversions quasiment tout le temps. Bref, c’est utile pour comprendre. On y revient quand le reste ne marche pas. En général, on commence par `Spark SQL `__. Ah oui j’oubliais, on s’en sert beaucoup quand les données ne sont pas structurées et sont décrites par du JSON, genre des logs d’un site internet. Chaque ligne est en fait un gros JSON. On utilise un jeu de données de machine learning `Adult `__ légèrement pré-traités et que vous pourrez trouver sur GitHub : `td3a_spark `__. .. code:: ipython3 import os if not os.path.exists("data_adult.txt"): from pyquickhelper.filehelper import unzip_files unzip_files("data_adult.zip", where_to=".") assert os.path.exists("data_adult.txt") .. code:: ipython3 import pandas df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8") df.head() .. raw:: html
age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
2 38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
3 53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
4 28 Private 338409 Bachelors 13 Married-civ-spouse Prof-specialty Wife Black Female 0 0 40 Cuba <=50K
On enlève le nom des colonnes. .. code:: ipython3 df.to_csv("adult.txt", sep="\t", encoding="utf-8", index=False, header=None) .. code:: ipython3 %head adult.txt -n 2 .. raw:: html
    39	 State-gov	77516	 Bachelors	13	 Never-married	 Adm-clerical	 Not-in-family	 White	 Male	2174	0	40	 United-States	 <=50K
    50	 Self-emp-not-inc	83311	 Bachelors	13	 Married-civ-spouse	 Exec-managerial	 Husband	 White	 Male	0	0	13	 United-States	 <=50K

    
déclaration d’un RDD ~~~~~~~~~~~~~~~~~~~~ La déclaration déclare l’existence d’un *RDD* comme on déclare un fichier. Pour l’instant aucune manipulation. .. code:: ipython3 rdd = sc.textFile("adult.txt") enregistrement d’un RDD ~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 import os if not os.path.exists("out"): os.mkdir("out") .. code:: ipython3 clean("out/copy_adult.txt") rdd.saveAsTextFile(os.path.abspath("out/copy_adult.txt")) .. code:: ipython3 %head out/copy_adult.txt/part-00000 -n 2 .. raw:: html
    39	 State-gov	77516	 Bachelors	13	 Never-married	 Adm-clerical	 Not-in-family	 White	 Male	2174	0	40	 United-States	 <=50K
    50	 Self-emp-not-inc	83311	 Bachelors	13	 Married-civ-spouse	 Exec-managerial	 Husband	 White	 Male	0	0	13	 United-States	 <=50K

    
lecture locale d’un RDD avec pandas ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ On lit chaque morceaux avant de les concaténer. .. code:: ipython3 import glob import pandas def read_rdd(path, **options): pat = os.path.join(path, "part*") all_files = glob.glob(pat) if len(all_files) == 0: raise Exception("No file to read in '{0}'".format(path)) merge = [] for f in all_files: try: df = pandas.read_csv(f, header=None, **options) except Exception as e: raise Exception("Unable to read '{0}'".format(f)) from e merge.append(df) if len(merge) == 0: raise Exception("No file to read in '{0}'".format(path)) concatenated_df = pandas.concat(merge, ignore_index=True) return concatenated_df data = read_rdd("out/copy_adult.txt", sep="\t", encoding="utf-8") data.head(n=2) .. raw:: html
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
collect ~~~~~~~ Cette opération regroupe les deux précédentes en une seule. Il faut toute de même faire attention de ne pas l’exécuter sur un grand fichier sous peine de faire exploser la mémoire. .. code:: ipython3 res = rdd.collect() .. code:: ipython3 res[:2] .. parsed-literal:: ['39\t State-gov\t77516\t Bachelors\t13\t Never-married\t Adm-clerical\t Not-in-family\t White\t Male\t2174\t0\t40\t United-States\t <=50K', '50\t Self-emp-not-inc\t83311\t Bachelors\t13\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t13\t United-States\t <=50K'] .. code:: ipython3 import pandas df = pandas.DataFrame([_.split("\t") for _ in res]) df.head(2) .. raw:: html
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
map ~~~ Transformer une ligne en une autre ligne. Chaque ligne est traitée indépendemment des autres. .. code:: ipython3 def extract_column(cols, row): spl = row.split("\t") return [spl[i].strip() for i in cols] res = rdd.map(lambda row: extract_column([1,3], row)) res.collect()[:2] .. parsed-literal:: [['State-gov', 'Bachelors'], ['Self-emp-not-inc', 'Bachelors']] filter ~~~~~~ Garder ou jeter une ligne. Chaque ligne est traitée indépendemment des autres. .. code:: ipython3 def filter_column(row): spl = row.split("\t") return spl[-1].strip() != "<=50K" res = rdd.filter(lambda row: filter_column(row)) res.collect()[:2] .. parsed-literal:: ['52\t Self-emp-not-inc\t209642\t HS-grad\t9\t Married-civ-spouse\t Exec-managerial\t Husband\t White\t Male\t0\t0\t45\t United-States\t >50K', '31\t Private\t45781\t Masters\t14\t Never-married\t Prof-specialty\t Not-in-family\t White\t Female\t14084\t0\t50\t United-States\t >50K'] On combine souvent les deux : .. code:: ipython3 def filter_column_split(row): return row[-1].strip() != "<=50K" res = rdd.map(lambda row: extract_column([1,3,-1], row)) \ .filter(lambda row: filter_column_split(row)) res.collect()[:2] .. parsed-literal:: [['Self-emp-not-inc', 'HS-grad', '>50K'], ['Private', 'Masters', '>50K']] Il faut faire attention aux transformations successives des lignes. flatMap ~~~~~~~ C’est la principale différence avec SQL. Une ligne peut devenir un nombre variable de lignes. .. code:: ipython3 def extract_column_and_multiply_row(n, row): spl = row.split("\t") return [tuple(_.strip() for _ in spl)] * n res = rdd.flatMap(lambda row: extract_column_and_multiply_row(2, row)) res.collect()[:3] .. parsed-literal:: [('39', 'State-gov', '77516', 'Bachelors', '13', 'Never-married', 'Adm-clerical', 'Not-in-family', 'White', 'Male', '2174', '0', '40', 'United-States', '<=50K'), ('39', 'State-gov', '77516', 'Bachelors', '13', 'Never-married', 'Adm-clerical', 'Not-in-family', 'White', 'Male', '2174', '0', '40', 'United-States', '<=50K'), ('50', 'Self-emp-not-inc', '83311', 'Bachelors', '13', 'Married-civ-spouse', 'Exec-managerial', 'Husband', 'White', 'Male', '0', '0', '13', 'United-States', '<=50K')] group / reduce + mapValues ~~~~~~~~~~~~~~~~~~~~~~~~~~ Petite moyenne ? .. code:: ipython3 def extract_age_rich(row): spl = row.split("\t") target = spl[-1].strip() age = float(spl[0]) return (age, target) def custom_agg(aggset): temp = list([_[0] for _ in aggset]) return len(temp), sum(temp) ave = rdd.map(extract_age_rich).groupBy(lambda row: row[1]).mapValues(custom_agg) fin = ave.collect() fin .. parsed-literal:: [('>50K', (7841, 346963.0)), ('<=50K', (24720, 909294.0))] sort ~~~~ Je n’en parle pas. Trier un gros jeu de données est à proscrire. On peut trier au sein d’un groupe mais **jamais** un stream entier. Ca fait presque dix ans que j’écris des jobs map/reduce, je n’ai jamais écrit un *sort* sur tout un jeu de données. Ca s’appelle flinguer de la CPU pour rien. join ~~~~ Et on remet la moyenne dans le stream initial. Il vaut mieux regarder la documentation de la méthode `join `__ avant de commencer à lire le code qui suit. .. code:: ipython3 add_key = rdd.map(lambda row: row.split("\t")).map(lambda row: (row[-1].strip(), row)) join = add_key.join(ave) join.collect()[:2] .. parsed-literal:: [('>50K', (['52', ' Self-emp-not-inc', '209642', ' HS-grad', '9', ' Married-civ-spouse', ' Exec-managerial', ' Husband', ' White', ' Male', '0', '0', '45', ' United-States', ' >50K'], (7841, 346963.0))), ('>50K', (['31', ' Private', '45781', ' Masters', '14', ' Never-married', ' Prof-specialty', ' Not-in-family', ' White', ' Female', '14084', '0', '50', ' United-States', ' >50K'], (7841, 346963.0)))] On commence à comprendre pourquoi `Spark SQL `__, ça risque d’être pas mal. le choix existentiel du join : le petit join ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ On fait souvent une opération qui consiste à garder les lignes pour lesquelles une certaine valeur appartient à un ensemble. On peut faire un join classique ou alors l’ensemble est petit, traiter ce join comme un map. On broadcaste l’ensemble à chaque processus exécutant le map. .. code:: ipython3 from pyspark.context import SparkContext ages = sc.broadcast([20, 30, 40]) ages.value .. parsed-literal:: [20, 30, 40] .. code:: ipython3 subset = rdd.filter(lambda row: int(row.split("\t")[0]) in ages.value ) subset.collect()[:2] .. parsed-literal:: ['30\t State-gov\t141297\t Bachelors\t13\t Married-civ-spouse\t Prof-specialty\t Husband\t Asian-Pac-Islander\t Male\t0\t0\t40\t India\t >50K', '40\t Private\t121772\t Assoc-voc\t11\t Married-civ-spouse\t Craft-repair\t Husband\t Asian-Pac-Islander\t Male\t0\t0\t40\t ?\t >50K'] les trucs qui servent parfois parce que … à l’usage ça sert ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Ce que font les méthodes associées aux `RDD `__, un peu comme les itérateurs, n’est pas toujours intuitif, mais il est à peu près sûr qu’elles vous serviront un jour (peut-être après avoir googlé ou bingé comme des fous). .. code:: ipython3 simple_rdd = sc.parallelize([2, 3, 4]) simple_rdd.collect() .. parsed-literal:: [2, 3, 4] .. code:: ipython3 simple_rdd.flatMap(lambda x: range(1, x)).collect() .. parsed-literal:: [1, 1, 2, 1, 2, 3] `histogram `__, `groupByKey `__ le truc à retenir ~~~~~~~~~~~~~~~~~ collect, collect… qu’est-ce que je voulais dire déjà… Ah oui… Un job map/reduce c’est : 1. La déclaration des flux d’entrées. 2. Le traitement à proprement parler. 3. La déclaration des flux de sorties. A moins d’écrire du java bas niveau, le job est transformé en un plan d’exécution qui n’est jamais exécuté si `collect `__ ou `save machin chouette `__ n’est jamais exécuté. Bref, c’est du `lazy `__. Spark DataFrame --------------- `Spark SQL `__ Au début, ça commence par… créer un dataframe. Et comme pour pandas, ces objets retienennt les noms et les types. .. code:: ipython3 import pandas data = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8") data.head(2) .. raw:: html
age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
.. code:: ipython3 if "spark" not in locals(): from pyspark.sql import SparkSession spark = SparkSession.builder.appName("nimportequoi").getOrCreate() # à ne faire qu'une fois .. code:: ipython3 # sdf = spark.createDataFrame(data) # ça marche sdf = spark.read.csv("data_adult.txt", sep="\t", encoding="utf-8") .. code:: ipython3 sdf.show() .. parsed-literal:: +---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+ |_c0| _c1| _c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10| _c11| _c12| _c13| _c14| +---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+ |age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target| | 39| State-gov| 77516| Bachelors| 13| Never-married| Adm-clerical| Not-in-family| White| Male| 2174| 0| 40| United-States| <=50K| | 50| Self-emp-not-inc| 83311| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 13| United-States| <=50K| | 38| Private|215646| HS-grad| 9| Divorced| Handlers-cleaners| Not-in-family| White| Male| 0| 0| 40| United-States| <=50K| | 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K| | 28| Private|338409| Bachelors| 13| Married-civ-spouse| Prof-specialty| Wife| Black| Female| 0| 0| 40| Cuba| <=50K| | 37| Private|284582| Masters| 14| Married-civ-spouse| Exec-managerial| Wife| White| Female| 0| 0| 40| United-States| <=50K| | 49| Private|160187| 9th| 5| Married-spouse-a...| Other-service| Not-in-family| Black| Female| 0| 0| 16| Jamaica| <=50K| | 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K| | 31| Private| 45781| Masters| 14| Never-married| Prof-specialty| Not-in-family| White| Female| 14084| 0| 50| United-States| >50K| | 42| Private|159449| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 5178| 0| 40| United-States| >50K| | 37| Private|280464| Some-college| 10| Married-civ-spouse| Exec-managerial| Husband| Black| Male| 0| 0| 80| United-States| >50K| | 30| State-gov|141297| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Asian-Pac-Islander| Male| 0| 0| 40| India| >50K| | 23| Private|122272| Bachelors| 13| Never-married| Adm-clerical| Own-child| White| Female| 0| 0| 30| United-States| <=50K| | 32| Private|205019| Assoc-acdm| 12| Never-married| Sales| Not-in-family| Black| Male| 0| 0| 50| United-States| <=50K| | 40| Private|121772| Assoc-voc| 11| Married-civ-spouse| Craft-repair| Husband| Asian-Pac-Islander| Male| 0| 0| 40| ?| >50K| | 34| Private|245487| 7th-8th| 4| Married-civ-spouse| Transport-moving| Husband| Amer-Indian-Eskimo| Male| 0| 0| 45| Mexico| <=50K| | 25| Self-emp-not-inc|176756| HS-grad| 9| Never-married| Farming-fishing| Own-child| White| Male| 0| 0| 35| United-States| <=50K| | 32| Private|186824| HS-grad| 9| Never-married| Machine-op-inspct| Unmarried| White| Male| 0| 0| 40| United-States| <=50K| | 38| Private| 28887| 11th| 7| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K| +---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+ only showing top 20 rows Conversion à pandas ~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 df = sdf.toPandas() .. code:: ipython3 df.head() .. raw:: html
_c0 _c1 _c2 _c3 _c4 _c5 _c6 _c7 _c8 _c9 _c10 _c11 _c12 _c13 _c14
0 age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
1 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
2 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
3 38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
4 53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
Retour aux RDD ~~~~~~~~~~~~~~ .. code:: ipython3 sdf.rdd .. parsed-literal:: MapPartitionsRDD[59] at javaToPython at null:-2 Récuperer le schéma ~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 sdf.schema .. parsed-literal:: StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,StringType,true),StructField(_c5,StringType,true),StructField(_c6,StringType,true),StructField(_c7,StringType,true),StructField(_c8,StringType,true),StructField(_c9,StringType,true),StructField(_c10,StringType,true),StructField(_c11,StringType,true),StructField(_c12,StringType,true),StructField(_c13,StringType,true),StructField(_c14,StringType,true))) .. code:: ipython3 sdf.printSchema() .. parsed-literal:: root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |-- _c7: string (nullable = true) |-- _c8: string (nullable = true) |-- _c9: string (nullable = true) |-- _c10: string (nullable = true) |-- _c11: string (nullable = true) |-- _c12: string (nullable = true) |-- _c13: string (nullable = true) |-- _c14: string (nullable = true) Utiliser pandas pour spécifer le format ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ On utilise pandas sur une partie du stream. .. code:: ipython3 import pandas df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8") df.head(n=2) .. raw:: html
age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
.. code:: ipython3 sdf = spark.createDataFrame(df) .. code:: ipython3 sdf.printSchema() .. parsed-literal:: root |-- age: long (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: long (nullable = true) |-- education: string (nullable = true) |-- education_num: long (nullable = true) |-- marital_status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: long (nullable = true) |-- capital_loss: long (nullable = true) |-- hours_per_week: long (nullable = true) |-- native_country: string (nullable = true) |-- target: string (nullable = true) .. code:: ipython3 fullsdf = spark.createDataFrame(sdf.rdd, sdf.schema) .. code:: ipython3 fullsdf.printSchema() .. parsed-literal:: root |-- age: long (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: long (nullable = true) |-- education: string (nullable = true) |-- education_num: long (nullable = true) |-- marital_status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: long (nullable = true) |-- capital_loss: long (nullable = true) |-- hours_per_week: long (nullable = true) |-- native_country: string (nullable = true) |-- target: string (nullable = true) Enregistrement au format parquet ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 fullsdf.write.parquet("data_adult.schema.parquet") Relecture du format parquet ~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: ipython3 newsdf = spark.read.parquet("data_adult.schema.parquet/") .. code:: ipython3 newsdf.printSchema() .. parsed-literal:: root |-- age: long (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: long (nullable = true) |-- education: string (nullable = true) |-- education_num: long (nullable = true) |-- marital_status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital_gain: long (nullable = true) |-- capital_loss: long (nullable = true) |-- hours_per_week: long (nullable = true) |-- native_country: string (nullable = true) |-- target: string (nullable = true) Dataframe Spark VS Dataframe pandas ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Spark a reproduit la même interface que pandas pour ses dataframes excepté que le résultat n’est pas calculé tant qu’on ne choisit pas de sauvegarder le résultat. .. code:: ipython3 fifty = fullsdf [fullsdf.age > 50] .. code:: ipython3 fifty.show() .. parsed-literal:: +---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+ |age| workclass|fnlwgt| education|education_num| marital_status| occupation| relationship| race| sex|capital_gain|capital_loss|hours_per_week|native_country|target| +---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+ | 53| Private|234721| 11th| 7| Married-civ-spouse| Handlers-cleaners| Husband| Black| Male| 0| 0| 40| United-States| <=50K| | 52| Self-emp-not-inc|209642| HS-grad| 9| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 45| United-States| >50K| | 54| Private|302146| HS-grad| 9| Separated| Other-service| Unmarried| Black| Female| 0| 0| 20| United-States| <=50K| | 59| Private|109015| HS-grad| 9| Divorced| Tech-support| Unmarried| White| Female| 0| 0| 40| United-States| <=50K| | 56| Local-gov|216851| Bachelors| 13| Married-civ-spouse| Tech-support| Husband| White| Male| 0| 0| 40| United-States| >50K| | 54| ?|180211| Some-college| 10| Married-civ-spouse| ?| Husband| Asian-Pac-Islander| Male| 0| 0| 60| South| >50K| | 53| Self-emp-not-inc| 88506| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K| | 57| Federal-gov|337895| Bachelors| 13| Married-civ-spouse| Prof-specialty| Husband| Black| Male| 0| 0| 40| United-States| >50K| | 53| Private|144361| HS-grad| 9| Married-civ-spouse| Machine-op-inspct| Husband| White| Male| 0| 0| 38| United-States| <=50K| | 53| Private|169846| HS-grad| 9| Married-civ-spouse| Adm-clerical| Wife| White| Female| 0| 0| 40| United-States| >50K| | 79| Private|124744| Some-college| 10| Married-civ-spouse| Prof-specialty| Other-relative| White| Male| 0| 0| 20| United-States| <=50K| | 67| ?|212759| 10th| 6| Married-civ-spouse| ?| Husband| White| Male| 0| 0| 2| United-States| <=50K| | 52| Private|276515| Bachelors| 13| Married-civ-spouse| Other-service| Husband| White| Male| 0| 0| 40| Cuba| <=50K| | 59| Private|159937| HS-grad| 9| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 48| United-States| <=50K| | 53| Private|346253| HS-grad| 9| Divorced| Sales| Own-child| White| Female| 0| 0| 35| United-States| <=50K| | 57| Private|249977| Assoc-voc| 11| Married-civ-spouse| Prof-specialty| Husband| White| Male| 0| 0| 40| United-States| <=50K| | 76| Private|124191| Masters| 14| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 40| United-States| >50K| | 56| Self-emp-not-inc|335605| HS-grad| 9| Married-civ-spouse| Other-service| Husband| White| Male| 0| 1887| 50| Canada| >50K| | 53| Private| 95647| 9th| 5| Married-civ-spouse| Handlers-cleaners| Husband| White| Male| 0| 0| 50| United-States| <=50K| | 56| Self-emp-inc|303090| Some-college| 10| Married-civ-spouse| Sales| Husband| White| Male| 0| 0| 50| United-States| <=50K| +---+-----------------+------+-------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+ only showing top 20 rows