Data Analysis, Programming

Introduction to PySpark via AWS EMR and Hands-on EDA

Performing EDA on NY Taxi Fare Dataset to see PySpark in action — because cloud computing is the next big thing!

Kunj Mehta
Towards AI
Published in
10 min readFeb 28, 2021

--

Photo by C Dustin on Unsplash

Introducing the Technologies

What is Spark and PySpark — Spark SQL and Spark MLlib?

From Wikipedia, Spark by Apache is an open-source analytics engine for large-scale data processing. It enables programmers to work upon data stored in multiple clusters with inherent data parallelism and fault tolerance.

At the base of the Spark engine are Resilient Distributed Databases (RDDs) that are a set of data items maintained over a cluster of machines in a fault-tolerant manner. These RDDs were developed to overcome the limitation of the Map-Reduce paradigm that forced a linear flow of data in programs by reading from disk, mapping and reducing and writing back to disk. RDDs on the other hand, function as a working set across clusters and offer a restricted form of shared memory during execution of the program.

Spark Core is the foundation of the overall project. It provides distributed task dispatching, scheduling, and basic I/O functionalities, exposed through an API centered on the RDD abstraction discussed above. Spark SQL is a component on top of Spark Core that introduced a data abstraction called DataFrames, which provides support for structured and semi-structured data. It provides both a domain-specific language (similar to pandas) to manipulate DataFrames and basic SQL support, with CLIs and ODBC servers. Similarly, Spark MLlib is a distributed machine learning framework on top of Spark Core.

Basically, PySpark is a Python API for the Scala-written Spark framework. This API contains libraries and wrappers that allow the programmer to leverage the functionalities and features of the underlying SparkSQL and Spark MLlib modules, among others.

What is AWS EMR?

Amazon Elastic Map Reduce is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink and Apache Hudi. Amazon EMR makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters.

Basically, it is distributed data processing using Spark on the cloud.

Setting it Up — AWS EMR vs Local PySpark Instance

There are two options that a programmer has in using the Spark framework. He can either decide to install it on his local system or use a service provided by some cloud provider.

If he goes with the first option, then the cores of the processor in the local system act as the clusters and parallelism is achieved at the core level. The problem with this approach is that some part of the RAM is already reserved as a working memory which is used to combine the results after distributed processing on the data is completed. This along with the RAM specifications on personal laptops today (8GB or 16GB) make this option very unattractive; the data processing is understandably very slow. On the other hand, using clusters in the cloud on a pay-as-you-use model offer unlimited options of system configurations and are clearly the better choice.

A brief on how to setup a cluster and a notebook on AWS EMR follows:

  1. Click on the link above. Then click on ‘Clusters’ on the side bar.
  2. Click on the blue ‘Create Cluster’ and go to ‘Advanced Options’
Step 3: Check the Spark framework for it to be installed on the cluster. The software settings needs a config file or JSON file stored in a S3 bucket that specifies the amount of working memory and core memory.
Step 3: Check the Spark framework for it to be installed on the cluster. The software settings needs a config file or JSON file stored in a S3 bucket that specifies the amount of working memory and core memory. Source: Image by the author.
Step 4: In Hardware, select the type and number of instances in the cluster as required for the task. Everything else can be kept as is.
Step 4: In Hardware, select the type and number of instances in the cluster as required for the task. Everything else can be kept as is. Source: Image by the author.
Step 5: We can keep the default settings in ‘General Cluster Settings’ as is. In ‘Security’, an EC2 key pair will have to be specified before a cluster is finally created.
Step 5: We can keep the default settings in ‘General Cluster Settings’ as is. In ‘Security’, an EC2 key pair will have to be specified before a cluster is finally created. Source: Image by the author.

After the cluster has been created, go to the ‘Notebooks’ on the sidebar and create a notebook associated with the cluster. And you are good to go!

The Dataset

Now that we have had a brief look at the different technologies we will be using and decided upon a configuration option, let’s look at the dataset we will be working on.

The dataset can be downloaded on Kaggle and contains 55 million records of trips of taxis in New York — perfect for big data and Spark. It has the following features:

  • key: this is the timestamp of the start of the trip
  • fare amount: the fare for the trip (in dollars)
  • pickup_datetime: the same as the key, this is the timestamp of the start of the trip
  • pickup_longitude: the longitude co-ordinates of the pickup
  • pickup_latitude: the latitude co-ordinates of the pickup
  • dropoff_longitude: the longitude co-ordinates of the drop
  • dropoff_latitude: the latitude co-ordinates of the drop
  • passenger_count: the number of passengers in the taxi for the trip

The objective of this problem statement is to apply appropriate EDA and feature engineering to predict taxi fares from a particular pickup point to dropoff point as closely as possible (as compared against the test set ground truth).

Finally, the EDA

Now, let’s dive into the EDA that was done for this dataset. As mentioned before in passing, using cloud provider services for distributed processing of data is beneficial, and that view is vindicated by looking at the size of the NY Taxi dataset — which would not have been possible to process on PCs anyway.

Initial Analysis

We import the libraries and read in the data as given below. Next, we perform very basic analysis as to the size of the dataset, schema and the columns present in it.

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
# all data manipulation functions
import pyspark.sql.functions as F
from pyspark.sql import Row
#PySpark does not have visualization capabilities
from pyspark_dist_explore import Histogram, hist, distplot, pandas_histogram
train_data_path = 's3://ny-taxi-emr/train.csv'
test_data_path = 's3://ny-taxi-emr/test.csv'
train_data = spark.read.csv(train_data_path, header = True, inferSchema = True, mode="DROPMALFORMED")
test_data = spark.read.csv(test_data_path, header = True, inferSchema = True, mode="DROPMALFORMED")

Next, we convert the data type of the columns with date-time values into the date-time data type. Here, withColumn() is used to create and replace an existing column while col() is used to access the column values.

train_data = train_data.withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))def create_date_columns():
"""
Create new rows from pickup_datetime,
namely, date, day, hour, day_of_week, month and year
"""
# Get datetime.date objects and create a new column pickup_date
new_train_data = train_data.withColumn("pickup_date", F.to_date(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_day", F.dayofmonth(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_hour", F.hour(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_day_of_week", F.dayofweek(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_month", F.month(F.col("pickup_datetime")))
new_train_data = new_train_data.withColumn("pickup_year", F.year(F.col("pickup_datetime")))

return new_train_data

Next, we check for null values and drop rows with null values.

new_train_data.select([F.count(F.when(F.isnull(col), col)).alias(col) for col in new_train_data.columns]).show()

Visualizing Dimension Distributions

We first start with visualizing the distributions of the appropriate dimensions. We start with the fare dimension and remove fares that are either too high or negative in value. collect() is used to retrieve the end-results from all the slaves to the master or driver node.

print("Maximum Fare:{}\n Minimum Fare: {}".format(new_train_data.agg({"fare_amount": "max"}).collect()[0],\
new_train_data.agg({"fare_amount": "min"}).collect()[0]))
train_data_fare_filtered = new_train_data.filter(F.col("fare_amount").between(2.5, 100))
Distribution of fare amount(binned)
Distribution of fare amount(binned). Source: Image by the author.

Similarly, we analyze the passenger count distribution and find that the maximum value is 208, which does not make sense. We take into consideration 7-seater cars as the maximum passenger count.

Distribution of passenger count
Distribution of passenger count. Source: Image by the author.

Again, we similarly do the same boundary analysis for the co-ordinate fields and remove trips which have co-ordinates beyond 2 degrees from the centre of NY. (40.771133, -73.974187).

train_data_filtered = train_data_passenger_filtered.filter((F.col("pickup_latitude").between(38.5, 42.5)))
train_data_filtered = train_data_filtered.filter((F.col("dropoff_latitude").between(38.5, 42.5)))
train_data_filtered = train_data_filtered.filter((F.col("pickup_longitude").between(-76, -72)))
train_data_filtered = train_data_filtered.filter((F.col("dropoff_longitude").between(-76, -72)))

Airport Fare Analysis

Plotting the trip co-ordinates on a map, we see that there are many pickups and drops from and to airports in NY. We go on to analyze this after obtaining the co-ordinates of the airports in NY.

1. Coordinates of Newark Airport = 40.6895° N, 74.1745° W

2. Coordinates of JFK Airport = 40.6413° N, 73.7781° W

3. Coordinates of La Guardia Airport = 40.7769° N, 73.8740° W
Pickup fare distribution at JFK
Pickup fare distribution at JFK. Source: Image by the author
Dropoff fare distribution at JFK
Dropoff fare distribution at JFK. Source: Image by the author.

We can see that the pickup and dropoff fares for JFK are more in the $40–$60 range which can suggest fixed pricing. To confirm, let’s compare this with average fare for all trips.

Distribution of fare amount
Distribution of fare amount. Source: Image by the author.

We find that the average fare is much higher when trips are related to airports. So, differentiating these records by feature engineering will help us build a better model as it will be able to differentiate and recognize the difference in pattern between airport trips and non-airport trips. So, we create features that specify whether a record (trip) is to any of the 3 airports.

Feature Engineering

For feature engineering and making additional dimensions related to airports, we use something called an User-Defined Function (UDF) in PySpark. This allows us to use user-defined Python functions with the Spark framework.

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# user-defined Python function
def isTripRelatedToAirport(latitude,longitude,airport_name='JFK'):
if ((latitude >= nyc_airports[airport_name]['min_lat']) &
(latitude <= nyc_airports[airport_name]['max_lat']) &
(longitude >= nyc_airports[airport_name]['min_lng']) &
(longitude <= nyc_airports[airport_name]['max_lng'])):
return True
else:
return False
# name the UDF using lambdas
featureEngineeringUDF = udf(lambda x, y, z: isTripRelatedToAirport(x,y,z),BooleanType())
# register the Python UDF with Spark
spark.udf.register("featureEngineeringUDF", featureEngineeringUDF)

Using this UDF, we build the airport-related dimensions. lit is used to pass a hardcoded string into the UDF.

train_data_filtered = train_data_filtered.withColumn("is_pickup_JFK", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("JFK")))train_data_filtered = train_data_filtered.withColumn("is_dropoff_JFK", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("JFK")))train_data_filtered = train_data_filtered.withColumn("is_pickup_EWR", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("EWR")))train_data_filtered = train_data_filtered.withColumn("is_dropoff_EWR", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("EWR")))train_data_filtered = train_data_filtered.withColumn("is_pickup_LaGuardia", featureEngineeringUDF(F.col("pickup_latitude"),F.col("pickup_longitude"), lit("LaGuardia")))train_data_filtered =  train_data_filtered.withColumn("is_dropoff_LaGuardia", featureEngineeringUDF(F.col("dropoff_latitude"),F.col("dropoff_longitude"), lit("LaGuardia")))

Next, we calculate the trip distance in kilometers from the co-ordinates using the Haversine Formula.

from pyspark.sql.types import DoubleType, ArrayType, DecimalType
import math

def trip_distance(lat1, lat2, lon1, lon2):
"""
Calculate trip distance based on Haversine formula
Args:
lat1: Latitude of first point
lat2: Latitude of second point
lon1: Longitude of first point
lon2: :ongitude of second point
Returns:
Distance between the two points in miles
"""
# pi / 180
p = 0.017453292519943295
a = 0.5 - math.cos((lat2 - lat1) * p)/2 + math.cos(lat1 * p) * math.cos(lat2 * p) * (1 - math.cos((lon2 - lon1) * p)) / 2
return 0.6213712 * 12742 * math.asin(math.sqrt(a))

haversineUDF = udf(lambda a, b, c, d: trip_distance(a,b,c,d), DoubleType())
spark.udf.register("haversineUDF", haversineUDF)
train_data_filtered = train_data_filtered.withColumn("trip_distance", haversineUDF(F.col("pickup_latitude"), F.col("dropoff_latitude"), F.col("pickup_longitude"), F.col("dropoff_longitude")))

Borough-wise Analysis

We also do a borough-wise trip fare analysis, similar to the airport analysis by obtaining the co-ordinates of all the boroughs in NY — Queens, Brooklyn, Bronx, Manhattan, Staten Island.

def getBorough(lat,lng):
"""
Get the borough based on latitude and longitude
Args:
lat: Latitude of the place
lng: Longitude of the place
Returns:
A string representing the name of the borough
"""
locations = nyc_boroughs.keys()
for location in locations:
if (lat >= nyc_boroughs[location]['min_lat'] and
lat <= nyc_boroughs[location]['max_lat'] and
lng >= nyc_boroughs[location]['min_lng'] and
lng <= nyc_boroughs[location]['max_lng']):
return location
return 'others'

boroughUDF = udf(lambda a, b: getBorough(a,b))
spark.udf.register("boroughUDF", boroughUDF)

train_data_filtered = train_data_filtered.withColumn("pickup_borough", boroughUDF(F.col("pickup_latitude"), F.col("pickup_longitude")))
train_data_filtered = train_data_filtered.withColumn("dropoff_borough", boroughUDF(F.col("dropoff_latitude"), F.col("dropoff_longitude")))
Fare distribution for Manhattan
Fare distribution for Manhattan. Darker red is for pickup fares and ligher red for drop-off fares. Source: Image by the author.
Fare distribution for Bronx
Fare distribution for Bronx. Darker red is for pickup fares and ligher red for drop-off fares. Source: Image by the author.
Fare distribution for Queens
Fare distribution for Queens. Darker red is for pickup fares and ligher red for drop-off fares. Source: Image by the author.
Fare distribution for Brooklyn
Fare distribution for Brooklyn. Darker red is for pickup fares and ligher red for drop-off fares. Source: Image by the author.
Fare distribution for Staten Island. Darker red is for pickup fares and ligher red for drop-off fares. Source: Image by the author.

We conclude the following from the above borough-wise graphs:

  1. Pickup and dropoff fare amount are almost similarly distributed for Manhattan.
  2. Dropoff fare is higher for Bronx, Brooklyn and Staten Island meaning people come from far out into the Bronx, Brooklyn and Staten Island
  3. For Queens, both averages are higher meaning people usually take longer trips in general

Time-based Analysis

Lastly, we do some time-based analysis before going on to predictions with the Spark MLlib. To better help with visualizations, the end results from PySpark DataFrames have been converted into Pandas dataframes using toPandas()

trips_by_year = train_data_filtered.groupBy("pickup_year").count().toPandas()
trips_by_year = trips_by_year.sort_values(by = "pickup_year")
trips_by_year
trips_by_month = train_data_filtered.groupBy("pickup_month").count().toPandas()
trips_by_month = trips_by_month.sort_values(by = "pickup_month")
trips_by_month
#similar for other granularities of time
Distribution of trips per year
Trips by year. Source: Image by the author
Distribution of trips per month
Trips by month. Source: Image by the author
Trips by day of week
Trips by day of week. Source: Image by the author
Trips by hour
Trips by hour. Source: Image by the author.

Building a ML Pipeline

First of all, we drop the columns with data type date-time because those will require time series concepts. We also encode the boroughs associated with each trip. StringIndexer is used to encode (convert from categorical to numerical) while OneHotEncoderEstimator is used to one-hot encode the subsequent numerical encodings of the boroughs.

train_data_filtered = train_data_filtered.drop("pickup_datetime", "pickup_date", "key")# create object of StringIndexer class and specify input and output column
SI_pickup = StringIndexer(inputCol='pickup_borough',outputCol='pickup_borough_encoded')
SI_dropoff = StringIndexer(inputCol='dropoff_borough',outputCol='dropoff_borough_encoded')

# transform the data
train_data_filtered = SI_pickup.fit(train_data_filtered).transform(train_data_filtered)
train_data_filtered = SI_dropoff.fit(train_data_filtered).transform(train_data_filtered)

# create object and specify input and output column
OHE = OneHotEncoderEstimator(inputCols=['pickup_borough_encoded', 'dropoff_borough_encoded'],outputCols=['pickup_borough_OHE', 'dropoff_borough_OHE'])

# transform the data
train_data_filtered = OHE.fit(train_data_filtered).transform(train_data_filtered)

train_data_filtered = train_data_filtered.drop("pickup_borough", "dropoff_borough", "pickup_borough_encoded", dropoff_borough_encoded")

Next, VectorAssembler is used to combine all the features into a single frature vector that simplifies the processing and predicton of data for the machine learning algorithms.

featureColumns = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'pickup_day', 'pickup_hour', 'pickup_day_of_week', 'pickup_month', 'pickup_year', 'is_pickup_JFK', 'is_dropoff_JFK', 'is_pickup_EWR', 'is_dropoff_EWR', 'is_pickup_LaGuardia', 'is_dropoff_LaGuardia', 'trip_distance', 'is_pickup_in_lower_manhattan', 'is_drop_in_lower_manhattan', 'pickup_borough_OHE', 'dropoff_borough_OHE']

assembler = VectorAssembler(inputCols= featureColumns, outputCol="features")

full_set = assembler.transform(train_data_filtered)

Lastly, we apply train-test split and perform model training using Decision Tree and Random Forest.

train_set, test_set = full_set.randomSplit([0.9, 0.1], seed=0)from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(labelCol="fare_amount", featuresCol= "features")
model = rf.fit(train_set)

predictions = model.transform(test_set)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount",metricName="rmse")
print("RMSE Error on test set: ", evaluator.evaluate(predictions)

The accuracy for the Random Forest model is 4.4201.

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(labelCol="fare_amount", featuresCol= "features")
model = dt.fit(train_set)

predictions = model.transform(test_set)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="fare_amount",metricName="rmse")
print("RMSE Error on test set: ", evaluator.evaluate(predictions))

The best RMSE error we get is for the Decision Tree model — 4.2814. You can refer to the full code (including for visualizations) and output on GitHub.

I would love to connect with you on Linkedin!

--

--

MS @ Rutgers 2023 | Writing on AI transformation, AI in finance, climate and logistics. linkedin.com/in/kunjmehta