Large-Scale Sentiment Analysis with PySpark

Comparative study of classification algorithms and feature extraction functions implemented in PySpark on 1,600,000 Tweets.

Clément Delteil 🌱
Towards AI

--

Scared Lego character on a white background
Photo by Nik on Unsplash

As entities become more interconnected, the volume of data to be processed grows exponentially. These entities, whether governmental or for-profit, have a real interest in our feelings, and our opinions. This data is sometimes referred to as the new black gold, which tells you how crucial it has become. They are now the basis for all decisions.

Before taking a public position on a subject, such an entity has to sound out the opinion of its members. It usually knows its audience well but this can be complicated when its content, whatever it is, is exposed on social networks. At that point, they are thrown into the lion’s den, at the mercy of algorithms. That’s why it has become essential for these stakeholders to track public sentiment.

A popular platform for tracking sentimental polarity on a given topic is the social network Twitter. Many actors publicly express their opinions on products, political decisions, etc.

In this article, we will perform a comparative study of the classification algorithms currently implemented in PySpark. For those who don’t know, PySpark is a Python API of Apache Spark which is itself known for its ability to handle data-intensive applications such as natural language processing.

We will perform this analysis on a dataset very well known in the scientific literature: Sentiment140 [1]. It contains 1,600,000 tweets that express three different polarities: positive, neutral, and negative.

In this natural language processing context, we will compare the selected algorithms according to different metrics and features created from the dataset. Our approach will consist of the following steps:

  1. Data visualization: A quick exploration of the dataset to see what we are dealing with.
  2. Features extraction: Based on our exploration, we’ll extract features for classification.
  3. Training of the different models: Then, we’ll see why I chose PySpark and train the selected models.
  4. Results: Finally, we will analyze the results keeping in mind the target exposed in the introduction.

In a second article, once we’ve determined which model is best suited for our task, I’ll show you how to deploy it by connecting it to the Twitter API via Apache Kafka for real-time predictions.

The whole code of this project is available on my GitHub.

Data Visualization

Let’s start by importing the libraries we need.

# Necessary imports 
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

When using Spark, you need to create a session. Below, we create a local session with the maximum number of possible cores (the * symbol) to maximize the efficiency of our next operations. The optimization of the models was done in a Google Cloud cluster, for those interested you can find the necessary code on GitHub.

spark1 = SparkSession.builder\
.master("local[*]")\
.appName("Twitter")\
.getOrCreate()

Since I know the dataset, I defined a StructType. This allows us when reading the CSV containing the data, to tell Spark to load the data according to the schema defined above.

path = "../resources/training_noemoticon.csv"  # Path dataset

# Data schema
schema = StructType([
StructField("target", IntegerType(), True),
StructField("id", StringType(), True),
StructField("date", StringType(), True),
StructField("query", StringType(), True),
StructField("author", StringType(), True),
StructField("tweet", StringType(), True)])

df = spark1.read.csv(path,
inferSchema=True, # Spark uses the defined schema
header=False,
schema=schema)

df.dropna() # Drop rows containing NaN values for simplicity

As you can see in the schema we’ve defined, our dataset contains 6 fields:

  • target: the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)
  • id: the id of the tweet (1235)
  • date: the date of the tweet (Sat May 16 23:58:44 UTC 2009)
  • query: the API query used to get the data
  • author: the user that tweeted
  • tweet: the text of the tweet (what we are most interested in)

This can be checked by displaying a sample of the dataset.

Screenshot of a data sample
Sample of data — Image by author

As you can see, the data is already cleaned up a bit. There is no more spam and emojis. That’s the advantage of known datasets from the scientific literature, they are easy to use 😉. However, we still have to remove empty words, punctuation, #hashtag, and @Username tags. Empty words are words that do not carry sentimental value, and that could noise the sentimental analysis such as: the, a, so, etc.

def pre_process(text):
# Remove links
text = re.sub('http://\S+|https://\S+', '', text)
text = re.sub('http[s]?://\S+', '', text)
text = re.sub(r"http\S+", "", text)

# Convert HTML references
text = re.sub('&amp', 'and', text)
text = re.sub('&lt', '<', text)
text = re.sub('&gt', '>', text)
text = re.sub('\xao', ' ', text)

# Remove new line characters
text = re.sub('[\r\n]+', ' ', text)

# Remove mentions
text = re.sub(r'@\w+', '', text)

# Remove hashtags
text = re.sub(r'#\w+', '', text)

# Remove multiple space characters
text = re.sub('\s+',' ', text)

# Convert to lowercase
text = text.lower()
return text

We can apply it to all the rows of our dataset.

df['tweet'] = df['tweet'].apply(pre_process)

If you ever want to see how to deal with this noise with real-world data, I wrote an article about it.

Now that we have a good idea of the data structure, we can extract information from it. Indeed, we have to transform our textual data into numerical data in order to give them to our models.

Features Extraction

Since we are looking for the best results, we will not be satisfied with a single method of feature extraction. In this section, we will use the following functions that are implemented natively in PySpark: Tokenizer, HashingTF, CountVectorizer, TF-IDF, N-Gram, ChisQSelector, etc.

If you don’t know one of the above methods, don’t panic! I will take the time to explain the purpose of each of them.

Basic Pipeline

Since we will be training different models on different combinations of features, we need to create Pipelines to automate the process. Let’s start by tokenizing our text. This will have the effect of dividing our text into tokens, where each token represents a word.

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector

tokenizer = Tokenizer(inputCol="tweet", outputCol="words")

Then, we will transform these tokens into numerical feature vectors using HashingTF: Hashing Term Frequency. The output of this function is a sparse vector of term frequency counts for each string.

hashtf = HashingTF(inputCol="words", outputCol='tf')

After obtaining this frequency vector, we need to pass it into the IDF function. This function will add weights to each word. The more frequent the word is, the lower its weight, and vice versa. This allows us to compensate for the bias in a large corpus of text like ours.

idf = IDF(inputCol='tf', outputCol="features")

Finally, all we have to do is index our labels.

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

If we start with logistic regression, we can put all these pre-processing steps in a pipeline to make it easier to run.

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

With these different steps, we have just seen a basic Pipeline for an NLP project. Of course, it is not with so few steps and the default parameters of the functions used that we will obtain the best results. Below, I will present a more complex Pipeline.

Complex Pipeline

In this more complex pipeline, we will make three changes.

  1. Replace HashingTF with CountVectorizer

They can both be used to generate term frequency vectors. However, they differ on a few points. On the one hand, HashingTF uses a hash function to map the terms into indices which may cause collisions. It also uses a fixed size for feature vectors (default is 262,144). On the other hand, CountVectorize is an exact method. It generates a vocabulary and selects the top N words ranked by term frequency.

In a nutshell, HashingTF is less expensive, as it uses a hash function, but is irreversible and can cause collisions, while CountVectorizer is deterministic and reversible, but is much more expensive.

2. Add N-Grams

N-grams are contiguous sequences of N items from a given text. They allow us to keep more context about the group of words rather than just considering each word independently.

3. Use ChisQSelector

After the creation of N-grams, we have many features (nearly 49,000 🤯). We certainly kept a lot of contexts but it is good to reduce this number to reduce the training time on the one hand and on the other hand not to keep useless information. ChisQSelector is a feature selection technique that uses the Chi-Squared test. The features with the highest test value are considered to be the most important and are therefore kept for future reference.

4. In code

These new steps can now be put together in a Pipeline.

def build_trigrams(inputCol=["tweet","target"], n=3):

tokenizer = [Tokenizer(inputCol="tweet", outputCol="words")]

ngrams = [
NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
for i in range(1, n + 1)
]

cv = [
CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
outputCol="{0}_tf".format(i))
for i in range(1, n + 1)
]

idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

assembler = [VectorAssembler(
inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
outputCol="rawFeatures"
)]

label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]

selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]

lr = [LogisticRegression()]

return Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + selector + lr)

So here are all the different feature extraction methods that were tested for this project. In the next section, I will introduce the training of one of these Pipelines and the models chosen.

Training

Using a Pipeline

Here is how to train one of these Pipelines and get accurate results.

%%time

st = datetime.utcnow()
pipelineFit = pipeline.fit(train_set)
print('Training time:', datetime.utcnow() - st)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

For your information, this Pipeline is the first one presented (the simplest). It allows us to obtain the following results.

Screenshot of the first results obtained
Sample results — Image by author

Models

For the models, I have tested almost all those available in MLlib (DataFrame-based), and that worked in a reasonable amount of time. Indeed, as the No Free Lunch Theorem states, there is no one-size-fits-all algorithm or model that performs best on all tasks 😉.

More precisely, I tested the following models.

  • Logistic Regression
  • Support Vector Machines (Linear Kernel)
  • Naive Bayes Classifier

Grid Search

In addition to testing different models, I obviously tested hyperparameter grids for each of them. You can find the code for each model in the GitHub repo.

In this article, we will continue our example with logistic regression.

tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0, 10.0]) \
.addGrid(lr.maxIter, [20, 50, 100, 500, 1000]) \
.addGrid(lr.elasticNetParam, [0, 0.5, 1.0]) \
.build()

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)

After defining the hyperparameter grid, all that remains is to launch it!

%%time
pipelineFit = cv.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

bestModel = pipelineFit.bestModel
pipelineFit.getEstimatorParamMaps()[np.argmax(pipelineFit.avgMetrics)]

(Be careful if you run the code with this dataset you could wait a long time in front of your screen… You can’t say I didn’t warn you)

Results

Global

The moment you are all waiting for, the results! As mentioned throughout the article, I tested different models on different scenarios. Below is the summary table of the results.

Screenshot of the summary table of the results of the classification of all models in all scenarios according to accuracy.
Summary table of model results in different scenarios by accuracy — Image by author

As the experimental results show, it is the Logistic Regression model that is the most adapted to the data with an accuracy of 80.8% in the last scenario.

We can also notice that we systematically get better results with CountVectorizer than with HashingTF. When I explained the difference between these two functions earlier, I had said that the second one could cause collisions. It is this difference that I think explains the contrast in performance between the scenarios.

In the last scenario, here are the parameters of the feature extraction functions.

  • CountVectorizer : Vocabulary Size = 2¹⁴
  • IDF: Minimum Frequency = 5
  • N-Gram : 1,2,3-Gram
  • ChiSqSelector : Top 2¹⁴ features i.e. 16 384 features

That is a total of 49,152 features before the selection and 16,384 after the passage through ChiSqSelector in the pipeline.

In details

By focusing on our best model, more information can be derived from these results.

First, we’re going to check the ROC curves for each scenario with the logistic regression model.

import matplotlib.pyplot as plt

plt.figure(figsize=(5,5))

plt.plot(pipelineFit1.stages[-1].summary.roc.select('FPR').collect(),
pipelineFit1.stages[-1].summary.roc.select('TPR').collect(),
label="AUC=" +str(round(pipelineFit1.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit2.stages[-1].summary.roc.select('FPR').collect(),
pipelineFit2.stages[-1].summary.roc.select('TPR').collect(),
label="AUC=" +str(round(pipelineFit2.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit3.stages[-1].summary.roc.select('FPR').collect(),
pipelineFit3.stages[-1].summary.roc.select('TPR').collect(),
label="AUC=" +str(round(pipelineFit3.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit4.stages[-1].summary.roc.select('FPR').collect(),
pipelineFit4.stages[-1].summary.roc.select('TPR').collect(),
label="AUC=" +str(round(pipelineFit4.stages [-1].summary.areaUnderROC, 3)))


plt.plot(pipelineFit5.stages[-1].summary.roc.select('FPR').collect(),
pipelineFit5.stages[-1].summary.roc.select('TPR').collect(),
label="AUC=" +str(round(pipelineFit5.stages[-1].summary.areaUnderROC, 3)))

plt.plot([0, 1], [0, 1], "r--", label="Guess")

plt.xlabel('FPR')
plt.ylabel('TPR')
plt.legend()
plt.show()

The above code retrieves the training information from each of the pipelines and displays the AUC curves.

Graph of the AUC curves of the different logistic regression models
AUC Curves in each scenario — Image by author

You can find the False Positive Rate on the X-axis and the True Positive Rate on the Y-axis. The models shown in the legend are in the same order as in the table. We can notice that it is not the model with the highest accuracy that obtains the highest AUC value.

If you are not familiar with AUC and ROC Curves, I can recommend this excellent article by Sarang Narkhede on this subject.

Back to our problem, we can also look at the confusion matrix of the logistic regression model predictions on the test data.

Confusion matrix of the best logistic regression model
Confusion Matrix of the logistic regression model in the last scenario — Image by author

From this confusion matrix, we can see that this model is rather optimistic about the emotion of a tweet. Indeed, it labeled 36,862 tweets as positive when they were negative, against only 25,837 positive tweets predicted as negative on the other side of the matrix.

Conclusions

In this article, we demonstrated the quality of some simple machine learning models for predicting the sentimental polarity of Tweets in a Big Data context. We explored different feature extraction scenarios specific to NLP, which allowed us to greatly improve our results and identify the Logistic Regression model as the best for our dataset. In the next article of this series, we will see how to deploy this model to analyze topics of interest in real-time using Apache Kafka and Spark Streaming.

The different code snippets presented can be executed locally if the dataset is of reasonable size. But keep in mind that even though the dataset may seem small at first since it contains only text, extracting the features will take a lot of time and will strongly slow down the training of your models.

In conclusion, considering the quality of the results obtained in such a short time, it is important for everyone to be aware of the power of these tools for advanced and precise analyses. Imagine what can be done with state-of-the-art models! So be careful what you post on social networks 😌.

--

--

Machine Learning Engineer 🌱 | French CS Engineer | Canadian MSc in AI | Data is my anchor in exploring all realms 🌍📊 | linkedin.com/in/clementdelteil/