February 21, 2011

Last Friday was an InDay (A day at LinkedIn where you can take a break from your day-to-day work and build something cool) As a part of the SNA team, we have been building some really cool distributed systems, from storage, to messaging, to search. So we thought it’d be cool on this InDay to put it all together and see what we can come up with.

A challenge we gave ourselves was to see how quickly we can build a scalable Tweet search system. Some requirements:

  • The entire stack must be scalable, e.g. by adding  commodity hardware, it should theoretically handle all tweets with real internet traffic.
  • The part of the system should be able to scale differently, e.g. there is no reason the messaging system needs to scale the same way as the storage system.
  • Fault tolerant – The system should be able to tolerate system failures as well as data corruption.
  • Only 1 day to build it.

We scribbled together the following architecture:

Every component here is an open-source project we have deployed into our production environment with its own scaling characteristics.

Some design considerations and implementation details:

TweetStreamer is a twitter streamer using the twitter API written in Scala. We chose to use it because it is light-weight, simple and frankly, does exactly what we wanted. It can be easily configured to handle different types of twitter feeds.

We now have a data feed, one obvious thing to do is to store it. Voldemort seemed to be an obvious choice. Key = twitter id, value = tweet status (json string).

We also send the feed to a Kafka topic to be consumed by down-stream services, e.g. Sensei (our search system). You can find the code here.

The Sensei/search integration is more work: (code here), We need to setup of the following pieces:

  • How we would parse the query.
  • How to consume indexing events: e.g. extract from a tweet json string pieces of data we want to index. The search node consumes from Kafka, and builds an index-able object from a json string. In this case, we are only interested in 3 pieces of data: unique tweet id, the actual tweet text and the time (to be able to sort on): see (here)
  • Index data retention policy as we accumulate more and more tweets. For this case, we configured index retention to be 7 days, with the index rolling forward nightly. Here is a wiki on how this works under-the-hood.

The result set contains an array of search hits, each hit containing the tweet id, relevance score and the time. We then for each tweet id, get the original tweet status object from Voldemort and build the result json object. This is done in a Scalatra servlet: see code here.

This pretty much describes the system. Let’s understand some design decisions that were made:

  • One design choice was letting the process that writes to Voldemort also be a Kafka consumer. Although this would be cleaner, we would risk a data-race where search may return hit array before they are yet added to Voldemort. By making sure it is first added to Voldemort, we can rely on it being an authoritative storage for our tweets.
  • You may have already realized Kafka is acting as a proxy for twitter stream, and we could have also streamed tweets directly into the search systems, bypassing the Kafka layer. What we would be missing is the ability to play back tweet events from a specific check-point. One really nice feature about Kafka is that you can keep a consumption point to have data replayed. This makes reindexing for cases such as data corruption and schema changes, etc., possible. Furthermore, to scale search, we would have a growing number of search nodes consume from the same Kafka stream. Kafka is written in a way where adding consumers does not affect through-put of the system really helps in scaling the entire system.
  • Another important design decision was on using Voldemort for storage. One solution would be instead store tweets in the search index, e.g. Lucene stored fields. The benefits with this approach would be stronger consistency between search and store, and also the stored data would follow the retention policy of that’s defined by the search system. However, other than the fact that Lucene stored field is no-where near as optimal comparing to a Voldemort cluster (an implementation issue), there are more convincing reasons:
    • We can first see the consistency benefit for having search and store be together is negligible. Actually, if we follow our assumption of tweets being append-only and we always write to Voldemort first, we really wouldn’t have consistency issues. Yet, having data storage reside on the same search system would disproportionally introduce contention for IO bandwidth and OS cache, as data volume increases, search performance can be negatively impacted.
    • The point about retention is rather valid. Our decision ultimately came down to two points: 1) Voldemort’s growth factor is very different, e.g. adding new records into the system is much cheaper, so it is feasible to have a much longer data retention policy. (see retention setting for Voldemort stores) 2) Having have cluster of tweet storage allows us to integrate with other systems if desired for analytics, display etc.

We tossed up what we built on Github (Chirper Project). As you can see, the amount of code is rather minimal, and yes, we were able to build it in one day.

Here is a screenshot:

screen-shot-2011-02-21-at-94602-pm

chirper screenshot

As you can see, the UI is rather fancy. Read on for part 2 of 2 of this post where Alejandro will detail out the UI portion of this application.

{{error}}