We're the Data Team

The data team at LinkedIn works on LinkedIn's information retrieval systems, the social graph system, data driven features, and supporting data infrastructure.

  • team photo
  • team photo
  • team photo
January 15, 2014

Apache DataFu now has a new website with plenty of documentation and HowTo guides. Check it out!

September 5, 2013

Author: William Vaughan

DataFu is an open-source collection of user-defined functions for working with large-scale data in Hadoop and Pig.

About two years ago, we recognized a need for a stable, well-tested library of Pig UDFs that could assist in common data mining and statistics tasks. Over the years, we had developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have the initial release of DataFu.

Since then, the project has continued to evolve. We have accepted contributions from a number of sources, improved the style and quality of testing, and adapted to the changing features and versions of Pig. During this time DataFu has been used extensively at LinkedIn for many of our data driven products like "People You May Known" and "Skills and Endorsements." The library is used at numerous companies, and it has also been included in Cloudera's Hadoop distribution (CDH) as well as the Apache BigTop project. DataFu has matured, and we are proud to announce the 1.0 release.

Download DataFu 1.0

An extended example

This release of DataFu has a number of new features that can make writing Pig easier, cleaner, and more efficient. In the remainder of this post, we are going to highlight some of these new features. So, let's start with an extended example.

Counting events

Let's consider a hypothetical recommendation system. In this system a user will be recommended an item (an impression). The user can then accept that recommendation, explicitly reject that recommendation, or just simply ignore it. A common task in such a system would be to count how many times users have seen and acted on items. How could we construct a pig script to implement this task?

Setup

Before we start, it's best to define what exactly we want to do, our inputs and our outputs. The task then is to generate, for each user, a list of all items that user has seen with a count of how many impressions were seen, how many were accepted and how many were rejected.

In summary, our desired output schema is:

features: {user_id:int, items:{(item_id:int, impression_count:int, accept_count:int, reject_count:int)}}

For input, we can load a record for each event:

A naive approach

The straight-forward approach to this problem generates each of the counts that we want, joins all of these counts together, and then groups them up by the user to produce the desired output:

Unfortunately, this approach is not very efficient. It generates six mapreduce jobs during execution and streams a lot of the same data through these jobs.

A better approach

Recognizing that we can combine the outer joins and group operations into a single cogroup allows us to reduce the number of mapreduce jobs.

However, we still have to perform an extra group operation to bring everything together by user_id for a total of two mapreduce jobs.

Best... Enter the DataFu

The two grouping operations in the last example operate on the same set of data. It would be great if we could just get rid of one of them somehow.

One thing that we have noticed is that even very big data will frequently get reasonably small once you segment it sufficiently. In this case, we have to segment down to the user level for our output. That's small enough to fit in memory. So, with a little bit of DataFu, we can group up all of the data for that user, and process it in one pass:

So, let's step through this example and see how it works and what our data looks like along the way.

Group the features

First we group all of the data together by the user, getting a few bags with all of the respective event data in the bag.

features_grouped = COGROUP impressions BY user_id, accepts BY user_id, rejects BY user_id;

--features_grouped: {group: int,impressions: {(user_id: int,item_id: int,timestamp: long)},accepts: {(user_id: int,item_id: int,timestamp: long)},rejects: {(user_id: int,item_id: int,timestamp: long)}}

CountEach

Next we count the occurences of each item in the impression, accept and reject bag.

DEFINE CountEach datafu.pig.bags.CountEach('flatten');

features_counted = FOREACH features_grouped GENERATE 
    group as user_id,
    CountEach(impressions.item_id) as impressions,
    CountEach(accepts.item_id) as accepts,
    CountEach(rejects.item_id) as rejects;

--features_counted: {user_id: int,impressions: {(item_id: int,count: int)},accepts: {(item_id: int,count: int)},rejects: {(item_id: int,count: int)}}

CountEach is a new UDF in DataFu that iterates through a bag counting the number of occurrences of each distinct tuple. In this case, we want to count occurrences of items, so we project the inner tuples of the bag to contain just the item_id. Since we specified the optional 'flatten' argument in the constructor, the output of the UDF will be a bag of each distinct input tuple (item_id) with a count field appended.

BagLeftOuterJoin

Now, we want to combine all of the separate counts for each type of event together into one tuple per item.

DEFINE BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();

features_joined = FOREACH features_counted GENERATE
    user_id,
    BagLeftOuterJoin(
        impressions, 'item_id',
        accepts, 'item_id',
        rejects, 'item_id'
    ) as items;

--features_joined: {user_id: int,items: {(impressions::item_id: int,impressions::count: int,accepts::item_id: int,accepts::count: int,rejects::item_id: int,rejects::count: int)}}

This is a join operation, but unfortunately, the only join operation that pig allows on bags (in a nested foreach) is CROSS. DataFu provides the BagLeftOuterJoin UDF to make up for this limitation. This UDF performs an in-memory hash join of each bag using the specified field as the join key. The output of this UDF mimics what you would expect from this bit of not (yet) valid Pig:

features_joined = FOREACH features_counted {
  items = JOIN impressions BY item_id LEFT OUTER, accepts BY item_id, rejects BY item_id;
  GENERATE
    user_id, items;
}

Because BagLeftOuterJoin is a UDF and works in memory, a separate map-reduce job is not launched. This fact will save us some time as we'll see later on in the analysis.

Coalesce

Finally, we have our data in about the right shape. We just need to clean up the schema and put some default values in place.

DEFINE Coalesce datafu.pig.util.Coalesce();

features = FOREACH features_joined {
    projected = FOREACH items GENERATE
        impressions::item_id as item_id,
        impressions::count as impression_count,
        Coalesce(accepts::count, 0) as accept_count,
        Coalesce(rejects::count, 0) as reject_count;
  GENERATE user_id, projected as items;
}

--features: {user_id: int,items: {(item_id: int,impression_count: int,accept_count: int,reject_count: int)}}

The various counts were joined together using an outer join in the previous step because a user has not necessarily performed an accept or reject action on each item that he or she has seen. If they have not acted, those fields will be null. Coalesce returns its first non-null parameter, which allows us to cleanly replace that null with a zero, avoiding the need for a bincond operator and maintaining the correct schema. Done!

Analysis

Ok great, we now have three ways to write the same script. We know that the naive way will trigger six mapreduce jobs, the better way two, and the DataFu way one, but does that really equate to a difference in performance?

Since we happened to have a dataset with a few billion records in it lying around, we decided to compare the three. We looked at two different metrics for evaluation. One is the best case wall clock time. This metric is basically the sum of the slowest map and reduce task for each job (using pig default parallelism estimates). The other is total compute time which is the sum of all map and reduce task durations.

Version Wall clock time % Total compute time %
naive100.0%100.0%
better30.2%62.6%
datafu10.5%43.0%

As we can see, the DataFu version provides a noticable improvement in both metrics. Glad to know that work wasn't all for naught.

Creating a custom purpose UDF

Many UDFs, such as those presented in the previous section, are general purpose. DataFu serves to collect these UDFs and make sure they are tested and easily available. If you are writing such a UDF, then we will happily accept contributions. However, frequently when you sit down to write a UDF, it is because you need to insert some sort of custom business logic or calculation into your pig script. These types of UDFs can easily become complex, involving a large number of parameters or nested structures.

Positional notation is bad

Even once the code is written, you are not done. You have to maintain it.

One of the difficult parts about this maintenance is that, as the pig script that uses the UDF changes, a developer has to be sure not to change the parameters to the UDF. Worse, because a standard UDF references fields by positions, it's very easy to introduce a subtle change that has an unintended side effect that does not trigger any errors during runtime, for example, when two fields of the same type swap positions.

Aliases can be better

Using aliases instead of positions makes it easier to maintain a consistent mapping between the UDF and the pig script. If an alias is removed, the UDF will fail with an error. If an alias changes position in a tuple, the UDF does not need to care. The alias also has some semantic meaning to the developer which can aid in the maintenance proces.

AliasableEvalFunc

Unfortunately, there is a problem using aliases. As of Pig 11.1 they are not available when the UDF is exec'ing on the back-end; they are only available on the front-end. The solution to this is to capture a mapping of alias to position on the front-end, store that mapping into the UDF context, retreive it on the back-end, and use it to look up each position by alias. You also need to handle a few issues with complex schemas (nested tuples and bags), keeping track of UDF instances, etc. To make this process simpler, DataFu provides AliasableEvalFunc, an extension to the standard EvalFunc with all of this behavior included.

Mortgage payment example

Using AliasableEvalFunc is pretty simple; the primary difference is that you need to override getOutputSchema instead of outputSchema and have access to the alias, position map through a number of convenience methods. Consider the following example:

In this script we retrieve by alias from the input tuple a couple of different types of fields. One of these fields is a bag, and we also want to get values from the tuples in that bag. To avoid having namespace collisions among the different levels of nested tuples, AliasableEvalFunc prepends the name of the enclosing bag or tuple. Thus, we use getPrefixedAliasName to find the field interest_rate inside the bag named interest_rates. That's all there is to using aliases in a UDF. As an added benefit, being able to dump schema information on errors helps in developing and debugging the UDF (see datafu.pig.util.DataFuException).

LinearRegression example

Having access to the schema opens up UDF development possibilities. Let's look back at the recommendation system example from the first part. The script in that part generated a bunch of features about the items that users saw and clicked. That's a good start to a recommendation workflow, but the end goal is to select which items to recommend. A common way to do this is to assign a score to each item based on some sort of machine learning algorithm. A simple algorithm for this task is linear regression. Ok, let's say we've trained our first linear regression model and are ready to plug it in to our workflow to produce our scores.

We could develop a custom UDF for this model that computes the score. It is just a weighted sum of the features. So, using AliasableEvalFunc we could retrieve each field that we need, multiply by the correct coefficient, and then sum these together. But, then every time we change the model, we are going to have to change the UDF to update the fields and coefficients. We know that our first model is not going to be very good and want to make it easy to plug in new models.

The model for a linear regression is pretty simple; it's just a mapping of fields to coefficient values. The only things that will change between models are which fields we are interested in and what the coefficient for those fields will be. So, let's just pass in a string representation of the model and then let the UDF do the work of figuring out how to apply it.

Nice, that's clean, and we could even pass that model string in as a parameter so we don't have to change the pig script to change the model either -- very reusable.

Now, the hard work, writing the UDF:

Ok, maybe not that hard... The UDF parses out the mapping of field to coeffcient in the constructor and then looks up the specified fields by name in the exec function. So, what happens when we change the model? If we decide to drop a field from our model, it just gets ignored, even if it is in the input tuple. If we add a new feature that's already available in the data it will just work. If we try and use a model with a new feature and forget to update the pig script, it will throw an error and tell us the feature that does not exist (as part of the behavior of getDouble()).

Combining this example with the feature counting example presented earlier, we have the basis for a recommendation system that was easy to write, will execute quickly, and will be simple to maintain.

Sampling the data

Working with big data can be a bit overwhelming and time consuming. Sometimes you want to avoid some of this hassle and just look at a portion of this data. Pig has built-in support for random sampling with the Sample operator. But sometimes a random percentage of the records is not quite what you need. Fortunately, DataFu has a few sampling UDFs that will help in some situations, and as always, we would be happy to accept any contributions of additional sampling UDFs, if you happen to have some lying around.

These things always are easier to understand with a bit of code, so let's go back to our recommendation system context and look at a few more examples.

Example 1. Generate training data

We had mentioned previously that we were going to use a machine learning algorithm, linear regression, to generate scores for our items. We waived our hands and it happened previously, but generally this task involves some work. One of the first steps is to generate the training data set for the learning algorithm. In order to make this training efficient, we only want to use a sample of all of our raw data.

Setup

Given impression, accepts, rejects and some pre-computed features about a user and items, we'd like to generate a training set, which will have all of this information for each user_id, item_id pair, for some sample of users.

So, from this input:

We want to produce this type of output:

{user_id, item_id, is_impressed, is_accepted, is_rejected, feature_1, feature_2}

One key point on sampling here: We want the sampling to be done by user_id. This means that if we choose one user_id to be included in the sample, all the data for that user_id should be included in the sample. This requirement is needed to preserve the original characteristics of raw data in the sampled data as well.

Naive approach

The staright-foward solution for this task will be group the tracking data for each user, item pair, then group it by user_id, sample this grouped data, and then flatten it all out again again:

This job includes two group operations, which translates to two map-reduce jobs. Also, the group operation is being done on the full data even though we will sample it down later. Can we do any better than this?

A sample of DataFu -- SampleByKey

Yep.

We can use the SampleByKey FilterFunc to do this with only one group operation. And, since the group is operating on the already sampled (significantly smaller) data this job will be far more efficient.

SampleByKey lets you designate which fields you want to use as keys for the sampling, and guarantees that for each selected key, all other records with that key will also be selected, which is exactly what we want. Another charasteritic of SampleByKey is that it is deterministic, as long as the same salt is given on initialization. Thanks to this charastristic, we were able to sample the data seperately before we join them from the above example.

Example 2. Recommending your output

Ok, we've now created some training data that we used to create a model which will produce a score for each recommendation. So now we've got to pick which items to show the user. But, we've got a bit of a problem, we only have limited real-estate on the screen to present our recommendations, so how do we select which ones to show? We've got a score from our model so we could just always pick the top scoring items. But then we might be showing the same recommendations all the time, and we want to shake things up a bit so things aren't so static (OK, yes, I admit this is a contrived example; you wouldn't do it this way in real life). So let's take a sample of the output.

Setup

With this input:

We want to produce the exact same output, but with fewer items per user -- let's say no more than 10.

Naive approach

We can randomize using Pig's default Sample command.

The problem of this approach is that results are sampled from the population in a uniformly random fashion. The score you created with your learning algorithm does not have any effect on generating final results.

The DataFu you most likely need -- WeightedSample

We should use that score we generated to help bias our sample.

Fortunately, WeightedSample can do exactly that. It will randomly select from the candidates, but the scores of each candidate will be used as the probability of whether the candidate will be seleceted or not. So, the tuples with higher weight will have a higher chance to be included in sample - perfect.

Additional Examples

If you've made it this far into the post, you deserve an encore. So here are two more examples of how DataFu can make writing pig a bit simpler for you:

Filtering with In

One case where conditional logic can be painful is filtering based on a set of values. Suppose you want to filter tuples based on a field equalling one of a list of values. In Pig this can be achieved by joining a list of conditional checks with OR:

However as the number of items to check for grows this becomes very verbose. The In filter function solves this and makes the resulting code very concise:

Left Outer Join of three or more relations with EmptyBagToNullFields

Pig's JOIN operator supports performing left outer joins on two relations only. If you want to perform a join on more than two relations you have two options. One is to perform a sequence of joins.

However this can be inefficient as it requires multiple MapReduce jobs. For many situations, a better option is to use a single COGROUP which requires only a single MapReduce job. However the code gets pretty ugly.

This code uses the insight that the input1 bag will be empty when there is no match, and flattening this will remove the entire record. If the input2 or input3 bags are empty we don't want flattening them to remove the record though, so we replace them with a bag having a single tuple with null elements. When these are flattened we get a single tuple with null elements. But, we want our output to have the correct schema, so we have to specify it manually. Once, we do all of this, the approach successfully replicates the left join behavior. It's more efficient, and it's really ugly to type and read.

To clean up this code we have created EmptyBagToNullFields, which replicates the same logic as in the example above, but in a much more concise and readable fashion.

Notice that we do not need to specify the schema as with the previous COGROUP example. The reason is that EmptyBagToNullFields produces the same schema as the input bag. So much cleaner.

Final Example

Ok, a second encore, but no more. If you are doing a lot of these, you can turn this into a macro:

Then all you need to do is call your macro

features = left_outer_join(input1, val1, input2, val2, input3, val3);

Wrap-up

So, that's a lot to digest, but it's just a highlight into a few interesting pieces of DataFu. Check out the DataFu 1.0 release as there's even more in store.

We hope that it proves valuable to you and as always welcome any contributions. Please let us know how you're using the library — we would love to hear from you.

Additional Credits

Thanks to Matt Hayes, Evion Kim, and Sam Shah for contributions to this post.

March 21, 2013

As the use of Hadoop grows in an organization, scheduling, capacity planning, and billing become critical concerns. These are all open problems in the Hadoop space, and today, we’re happy to announce we’re open sourcing LinkedIn’s solution: White Elephant.

At LinkedIn, we use Hadoop for product development (e.g., predictive analytics applications like People You May Know and Endorsements), descriptive statistics for powering our internal dashboards, ad-hoc analysis by data scientists, and ETL. To better understand the usage of our Hadoop cluster across all of our use cases, we created White Elephant.

While tools like Ganglia provide system-level metrics, we wanted to be able to understand what resources were being used by each user and at what times. White Elephant parses Hadoop logs to provide visual drill downs and rollups of task statistics for your Hadoop cluster, including total task time, slots used, CPU time, and failed job counts.

White Elephant fills several needs:

  • Scheduling: when you have a handful of periodic jobs, it’s easy to reason about when they should run, but that quickly doesn’t scale. The ability to schedule jobs at periods of low utilization helps maximize cluster efficiency.
  • Capacity planning: to plan for future hardware needs, operations needs to understand the resource usage growth of jobs.
  • Billing: Hadoop clusters have finite capacity, so in a multi-tenant environment it’s important to know the resources used by a product feature against its business value.

In this post, we'll go over White Elephant's architecture and showcase some of the visualizations it offers. While you're reading, feel free to head over to the GitHub page to check out the code and try it out yourself!

Architecture

Here's a diagram outlining the White Elephant architecture:

Architecture

There are three Hadoop Grids, A, B, and C, for which White Elephant will compute statistics as follows:

  1. Upload Task: a task that periodically runs on the Job Tracker for each grid and incrementally copies new log files into a Hadoop grid for analysis.
  2. Compute: a sequence of MapReduce jobs coordinated by a Job Executor parses the uploaded logs and computes aggregate statistics.
  3. Viewer: a viewer app incrementally loads the aggregate statistics, caches them locally, and exposes a web interface which can be used to slice and dice statistics for your Hadoop clusters.

Example

Let’s go through a real use case: we've noticed an increase in cluster usage the last few months, but we don't know who is responsible. We can use White Elephant to investigate this issue.

The graph below shows a sample data set with aggregate hours used per week for a cluster over the last several months. You'll notice that since mid-January, weekly cluster usage increased by about 4k hours from a baseline of about 6k hours.

Architecture

In the graph above, Aggregate selected is checked, so the data for all users is grouped together. Let’s instead look at a stacked graph of the top 20 users by unchecking Aggregate selected and setting Max to graph to 20.

Architecture

Now we can see individual usage per week by the top 20 users. The remaining 46 users have been grouped together into a single metric. Several users stand out suspiciously in terms of cluster usage, so we'll dig deeper.

We can highlight one of these users by hovering over the username in the legend.

Architecture

Using drag and drop we can rearrange the list so these users appear at the bottom.

Architecture

Looks like 4 users have shown significant usage increases: User-1 and User-2 began increasing in mid-January, while User-43 and User-65 began a steady climb in usage around December.

Did we miss anyone? If we want to see what cluster usage would look like without these users we can deselect them in the legend.

Architecture

Once we exclude these users, we can see that cluster usage has not significantly changed during this time period, so we've identified all of our culprits.

Let's drill down to just these four users. Users can be selected with a multi-select control. A filter makes it easy to search for particular users by name.

Architecture

How do these 4 compare with everyone else? For convenience, the remaining users are aggregated together and included as well: just select the aggregate metric and move it to the top.

Architecture

And there you have it: with White Elephant, we've tracked down the problem with ease thanks to the unprecedented visibility into our Hadoop usage. We even get a table presenting what data was queried from which we can export as a CSV.

Architecture

Open Source

White Elephant is open source and freely available here under the Apache 2 license. As always, we welcome contributions, so send us your pull requests.

Co-authors:

January 24, 2013

This is a crosspost from the Hortonworks blog.

If Pig is the “duct tape for big data”, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
C1 = order A by timestamp;
  generate user, Sessonize(C1);
}
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines.

You can download DataFu at http://data.linkedin.com/opensource/datafu.

You can grab sample data and code you can run on your own for this sessionization example below.

Sessionization Example

Suppose that we have a stream of page views from which we have extracted a member ID and UNIX timestamp. It might look something like this:

memberId timestamp      url
1        1357718725941  /
1        1357718871442  /profile
1        1357719038706  /inbox
1        1357719110742  /groups
...
2        1357752955401  /inbox
2        1357752982385  /profile
...

The full data set for this example can be found here.

Using DataFu we can assign session IDs to each of these events and group by session ID in order to compute the length of each session. From there we can complete the exercise by simply applying the statistics UDFs provided by DataFu.

REGISTER piggybank.jar;
REGISTER datafu-0.0.6.jar;
REGISTER guava-13.0.1.jar; -- needed by StreamingQuantile

DEFINE UnixToISO   org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE Sessionize  datafu.pig.sessions.Sessionize('10m');
DEFINE Median      datafu.pig.stats.Median();
DEFINE Quantile    datafu.pig.stats.StreamingQuantile('0.75','0.90','0.95');
DEFINE VAR         datafu.pig.stats.VAR();

pv = LOAD 'clicks.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);

pv = FOREACH pv
     -- Sessionize expects an ISO string
     GENERATE UnixToISO(time) as isoTime,
              time,
              memberId;

pv_sessionized = FOREACH (GROUP pv BY memberId) {
  ordered = ORDER pv BY isoTime;
  GENERATE FLATTEN(Sessionize(ordered)) AS (isoTime, time, memberId, sessionId);
};

pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, time;

-- compute length of each session in minutes
session_times = FOREACH (GROUP pv_sessionized BY sessionId)
                GENERATE group as sessionId,
                         (MAX(pv_sessionized.time)-MIN(pv_sessionized.time))
                            / 1000.0 / 60.0 as session_length;

-- compute stats on session length
session_stats = FOREACH (GROUP session_times ALL) {
  ordered = ORDER session_times BY session_length;
  GENERATE
    AVG(ordered.session_length) as avg_session,
    SQRT(VAR(ordered.session_length)) as std_dev_session,
    Median(ordered.session_length) as median_session,
    Quantile(ordered.session_length) as quantiles_session;
};

DUMP session_stats
--(15.737532575757575,31.29552045993877,(2.848041666666667),(14.648516666666666,31.88788333333333,86.69525))
You can download DataFu at http://data.linkedin.com/opensource/datafu.

This is just a taste. There’s plenty more in the library for you to peruse. Take a look here. DataFu is freely available under the Apache 2 license. We welcome contributions, so please send us your pull requests!

--- Matthew Hayes & Sam Shah

October 31, 2012

LinkedIn has a diverse ecosystem of specialized data storage and serving systems. This leads to the need to reliably capture changes from primary data sources and transfer them consistently through the rest of the data architecture. In response to this need, we developed, a source-agnostic distributed change data capture system, which has since become an integral part of its data processing pipeline. The Databus transport layer provides end to end latencies in milliseconds and handles throughput of thousands of change events per second per server while supporting infinite look back capabilities and rich subscription functionality.

In the past we have written about the motivation, design and architecture of Databus and we have recently published a paper at SOCC 2012 that describes in further detail the design decisions and implementation of Databus. In this post, we will outline our experiences in operating Databus as a Service in production.

Change-Data-Capture As a Service

The Databus System comprises of several components as depicted. The Relay fetches the changes from the source database and stores the events in high performance transient log store. The Bootstrap Service stores a moving snapshot of the source database by periodically applying change stream from the Relay. Applications use the Databus Client Library to pull the change stream from the Relay or Bootstrap and process the change events in Consumers that implement a callback API defined by the library. The change stream i.e. the Relay and the Bootstrap are owned and operated centrally by us, the Databus Team. The Applications are developed by various teams and run on separate infrastructure. This ownership model lets Application Teams manage the scaling of processing the change streams and allows us to focus on producing and managing reliable change streams from the databases.

The sources (database tables or views) of interest are identified by Application Teams. We provision the source by running a series of scripts that instrument the database (set up necessary triggers), generate Avro Schemas (describe the change events by inspecting the database schemas) and publish those schemas to a Schema Registry. We make appropriate entries in the global configuration to identify the machines on which the change stream will live. The seeding process, which is a one time effort, consists of generating and loading a consistent snapshot of the source to the Bootstrap's snapshot store. We then enable specialized consumers that continuously apply the change stream from the Relay to the corresponding snapshot store.

We’ve been running the latest generation of Databus technology for a year. We have provisioned tens of databases that store user and advertiser data and managed their change stream. We have also assisted and supported several applications in consuming this change stream in reliable and scalable ways. We will now describe what we learned running the change capture service and outline their impact on applications.

Applications (Clients)

  • Launched Faster
  • We have seen new applications that require to read entire source databases before they begin listening to the databus change stream. The new bootstrap process has made it unnecessary for applications to find custom methods to feed the source data into their processing system, significantly improving their time to launch. Applications achieve this by consuming the databus stream from 'time=0'. In these cases, we have found that the Databus Client Library required tuning to adjust to large event windows that occurred during the consumption of the initial snapshot. To improve processing times, some applications have used library configurations that increased the number of available threads and consumed events in parallel in this large window. In general, the total event processing time was often largely limited by available network bandwidth when the application itself was relatively quick to process the change events.

  • Scaled with Partitioned Change Stream
  • We have observed that a number of applications are themselves partition aware. For example, our distributed search system has many hundreds of nodes and each node only indexes a fraction of the complete data set. Often, different consumers want different partitioning functions or axes. For example, our search engine uses range-based partitioning, while our relevance engine uses mod-based partitioning. Earlier, all the individual machines would pull the entire stream and filter it client-side by dropping the events that they were not interested in. We added server-side filtering to Databus that allowed consumers to specify their filtering function (mod or range) while subscribing to the sources. This has resulted in huge network savings which has decreased the bandwidth requirements by up to 40 times.

  • Used Cluster-Awareness
  • Some applications have used the Databus Client Library to build fault-tolerance amongst their application's processing nodes. Applications have utilized the library's in-built cluster-aware functionality for instance to elect a single writer (from amongst many nodes) to process updates received on Databus. In other cases, they have used the library to perform partition assignments as nodes entered and left the designated application cluster. We have leveraged Helix to implement cluster-aware functionality of Databus.

  • Monitored Health of the Pipeline
  • While there are many metrics exposed by the Databus system, none has proved to be as effective as 'timeSinceLastUpdate' to answer the question 'How far behind are we from the Source Database'. It is defined as the time elapsed (in milliseconds) since the time at which the most recently processed event was updated. It has turned out to be a valid proxy for detecting failures in the pipeline. For example, a continuously increasing value of this metric indicates that for one reason or another that the application has stopped processing new updates. In other cases, this metric can effectively capture the 'lag' that exists between an update occurring in the database and the application receiving it, such as measuring the delay after which a newly added member (in the member database) becomes searchable.

Change-Data-Capture Service

Looking back, there are a few things we have clearly done well and some pains we’re still addressing.
  • Achieved Source Database Isolation
  • With the advent of the Bootstrap Service, applications that 'fall behind' the Relay no longer have to directly read changes from the Database. We have seen in practice that whenever an application attempts to read updates from a point older in time than the Relay's retention period, it seamlessly switches to reading updates from the bootstrap service until it has caught up. In some cases, we have seen new applications become consistent with the entire source database, by reading the databus stream from 'time=0' without affecting load on the source database.

  • Enabled Seamless Schema Evolution
  • As and when the source database schemas change, corresponding new Avro Schemas are generated. The Relays consume data from the modified table/view and generate events conforming to the latest schema. As long as the schemas are backward compatible, only the latest change stream needs to be provisioned irrespective of the number of versions that are being actively consumed. The Applications are free to continue to use the older versions of schema until they are ready to process the new fields. The conversion of events from the newer to the older schema is done automatically by the Client Library.

  • Relay Deployment and Scale-Out
  • While it's typical for the change stream of a source to reside on more than one relay server for fault-tolerance, we have added relay servers to handle the steady increase in the number of consumers for a given source. However, we observed that addition of relay servers scales as long as the source database is able to handle the incremental load imparted by each relay. We have handled scaling of the 'most popular sources' (high application load + high write rate at database) by deploying relays in 'leader-follower' clusters where the leader directly connects to the database and the followers are 'chained' to the 'leader relay', thereby presenting a constant and manageable load on the source database.

  • Improved Hardware Utilization
    • Increased Change Stream Retention Period
    • Our database sources are highly heterogeneous varying both in size of rows as well as rate of updates. With the choice of Avro and our high-performance log store, we can easily configure the system to retain the change stream at the relay for 2- 7 days for any source. Using a binary protocol (such as Avro) has resulted in improvement of our storage footprint by a significant factor compared to earlier versions of Databus that used Java Serialized Objects.

    • Resource Sharing amongst Change Streams
    • A single relay or bootstrap server have been able to host change streams and snapshots of several sources, enabling us to efficiently utilize available memory and network bandwidth.
  • Bootstrap Seeding Pains
  • Seeding involves generating and loading a consistent snapshot of the source data into the Bootstrap Service's Snapshot Store. This can be a challenge for large data sets. Since stopping the writes to the primary store to seed the Bootstrap store is rarely an option, we either have to procure additional hardware to load a stable backup or devise an efficient restartable algorithm that can read the data out of the primary store in small chunks while guaranteeing that no updates are going to be missed and that all transactions that happen during the seeding process are fully applied at the end. We found that sources with complex joins and/or large BLOBs can negatively affect seeding performance. In some cases with complex joins, we have used dumps of the source tables and computed the joins offline (e.g. in Hadoop). With such an approach, the main challenge was to ensure that the offline join produces exactly the same results as if it was performed by the primary store, because the two table dumps may not be consistent with each other.

Next Play

While we have made significant progress in launching and managing several change streams within Linkedin, there are miles to go before we sleep. A few tasks on which we are spending time:
  • Improvement of Change Capture from Oracle
  • We are working on solutions that will pull changes from Oracle and avoid the overhead of triggers.

  • Automatic Provisioning of Relays and Bootstrap Servers
  • In the current setup, the decision to add new relay or bootstrap servers to a cluster, or to define the clusters themselves is made by humans with access to operating metrics such as utilization (mem,cpu) and incoming load. In future, we want to automate provisioning of databus clusters when we observe increased demand.

  • Scaling Bootstrap DB to handle rapidly growing databases
  • Large database sources will benefit from compression and partitioning as we anticipate their steady growth.
In future posts we will delve into the details behind the challenges of change capture from Oracle and how we are continuing to overcome them, take a closer look a the Databus Client Library API's and talk in detail about the challenges of dealing with the infinite lookback capability, the Bootstrap Service. Until then, as always, we are eager to hear your ideas and thoughts.

==============================================================
Sunil Nagaraj
Staff Software Engineer

{{error}}