We're the Data Team

The data team at LinkedIn works on LinkedIn's information retrieval systems, the social graph system, data driven features, and supporting data infrastructure.

  • team photo
  • team photo
  • team photo
March 21, 2013

As the use of Hadoop grows in an organization, scheduling, capacity planning, and billing become critical concerns. These are all open problems in the Hadoop space, and today, we’re happy to announce we’re open sourcing LinkedIn’s solution: White Elephant.

At LinkedIn, we use Hadoop for product development (e.g., predictive analytics applications like People You May Know and Endorsements), descriptive statistics for powering our internal dashboards, ad-hoc analysis by data scientists, and ETL. To better understand the usage of our Hadoop cluster across all of our use cases, we created White Elephant.

While tools like Ganglia provide system-level metrics, we wanted to be able to understand what resources were being used by each user and at what times. White Elephant parses Hadoop logs to provide visual drill downs and rollups of task statistics for your Hadoop cluster, including total task time, slots used, CPU time, and failed job counts.

White Elephant fills several needs:

  • Scheduling: when you have a handful of periodic jobs, it’s easy to reason about when they should run, but that quickly doesn’t scale. The ability to schedule jobs at periods of low utilization helps maximize cluster efficiency.
  • Capacity planning: to plan for future hardware needs, operations needs to understand the resource usage growth of jobs.
  • Billing: Hadoop clusters have finite capacity, so in a multi-tenant environment it’s important to know the resources used by a product feature against its business value.

In this post, we'll go over White Elephant's architecture and showcase some of the visualizations it offers. While you're reading, feel free to head over to the GitHub page to check out the code and try it out yourself!

Architecture

Here's a diagram outlining the White Elephant architecture:

Architecture

There are three Hadoop Grids, A, B, and C, for which White Elephant will compute statistics as follows:

  1. Upload Task: a task that periodically runs on the Job Tracker for each grid and incrementally copies new log files into a Hadoop grid for analysis.
  2. Compute: a sequence of MapReduce jobs coordinated by a Job Executor parses the uploaded logs and computes aggregate statistics.
  3. Viewer: a viewer app incrementally loads the aggregate statistics, caches them locally, and exposes a web interface which can be used to slice and dice statistics for your Hadoop clusters.

Example

Let’s go through a real use case: we've noticed an increase in cluster usage the last few months, but we don't know who is responsible. We can use White Elephant to investigate this issue.

The graph below shows a sample data set with aggregate hours used per week for a cluster over the last several months. You'll notice that since mid-January, weekly cluster usage increased by about 4k hours from a baseline of about 6k hours.

Architecture

In the graph above, Aggregate selected is checked, so the data for all users is grouped together. Let’s instead look at a stacked graph of the top 20 users by unchecking Aggregate selected and setting Max to graph to 20.

Architecture

Now we can see individual usage per week by the top 20 users. The remaining 46 users have been grouped together into a single metric. Several users stand out suspiciously in terms of cluster usage, so we'll dig deeper.

We can highlight one of these users by hovering over the username in the legend.

Architecture

Using drag and drop we can rearrange the list so these users appear at the bottom.

Architecture

Looks like 4 users have shown significant usage increases: User-1 and User-2 began increasing in mid-January, while User-43 and User-65 began a steady climb in usage around December.

Did we miss anyone? If we want to see what cluster usage would look like without these users we can deselect them in the legend.

Architecture

Once we exclude these users, we can see that cluster usage has not significantly changed during this time period, so we've identified all of our culprits.

Let's drill down to just these four users. Users can be selected with a multi-select control. A filter makes it easy to search for particular users by name.

Architecture

How do these 4 compare with everyone else? For convenience, the remaining users are aggregated together and included as well: just select the aggregate metric and move it to the top.

Architecture

And there you have it: with White Elephant, we've tracked down the problem with ease thanks to the unprecedented visibility into our Hadoop usage. We even get a table presenting what data was queried from which we can export as a CSV.

Architecture

Open Source

White Elephant is open source and freely available here under the Apache 2 license. As always, we welcome contributions, so send us your pull requests.

Co-authors:

January 24, 2013

This is a crosspost from the Hortonworks blog.

If Pig is the “duct tape for big data”, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
C1 = order A by timestamp;
  generate user, Sessonize(C1);
}
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines.

You can download DataFu at http://data.linkedin.com/opensource/datafu.

You can grab sample data and code you can run on your own for this sessionization example below.

Sessionization Example

Suppose that we have a stream of page views from which we have extracted a member ID and UNIX timestamp. It might look something like this:

memberId timestamp      url
1        1357718725941  /
1        1357718871442  /profile
1        1357719038706  /inbox
1        1357719110742  /groups
...
2        1357752955401  /inbox
2        1357752982385  /profile
...

The full data set for this example can be found here.

Using DataFu we can assign session IDs to each of these events and group by session ID in order to compute the length of each session. From there we can complete the exercise by simply applying the statistics UDFs provided by DataFu.

REGISTER piggybank.jar;
REGISTER datafu-0.0.6.jar;
REGISTER guava-13.0.1.jar; -- needed by StreamingQuantile

DEFINE UnixToISO   org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE Sessionize  datafu.pig.sessions.Sessionize('10m');
DEFINE Median      datafu.pig.stats.Median();
DEFINE Quantile    datafu.pig.stats.StreamingQuantile('0.75','0.90','0.95');
DEFINE VAR         datafu.pig.stats.VAR();

pv = LOAD 'clicks.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);

pv = FOREACH pv
     -- Sessionize expects an ISO string
     GENERATE UnixToISO(time) as isoTime,
              time,
              memberId;

pv_sessionized = FOREACH (GROUP pv BY memberId) {
  ordered = ORDER pv BY isoTime;
  GENERATE FLATTEN(Sessionize(ordered)) AS (isoTime, time, memberId, sessionId);
};

pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, time;

-- compute length of each session in minutes
session_times = FOREACH (GROUP pv_sessionized BY sessionId)
                GENERATE group as sessionId,
                         (MAX(pv_sessionized.time)-MIN(pv_sessionized.time))
                            / 1000.0 / 60.0 as session_length;

-- compute stats on session length
session_stats = FOREACH (GROUP session_times ALL) {
  ordered = ORDER session_times BY session_length;
  GENERATE
    AVG(ordered.session_length) as avg_session,
    SQRT(VAR(ordered.session_length)) as std_dev_session,
    Median(ordered.session_length) as median_session,
    Quantile(ordered.session_length) as quantiles_session;
};

DUMP session_stats
--(15.737532575757575,31.29552045993877,(2.848041666666667),(14.648516666666666,31.88788333333333,86.69525))
You can download DataFu at http://data.linkedin.com/opensource/datafu.

This is just a taste. There’s plenty more in the library for you to peruse. Take a look here. DataFu is freely available under the Apache 2 license. We welcome contributions, so please send us your pull requests!

--- Matthew Hayes & Sam Shah

October 31, 2012

LinkedIn has a diverse ecosystem of specialized data storage and serving systems. This leads to the need to reliably capture changes from primary data sources and transfer them consistently through the rest of the data architecture. In response to this need, we developed, a source-agnostic distributed change data capture system, which has since become an integral part of its data processing pipeline. The Databus transport layer provides end to end latencies in milliseconds and handles throughput of thousands of change events per second per server while supporting infinite look back capabilities and rich subscription functionality.

In the past we have written about the motivation, design and architecture of Databus and we have recently published a paper at SOCC 2012 that describes in further detail the design decisions and implementation of Databus. In this post, we will outline our experiences in operating Databus as a Service in production.

Change-Data-Capture As a Service

The Databus System comprises of several components as depicted. The Relay fetches the changes from the source database and stores the events in high performance transient log store. The Bootstrap Service stores a moving snapshot of the source database by periodically applying change stream from the Relay. Applications use the Databus Client Library to pull the change stream from the Relay or Bootstrap and process the change events in Consumers that implement a callback API defined by the library. The change stream i.e. the Relay and the Bootstrap are owned and operated centrally by us, the Databus Team. The Applications are developed by various teams and run on separate infrastructure. This ownership model lets Application Teams manage the scaling of processing the change streams and allows us to focus on producing and managing reliable change streams from the databases.

The sources (database tables or views) of interest are identified by Application Teams. We provision the source by running a series of scripts that instrument the database (set up necessary triggers), generate Avro Schemas (describe the change events by inspecting the database schemas) and publish those schemas to a Schema Registry. We make appropriate entries in the global configuration to identify the machines on which the change stream will live. The seeding process, which is a one time effort, consists of generating and loading a consistent snapshot of the source to the Bootstrap's snapshot store. We then enable specialized consumers that continuously apply the change stream from the Relay to the corresponding snapshot store.

We’ve been running the latest generation of Databus technology for a year. We have provisioned tens of databases that store user and advertiser data and managed their change stream. We have also assisted and supported several applications in consuming this change stream in reliable and scalable ways. We will now describe what we learned running the change capture service and outline their impact on applications.

Applications (Clients)

  • Launched Faster
  • We have seen new applications that require to read entire source databases before they begin listening to the databus change stream. The new bootstrap process has made it unnecessary for applications to find custom methods to feed the source data into their processing system, significantly improving their time to launch. Applications achieve this by consuming the databus stream from 'time=0'. In these cases, we have found that the Databus Client Library required tuning to adjust to large event windows that occurred during the consumption of the initial snapshot. To improve processing times, some applications have used library configurations that increased the number of available threads and consumed events in parallel in this large window. In general, the total event processing time was often largely limited by available network bandwidth when the application itself was relatively quick to process the change events.

  • Scaled with Partitioned Change Stream
  • We have observed that a number of applications are themselves partition aware. For example, our distributed search system has many hundreds of nodes and each node only indexes a fraction of the complete data set. Often, different consumers want different partitioning functions or axes. For example, our search engine uses range-based partitioning, while our relevance engine uses mod-based partitioning. Earlier, all the individual machines would pull the entire stream and filter it client-side by dropping the events that they were not interested in. We added server-side filtering to Databus that allowed consumers to specify their filtering function (mod or range) while subscribing to the sources. This has resulted in huge network savings which has decreased the bandwidth requirements by up to 40 times.

  • Used Cluster-Awareness
  • Some applications have used the Databus Client Library to build fault-tolerance amongst their application's processing nodes. Applications have utilized the library's in-built cluster-aware functionality for instance to elect a single writer (from amongst many nodes) to process updates received on Databus. In other cases, they have used the library to perform partition assignments as nodes entered and left the designated application cluster. We have leveraged Helix to implement cluster-aware functionality of Databus.

  • Monitored Health of the Pipeline
  • While there are many metrics exposed by the Databus system, none has proved to be as effective as 'timeSinceLastUpdate' to answer the question 'How far behind are we from the Source Database'. It is defined as the time elapsed (in milliseconds) since the time at which the most recently processed event was updated. It has turned out to be a valid proxy for detecting failures in the pipeline. For example, a continuously increasing value of this metric indicates that for one reason or another that the application has stopped processing new updates. In other cases, this metric can effectively capture the 'lag' that exists between an update occurring in the database and the application receiving it, such as measuring the delay after which a newly added member (in the member database) becomes searchable.

Change-Data-Capture Service

Looking back, there are a few things we have clearly done well and some pains we’re still addressing.
  • Achieved Source Database Isolation
  • With the advent of the Bootstrap Service, applications that 'fall behind' the Relay no longer have to directly read changes from the Database. We have seen in practice that whenever an application attempts to read updates from a point older in time than the Relay's retention period, it seamlessly switches to reading updates from the bootstrap service until it has caught up. In some cases, we have seen new applications become consistent with the entire source database, by reading the databus stream from 'time=0' without affecting load on the source database.

  • Enabled Seamless Schema Evolution
  • As and when the source database schemas change, corresponding new Avro Schemas are generated. The Relays consume data from the modified table/view and generate events conforming to the latest schema. As long as the schemas are backward compatible, only the latest change stream needs to be provisioned irrespective of the number of versions that are being actively consumed. The Applications are free to continue to use the older versions of schema until they are ready to process the new fields. The conversion of events from the newer to the older schema is done automatically by the Client Library.

  • Relay Deployment and Scale-Out
  • While it's typical for the change stream of a source to reside on more than one relay server for fault-tolerance, we have added relay servers to handle the steady increase in the number of consumers for a given source. However, we observed that addition of relay servers scales as long as the source database is able to handle the incremental load imparted by each relay. We have handled scaling of the 'most popular sources' (high application load + high write rate at database) by deploying relays in 'leader-follower' clusters where the leader directly connects to the database and the followers are 'chained' to the 'leader relay', thereby presenting a constant and manageable load on the source database.

  • Improved Hardware Utilization
    • Increased Change Stream Retention Period
    • Our database sources are highly heterogeneous varying both in size of rows as well as rate of updates. With the choice of Avro and our high-performance log store, we can easily configure the system to retain the change stream at the relay for 2- 7 days for any source. Using a binary protocol (such as Avro) has resulted in improvement of our storage footprint by a significant factor compared to earlier versions of Databus that used Java Serialized Objects.

    • Resource Sharing amongst Change Streams
    • A single relay or bootstrap server have been able to host change streams and snapshots of several sources, enabling us to efficiently utilize available memory and network bandwidth.
  • Bootstrap Seeding Pains
  • Seeding involves generating and loading a consistent snapshot of the source data into the Bootstrap Service's Snapshot Store. This can be a challenge for large data sets. Since stopping the writes to the primary store to seed the Bootstrap store is rarely an option, we either have to procure additional hardware to load a stable backup or devise an efficient restartable algorithm that can read the data out of the primary store in small chunks while guaranteeing that no updates are going to be missed and that all transactions that happen during the seeding process are fully applied at the end. We found that sources with complex joins and/or large BLOBs can negatively affect seeding performance. In some cases with complex joins, we have used dumps of the source tables and computed the joins offline (e.g. in Hadoop). With such an approach, the main challenge was to ensure that the offline join produces exactly the same results as if it was performed by the primary store, because the two table dumps may not be consistent with each other.

Next Play

While we have made significant progress in launching and managing several change streams within Linkedin, there are miles to go before we sleep. A few tasks on which we are spending time:
  • Improvement of Change Capture from Oracle
  • We are working on solutions that will pull changes from Oracle and avoid the overhead of triggers.

  • Automatic Provisioning of Relays and Bootstrap Servers
  • In the current setup, the decision to add new relay or bootstrap servers to a cluster, or to define the clusters themselves is made by humans with access to operating metrics such as utilization (mem,cpu) and incoming load. In future, we want to automate provisioning of databus clusters when we observe increased demand.

  • Scaling Bootstrap DB to handle rapidly growing databases
  • Large database sources will benefit from compression and partitioning as we anticipate their steady growth.
In future posts we will delve into the details behind the challenges of change capture from Oracle and how we are continuing to overcome them, take a closer look a the Databus Client Library API's and talk in detail about the challenges of dealing with the infinite lookback capability, the Bootstrap Service. Until then, as always, we are eager to hear your ideas and thoughts.

==============================================================
Sunil Nagaraj
Staff Software Engineer

October 3, 2012

LinkedIn’s DataFu library, a collection of useful UDFs for Pig, is now a standard part of Cloudera’s Hadoop distribution (CDH) starting with version 4.1:

Support for DataFu – the LinkedIn data science team was kind enough to open source their library of Pig UDFs that make it easier to perform common jobs like sessionization or set operations. Big thanks to the LinkedIn team!!!

Read the full announcement here. Pig users around the world rejoice!

September 27, 2012

LinkedIn has many analytical insight products such as "Who's Viewed My Profile?" and "Who's Viewed This Job?". At their core, these are multidimensional queries. For example, "Who's Viewed My Profile?" takes someone's profile views and breaks them down by industry, geography, company, school, etc to show the richness of people who viewed their profiles:

Online analytical processing (OLAP) has been the traditional approach to solve these multi-dimensional analytical problems. However, for our use cases, we had to build a solution that can answer these queries in milliseconds across 175+ million members. We called this solution Avatara:

Avatara: OLAP for Web-scale Analytical Products

Avatara is LinkedIn's scalable, low latency, and highly-available OLAP system for "sharded" multi-dimensional queries in the time constraints of a request/response loop. It has been successfully powering "Who's Viewed My Profile?" and other analytical products for the past two and a half years. We recently presented a paper on Avatara at the 38th conference on Very Large Databases (VLDB).

Overview

At LinkedIn, we need an OLAP solution that can serve our 175+ million members around the world. It must be able to answer queries in tens of milliseconds, as our members are waiting for the page to load on their browsers. An interesting insight for LinkedIn's use cases is that queries span relatively few – usually tens to at most a hundred – dimensions, so this data can be sharded across a primary dimension. For "Who's Viewed My Profile?", we can shard the cube by the member herself, as the product does not allow analyzing profile views of anyone other than the member currently logged in. We call this the many, small cubes problem.

Here's a brief overview of how it works. As shown in the figure below, Avatara consists of two components:

  1. An offline engine that computes cubes in batch
  2. An online engine that serves queries in real time

The offline engine computes cubes with high throughput by leveraging Hadoop for batch processing. It then writes cubes to Voldemort, LinkedIn's open-source key-value store. The online engine queries the Voldemort store when a member loads a page. Every piece in this architecture runs on commodity hardware and can be easily scaled horizontally.

Offline Engine

The offline batch engine processes data through a pipeline that has three phases:

  1. Preprocessing
  2. Projections and joins
  3. Cubification

Each phase runs one or more Hadoop jobs and produces output that is the input for the subsequent phase. We utilize Hadoop for its built-in high throughput, fault tolerance and horizontal scalability. The pipeline preprocess raw data as needed, projects out dimensions of interest, performs user-defined joins, and at the end transforms the data to cubes. The result of the batch engine is a set of sharded small cubes, represented by key-value pairs, where each key is a shard (for example, by member_id for "Who's Viewed My Profile?"), and the value is the cube for the shard.

Online Engine

All cubes are bulk loaded into Voldemort. The online query engine retrieves and processes data from Voldemort, returning results back to the client. It provides SQL-like operators, such as select, where, group by, plus some math operations. The wide-spread adoption of SQL makes it easy for application developers to interact with Avatara. With Avatara, 80% of queries can be satisfied within 10 ms, and 95% of queries can be answered within 25 ms for "Who's Viewed My Profile?" on a high traffic day.

Learn more

If you're interested in more detail about how the system works, please see the full paper: Avatara: OLAP for Web-scale Analytical Products.

{{error}}