Data Analysis, Programming
Introduction to PySpark via AWS EMR and Hands-on EDA
Performing EDA on NY Taxi Fare Dataset to see PySpark in action — because cloud computing is the next big thing!
Introducing the Technologies
What is Spark and PySpark — Spark SQL and Spark MLlib?
From Wikipedia, Spark by Apache is an open-source analytics engine for large-scale data processing. It enables programmers to work upon data stored in multiple clusters with inherent data parallelism and fault tolerance.
At the base of the Spark engine are Resilient Distributed Databases (RDDs) that are a set of data items maintained over a cluster of machines in a fault-tolerant manner. These RDDs were developed to overcome the limitation of the Map-Reduce paradigm that forced a linear flow of data in programs by reading from disk, mapping and reducing and writing back to disk. RDDs on the other hand, function as a working set across clusters and offer a restricted form of shared memory during execution of the program.
Spark Core is the foundation of the overall project. It provides distributed task dispatching, scheduling, and basic I/O functionalities, exposed through an API centered on the RDD abstraction discussed above. Spark SQL is a component on top of Spark Core that introduced a data abstraction called DataFrames, which provides support for structured and semi-structured data. It provides both a domain-specific language (similar to pandas) to manipulate DataFrames and basic SQL support, with CLIs and ODBC servers. Similarly, Spark MLlib is a distributed machine learning framework on top of Spark Core.
Basically, PySpark is a Python API for the Scala-written Spark framework. This API contains libraries and wrappers that allow the programmer to leverage the functionalities and features of the underlying SparkSQL and Spark MLlib modules, among others.
What is AWS EMR?
Amazon Elastic Map Reduce is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink and Apache Hudi. Amazon EMR makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.
Basically, it is distributed data processing using Spark on the cloud.
Setting it Up — AWS EMR vs Local PySpark Instance
There are two options that a programmer has in using the Spark framework. He can either decide to install it on his local system or use a service provided by some cloud provider.
If he goes with the first option, then the cores of the processor in the local system act as the clusters and parallelism is achieved at the core level. The problem with this approach is that some part of the RAM is already reserved as a working memory which is used to combine the results after distributed processing on the data is completed. This along with the RAM specifications on personal laptops today (8GB or 16GB) make this option very unattractive; the data processing is understandably very slow. On the other hand, using clusters in the cloud on a pay-as-you-use model offer unlimited options of system configurations and are clearly the better choice.
A brief on how to setup a cluster and a notebook on AWS EMR follows:
- Click on the link above. Then click on ‘Clusters’ on the side bar.
- Click on the blue ‘Create Cluster’ and go to ‘Advanced Options’
After the cluster has been created, go to the ‘Notebooks’ on the sidebar and create a notebook associated with the cluster. And you are good to go!
The Dataset
Now that we have had a brief look at the different technologies we will be using and decided upon a configuration option, let’s look at the dataset we will be working on.
The dataset can be downloaded on Kaggle and contains 55 million records of trips of taxis in New York — perfect for big data and Spark. It has the following features:
- key: this is the timestamp of the start of the trip
- fare amount: the fare for the trip (in dollars)
- pickup_datetime: the same as the key, this is the timestamp of the start of the trip
- pickup_longitude: the longitude co-ordinates of the pickup
- pickup_latitude: the latitude co-ordinates of the pickup
- dropoff_longitude: the longitude co-ordinates of the drop
- dropoff_latitude: the latitude co-ordinates of the drop
- passenger_count: the number of passengers in the taxi for the trip
The objective of this problem statement is to apply appropriate EDA and feature engineering to predict taxi fares from a particular pickup point to dropoff point as closely as possible (as compared against the test set ground truth).
Finally, the EDA
Now, let’s dive into the EDA that was done for this dataset. As mentioned before in passing, using cloud provider services for distributed processing of data is beneficial, and that view is vindicated by looking at the size of the NY Taxi dataset — which would not have been possible to process on PCs anyway.
Initial Analysis
We import the libraries and read in the data as given below. Next, we perform very basic analysis as to the size of the dataset, schema and the columns present in it.
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd# all data manipulation functions
import pyspark.sql.functions as F
from pyspark.sql import Row#PySpark does not have visualization capabilities
from pyspark_dist_explore import Histogram, hist, distplot, pandas_histogramtrain_data_path = 's3://ny-taxi-emr/train.csv'
test_data_path = 's3://ny-taxi-emr/test.csv'train_data = spark.read.csv(train_data_path, header = True, inferSchema = True, mode="DROPMALFORMED")
test_data = spark.read.csv(test_data_path, header = True, inferSchema = True, mode="DROPMALFORMED")
Next, we convert the data type of the columns with date-time values into the date-time data type. Here, withColumn() is used to create and replace an existing column while col() is used to access the column values.
train_data = train_data.withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))def create_date_columns():
"""
Create new rows from pickup_datetime,
namely, date, day, hour, day_of_week, month and year
"""
# Get datetime.date objects and create a new column pickup_date
new_train_data = train_data.withColumn("pickup_date", F.to_date(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_day", F.dayofmonth(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_hour", F.hour(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_day_of_week", F.dayofweek(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_month", F.month(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_year", F.year(F.col("pickup_datetime")))
return new_train_data
Next, we check for null values and drop rows with null values.
new_train_data.select([F.count(F.when(F.isnull(col), col)).alias(col) for col in new_train_data.columns]).show()
Visualizing Dimension Distributions
We first start with visualizing the distributions of the appropriate dimensions. We start with the fare dimension and remove fares that are either too high or negative in value. collect() is used to retrieve the end-results from all the slaves to the master or driver node.
print("Maximum Fare:{}\n Minimum Fare: {}".format(new_train_data.agg({"fare_amount": "max"}).collect()[0],\
new_train_data.agg({"fare_amount": "min"}).collect()[0]))train_data_fare_filtered = new_train_data.filter(F.col("fare_amount").between(2.5, 100))
Similarly, we analyze the passenger count distribution and find that the maximum value is 208, which does not make sense. We take into consideration 7-seater cars as the maximum passenger count.
Again, we similarly do the same boundary analysis for the co-ordinate fields and remove trips which have co-ordinates beyond 2 degrees from the centre of NY. (40.771133, -73.974187).
train_data_filtered = train_data_passenger_filtered.filter((F.col("pickup_latitude").between(38.5, 42.5)))
train_data_filtered = train_data_filtered.filter((F.col("dropoff_latitude").between(38.5, 42.5)))
train_data_filtered = train_data_filtered.filter((F.col("pickup_longitude").between(-76, -72)))
train_data_filtered = train_data_filtered.filter((F.col("dropoff_longitude").between(-76, -72)))
Airport Fare Analysis
Plotting the trip co-ordinates on a map, we see that there are many pickups and drops from and to airports in NY. We go on to analyze this after obtaining the co-ordinates of the airports in NY.
1. Coordinates of Newark Airport = 40.6895° N, 74.1745° W
2. Coordinates of JFK Airport = 40.6413° N, 73.7781° W
3. Coordinates of La Guardia Airport = 40.7769° N, 73.8740° W
We can see that the pickup and dropoff fares for JFK are more in the $40–$60 range which can suggest fixed pricing. To confirm, let’s compare this with average fare for all trips.
We find that the average fare is much higher when trips are related to airports. So, differentiating these records by feature engineering will help us build a better model as it will be able to differentiate and recognize the difference in pattern between airport trips and non-airport trips. So, we create features that specify whether a record (trip) is to any of the 3 airports.
Feature Engineering
For feature engineering and making additional dimensions related to airports, we use something called an User-Defined Function (UDF) in PySpark. This allows us to use user-defined Python functions with the Spark framework.
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType# user-defined Python function
def isTripRelatedToAirport(latitude,longitude,airport_name='JFK'):
if ((latitude >= nyc_airports[airport_name]['min_lat']) &
(latitude <= nyc_airports[airport_name]['max_lat']) &
(longitude >= nyc_airports[airport_name]['min_lng']) &
(longitude <= nyc_airports[airport_name]['max_lng'])):
return True
else:
return False# name the UDF using lambdas
featureEngineeringUDF = udf(lambda x, y, z: isTripRelatedToAirport(x,y,z),BooleanType())# register the Python UDF with Spark
spark.udf.register("featureEngineeringUDF", featureEngineeringUDF)
Using this UDF, we build the airport-related dimensions. lit is used to pass a hardcoded string into the UDF.
train_data_filtered = train_data_filtered.withColumn("is_pickup_JFK", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("JFK")))train_data_filtered = train_data_filtered.withColumn("is_dropoff_JFK", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("JFK")))train_data_filtered = train_data_filtered.withColumn("is_pickup_EWR", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("EWR")))train_data_filtered = train_data_filtered.withColumn("is_dropoff_EWR", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("EWR")))train_data_filtered = train_data_filtered.withColumn("is_pickup_LaGuardia", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("LaGuardia")))train_data_filtered = train_data_filtered.withColumn("is_dropoff_LaGuardia", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("LaGuardia")))
Next, we calculate the trip distance in kilometers from the co-ordinates using the Haversine Formula.
from pyspark.sql.types import DoubleType, ArrayType, DecimalType
import math
def trip_distance(lat1, lat2, lon1, lon2):
"""
Calculate trip distance based on Haversine formula
Args:
lat1: Latitude of first point
lat2: Latitude of second point
lon1: Longitude of first point
lon2: :ongitude of second point
Returns:
Distance between the two points in miles
"""
# pi / 180
p = 0.017453292519943295
a = 0.5 - math.cos((lat2 - lat1) * p)/2 + math.cos(lat1 * p) * math.cos(lat2 * p) * (1 - math.cos((lon2 - lon1) * p)) / 2
return 0.6213712 * 12742 * math.asin(math.sqrt(a))
haversineUDF = udf(lambda a, b, c, d: trip_distance(a,b,c,d), DoubleType())
spark.udf.register("haversineUDF", haversineUDF)train_data_filtered = train_data_filtered.withColumn("trip_distance", haversineUDF(F.col("pickup_latitude"), F.col("dropoff_latitude"), F.col("pickup_longitude"), F.col("dropoff_longitude")))
Borough-wise Analysis
We also do a borough-wise trip fare analysis, similar to the airport analysis by obtaining the co-ordinates of all the boroughs in NY — Queens, Brooklyn, Bronx, Manhattan, Staten Island.
def getBorough(lat,lng):
"""
Get the borough based on latitude and longitude
Args:
lat: Latitude of the place
lng: Longitude of the place
Returns:
A string representing the name of the borough
"""
locations = nyc_boroughs.keys()
for location in locations:
if (lat >= nyc_boroughs[location]['min_lat'] and
lat <= nyc_boroughs[location]['max_lat'] and
lng >= nyc_boroughs[location]['min_lng'] and
lng <= nyc_boroughs[location]['max_lng']):
return location
return 'others'
boroughUDF = udf(lambda a, b: getBorough(a,b))
spark.udf.register("boroughUDF", boroughUDF)
train_data_filtered = train_data_filtered.withColumn("pickup_borough", boroughUDF(F.col("pickup_latitude"), F.col("pickup_longitude")))train_data_filtered = train_data_filtered.withColumn("dropoff_borough", boroughUDF(F.col("dropoff_latitude"), F.col("dropoff_longitude")))
We conclude the following from the above borough-wise graphs:
- Pickup and dropoff fare amount are almost similarly distributed for Manhattan.
- Dropoff fare is higher for Bronx, Brooklyn and Staten Island meaning people come from far out into the Bronx, Brooklyn and Staten Island
- For Queens, both averages are higher meaning people usually take longer trips in general
Time-based Analysis
Lastly, we do some time-based analysis before going on to predictions with the Spark MLlib. To better help with visualizations, the end results from PySpark DataFrames have been converted into Pandas dataframes using toPandas()
trips_by_year = train_data_filtered.groupBy("pickup_year").count().toPandas()
trips_by_year = trips_by_year.sort_values(by = "pickup_year")
trips_by_yeartrips_by_month = train_data_filtered.groupBy("pickup_month").count().toPandas()
trips_by_month = trips_by_month.sort_values(by = "pickup_month")
trips_by_month#similar for other granularities of time
Building a ML Pipeline
First of all, we drop the columns with data type date-time because those will require time series concepts. We also encode the boroughs associated with each trip. StringIndexer is used to encode (convert from categorical to numerical) while OneHotEncoderEstimator is used to one-hot encode the subsequent numerical encodings of the boroughs.
train_data_filtered = train_data_filtered.drop("pickup_datetime", "pickup_date", "key")# create object of StringIndexer class and specify input and output column
SI_pickup = StringIndexer(inputCol='pickup_borough',outputCol='pickup_borough_encoded')
SI_dropoff = StringIndexer(inputCol='dropoff_borough',outputCol='dropoff_borough_encoded')
# transform the data
train_data_filtered = SI_pickup.fit(train_data_filtered).transform(train_data_filtered)
train_data_filtered = SI_dropoff.fit(train_data_filtered).transform(train_data_filtered)
# create object and specify input and output column
OHE = OneHotEncoderEstimator(inputCols=['pickup_borough_encoded', 'dropoff_borough_encoded'],outputCols=['pickup_borough_OHE', 'dropoff_borough_OHE'])
# transform the data
train_data_filtered = OHE.fit(train_data_filtered).transform(train_data_filtered)
train_data_filtered = train_data_filtered.drop("pickup_borough", "dropoff_borough", "pickup_borough_encoded", dropoff_borough_encoded")
Next, VectorAssembler is used to combine all the features into a single frature vector that simplifies the processing and predicton of data for the machine learning algorithms.
featureColumns = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'pickup_day', 'pickup_hour', 'pickup_day_of_week', 'pickup_month', 'pickup_year', 'is_pickup_JFK', 'is_dropoff_JFK', 'is_pickup_EWR', 'is_dropoff_EWR', 'is_pickup_LaGuardia', 'is_dropoff_LaGuardia', 'trip_distance', 'is_pickup_in_lower_manhattan', 'is_drop_in_lower_manhattan', 'pickup_borough_OHE', 'dropoff_borough_OHE']
assembler = VectorAssembler(inputCols= featureColumns, outputCol="features")
full_set = assembler.transform(train_data_filtered)
Lastly, we apply train-test split and perform model training using Decision Tree and Random Forest.
train_set, test_set = full_set.randomSplit([0.9, 0.1], seed=0)from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(labelCol="fare_amount", featuresCol= "features")
model = rf.fit(train_set)
predictions = model.transform(test_set)evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount",metricName="rmse")
print("RMSE Error on test set: ", evaluator.evaluate(predictions)
The accuracy for the Random Forest model is 4.4201.
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluatordt = DecisionTreeRegressor(labelCol="fare_amount", featuresCol= "features")
model = dt.fit(train_set)
predictions = model.transform(test_set)evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount",metricName="rmse")
print("RMSE Error on test set: ", evaluator.evaluate(predictions))
The best RMSE error we get is for the Decision Tree model — 4.2814. You can refer to the full code (including for visualizations) and output on GitHub.
I would love to connect with you on Linkedin!