Search
Nicolas Perez

Spark Streaming and Twitter Sentiment Analysis

March 9, 2021

Editor’s Note: MapR products and solutions sold prior to the acquisition of such assets by Hewlett Packard Enterprise Company in 2019 may have older product names and model numbers that differ from current solutions. For information about current offerings, which are now part of HPE Ezmeral Data Fabric, please visit https://www.hpe.com/us/en/software/data-fabric.html

Original Post Information:

"authorDisplayName": "Nicolas A Perez",
"publish": "2016-04-19T07:00:00.000Z",
"tags": ["apache-spark","use-case"]

This blog post is the result of my efforts to show to a coworker how to get the insights he needed by using the streaming capabilities and concise API of Apache Spark. In this blog post, you'll learn how to do some simple, yet very interesting, analytics that will help you solve real problems by analyzing specific areas of a social network.

Using a subset of a Twitter stream was the perfect choice to use in this demonstration, since it had everything we needed: an endless and continuous data source that was ready to be explored.

Spark Streaming, Minimized

Spark Streaming is very well explained here, so we are going to skip some of the details about the Streaming API and move on to setting up our app.

Setting Up Our App

Let’s see how to prepare our app before doing anything else.

val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config) sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

Here, we have created the Spark Context sc and set the log level to WARN to eliminate the noisy log Spark generates. We also created a Streaming Context ssc using sc. Then we set up our Twitter credentials (before doing this we needed to follow these steps) that we got from the Twitter website. Now the real fun starts.

It is easy to find out what is trending on Twitter at any given moment; it is just a matter of counting the appearances of each tag on the stream. Let’s see how Spark allows us to do this operation.

val tags = stream.flatMap {
  status => status.getHashtagEntities.map(_.getText)  
}
tags.countByValue() .foreachRDD {
  rdd => val now = org.joda.time.DateTime.now() rdd.sortBy(_._2) .map(x => (x, now)) .saveAsTextFile(s"~/twitter/$now")
}

First, we got the tags from the Tweets, counted how many times it (a tag) appeared, and sorted them by the count. After that, we persisted the result in order to point Splunk (or any other tool for that matter) to it. We could build some interesting dashboards using this information in order to track the most trending hashtags. Based on this information, my coworker could create campaigns and use these popular tags to attract a bigger audience.

Analyzing Tweets

Now we want to add functionality to get an overall opinion of what people think about a set of topics. For the sake of this example, let’s say that we want to know the sentiment of Tweets about Big Data and Food, two very unrelated topics.

There are several APIs for analyzing sentiments from Tweets, but we are going to use an interesting library from The Stanford Natural Language Processing Group in order to extract the corresponding sentiments.

In our build.sbt file we need to add the corresponding dependencies.

libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"classifier "models"

Now, we need to select only those Tweets we really care about by filtering the stream using certain hashtag (#). This filtering is quite easy, thanks to a unified Spark API.

Let’s see how.

val tweets = stream.filter {
  t => val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase) tags.contains("#bigdata") && tags.contains("#food")
}

Here, we get all tags in each Tweet, checking that it has been tagged with#bigdata and #food.

Once we have our Tweets, extracting the corresponding sentiment is quite easy. Let’s define a function that extracts the sentiment from the Tweet’s content so we can plug it in our pipeline.

def detectSentiment(message: String): SENTIMENT_TYPE

We are going to use this function, assuming it does what it should, and we will put its implementation at the end, since it's not the focus of this post. In order to get an idea of how it works, let's build some tests around it.

it("should detect not understood sentiment") {
  detectSentiment("")should equal (NOT_UNDERSTOOD) 
}
it("should detect a negative sentiment") {
  detectSentiment("I am feeling very sad and frustrated.")should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
  detectSentiment("I'm watching a movie")should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
  detectSentiment("It was a nice experience.")should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
  detectSentiment("It was a very nice experience.")should equal (VERY_POSITIVE)
}

These tests should be enough to show how detectSentiment works.

Let’s see an example.

val data = tweets.map {
  status => val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase) (status.getText, sentiment.toString, tags)
}

Here, data represents a DStream of Tweets we want, the associated sentiment, and the hashtags within the Tweet (here we should find the tags we used to filter).

SQL Interoperability

Now we want to cross reference the sentiment data with an external dataset that we can query using SQL. For my coworker, it makes a lot of sense to be able to join the Twitter stream with his other dataset.

Let’s take a look at how we could achieve this.

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ 
data.foreachRDD {
  rdd => rdd.toDF().registerTempTable("sentiments")
}

We have transformed our stream into a different representation (a DataFrame), which is also backed by all Spark concepts (resilient, distributed, very fast) and exposed it as a table so my coworker can use his beloved SQL to query different sources.

The table sentiment (that we defined from our DataFrame) will be queried as any other table in his system. Another possibility is that we could query other data sources (Cassandra, Xmls, or our own binary formatted files) using Spark SQL and cross them with the stream.

You can find out more information about this topic here and here.

An example of querying a DataFrame is shown next.

sqlContext.sql("select * from sentiments").show()

Windowed Operations

Spark Streaming has the ability to look back in the stream, a functionality most streaming engines lack (if they do have this functionality, it's very hard to implement).

In order to implement a windowed operation, you'll need to checkpoint the stream, but this is an easy task. You'll find more information about this here.

Here's a small example of this kind of operation:

tags  
   .window(Minutes(1)) . (...)

Conclusion

Even though our examples are quite simple, we were able to solve a real life problem using Spark. We now have the ability to identify trending topics on Twitter, which helps us both target and increase our audience. At the same time, we are able to access different data sets using a single set of tools such as SQL.

Very interesting results came back from #bigdata and #food at the same time. Perhaps people Tweet about big data at lunch time—who knows?

Related

Ted Dunning & Ellen Friedman

3 ways a data fabric enables a data-first approach

Mar 15, 2022
Nicolas Perez

A Functional Approach to Logging in Apache Spark

Feb 5, 2021
Cenz Wong

Getting Started with DataTaps in Kubernetes Pods

Jul 6, 2021
Kiran Kumar Mavatoor

Accessing HPE Ezmeral Data Fabric Object Storage from Spring Boot S3 Micro Service deployed in K3s cluster

Sep 13, 2021
Carol McDonald

An Inside Look at the Components of a Recommendation Engine

Jan 22, 2021
Carol McDonald

Analyzing Flight Delays with Apache Spark GraphFrames and MapR Database

Dec 16, 2020
Nicolas Perez

Apache Spark as a Distributed SQL Engine

Jan 7, 2021
Carol McDonald

Apache Spark Machine Learning Tutorial

Nov 25, 2020

HPE Developer Newsletter

Stay in the loop.

Sign up for the HPE Developer Newsletter or visit the Newsletter Archive to see past content.

By clicking on “Subscribe Now”, I agree to HPE sending me personalized email communication about HPE and select HPE-Partner products, services, offers and events. I understand that my email address will be used in accordance with HPE Privacy Statement. You may unsubscribe from receiving HPE and HPE-Partner news and offers at any time by clicking on the Unsubscribe button at the bottom of the newsletter.

For more information on how HPE manages, uses, and protects your personal data please refer to HPE Privacy Statement.