Using Apache Spark for Distributed Adwords Reports Management
GetYourGuide is the leading marketplace for tours and activities. Our visitors look for amazing things to do while in a destination: from Paris Catacombs tours to entrance tickets for museums and attractions. A key component of our mission is to match our customers’ interests with the amazing products in our inventory. That means that we have to be where our customers are, and that is often on search engines such as Google or Bing.
At GetYourGuide, we are familiar with Adwords (Google’s paid search product) to match visitor search queries with the relevant products in a destination. Adwords is a very sophisticated product and also a very data-driven one. We recently explored solutions to scale up the way we download and process Adwords reports: a key dataset Adwords provides to monitor metrics such as impressions, clicks and more. These reports contain a lot of information, and our goal was to download and treat them as quickly and reliably as possible. Fortunately, Google Adwords provides a rich API to do so, but because of the large number of destinations we serve, the number of languages we use (GetYourGuide is available in 14 languages) and the constraints the Adwords API impose, this problem required a creative and scalable solution.
Before diving into it, let’s review the setup we started with:
Our original solution was pretty straightforward. A cron job starts a 2-step process on a server, where each step is a Python script. The first step connects to the Adwords API, downloads the reports one at a time and writes the resulting raw CSV files. The second step takes each CSV file and applies transformations, such as adding columns or filtering rows. The final result is a processed or final CSV, which could, for instance, be inserted into a database and/or archived.
As GetYourGuide's inventory grew, this original solution was taking too long and we looked for ways to speed it up. Over time, we also identified other important limitations with this setup:
The whole process ran on a single server, which therefore connected to Adwords from a unique IP. The Adwords API has certain limits in number of connections and would return errors when these limits were exceeded.
As with most APIs, the Adwords API sometimes failed and task retry capabilities were quickly needed.
When something went wrong the whole process had to be run from scratch again.
CSV writing and parsing was inefficient and error-prone.
Writing simple filters and transformations required a fair amount of custom Python code.
Parallel and Fault-tolerance
Based on these constraints, it was pretty clear the solution needed to be parallel and somehow resistant to transient failures. We looked to Apache Spark to do this job and found it to be an excellent fit:
By default, Spark jobs are distributed on multiple workers and failed tasks are automatically retried in the most optimal way.
The Spark API has powerful and convenient transformation and filtering functions, therefore making the processing step far easier to implement.
A Spark cluster operates on different machines, with different IP addresses, allowing the limits the Adwords API imposes to be raised.
Manipulating the data directly using the Spark API in memory makes it faster and simpler as it avoids dealing with multiple CSV formats.
The solution we developed took the numerous report requests needed and split them in Spark partitions. Each partition can (and will) be executed on a different Spark worker. Within each partition the work is partitioned again into tasks (e.g. a specific report download for a specific date). In turn, dozens of these tasks are running in parallel on a worker, which in turn is part of a cluster of other workers doing the same thing.
There were two additional constraints in this system that we had to deal with. This nicely illustrates two other important aspects of Spark.
The first constraint is the need to authenticate to the Adwords API. This initialization step is essential but only needs to be done once. However, because the Spark driver and its workers are separated from each other, one needs to do this initialization step in each worker or each partition. This would be too costly and inefficient to do for every task, but fortunately Spark's DataFrame API has a mapPartition() function that allows it to operate at the partition level and therefore allows it to run these initialization steps.
Then, because some reports are much larger than others, data skew is introduced. For instance, with a large skew most partitions would quickly finish but one or two would take a long time, delaying the entire job. Dealing with skew usually means balancing the work each partition is doing. That can be done with or without knowledge of how long each task will take, the latter being easier to implement. In our case, we first attempted to slice the requests differently so reports of different types could be put on the same partition, but that made the code more complex as each partition's result schema should be consistent. At the end, we compared various scenarios but couldn't completely eliminate the problem. In the Spark execution timeline example below you can see the problem. The download task in red takes some time and still goes on after most of the other process tasks have finished.
Besides the fault-tolerance benefits we also wanted to benchmark how parallel this task can be implemented and measure the total job execution time. For this experiment, we took a sample of the reports data we needed on a daily basis and did some benchmarks with the number of Spark nodes.
In this graph you can see that the execution time is reduced by ~66% when using 4 or 5 Spark nodes instead of 1. By looking at the details, we learnt that we were bound by the longest download task, which wasn’t too surprising. Nevertheless, compared to our initial setup, this was a substantial gain.
Digging deeper on the Adwords download task, the result we wanted to achieve was maximizing the download bandwidth given to us for each partition. Although Spark runs multiple tasks per worker, we found that this wasn't enough to satisfy this criteria and played with the number of download threads within each task.
This graph shows that increasing download parallelism within each task reduces execution time per task by 75%. After 4 threads it seems to reach its minimum. Also, when the number of threads is too high it also risks triggering the Adwords rate limits, which will end up delaying the entire task due to retries.
This work has helped scale a critical data pipeline at GetYourGuide, while only leveraging existing open-source technologies. Apache Spark is a very powerful tool and recent developments such as the DataFrame API makes it even easier to use.
The next step in this area will be to continue to address the skew problem by breaking down the reports in more sophisticated ways. We also haven't done complete profiling of the Spark workers, which could highlight where other bottlenecks are (network, cpu, etc.).