First steps on pyspark#

Links: notebook, html, python, slides, GitHub

Initialization#

Répertoire national des élus https://www.data.gouv.fr/fr/datasets/repertoire-national-des-elus-1/

!pip install pandas
import pandas
import pyspark

Kubernetes#

from pyspark import SparkConf
conf = SparkConf()
conf.getAll()
!kubectl api-resources
!kubectl api-resources --help
!kubectl get pods
!kubectl exec sparksql-4fe7618772624ef3-exec-6 -- ls /
!kubectl exec --help
!kubectl cluster-info

Dummy example#

%%writefile --help
%%writefile dummpy.py

import random
import string
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, randn
from pyspark.sql.types import StructType, StructField, DoubleType, LongType
sparkConf = SparkConf()
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
def randomString(stringLength=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))
r = randomString()
print ("Random String is ", r )
path = "/opt/spark/work-dir/" + r
start = datetime.datetime.now()
print("spark job submitted: " + str(start))
dfrange = spark.range(0, 10e7+1, 1 , 32) #last value is chunk split of output file so //
df_to_file = dfrange.select("id",
              rand(seed=5).alias("uniform"),
              randn(seed=5).alias("normal"),
              rand(seed=5).alias("uniform2"),
              randn(seed=5).alias("normal2"),
              rand(seed=5).alias("uniform3"),
              randn(seed=5).alias("normal3"))
df_to_file.write.format("example_spark.csv").option("header","true").mode('overwrite').save(path)
print("spark write finished: " + str(datetime.datetime.now() - start))
schema = StructType([StructField("id", LongType(), True),
                   StructField("field1", DoubleType(), True),
                   StructField("field2", DoubleType(), True),
                   StructField("field3", DoubleType(), True),
                   StructField("field4", DoubleType(), True),
                   StructField("field5", DoubleType(), True),
                   StructField("field6", DoubleType(), True)])
df_from_file = spark.read.load(path,
                     format="csv",
                     schema=schema,
                     header=True)
print("spark read finished: " + str(datetime.datetime.now() - start))
df_from_file.count()
print("spark count finished: " + str(datetime.datetime.now() - start))
!spark-submit --master k8s://https://kubernetes.default.svc:443 --deploy-mode cluster \
        --name sparksql --conf spark.executor.instances=2 \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=default \
        --conf spark.kubernetes.namespace=default \
        --py-files my-pyspark-app.zip \
        dummy.py

Data#

df = pandas.read_csv("https://www.data.gouv.fr/fr/datasets/r/d5f400de-ae3f-4966-8cb6-a85c70c6c24a", sep="\t")
df.head()
df.shape
data = list(map(tuple, df.itertuples()))
setlen = set(map(len, data))
setlen
data[0]

From pandas to spark#

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparksql").getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

cols = [StructField("index", IntegerType(), True)]
for name, typ in zip(df.columns, df.dtypes):
    if "float64" in str(typ):
        cols.append(StructField(name, DoubleType(), True))
    else:
        cols.append(StructField(name, StringType(), True))
schema = StructType(cols)
encoded_name = schema.fields[0].name.encode('utf-8')
schema
dfs = spark.createDataFrame(data, schema=schema)
dfs.show()
dfs.columns
dfs.printSchema()
dfs.write.mode("overwrite").format("parquet").save("data/elu.parquet")
dfs.write.csv("data/elu.csv")
!hdfs dfs -ls
!hdfs dfs -ls data/

HDFS#

!hdfs --help
!hdfs envvars
!ls /opt/spark/conf
cat /opt/spark/conf/spark-defaults.conf
!kubectl --help
!kubectl get pods -n spark
!hdfs dfs --help
!hdfs dfs -du

Lecture#

df2 = spark.read.csv("data/elu.csv/")

Group by#

dfs.printSchema()
from pyspark.sql.functions import split
from pyspark.sql.functions import col

dfs_new = dfs.withColumn("année_naissance", split(col("Date de naissance"), "/")[2])
gr = dfs_new.groupBy("année_naissance").count()
gr
grexe = gr.collect()
grexe
pdf = gr.toPandas()
pdf
!pip install matplotlib
pdf.plot.scatter(x="année_naissance", y="count")

Mllib#

!pip install scikit-learn
from pyspark.ml.regression import LinearRegression
from sklearn.datasets import make_regression
X, y = make_regression(10000, 10)
X.shape, y.shape
df = pandas.DataFrame(X)
df.columns = [f"f{i}" for i in range(len(df.columns))]
df["Y"] = y
df.head()
dfs = spark.createDataFrame(df.itertuples())
dfs.head()
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[f"f{i}" for i in range(10)], outputCol="features")
data = assembler.transform(dfs)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
reg = LinearRegression(labelCol="Y", featuresCol="features")
reg_train = reg.fit(trainingData)
reg_train
from pyspark.ml.evaluation import RegressionEvaluator
predictions = reg_train.transform(testData)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Y", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("RMSE = {}".format(rmse))
reg_train.save("linreg")
!ls -l linreg

stop#

spark.stop()