Towards AI

The leading AI community and content platform focused on making AI accessible to all. Check out our new course platform: https://academy.towardsai.net/courses/beginner-to-advanced-llm-dev

Follow publication

Real-Time Sentiment Analysis with Docker, Kafka, and Spark Streaming

Clément Delteil 🌱
Towards AI
Published in
12 min readMay 6, 2023

Enigmatic photo of a copper pipe representing the ETL process
Photo by Vladimir Kramer on Unsplash
Flow chart of the architecture of the ETL process implemented for the sentimental analysis of tweets in real time
Flow chart of the ETL pipeline implemented in this article — Image by author

Installing Docker Desktop

Screenshot of the Docker Desktop user interface
Screenshot of the Docker Desktop user interface — Image by author

Container with Apache Kafka

docker-compose -f zk-single-kafka-single.yml up -d
docker-compose -f zk-single-kafka-single.yml ps
docker exec -it kafka1 /bin/bash
kafka-topics --version
Screenshot of the command prompt response after asking for kafka’s version
Screenshot of the command prompt response — Image by author
kafka-topics --create --topic twitter --bootstrap-server localhost:9092
kafka-topics --describe --topic twitter --bootstrap-server localhost:9092
Screenshot of the description of the recently created Twitter topic
Description of the recently created Twitter topic — Image by author

Creating a Kafka Producer

import tweepy
from kafka import KafkaProducer
import logging
import json
from decouple import config

"""API ACCESS KEYS"""
consumerKey = config('CONSUMERKEY')
consumerSecret = config('CONSUMERSECRET')
accessToken = config('ACCESSTOKEN')
accessTokenSecret = config('ACCESSTOKENSECRET')

logging.basicConfig(level=logging.INFO)
# Create the producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
search_term = 'ChatGPT' # Modify this string to decide what to analyze
topic_name = 'twitter' # Modify this according to your topic name
def twitterAuth():
# create the authentication object
authenticate = tweepy.OAuthHandler(consumerKey, consumerSecret)
# set the access token and the access token secret
authenticate.set_access_token(accessToken, accessTokenSecret)
# create the API object
api = tweepy.API(authenticate, wait_on_rate_limit=True)
return api
class TweetListener(tweepy.Stream):

def on_data(self, raw_data):
# logging.info(raw_data)

tweet = json.loads(raw_data)

if tweet['data']:
data = {
'message': tweet['data']['text'].replace(',', '')
}
producer.send(topic_name, value=json.dumps(data).encode('utf-8'))

return True

@staticmethod
def on_error(status_code):
if status_code == 420:
# returning False in on_data disconnects the stream
return False

def start_streaming_tweets(self, search_term):
self.add_rules(tweepy.StreamRule(search_term))
self.filter()
if __name__ == '__main__':
twitter_stream = TweetListener(bearerToken)
twitter_stream.start_streaming_tweets(search_term)
GIF of the data stream sent by Twitter API and received by the producer
GIF of the data stream received by the producer — Image by author

Creating a Kafka Consumer

import re
import findspark
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, ArrayType


if __name__ == "__main__":
findspark.init()

# Path to the pre-trained model
path_to_model = r''

# Config
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("TwitterSentimentAnalysis") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.getOrCreate()
# Schema for the incoming data
schema = StructType([StructField("message", StringType())])

# Read the data from kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "twitter") \
.option("startingOffsets", "latest") \
.option("header", "true") \
.load() \
.selectExpr("CAST(value AS STRING) as message")

df = df \
.withColumn("value", from_json("message", schema))
# Pre-processing the data
pre_process = udf(
lambda x: re.sub(r'[^A-Za-z\n ]|(http\S+)|(www.\S+)', '', x.lower().strip()).split(), ArrayType(StringType())
)

df = df.withColumn("cleaned_data", pre_process(df.message)).dropna()

# Load the pre-trained model
pipeline_model = PipelineModel.load(path_to_model)
# Make predictions
prediction = pipeline_model.transform(df)
# Select the columns of interest
prediction = prediction.select(prediction.message, prediction.prediction)
# Print prediction in console
prediction \
.writeStream \
.format("console") \
.outputMode("update") \
.start() \
.awaitTermination()
GIF of the results of the real-time sentiment analysis of tweets on ChatGPT
GIF of the results of the real-time sentiment analysis of tweets on ChatGPT — Image by author
prediction \
.writeStream \
.format("console") \
.option("truncate", False) \ # Set truncate option to False
.outputMode("update") \
.start() \
.awaitTermination()

What to do with those results?

Saving results locally

prediction.writeStream \
.outputMode("append") \
.format("json") \
.option("path", saving_path) \
.option("header", True) \
.option("checkpointLocation", check_path) \
.start() \
.awaitTermination()
Screenshot of locally saved results in JSON format
Screenshot of locally saved results in JSON format — Image by author

Loading results in MongoDB

def write_row_in_mongo(dataframe):
mongo_uri = "mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]"

dataframe.write.format("mongo").mode("append").option("uri", mongo_uri).save()
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("TwitterSentimentAnalysis") \
.config("spark.mongodb.input.uri",
"mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]") \
.config("spark.mongodb.output.uri",
"mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]"
) \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.getOrCreate()
# Load predictions in Mongo
query = prediction.writeStream.queryName("tweets") \
.foreachBatch(write_row_in_mongo).start()
query.awaitTermination()

Loading results in Delta Lake

def write_row_in_delta(df):
deltaPath = config('DELTAPATH')
df.write.format("delta").mode("append").option("mergeSchema", "true").save(deltaPath)

# ...

spark = SparkSession \
.builder \
.master("local[*]") \
.appName("TwitterSentimentAnalysis") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.getOrCreate()

# ...

query = prediction.writeStream \
.foreachBatch(write_row_in_delta) \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()

Conclusion

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

The leading AI community and content platform focused on making AI accessible to all. Check out our new course platform: https://academy.towardsai.net/courses/beginner-to-advanced-llm-dev

Machine Learning Engineer 🌱 | French CS Engineer | Canadian MSc in AI | Data is my anchor in exploring all realms 🌍📊 | linkedin.com/in/clementdelteil/

Responses (3)

Write a response