# First steps on pyspark

## Initialization

Répertoire national des élus
https://www.data.gouv.fr/fr/datasets/repertoire-national-des-elus-1/

In [1]:
!pip install pandas

In [2]:
import pandas
import pyspark

## Kubernetes

In [3]:
from pyspark import SparkConf
conf = SparkConf()
conf.getAll()

In [4]:
!kubectl api-resources

In [5]:
!kubectl api-resources --help

In [6]:
!kubectl get pods

In [7]:
!kubectl exec sparksql-4fe7618772624ef3-exec-6 -- ls /

In [8]:
!kubectl exec --help

In [9]:
!kubectl cluster-info

## Dummy example

In [10]:
%%writefile --help

In [11]:
%%writefile dummpy.py

import random
import string
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, randn
from pyspark.sql.types import StructType, StructField, DoubleType, LongType
sparkConf = SparkConf()
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
def randomString(stringLength=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))
r = randomString()
print ("Random String is ", r )
path = "/opt/spark/work-dir/" + r
start = datetime.datetime.now()
print("spark job submitted: " + str(start))
dfrange = spark.range(0, 10e7+1, 1 , 32) #last value is chunk split of output file so //
df_to_file = dfrange.select("id",
              rand(seed=5).alias("uniform"),
              randn(seed=5).alias("normal"),
              rand(seed=5).alias("uniform2"),
              randn(seed=5).alias("normal2"),
              rand(seed=5).alias("uniform3"),
              randn(seed=5).alias("normal3"))
df_to_file.write.format("example_spark.csv").option("header","true").mode('overwrite').save(path)
print("spark write finished: " + str(datetime.datetime.now() - start))
schema = StructType([StructField("id", LongType(), True),
                   StructField("field1", DoubleType(), True),
                   StructField("field2", DoubleType(), True),
                   StructField("field3", DoubleType(), True),
                   StructField("field4", DoubleType(), True),
                   StructField("field5", DoubleType(), True),
                   StructField("field6", DoubleType(), True)])
df_from_file = spark.read.load(path,
                     format="csv",
                     schema=schema,
                     header=True)
print("spark read finished: " + str(datetime.datetime.now() - start))
df_from_file.count()
print("spark count finished: " + str(datetime.datetime.now() - start))

In [12]:
!spark-submit --master k8s://https://kubernetes.default.svc:443 --deploy-mode cluster \
        --name sparksql --conf spark.executor.instances=2 \
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=default \
        --conf spark.kubernetes.namespace=default \
        --py-files my-pyspark-app.zip \
        dummy.py

## Data

In [13]:
df = pandas.read_csv("https://www.data.gouv.fr/fr/datasets/r/d5f400de-ae3f-4966-8cb6-a85c70c6c24a", sep="\t")
df.head()

In [14]:
df.shape

In [15]:
data = list(map(tuple, df.itertuples()))
setlen = set(map(len, data))
setlen

In [16]:
data[0]

## From pandas to spark

In [17]:
import pyspark
from pyspark.sql import SparkSession

In [18]:
spark = SparkSession.builder.appName("sparksql").getOrCreate()

In [19]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

cols = [StructField("index", IntegerType(), True)]
for name, typ in zip(df.columns, df.dtypes):
    if "float64" in str(typ):
        cols.append(StructField(name, DoubleType(), True))
    else:
        cols.append(StructField(name, StringType(), True))
schema = StructType(cols)
encoded_name = schema.fields[0].name.encode('utf-8')
schema

In [20]:
dfs = spark.createDataFrame(data, schema=schema)
dfs.show()

In [21]:
dfs.columns

In [22]:
dfs.printSchema()

In [23]:
dfs.write.mode("overwrite").format("parquet").save("data/elu.parquet")

In [24]:
dfs.write.csv("data/elu.csv")

In [25]:
!hdfs dfs -ls

In [26]:
!hdfs dfs -ls data/

## HDFS

In [27]:
!hdfs --help

In [28]:
!hdfs envvars

In [29]:
!ls /opt/spark/conf

In [30]:
cat /opt/spark/conf/spark-defaults.conf

In [31]:
!kubectl --help

In [32]:
!kubectl get pods -n spark

In [33]:
!hdfs dfs --help

In [34]:
!hdfs dfs -du

## Lecture

In [35]:
df2 = spark.read.csv("data/elu.csv/")

## Group by

In [36]:
dfs.printSchema()

In [37]:
from pyspark.sql.functions import split
from pyspark.sql.functions import col

dfs_new = dfs.withColumn("année_naissance", split(col("Date de naissance"), "/")[2])

In [38]:
gr = dfs_new.groupBy("année_naissance").count()
gr

In [39]:
grexe = gr.collect()
grexe

In [40]:
pdf = gr.toPandas()
pdf

In [41]:
!pip install matplotlib

In [42]:
pdf.plot.scatter(x="année_naissance", y="count")

## Mllib

In [43]:
!pip install scikit-learn

In [44]:
from pyspark.ml.regression import LinearRegression

In [45]:
from sklearn.datasets import make_regression
X, y = make_regression(10000, 10)
X.shape, y.shape

In [46]:
df = pandas.DataFrame(X)
df.columns = [f"f{i}" for i in range(len(df.columns))]
df["Y"] = y
df.head()

In [47]:
dfs = spark.createDataFrame(df.itertuples())

In [48]:
dfs.head()

In [49]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[f"f{i}" for i in range(10)], outputCol="features")
data = assembler.transform(dfs)

In [50]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [51]:
reg = LinearRegression(labelCol="Y", featuresCol="features")
reg_train = reg.fit(trainingData)

In [52]:
reg_train

In [53]:
from pyspark.ml.evaluation import RegressionEvaluator
predictions = reg_train.transform(testData)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Y", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("RMSE = {}".format(rmse))

In [54]:
reg_train.save("linreg")

In [55]:
!ls -l linreg

## stop

In [56]:
spark.stop()