Real-Time Topic Modeling of Microblogs

by Yogesh Tewari and Rajesh Kawad

This article explores the challenge of real-time extraction of topics from a continuous stream of incoming microblogs or tweets. Learn how to apply LDA to tweets for topic modeling.

Published March 2013

Downloads:

Download: Java SE 7

This article explores the challenge of real-time extraction of topics from a continuous stream of incoming microblogs or tweets that are particular to an application we created. Given simple tweet text, our application should suggest a relevant topic being discussed in that tweet, and provide, in real time, a timeline of topics generated from tweet streams. This is a complex task, first because a tweet, considered as a text corpus, contains only 140 characters and second, given their brevity, tweets may not provide useful information and may contain different forms of text such as "smileys" and short-form URLs. Finally, tweets are generated in real time.

We apply LDA (latent Dirichlet allocation) to topic model tweets and use the Machine Learning for Language Toolkit (MALLET) API as the implementation of LDA in a Java environment. Topic models, algorithms that uncover document collections' hidden thematic structure, train themselves according to the posterior probability distribution of various modeling parameters. This article doesn't discuss the parameters that are internal to LDA and the API but focuses instead on those we provide as input to our application.

LDA implementation is encapsulated within the MALLET API, which functions as a command line–based Java tool. Our targets are the actual Java classes that perform the LDA logic whose methods we invoke with required input in real time. Storm is our choice of a free and open source distributed real-time computation engine implemented in Java and running in a distributed mode. Storm is highly scalable and easily capable of handling incoming tweet streams. We use Twitter4J to stream tweets, which require valid Twitter authentication. So our task is to design a topology that will consume tweet streams and output a timeline of topics.

Rationale

Topic modeling, as a machine learning discipline, provides a simple use case for labeling and classifying a collection of texts and documents. LDA, the most commonly used topic model, is implemented by various applications, such as Apache Mahout, MALLET, LDA-c, and GENSIM. These implementations span various programming platforms such as C, C++, Python, and Java.

Various applications perform topic modeling on microblogs, which are simply text. Although these applications offer different approaches to topic modeling, they all make use of batch processing. Microblogs are actually texts that stream in real time. Various applications and papers discuss topic modeling on tweets, but their idea of this process is to collect a bunch of tweets and then apply their topic model to that group of tweets. So for a simple use case of this scenario, if we need to know the topics currently being discussed in our company's Twitter account, we need to wait for a reasonable number of tweets to aggregate and then apply our topic model on that group.

How It's Done

MALLET API feeds on vectors of text/documents for topic extraction. Our first job is to convert the incoming tweet streams to vectors, write them to files, and provide the directory path to the MALLET method for topic extraction. We create a sample application in our Twitter account on http://dev.twitter.com/ and generate "OAuth settings" and "access token," which are necessary for Twitter authorization. (Please go here for more information.) The actual workflow steps are as follows:

  1. Filter tweets from the Twitter stream.
  2. Save these tweets to a file. (In our case, one tweet per file per directory, because we are applying topic modeling to every tweet in real time).
  3. Apply text-to-vector conversion to each tweet file, and write to the vector file.
  4. Apply vector-to-topic conversion to each vector file.

These four simple steps constitute the main functionality of the Storm topology that performs this whole process in real time. A simple Storm topology is shown in Figure 1. It consists of spouts (the source of streams) and bolts (processing engine/consumers of streams). Our topology contains one spout to handle incoming tweet streams; one bolt to filter and persist tweets to files; and one bolt to perform topic modeling, including text-to-vector conversion of those files.

Figure 1

Figure 1. Real-time topic modeling Storm topology

What Is LDA?

As stated in the paper "Latent Dirichlet Allocation" by David M. Blei, Andrew Y. Ng, and Michael I. Jordan, "LDA is a generative probabilistic model for collections of discrete data such as text corpora."

Corpora is the plural of corpus, which essentially means a large and structured set of texts. This article is a text corpus. LDA performs topic distribution learning in the form of Bayesian inference, an approximate inference technique based on vibrational methods and an expectation-maximization (EM) algorithm for empirical Bayes parameter estimation. As mentioned earlier, we can modulate our result based upon various modeling parameters. Some of these, which we provide to our application as input, are

  • Stop words: Provided as a stop word file, these words are to be ignored in topic modeling. In our tweets application, we use smileys, URLs, and common Twitter jargon such as LOL as stop words.
  • Number of iterations: This value defines the number of iterations of Gibbs sampling (used in Bayesian inference). Its ideal value should range between 200 and 1,000, depending on the number of topics required from the given input text.
  • Number of topics: This value states how many topics are required from the input text.
  • Document topic threshold: This value states the threshold proportion of a topic relative to the text. A topic is valid if its occurrence in the text is above the given threshold value. Any topics below the threshold are discarded.

Using Storm

Storm was developed and is maintained by Nathan Marz. According to Marz, "Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use."

Our Storm cluster includes two nodes, stormnode1 and stormnode2, both running on Centos 5.6. The stormnode1 node is running Nimbus (the master), and both are running Supervisor (the workers). Here are the steps we took:

  1. Set up the Storm cluster. Make sure ZooKeeper is running (required by Storm), and create an entry for ZooKeeper in the Storm configuration file storm.yaml. Both of the nodes should have proper entries. Listing 1 is a sample storm.yaml file.

    java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
    # storm.* configs are general configurations
    # the local dir is where jars are kept
    storm.local.dir: "/home/storm/local"
    storm.zookeeper.servers:
        - "localhost"
    storm.zookeeper.port: 2181
    storm.zookeeper.root: "/home/storm/zookeeper"
    storm.cluster.mode: "distributed" # can be distributed or local
    storm.local.mode.zmq: false
    
    # nimbus.* configs are for the master
    nimbus.host: "localhost"
    nimbus.thrift.port: 6627
    
    transactional.zookeeper.root: "/home/storm/transactional/zookeeper"
    transactional.zookeeper.servers: null
    transactional.zookeeper.port: null
    
    # supervisor.* configs are for node supervisors
    # Define the numbers of workers that can be run on this machine.
    # Each worker is assigned a port to use for communication
    supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703
    

    Listing 1. Sample storm.yaml file

    We change the nimbus.port entry to stormnode1 in the storm.yaml file of stormnode2. Now the following commands will go into separate terminals in stormnode1. (We have not tried to run these commands in the background, so we have one terminal per command. Please explore this if you wish to.)

    $ ./ /home/storm/bin/storm nimbus
    $ ./ /home/storm/bin/storm supervisor
    $ ./ /home/storm/bin/storm ui
    

    For stormnode2, only Supervisor needs to run:

    $ ./ /home/storm/bin/storm supervisor
    

    Once the Storm UI is up, we can open http://localhost:8080 (the default; the port can be changed) on stormnode1 to get Storm statistics, as shown in Figure 2.

    Figure 2

    Figure 2. Storm UI dashboard

  2. Submit the topology in the form of a JAR file on a new terminal. Include the main class and topology name options along with the topology JAR file.

    $ ./ /home/storm/bin/storm jar /home/storm/data/jars/ldatopo.jar 
    com.infy.ilabs.LDATopology LDATopology 
    

    Figure 3

    Figure 3. Storm UI, topology submitted

    Figure 4

    Figure 4. Storm UI spouts and bolts

  3. Output is usually directed to a file that can later be referenced for topic timelines.

Topology

A Storm topology usually consists of spouts and bolts. The spouts are the sources, and the bolts are the consumers. Here is a typical Storm topology, as illustrated by Nathan Marz:

Figure 5

Figure 5. Typical Storm topology

Our topology, with its one spout and two bolts, is simpler. The spout uses the Twitter4J API to stream tweets in real time and directs them to the preprocessing bolt, which filters the tweets and persists them as discussed previously. This preprocessing bolt then streams the path of persisted tweets to the LDA-performing bolt. This bolt invokes MALLET API methods, with the required parameters, which perform topic modeling on a per-tweet basis. The standard Storm code that goes into the topology, spouts, and bolts is as follows.

/*
This is a simple topology building code. Please take care of imports, 
exceptions, and other required code components in your actual class.
*/

// create new topology and set spouts and bolts.
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("TweetsSpout", new TweetsSpout(), 4);
topologyBuilder.setBolt("TweetsBolt", new TweetsInFileBolt(), 2).shuffleGrouping("TweetsSpout");
topologyBuilder.setBolt("LDABolt", new DoLDABolt(), 1).shuffleGrouping(
"TweetsBolt");

// submit topology to storm with args[0] as topology name.
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology());

Listing 2. Storm topology class

/*
This is a simple spout building code and is using twitter4j UserStreamListener 
implementation. Please take care of imports, exceptions, and other required 
code components in your actual class.
*/
class TweetsSpout extends BaseRichSpout {
  ..
  ..
  SpoutOutputCollector _collector;
  LinkedBlockingQueue<Status> queue = null;
  TwitterStream _twitterStream;

  @Override
  void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    queue = new LinkedBlockingQueue<Status>(1000);
    _collector = collector;
    ..
    ..
    UserStreamListener listener = new UserStreamListener() {
      // UserStreamListener is an Interface. 
      // Instantiate its implementation accordingly.
      // Refer to Twitter4J examples.
      ..
      ..
      @Override
      void onStatus(Status status) {
        System.out.println("onStatus @" + status.getUser().getScreenName() + 
        " - " + status.getText());
      }
      ..
      ..
    };
    ..
    ..
    _twitterStream.addListener(listener);
    _twitterStream.user();
    ..
  }
  
  @Override
  void nextTuple() {
    ..
    ..
    String tweet = queue.poll();
    ..
    // If no tweets present then sleep(50) else _collector.emit(new Values(tweet));
  }
}

Listing 3. Storm spout class

/*
This is a simple bolt building code that reads input from spout, persists 
filter tweets to files, and emits the file path. Please take care of 
imports, exceptions, and other required code components in your actual class.
*/
class TweetsBolt extends BaseBasicBolt {
  ..
  ..
  @Override
  void execute(Tuple input, BasicOutputCollector collector) {
    ..
    ..
    String tweet = input.getValues().toString();
    ..
    // read tweet as string and process according to your need.
    // Persist this tweet to a file and emit the file path as 
    // input to next bolt.
    ..
    // Use File.createTempFile() to create unique directory and 
    // files per tweet or a collection of tweets. 
    ..
    String filePath = tweetFile.getPath();
    collector.emit(new Values(filePath));
  }
}

Listing 4. Preprocessor bolt class—TweetsInFileBolt.java

/*
This is simple bolt building code that reads input from previous bolt, performs 
LDA using MALLET API, and persists the output. Please take care of imports, 
exceptions, and other required code components in your actual class.
*/
class LDABolt extends BaseBasicBolt {
  ..
  ..
  @Override
  void execute(Tuple input, BasicOutputCollector collector) {
    ..
    ..
    Text2Vectors t2v = new Text2Vectors();
    ..
    // form string array args and invoke main of Text2Vectors.
    String[] args = {"--remove-stopwords", "true",
        "--preserve-case", "false",
        "--input", <input directory path>,
        "--output", <output directory path>+"/topic-input.mallet",
        "--keep-sequence"
        };
    T2v.main(args);
    ..
    Vectors2Topics v2t = new Vectors2Topics();
    ..
    // form string array args and invoke main of Vectors2Topics.
    String[] args = {"--num-iterations", "200",
        "--num-top-words", "3",
        "--doc-topics-threshold", "0.26",
        "--input", <output directory path>+"/topic-input.mallet",
        "--num-topics", "1",
        "--output-state", <output directory path>+"/output_state.gz",
        "--output-topic-keys", <output directory path>+"/output_topic_keys",
        "--output-doc-topics", <output directory path>+"/output_doc_topics.txt"
        };
    v2t.main(args);
    ..
    // Output directory path will contain files related to topics 
    // generated. Use them as you wish.
  }
}

Listing 5. Storm topology class—LDATopology.java

Results

We executed our POC (Proof Of Concept) on some sample tweets about Infosys' purchase of Lodestone. Table 1 shows two of the sample inputs and outputs:

Table 1. Sample inputs and outputs
1 Input tweet: RT @rwang0: News Analysis: Infosys Buys Lodestone for $350M #techoptimization #BPO #SAP
Filtered tweet: RT @rwang0: News Analysis: Infosys Buys Lodestone for $350M [hash tags removed]
Result   Topic Word 1 Topic Word 2
Topic Word 1 lodestone Infosys
Topic Word 2 buys News
2 Input tweet: RT @ForbesTech News Analysis: Infosys Buys Lodestone for $350M: Global outsourcing and Bangalore In... #Philippines
Filtered tweet: RT @ForbesTech News Analysis: Infosys Buys Lodestone for $350M: Global outsourcing and Bangalore In... [hash tags removed]
Result   Topic Word 1 Topic Word 2
Topic Word 1 bangalore outsourcing
Topic Word 2 lodestone infosys

As is evident from the above, even though the inputs were short tweets, our POC was able to produce topics related to Infosys, Lodestone, and a buy/acquisition. The above experiment was performed for two topics per tweet. The documents' topic threshold value was in the range of .20<T<0.50, and two topic words were printed per topic.

With a real-time feed of the tweet stream, we got a timeline of topic words that were produced as shown above.

Conclusion

This POC (Proof Of Concept) was focused mainly on using Twitter as the source of microblog streams. We can enhance this use case by encapsulating other sources such as Facebook, identi.ca, and company internal blogging and bulletin boards. With initial topic timelines at the source, we can later perform a statistical analysis of various real-world and social events such as the 2012 Olympics, the Brangelina marriage, and natural disasters such as the Fukushima nuclear disaster or Hurricane Sandy. This statistical analysis can later be put to use for deciding trending topics on the timeline, shifts in trends, events over time, and correlation of various topics related to real-world events.

From this POC, we concluded that we can apply the LDA topic model, known primarily for working well with large text corpora, to a small collection of texts such as tweets and extract topics in real time. We can further extend our use case to implement statistical analyses of these topics in real time.

See Also

About the Authors

Yogesh Tewari is a technology analyst with Infosys Limited who has four and a half years of development experience in Java/J2EE technologies. He is currently involved in big-data research and development at Infosys Labs in Bangalore, India. He has a bachelor of technology degree in electronics and communication engineering. He is a Linux and big-data enthusiast with an interest in Hadoop- and Storm-related technologies.

Rajesh Kawad is a systems engineer with Infosys Limited who has two and a half years of development experience in Java/J2EE technologies. He is currently involved in big-data research and development at Infosys Labs in Bangalore, India. He holds a master of computer applications degree. He has worked on emerging technologies such as Hadoop, Hive, Oozie, and Storm.

Join the Conversation

Join the Java community conversation on Facebook, Twitter, and the Java Source Blog!