Code
import pyspark
import numpy as np
import pandas as pd
kakamana
April 9, 2023
The next step is to build two types of classification models: Decision Trees and Logistic Regression. Additionally, you will learn about a few approaches to data preparation.
This Classification is part of Datacamp course: Machine Learning with PySpark Spark is a powerful, general-purpose tool for working with large data sets. Spark transparently distributes compute tasks across a cluster. By doing this, operations are fast, but you can also focus on the analysis rather than worry about technical details. This course will teach you how to get data into Spark, and then dive into three fundamental Spark Machine Learning algorithms: Linear Regression, Logistic Regression/Classifiers, and creating pipelines. You will analyze a large dataset of flight delays and spam text messages along the way. With this background, you will be able to harness the power of Spark and apply it to your own Machine Learning projects.
This is my learning experience of data science through DataCamp. These repository contributions are part of my learning journey through my graduate program masters of applied data sciences (MADS) at University Of Michigan, DeepLearning.AI, Coursera & DataCamp. You can find my similar articles & more stories at my medium & LinkedIn profile. I am available at kaggle & github blogs & github repos. Thank you for your motivation, support & valuable feedback.
These include projects, coursework & notebook which I learned through my data science journey. They are created for reproducible & future reference purpose only. All source code, slides or screenshot are intellactual property of respective content authors. If you find these contents beneficial, kindly consider learning subscription from DeepLearning.AI Subscription, Coursera, DataCamp
You previously loaded airline flight data from a CSV file. You’re going to develop a model which will predict whether or not a given flight will be delayed.
In this exercise you need to trim those data down by:
23/04/09 23:12:39 WARN Utils: Your hostname, kamrans-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en1)
23/04/09 23:12:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/04/09 23:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/09 23:12:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10| 1| OO| 5836|ORD| 157| 8.18| 51| 27|
| 1| 4| 1| OO| 5866|ORD| 466| 15.5| 102| null|
| 11| 22| 1| OO| 6016|ORD| 738| 7.17| 127| -19|
| 2| 14| 5| B6| 199|JFK|2248| 21.17| 365| 60|
| 5| 25| 3| WN| 1675|SJC| 386| 12.92| 85| 22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
flights_drop_column = flights.drop('flight')
# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()
# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')
# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())
258289
The Federal Aviation Administration (FAA) considers a flight to be “delayed” when it arrives 15 minutes or more after its scheduled time.
The next step of preparing the flight data has two parts:
from pyspark.sql.functions import round
# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)).drop('mile')
# Create 'label' column indicating whether flight delayed (1) or not(0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))
# Check first five records
flights_km.show(5)
+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay| km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10| 1| OO|ORD| 8.18| 51| 27| 253.0| 1|
| 11| 22| 1| OO|ORD| 7.17| 127| -19|1188.0| 0|
| 2| 14| 5| B6|JFK| 21.17| 365| 60|3618.0| 1|
| 5| 25| 3| WN|SJC| 12.92| 85| 22| 621.0| 1|
| 3| 28| 1| B6|LGA| 13.33| 182| 70|1732.0| 1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows
In the flights data there are two columns, carrier and org, which hold categorical data. You need to transform those columns into indexed numerical values.
from pyspark.ml.feature import StringIndexer
# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')
# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)
# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)
# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(5)
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay| km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10| 1| OO|ORD| 8.18| 51| 27| 253.0| 1| 2.0| 0.0|
| 11| 22| 1| OO|ORD| 7.17| 127| -19|1188.0| 0| 2.0| 0.0|
| 2| 14| 5| B6|JFK| 21.17| 365| 60|3618.0| 1| 4.0| 2.0|
| 5| 25| 3| WN|SJC| 12.92| 85| 22| 621.0| 1| 3.0| 5.0|
| 3| 28| 1| B6|LGA| 13.33| 182| 70|1732.0| 1| 4.0| 3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows
The final stage of data preparation is to consolidate all of the predictor columns into a single column.
At present our data has the following predictor columns:
from pyspark.ml.feature import VectorAssembler
# Create an assembler object
assembler = VectorAssembler(inputCols=[
'mon', 'dom', 'dow',
'carrier_idx',
'org_idx',
'km', 'depart', 'duration'
], outputCol='features')
# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)
# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)
+-----------------------------------------+-----+
|features |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0] |27 |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19 |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60 |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0] |22 |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70 |
+-----------------------------------------+-----+
only showing top 5 rows
To objectively assess a Machine Learning model you need to be able to test it on an independent set of data. You can’t use the same data that you used to train the model: of course the model will perform (relatively) well on those data!
You will split the data into two components:
Now that you’ve split the flights data into training and testing sets, you can use the training set to fit a Decision Tree model
from pyspark.ml.classification import DecisionTreeClassifier
# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)
# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)
+-----+----------+----------------------------------------+
|label|prediction|probability |
+-----+----------+----------------------------------------+
|1 |0.0 |[0.6215102328863796,0.3784897671136203] |
|1 |0.0 |[0.6215102328863796,0.3784897671136203] |
|0 |1.0 |[0.3180768185524051,0.6819231814475949] |
|1 |1.0 |[0.3180768185524051,0.6819231814475949] |
|0 |1.0 |[0.36809815950920244,0.6319018404907976]|
+-----+----------+----------------------------------------+
only showing top 5 rows
You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.
A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:
prediction.groupBy('label', 'prediction').count().show()
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label = 1').count()
FP = prediction.filter('prediction = 1 AND label = 0').count()
# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
| 1| 0.0| 9580|
| 0| 0.0|16414|
| 1| 1.0|16535|
| 0| 1.0| 9210|
+-----+----------+-----+
0.6368310172210518
You’ve already built a Decision Tree model using the flights data. Now you’re going to create a Logistic Regression model on the same data.
The objective is to predict whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).
Although you have a variety of predictors at your disposal, you’ll only use the mon, depart and duration columns for the moment. These are numerical features which can immediately be used for a Logistic Regression model. You’ll need to do a little more work before you can include categorical features.
from pyspark.ml.classification import LogisticRegression
# Selecting numeric columns
flights_train_num = flights_train.select("mon", 'depart', 'duration', 'features', 'label')
flights_test_num = flights_test.select("mon", "depart", "duration", 'features', 'label')
# Create classifier object and train on training data
logistic = LogisticRegression().fit(flights_train_num)
# Create a predictions for the test data and show confusion matrix
prediction = logistic.transform(flights_test_num)
prediction.groupBy("label", "prediction").count().show()
23/04/09 23:50:05 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/04/09 23:50:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/09 23:50:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
| 1| 0.0| 9455|
| 0| 0.0|14931|
| 1| 1.0|16660|
| 0| 1.0|10693|
+-----+----------+-----+
Accuracy is generally not a very reliable metric because it can be biased by the most common target class.
There are two other useful metrics:
Precision is the proportion of positive predictions which are correct. For all flights which are predicted to be delayed, what proportion is actually delayed?
Recall is the proportion of positives outcomes which are correctly predicted. For all delayed flights, what proportion is correctly predicted by the model?
The precision and recall are generally formulated in terms of the positive target class. But it’s also possible to calculate weighted versions of these metrics which look at both target classes.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall = {:.2f}'.format(precision, recall))
# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})
# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})
precision = 0.61
recall = 0.64
At the end of the previous chapter you loaded a dataset of SMS messages which had been labeled as either “spam” (label 1) or “ham” (label 0). You’re now going to use those data to build a classifier model.
But first you’ll need to prepare the SMS messages as follows:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Specify column names and types
schema = StructType([
StructField("id", IntegerType()),
StructField("text", StringType()),
StructField("label", IntegerType())
])
# Load data from a delimited file
sms = spark.read.csv('dataset/sms.csv', sep=';', header=False, schema=schema)
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer
# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))
# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))
# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)
wrangled.show(4, truncate=False)
+---+----------------------------------+-----+------------------------------------------+
|id |text |label|words |
+---+----------------------------------+-----+------------------------------------------+
|1 |Sorry I'll call later in meeting |0 |[sorry, i'll, call, later, in, meeting] |
|2 |Dont worry I guess he's busy |0 |[dont, worry, i, guess, he's, busy] |
|3 |Call FREEPHONE now |1 |[call, freephone, now] |
|4 |Win a cash prize or a prize worth |1 |[win, a, cash, prize, or, a, prize, worth]|
+---+----------------------------------+-----+------------------------------------------+
only showing top 4 rows
The next steps will be to remove stop words and then apply the hashing trick, converting the results into a TF-IDF.
A quick reminder about these concepts:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF
sms = wrangled.select('id', 'words', 'label')
# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms').transform(sms)
# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024).transform(wrangled)
# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features').fit(wrangled).transform(wrangled)
tf_idf.select('terms', 'features').show(4, truncate=False)
+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms |features |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting] |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|
|[dont, worry, guess, busy] |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |
|[call, freephone] |(1024,[133,138],[5.367951058306837,2.273418200008753]) |
|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114]) |
+--------------------------------+----------------------------------------------------------------------------------------------------+
only showing top 4 rows
The SMS data have now been prepared for building a classifier. Specifically, this is what you have done:
Next you’ll need to split the TF-IDF data into training and testing sets. Then you’ll use the training data to fit a Logistic Regression model and finally evaluate the performance of that model on the testing data.
sms = tf_idf.select('label', 'features')
# Split the data into training and test sets
sms_train, sms_test = sms.randomSplit([0.8, 0.2], seed=13)
# Fit a Logistic Regression model to the training data
logistic = LogisticRegression(regParam=0.2).fit(sms_train)
# Make predictions on the test data
prediction = logistic.transform(sms_test)
# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()
23/04/10 00:09:36 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/04/10 00:09:36 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
| 1| 0.0| 39|
| 0| 0.0| 932|
| 1| 1.0| 121|
| 0| 1.0| 4|
+-----+----------+-----+