.. _examplesparkrst: ====================== First steps on pyspark ====================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/notebook_eleves/2022-2023/example_spark.ipynb|*` Initialization -------------- Répertoire national des élus https://www.data.gouv.fr/fr/datasets/repertoire-national-des-elus-1/ .. code:: ipython3 !pip install pandas .. code:: ipython3 import pandas import pyspark Kubernetes ---------- .. code:: ipython3 from pyspark import SparkConf conf = SparkConf() conf.getAll() .. code:: ipython3 !kubectl api-resources .. code:: ipython3 !kubectl api-resources --help .. code:: ipython3 !kubectl get pods .. code:: ipython3 !kubectl exec sparksql-4fe7618772624ef3-exec-6 -- ls / .. code:: ipython3 !kubectl exec --help .. code:: ipython3 !kubectl cluster-info Dummy example ------------- .. code:: ipython3 %%writefile --help .. code:: ipython3 %%writefile dummpy.py import random import string import datetime from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import col, rand, randn from pyspark.sql.types import StructType, StructField, DoubleType, LongType sparkConf = SparkConf() spark = SparkSession.builder.config(conf=sparkConf).getOrCreate() sc = spark.sparkContext def randomString(stringLength=10): """Generate a random string of fixed length """ letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(stringLength)) r = randomString() print ("Random String is ", r ) path = "/opt/spark/work-dir/" + r start = datetime.datetime.now() print("spark job submitted: " + str(start)) dfrange = spark.range(0, 10e7+1, 1 , 32) #last value is chunk split of output file so // df_to_file = dfrange.select("id", rand(seed=5).alias("uniform"), randn(seed=5).alias("normal"), rand(seed=5).alias("uniform2"), randn(seed=5).alias("normal2"), rand(seed=5).alias("uniform3"), randn(seed=5).alias("normal3")) df_to_file.write.format("example_spark.csv").option("header","true").mode('overwrite').save(path) print("spark write finished: " + str(datetime.datetime.now() - start)) schema = StructType([StructField("id", LongType(), True), StructField("field1", DoubleType(), True), StructField("field2", DoubleType(), True), StructField("field3", DoubleType(), True), StructField("field4", DoubleType(), True), StructField("field5", DoubleType(), True), StructField("field6", DoubleType(), True)]) df_from_file = spark.read.load(path, format="csv", schema=schema, header=True) print("spark read finished: " + str(datetime.datetime.now() - start)) df_from_file.count() print("spark count finished: " + str(datetime.datetime.now() - start)) .. code:: ipython3 !spark-submit --master k8s://https://kubernetes.default.svc:443 --deploy-mode cluster \ --name sparksql --conf spark.executor.instances=2 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=default \ --conf spark.kubernetes.namespace=default \ --py-files my-pyspark-app.zip \ dummy.py Data ---- .. code:: ipython3 df = pandas.read_csv("https://www.data.gouv.fr/fr/datasets/r/d5f400de-ae3f-4966-8cb6-a85c70c6c24a", sep="\t") df.head() .. code:: ipython3 df.shape .. code:: ipython3 data = list(map(tuple, df.itertuples())) setlen = set(map(len, data)) setlen .. code:: ipython3 data[0] From pandas to spark -------------------- .. code:: ipython3 import pyspark from pyspark.sql import SparkSession .. code:: ipython3 spark = SparkSession.builder.appName("sparksql").getOrCreate() .. code:: ipython3 from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType cols = [StructField("index", IntegerType(), True)] for name, typ in zip(df.columns, df.dtypes): if "float64" in str(typ): cols.append(StructField(name, DoubleType(), True)) else: cols.append(StructField(name, StringType(), True)) schema = StructType(cols) encoded_name = schema.fields[0].name.encode('utf-8') schema .. code:: ipython3 dfs = spark.createDataFrame(data, schema=schema) dfs.show() .. code:: ipython3 dfs.columns .. code:: ipython3 dfs.printSchema() .. code:: ipython3 dfs.write.mode("overwrite").format("parquet").save("data/elu.parquet") .. code:: ipython3 dfs.write.csv("data/elu.csv") .. code:: ipython3 !hdfs dfs -ls .. code:: ipython3 !hdfs dfs -ls data/ HDFS ---- .. code:: ipython3 !hdfs --help .. code:: ipython3 !hdfs envvars .. code:: ipython3 !ls /opt/spark/conf .. code:: ipython3 cat /opt/spark/conf/spark-defaults.conf .. code:: ipython3 !kubectl --help .. code:: ipython3 !kubectl get pods -n spark .. code:: ipython3 !hdfs dfs --help .. code:: ipython3 !hdfs dfs -du Lecture ------- .. code:: ipython3 df2 = spark.read.csv("data/elu.csv/") Group by -------- .. code:: ipython3 dfs.printSchema() .. code:: ipython3 from pyspark.sql.functions import split from pyspark.sql.functions import col dfs_new = dfs.withColumn("année_naissance", split(col("Date de naissance"), "/")[2]) .. code:: ipython3 gr = dfs_new.groupBy("année_naissance").count() gr .. code:: ipython3 grexe = gr.collect() grexe .. code:: ipython3 pdf = gr.toPandas() pdf .. code:: ipython3 !pip install matplotlib .. code:: ipython3 pdf.plot.scatter(x="année_naissance", y="count") Mllib ----- .. code:: ipython3 !pip install scikit-learn .. code:: ipython3 from pyspark.ml.regression import LinearRegression .. code:: ipython3 from sklearn.datasets import make_regression X, y = make_regression(10000, 10) X.shape, y.shape .. code:: ipython3 df = pandas.DataFrame(X) df.columns = [f"f{i}" for i in range(len(df.columns))] df["Y"] = y df.head() .. code:: ipython3 dfs = spark.createDataFrame(df.itertuples()) .. code:: ipython3 dfs.head() .. code:: ipython3 from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=[f"f{i}" for i in range(10)], outputCol="features") data = assembler.transform(dfs) .. code:: ipython3 (trainingData, testData) = data.randomSplit([0.7, 0.3]) .. code:: ipython3 reg = LinearRegression(labelCol="Y", featuresCol="features") reg_train = reg.fit(trainingData) .. code:: ipython3 reg_train .. code:: ipython3 from pyspark.ml.evaluation import RegressionEvaluator predictions = reg_train.transform(testData) evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Y", metricName="rmse") rmse = evaluator.evaluate(predictions) print("RMSE = {}".format(rmse)) .. code:: ipython3 reg_train.save("linreg") .. code:: ipython3 !ls -l linreg stop ---- .. code:: ipython3 spark.stop()