Spark et MLlib - ML

Links: notebook, html, PDF, python, slides, GitHub

Régression logisitique avec Spark.

from jyquickhelper import add_notebook_menu
add_notebook_menu()

MLlib est la librairie de machine learning distribué implémenté sur Spark et qui explique en partie son succès. La première mouture de la librairie était Mahout implémentée sur Hadoop. MLlib est devenu le standard. ML est la dernière version et s’appuie sur les DataFrame. On retrouve les mêmes concepts que ceux de scikit-learn tels que les Pipeline.

Data

import os
if not os.path.exists("data_adult.txt"):
    from pyquickhelper.filehelper import unzip_files
    unzip_files("data_adult.zip", where_to=".")
assert os.path.exists("data_adult.txt")
import pandas
df = pandas.read_csv("data_adult.txt", sep="\t", encoding="utf-8")
df.head()
age workclass fnlwgt education education_num marital_status occupation relationship race sex capital_gain capital_loss hours_per_week native_country target
0 39 State-gov 77516 Bachelors 13 Never-married Adm-clerical Not-in-family White Male 2174 0 40 United-States <=50K
1 50 Self-emp-not-inc 83311 Bachelors 13 Married-civ-spouse Exec-managerial Husband White Male 0 0 13 United-States <=50K
2 38 Private 215646 HS-grad 9 Divorced Handlers-cleaners Not-in-family White Male 0 0 40 United-States <=50K
3 53 Private 234721 11th 7 Married-civ-spouse Handlers-cleaners Husband Black Male 0 0 40 United-States <=50K
4 28 Private 338409 Bachelors 13 Married-civ-spouse Prof-specialty Wife Black Female 0 0 40 Cuba <=50K
df.dtypes
age                int64
workclass         object
fnlwgt             int64
education         object
education_num      int64
marital_status    object
occupation        object
relationship      object
race              object
sex               object
capital_gain       int64
capital_loss       int64
hours_per_week     int64
native_country    object
target            object
dtype: object
cols = list(filter(lambda tu: tu[1] != object, zip(range(len(df.columns)), df.dtypes)))
cols
[(0, dtype('int64')),
 (2, dtype('int64')),
 (4, dtype('int64')),
 (10, dtype('int64')),
 (11, dtype('int64')),
 (12, dtype('int64'))]
column_keep = set(_[0] for _ in cols)
column_keep
{0, 2, 4, 10, 11, 12}
df.to_csv("adult.txt", sep="\t", index=False, header=None)
data = sc.textFile("adult.txt")
col = data.take(2)
col
['39t State-govt77516t Bachelorst13t Never-marriedt Adm-clericalt Not-in-familyt Whitet Malet2174t0t40t United-Statest <=50K',
 '50t Self-emp-not-inct83311t Bachelorst13t Married-civ-spouset Exec-managerialt Husbandt Whitet Malet0t0t13t United-Statest <=50K']

Régression logistique (RDD)

On reprend l’exemple de la documentation : Linear Methods - RDD-based API. On exclue les variables catégorielles pour garder l’exemple concis.

def parsePoint(line):
    spl = line.split('\t')
    values = [float(x) for i, x in enumerate(spl) if i in column_keep]
    target = float(spl[-1].strip() == "<=50K")
    return LabeledPoint(target, values)

# We prepare the training data
parsedData = data.map(parsePoint)
parsedData.collect()[:2]
[LabeledPoint(1.0, [39.0,77516.0,13.0,2174.0,0.0,40.0]),
 LabeledPoint(1.0, [50.0,83311.0,13.0,0.0,0.0,13.0])]
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    spl = line.split('\t')
    values = [float(x) for i, x in enumerate(spl) if i in column_keep]
    target = float(spl[-1].strip() == "<=50K")
    return LabeledPoint(target, values)

# We prepare the training data
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

Pendant que ça tourne, il faut regarder la fenêtre terminal qui affiche les messages du serveur de notebook.

model.numClasses, model.numFeatures, model.weights
(2, 6, DenseVector([0.0045, 0.0, 0.0086, -0.0003, -0.0008, 0.009]))
from pyquickhelper.filehelper import remove_folder
def clean(folder):
    if os.path.exists(folder):
        return remove_folder(folder)
    else:
        return []
clean("target/pythonLogisticRegressionWithLBFGSModel")

# Save and load model
model.save(sc, "target/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc, "target/pythonLogisticRegressionWithLBFGSModel")
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
def filter_error(ys):
    return ys[0] != ys[1]
trainErr = labelsAndPreds.filter(filter_error).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
Training Error = 0.20217438039372254

Régression logisitique (DataFrame)

On s’inspire de l’exemple : Régression Logistique. Le code change, la préparation des données aussi. Les modèles acceptent comme entrées un vecteur colonne créé par un VectorAssembler.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
training = spark.createDataFrame(df)
training = training.withColumn('Y', training.target == " <=50K")
training = training.withColumn("Y", training.Y.astype('float'))
training = training.select(["age", "fnlwgt", "education_num", "capital_gain", "capital_loss",
                           "hours_per_week", "Y"])
assembler = VectorAssembler(
    inputCols=["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"],
    outputCol="features")
training = assembler.transform(training)
training.explain()
== Physical Plan ==
*Project [age#496L, fnlwgt#498L, education_num#500L, capital_gain#506L, capital_loss#507L, hours_per_week#508L, cast((target#510 =  <=50K) as float) AS Y#545, UDF(struct(cast(age#496L as double) AS age_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#571, cast(fnlwgt#498L as double) AS fnlwgt_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#572, cast(education_num#500L as double) AS education_num_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#573, cast(capital_gain#506L as double) AS capital_gain_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#574, cast(capital_loss#507L as double) AS capital_loss_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#575, cast(hours_per_week#508L as double) AS hours_per_week_double_VectorAssembler_4b1a9ef2a7fdcd07c46f#576)) AS features#577]
+- Scan ExistingRDD[age#496L,workclass#497,fnlwgt#498L,education#499,education_num#500L,marital_status#501,occupation#502,relationship#503,race#504,sex#505,capital_gain#506L,capital_loss#507L,hours_per_week#508L,native_country#509,target#510]
head = training.take(2)
head
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0])),
 Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]))]
training.schema
StructType(List(StructField(age,LongType,true),StructField(fnlwgt,LongType,true),StructField(education_num,LongType,true),StructField(capital_gain,LongType,true),StructField(capital_loss,LongType,true),StructField(hours_per_week,LongType,true),StructField(Y,FloatType,true),StructField(features,VectorUDT,true)))
training.groupby("Y").count().collect()
[Row(Y=1.0, count=24720), Row(Y=0.0, count=7841)]
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol='Y', featuresCol="features")

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
Coefficients: (6,[],[])
Intercept: 1.1482462553407051
prediction = lrModel.transform(training)
prediction.take(2)
[Row(age=39, fnlwgt=77516, education_num=13, capital_gain=2174, capital_loss=0, hours_per_week=40, Y=1.0, features=DenseVector([39.0, 77516.0, 13.0, 2174.0, 0.0, 40.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0),
 Row(age=50, fnlwgt=83311, education_num=13, capital_gain=0, capital_loss=0, hours_per_week=13, Y=1.0, features=DenseVector([50.0, 83311.0, 13.0, 0.0, 0.0, 13.0]), rawPrediction=DenseVector([-1.1482, 1.1482]), probability=DenseVector([0.2408, 0.7592]), prediction=1.0)]