How To Get Streaming Data From Twitter

I occasionally receive requests to share my Twitter analysis tools. After a few recent requests, it finally occurred to me that it would make sense to create a series of articles that describe how to use Python and the Twitter API to perform basic analytical tasks. Teach a man to fish, and all that.

In this blog post, I’ll describe how to obtain streaming data using Python and the Twitter API.

I’m using twarc instead of tweepy to gather data from Twitter streams. I recently switched to using twarc, because has a simpler interface than tweepy, and handles most network errors and Twitter errors automatically.

In this article, I’ll provide two examples. The first one covers the simplest way to get streaming data from Twitter. Let’s start by importing our dependencies.

from twarc import Twarc
import sys

Next, create a twarc session. For this, you’ll need to create a Twitter application in order to obtain the relevant authentication keys and fill in those empty strings. You can find many guides on the Internet for this. Here’s one.

if __name__ == '__main__':
  consumer_key=""
  consumer_secret=""
  access_token=""
  access_token_secret=""

  twarc = Twarc(consumer_key, consumer_secret, access_token, access_token_secret)

For the sake of brevity, let’s assume search terms will be passed as a list on the command-line. We’ll simply accept that list without checking it’s validity. Your own implementation should probably do more.

  target_list = []
  if (len(sys.argv) > 1):
    target_list = sys.argv[1:]

Finally, we’ll check if we have any search targets. If we do, we’ll create a search query. If not, we’ll attach to the sample stream.

  if len(target_list) > 0:
    query = ",".join(target_list)
    print "Search: " + query
    for tweet in twarc.filter(track = query):
      print_tweet(tweet)
  else:
    print "Getting 1% sample."
    for tweet in twarc.sample():
      print_tweet(tweet)

Here’s a function to print the “text” field of each tweet we receive from the stream.

def print_tweet(status):
  if "text" in status:
    print status["text"]

And that’s it. In just over 20 lines of code, you can attach to a Twitter stream, receive Tweets, and process (or in this case, print) them.

In my second example, incoming Tweet objects will be pushed onto a queue in the main thread, while a second processing thread will pull those objects off the queue and process them. The reason we would want to separate gathering and processing into separate threads is to prevent any blocking by the processing step. Although in this example, simply printing a Tweet’s text out is unlikely to block under normal circumstances, once your processing code becomes more complex, blocking is more likely to occur. By offloading processing to a separate thread, your script should be able to handle things such as heavy Tweet volume spikes, writing to disk, communicating over the network, using machine learning models, and working with large frequency distribution maps.

As before, we’ll start by importing dependencies. We’re including threading (for multithreading), Queue (to manage a queue), and time (for time.sleep).

from twarc import Twarc
import Queue
import threading
import sys
import time

The following two functions will run in our processing thread. One will process a Tweet object. In this case, we’ll do exactly the same as in our previous example, and simply print the Tweet’s text out.

# Processing thread
def process_tweet(status):
  if "text" in status:
    print status["text"]

The other function that will run in the context of the processing thread is a function to get items that were pushed into the queue. Here’s what it looks like.

def tweet_processing_thread():
  while True:
    item = tweet_queue.get()
    process_tweet(item)
    tweet_queue.task_done()

There are also two functions in our main thread. This one implements the same logic for attaching to a Twitter stream as in our first example. However, instead of calling process_tweet() directly, it pushes tweets onto the queue.

# Main thread
def get_tweet_stream(target_list, twarc):
  if len(target_list) > 0:
    query = ",".join(target_list)
    print "Search: " + query
    for tweet in twarc.filter(track = query):
      tweet_queue.put(tweet)
  else:
    print "Getting 1% sample."
    for tweet in twarc.sample():
      tweet_queue.put(tweet)

Now for our main function. We’ll start by creating a twarc object, and getting command-line args (as before):

if __name__ == '__main__':
  consumer_key=""
  consumer_secret=""
  access_token=""
  access_token_secret=""

  twarc = Twarc(consumer_key, consumer_secret, access_token, access_token_secret)

  target_list = []
    if (len(sys.argv) > 1):
      target_list = sys.argv[1:]

Next, let’s create the queue and start our processing thread.

  tweet_queue = Queue.Queue()
  thread = threading.Thread(target=tweet_processing_thread)
  thread.daemon = True
  thread.start()

Since listening to a Twitter stream is essentially an endless loop, let’s add the ability to catch ctrl-c and clean up if needed.

  while True:
    try:
      get_tweet_stream(target_list, twarc)
    except KeyboardInterrupt:
      print "Keyboard interrupt..."
      # Handle cleanup (save data, etc)
      sys.exit(0)
    except:
      print("Error. Restarting...")
      time.sleep(5)
      pass

If you want to observe a queue buildup, add a sleep into the process_tweet() function, and attach to a stream with high enough volume (such as passing “trump” as a command-line parameter). Have fun listening to Twitter streams!



Articles with similar Tags