Remember me

Register  |   Lost password?


 

StatAlgo Logo


Stream Processing with Messaging Systems

Wed, 28 May 2014 20:47:40 GMT

Stream processing involves taking a stream of messages and running a computation on each message. How can we manage high throughput given a large number of messages and potentially complicated computations? Messaging systems (EMS) are middleware which serve to pass messages reliably from one process to another providing a means for asynchronous communication, which means that processes are not blocked from operating to wait while another process is running (data is not synchronized between the producer and consumer).

One standard design pattern that uses messaging queues is called the publish-subscribe pattern (or pub-sub) (related to the observer pattern and the client-server model). This is a division of labour which dictates that there is a publisher which is responsible for producing output, while the subscriber is responsible for consuming and performing some kind of computation. This allows to optimize each component to strictly suit its need.

Why use a Messaging System?

A real-time system requires that the analysis keeps up the pace with data input.

Underlying the publish-subscribe pattern is a fundamental issue with data streams: the data can be produced at a higher rate than it can be consumed. The consumer is performing some kind of analysis, which may in some cases be quite complex, while the producer is often simply passing along a stream of data with minimal processing overhead.

When there are very high data rates, it is necessary both to ensure that data can get from the producer to the consumer, but also to allow for scaling: that means that there may need to be many consumers per producer, and depending on the analytic requirements, it may be that these cannot be co-located on the same physical machine.

Messaging systems also allow for diverse applications and languages to communicate with each other, which provides a further means for creating specific applications for specific tasks, without having to worry as much about integration issues.

Needless to say, using a messaging layer should be done only as needed: it adds an extra level of complexity and introduces additional when processing data.

What is a Data Tuple?

For clarity, we will refer to the data that gets passed around between different parts of the application as tuples, although you will also frequently see these referred to as messages.

What is a tuple? A tuple is an ordered list of elements, generally written as a list with ( , ). As an example, (a, b, 1, 2, c) is a 5-tuple. In the context of stream processes, a tuple can be considered any structured data that needs to get moved from one function to another (where an function in this context is simply a block of executable code), although it is uncommon to move whole objects explicitly. It is typical to write applications that pass tuples within the same process, but this design can introduce bottlenecks in larger systems.

What is a Queue?

A queue is one of the most basic data structures in computer science. This implements the First-In-First-Out (FIFO) method, which means that entries are pushed in one side and popped off the other side.

The queue is a sensible data structure to use for messaging systems since it is lightweight and we typically want to keep the temporal order of events.

Advanced Messaging Queuing Protocol (AMQP)

AMQP mandates the behavior of the messaging provider and client to the extent that implementations from different vendors are truly interoperable, in the same way as SMTP, HTTP, FTP, etc. have created interoperable systems.

The Advanced Message Queuing Protocol (AMQP) is an open standard for message-oriented middleware. Many popular open-source messaging systems now use AMQP as a standard.

AMQP originated in 2003 by John O'Hara at JPMorgan Chase, but by 2005 had expanded to a much larger working group. This now forms the basis for some of the most popular messaging systems.

Systems

Messaging systems have been around for a long time, although the open-source implementations started mostly within the last 10-15 years. IBM and Tibco created some of the major popular commercial products historically.

Open source implementations have gained significant traction over the last decade (a nice comparison can be found on stackoverflow). I have personally used several over time, but mostly use RabbitMQ to start because of ease of implementation and features, unless performance is a significant factor (in which case I have defaulted to ZeroMQ):

  • ActiveMQ is generally considered to be one of the most flexible and popular message queues, is written in Java, and can be deployed using different topologies including P2P, and supports AMQP.
  • Kafka was developed at LinkedIn, open sourced in 2011, and is written in Scala (nice discussion comparing Kafka to Rabbit on quora).
  • RabbitMQ implements AMQP, uses a broker model (single point), and is written in Erlang.
  • ZeroMq was developed by Pieter Hintjens (starting around 2007), is often considered to be one of the fastest message queues, offers flexible topologies, is written in C++, and does not support AMQP.

Also interesting is NSQ, which was created by bit.ly.

There are many different language interfaces to these queues.

Messaging Example: The Good vs. The Bad of Obama on Twitter

Let us design a simple toy example to demonstrate the usage of a message queue. Suppose we want to connect to streaming Twitter data and simply count all the tweets that contain the word "Obama" and that use the word "good" and/or "bad". We will use Python with RabbitMQ. I base this example heavily from this tweepy streaming example and the RabbitMQ python tutorial.


Install RabbitMQ following the directions: http://www.rabbitmq.com/download.html. Start running the RabbitMQ server by executing: rabbitmq-server. Now you will need to get a consumer key and access token to use the Twitter API by following the directions on https://apps.twitter.com/.

Now we're ready to start pushing data onto the queue (note that you need to provide your twitter keys below):

 import pika from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream # Go to http://dev.twitter.com and create an app. consumer_key="" consumer_secret="" access_token="" access_token_secret="" # Create a connection to ActiveMQ on the localhost connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # Create a queue called ObamaTweets channel.queue_declare(queue='ObamaTweets') # Create a listener class that will get called from the tweet stream # This will post tweets directly onto the queue class RabbitListener(StreamListener):   """ A listener handles tweets are the received from the stream.   """   def on_data(self, data):     print data     channel.basic_publish(exchange='', routing_key='ObamaTweets', body=data)     return True      def on_error(self, status):     print status # Initialize the tweet stream and start running everything: l = RabbitListener() auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) stream = Stream(auth, l) stream.filter(track=['Obama']) connection.close() 

Then, in a separate python session, start to consume tweets off the queue:

 import pika import json # Connect to the ObamaTweets queue connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='ObamaTweets') # Track some count variables channel.count_all = 0 channel.count_good = 0 channel.count_bad = 0 print ' [*] Waiting for messages. To exit press CTRL+C' # Define function to count tweets by type def callback(ch, method, properties, body):   tweet_text = json.loads(body).get('text')   ch.count_all += 1   if "good" in tweet_text:     ch.count_good += 1   if "bad" in tweet_text:     ch.count_bad += 1   print " [x] Received %s: Counts - Good = %s Bad = %s All = %s" % (tweet_text, ch.count_good, ch.count_bad, ch.count_all) # Run consumer channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() connection.close() 

You should see all Obama tweets now being produced in one session and consumed in another. This should create output that looks roughly like:

  [x] Received RT @benshapiro: It's always fun when Obama uses troops for a photo op in which he poses as a centrist between two positions that don't exis…: Counts - Good = 3 Bad = 1 All = 580  [x] Received Heroic Obama hides/ducks behind a child to avoid being hit by catapult via Pamela Geller, Atlas ... http://t.co/YEu9zAa8vt: Counts - Good = 3 Bad = 1 All = 581  [x] Received @nafisa2020 @seunfakze for not doing his job? Obama that is the world strongest president is being abused how many has he rented shame u: Counts - Good = 3 Bad = 1 All = 582  [x] Received RT @zerohedge: President Obama Defends His Foreign Policy In West Point Speech - Live Webcast http://t.co/QbSh7SNId8: Counts - Good = 3 Bad = 1 All = 583  [x] Received RT @ConNewsNow: Barack Obama and His Urban Parasites Declare War on the Constitution http://t.co/cpPq3QXSh2 #tcot #UniteBlue #p2: Counts - Good = 3 Bad = 1 All = 584 

This provides a very simple example of pushing data onto a queue and consuming it off in another process, but it doesn't give any indication for how we might go about creating more sophisticated distributed applications. We extend this in later posts are review issues related to throughput, reliability, latency, and general topology design.

In the next post, we will cover complex event processing and look at Esper as an example. We will then review general stream processing systems including Storm and Druid to see how they solve some of these issues and how they can be used for large scale stream mining applications.

, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ,