October 31, 2012

LinkedIn has a diverse ecosystem of specialized data storage and serving systems. This leads to the need to reliably capture changes from primary data sources and transfer them consistently through the rest of the data architecture. In response to this need, we developed, a source-agnostic distributed change data capture system, which has since become an integral part of its data processing pipeline. The Databus transport layer provides end to end latencies in milliseconds and handles throughput of thousands of change events per second per server while supporting infinite look back capabilities and rich subscription functionality.

In the past we have written about the motivation, design and architecture of Databus and we have recently published a paper at SOCC 2012 that describes in further detail the design decisions and implementation of Databus. In this post, we will outline our experiences in operating Databus as a Service in production.

Change-Data-Capture As a Service

The Databus System comprises of several components as depicted. The Relay fetches the changes from the source database and stores the events in high performance transient log store. The Bootstrap Service stores a moving snapshot of the source database by periodically applying change stream from the Relay. Applications use the Databus Client Library to pull the change stream from the Relay or Bootstrap and process the change events in Consumers that implement a callback API defined by the library. The change stream i.e. the Relay and the Bootstrap are owned and operated centrally by us, the Databus Team. The Applications are developed by various teams and run on separate infrastructure. This ownership model lets Application Teams manage the scaling of processing the change streams and allows us to focus on producing and managing reliable change streams from the databases.

The sources (database tables or views) of interest are identified by Application Teams. We provision the source by running a series of scripts that instrument the database (set up necessary triggers), generate Avro Schemas (describe the change events by inspecting the database schemas) and publish those schemas to a Schema Registry. We make appropriate entries in the global configuration to identify the machines on which the change stream will live. The seeding process, which is a one time effort, consists of generating and loading a consistent snapshot of the source to the Bootstrap's snapshot store. We then enable specialized consumers that continuously apply the change stream from the Relay to the corresponding snapshot store.

We’ve been running the latest generation of Databus technology for a year. We have provisioned tens of databases that store user and advertiser data and managed their change stream. We have also assisted and supported several applications in consuming this change stream in reliable and scalable ways. We will now describe what we learned running the change capture service and outline their impact on applications.

Applications (Clients)

  • Launched Faster
  • We have seen new applications that require to read entire source databases before they begin listening to the databus change stream. The new bootstrap process has made it unnecessary for applications to find custom methods to feed the source data into their processing system, significantly improving their time to launch. Applications achieve this by consuming the databus stream from 'time=0'. In these cases, we have found that the Databus Client Library required tuning to adjust to large event windows that occurred during the consumption of the initial snapshot. To improve processing times, some applications have used library configurations that increased the number of available threads and consumed events in parallel in this large window. In general, the total event processing time was often largely limited by available network bandwidth when the application itself was relatively quick to process the change events.

  • Scaled with Partitioned Change Stream
  • We have observed that a number of applications are themselves partition aware. For example, our distributed search system has many hundreds of nodes and each node only indexes a fraction of the complete data set. Often, different consumers want different partitioning functions or axes. For example, our search engine uses range-based partitioning, while our relevance engine uses mod-based partitioning. Earlier, all the individual machines would pull the entire stream and filter it client-side by dropping the events that they were not interested in. We added server-side filtering to Databus that allowed consumers to specify their filtering function (mod or range) while subscribing to the sources. This has resulted in huge network savings which has decreased the bandwidth requirements by up to 40 times.

  • Used Cluster-Awareness
  • Some applications have used the Databus Client Library to build fault-tolerance amongst their application's processing nodes. Applications have utilized the library's in-built cluster-aware functionality for instance to elect a single writer (from amongst many nodes) to process updates received on Databus. In other cases, they have used the library to perform partition assignments as nodes entered and left the designated application cluster. We have leveraged Helix to implement cluster-aware functionality of Databus.

  • Monitored Health of the Pipeline
  • While there are many metrics exposed by the Databus system, none has proved to be as effective as 'timeSinceLastUpdate' to answer the question 'How far behind are we from the Source Database'. It is defined as the time elapsed (in milliseconds) since the time at which the most recently processed event was updated. It has turned out to be a valid proxy for detecting failures in the pipeline. For example, a continuously increasing value of this metric indicates that for one reason or another that the application has stopped processing new updates. In other cases, this metric can effectively capture the 'lag' that exists between an update occurring in the database and the application receiving it, such as measuring the delay after which a newly added member (in the member database) becomes searchable.

Change-Data-Capture Service

Looking back, there are a few things we have clearly done well and some pains we’re still addressing.
  • Achieved Source Database Isolation
  • With the advent of the Bootstrap Service, applications that 'fall behind' the Relay no longer have to directly read changes from the Database. We have seen in practice that whenever an application attempts to read updates from a point older in time than the Relay's retention period, it seamlessly switches to reading updates from the bootstrap service until it has caught up. In some cases, we have seen new applications become consistent with the entire source database, by reading the databus stream from 'time=0' without affecting load on the source database.

  • Enabled Seamless Schema Evolution
  • As and when the source database schemas change, corresponding new Avro Schemas are generated. The Relays consume data from the modified table/view and generate events conforming to the latest schema. As long as the schemas are backward compatible, only the latest change stream needs to be provisioned irrespective of the number of versions that are being actively consumed. The Applications are free to continue to use the older versions of schema until they are ready to process the new fields. The conversion of events from the newer to the older schema is done automatically by the Client Library.

  • Relay Deployment and Scale-Out
  • While it's typical for the change stream of a source to reside on more than one relay server for fault-tolerance, we have added relay servers to handle the steady increase in the number of consumers for a given source. However, we observed that addition of relay servers scales as long as the source database is able to handle the incremental load imparted by each relay. We have handled scaling of the 'most popular sources' (high application load + high write rate at database) by deploying relays in 'leader-follower' clusters where the leader directly connects to the database and the followers are 'chained' to the 'leader relay', thereby presenting a constant and manageable load on the source database.

  • Improved Hardware Utilization
    • Increased Change Stream Retention Period
    • Our database sources are highly heterogeneous varying both in size of rows as well as rate of updates. With the choice of Avro and our high-performance log store, we can easily configure the system to retain the change stream at the relay for 2- 7 days for any source. Using a binary protocol (such as Avro) has resulted in improvement of our storage footprint by a significant factor compared to earlier versions of Databus that used Java Serialized Objects.

    • Resource Sharing amongst Change Streams
    • A single relay or bootstrap server have been able to host change streams and snapshots of several sources, enabling us to efficiently utilize available memory and network bandwidth.
  • Bootstrap Seeding Pains
  • Seeding involves generating and loading a consistent snapshot of the source data into the Bootstrap Service's Snapshot Store. This can be a challenge for large data sets. Since stopping the writes to the primary store to seed the Bootstrap store is rarely an option, we either have to procure additional hardware to load a stable backup or devise an efficient restartable algorithm that can read the data out of the primary store in small chunks while guaranteeing that no updates are going to be missed and that all transactions that happen during the seeding process are fully applied at the end. We found that sources with complex joins and/or large BLOBs can negatively affect seeding performance. In some cases with complex joins, we have used dumps of the source tables and computed the joins offline (e.g. in Hadoop). With such an approach, the main challenge was to ensure that the offline join produces exactly the same results as if it was performed by the primary store, because the two table dumps may not be consistent with each other.

Next Play

While we have made significant progress in launching and managing several change streams within Linkedin, there are miles to go before we sleep. A few tasks on which we are spending time:
  • Improvement of Change Capture from Oracle
  • We are working on solutions that will pull changes from Oracle and avoid the overhead of triggers.

  • Automatic Provisioning of Relays and Bootstrap Servers
  • In the current setup, the decision to add new relay or bootstrap servers to a cluster, or to define the clusters themselves is made by humans with access to operating metrics such as utilization (mem,cpu) and incoming load. In future, we want to automate provisioning of databus clusters when we observe increased demand.

  • Scaling Bootstrap DB to handle rapidly growing databases
  • Large database sources will benefit from compression and partitioning as we anticipate their steady growth.
In future posts we will delve into the details behind the challenges of change capture from Oracle and how we are continuing to overcome them, take a closer look a the Databus Client Library API's and talk in detail about the challenges of dealing with the infinite lookback capability, the Bootstrap Service. Until then, as always, we are eager to hear your ideas and thoughts.

==============================================================
Sunil Nagaraj
Staff Software Engineer

{{error}}