Give Meaning to 100 billion Events a Day

http://highscalability.com/blog/2018/4/9/give-meaning-to-100-billion-events-a-day-the-analytics-pipel.html

  • 10bn events per day is ~115k events/second
  • High-level pipeline is roughly Kafka (AWS) → Dataflow (GCP) → BigQuery (GCP) → Redshift (AWS) 🤔
  • Espeecially curious about the Redshift vs BigQuery tradeoff and how exactly BigQuery is worth these multi-cloud gymnastics.

In digital advertising, day-to-day operations generate a lot of events we need to track in order to transparently report campaign’s performances. These events come from:

  • Users’ interactions with the ads, sent by the browser. These events are called tracking events and can be standard (start, complete, pause, resume, etc.) or custom events coming from interactive creatives built with Teads Studio. We receive about 10 billion tracking events a day.
  • Events coming from our back-ends, regarding ad auctions’ details for the most part (real-time bidding processes). We generate more than 60 billion of these events daily, before sampling, and should double this number in 2018.

However, our technical context wasn’t ideal for BigQuery. We wanted to use it to store and transform all our events coming from multiple Kafka topics. We couldn’t move our Kafka clusters away from AWS or use Pub/Sub, the managed equivalent of Kafka on GCP, since these clusters are used by some of our ad delivery components also hosted on AWS. As a result, we had to deal with the challenges of operating a multi-cloud infrastructure.

When dealing with tracking events, the first problem you face is the fact that you have to process them unordered, with unknown delays. The difference between the time the event actually occurred (event time) and the time the event is observed by the system (processing time) ranges from the millisecond up to several hours. These large delays are not so rare and can happen when the user loses its connection or activates flight mode in between browsing sessions.

Our resulting architecture is a chain of 30 min Dataflow batch jobs, sequentially scheduled to read a Kafka topic and write to BigQuery using load jobs.

To handle most of the situations we sized Dataflow to be able to read 3 times faster than the actual pace. A 20 min read with a single n1-highcpu-16instance can unstack 60 minutes of messages. In our use case, we end up with a sawtooth latency that oscillates between 3 min (minimum duration of the Write BQ phase) and 30 min (total duration of a job).

Unlike traditional ETL processes where data is Transformed before it is Loaded, we choose to store it first (ELT), in a raw format. It has two main benefits:

  • It lets us have access to each and every raw event for fine analysis and debug purposes,
  • It simplifies the entire chain by letting BigQuery do the transformations with a simple but powerful SQL dialect.

Querying the raw events table directly is nice for debugging and deep analysis purposes, but it’s impossible to achieve acceptable performance querying a table of this scale, not to mention the cost of such operation.

BigQuery, as powerful as it can be, has its limits:

  • BigQuery doesn’t allow queries to multiple tables that have different schemas (even if the query is not using the fields that differ). We have a script to bulk update hundreds of tables when we need to add a field.
  • BigQuery doesn’t support column drop. Not a big deal, but not helping to pay the technical debt.
  • Querying multiple hours: BigQuery supports wildcard in table name, but performance is so bad we have to generate queries that explicitly query each table with UNION ALL.
  • We always need to join these events with data hosted on other databases (e.g. to hydrate events with more information about an advertising campaign), but BigQuery does not support it (yet). We currently have to regularly copy entire tables to BigQuery to be able to join the data within a single query.

With Teads’ ad delivery infrastructure in AWS and a Kafka cluster shared with many other components, we have no choice but to move quite a lot of data between both AWS and GCP Clouds, which is not easy and certainly not cheap. We located our Dataflow instances (thus the main GCP entry point) as close as possible to our AWS infrastructure, and fortunately, the existing links between AWS and GCP are good enough so we can simply use managed VPNs. Although we encountered some instability running these VPN, we managed to sort it out using a simple script that turns it off and on again. We have never faced big enough problems to justify the cost of a dedicated link. Once again, cost is something you have to closely watch and, where egress is concerned, difficult to assess before you see the bill. Carefully choosing how you compress data is one of the best leverage to reduce these costs.

Due to concurrency limits and incompressible query latency of 3 to 5 sec (acceptable and inherent to its design), BigQuery has to be compounded with other tools to serve apps (dashboards, web UIs, etc.). This task is performed by our Analytics service, a Scala component that taps onto BigQuery to generate on-demand reports (spreadsheets) and tailored data marts (updated daily or hourly). This specific service is required to handle the business logic. It would be too hard to maintain as SQL and generate data marts using the pipeline transformation otherwise. We chose AWS Redshift to store and serve our data marts. Although it might not seem to be an obvious choice to serve user-facing apps, Redshift works for us because we have a limited number of concurrent users.

Look Up

  • Druid
  • Google Dataflow
Edit