how-we-built-our-new-modern-etl-pipeline-part-1
Engineering
Dec 12, 2019

How We Built our New Modern ETL Pipeline, Part 1

GetYourGuide
Careers Team

UPDATE: On July 8 at 3 pm BST/4 pm CEST, our data platform experts and Snowflake are teaming up for a webinar. Learn how modern data architecture can transform ordinary trips into extraordinary ones with valuable business insights. Augusto Elesbão, senior data engineer at GetYourGuide; Robert Bemmann, data engineer; and Snowflake director of product management Torsten Grabs. Learn more and register.

Thiago Rigo, senior data engineer, and David Mariassy, data engineer, built a modern ETL pipeline from scratch using Debezium, Kafka, Spark and Airflow.

In the first of this two-part series, Thiago walks us through our new and legacy ETL pipeline, overall architecture and gives us an overview of our extraction layer. This article is an adaptation of the presentation at Spark & AI, a summit organized by Data Bricks.

Part 2 focuses on the transformation layer of our new ETL solution.

{{Divider}}

Our legacy ETL pipeline

The GetYourGuide stack was previously made of two main parts: Pentaho and Postgres. Both ETL tools have worked really well for us for over the past six years, but as data volume grows, and business requirements change, we’ve decided it was time to move to a more modern architecture since our current ETL pipeline was starting to show some problems.

A couple of issues we were trying to solve:

  1. Breaking schema changes, e.g. drop columns and inconsistent type change, on upstream databases meant that our ETL jobs could break in the middle of the night.
  2. Pentaho, as a graphical tool, was very rich in features, but also required in-depth knowledge to navigate its graphical interface.
  3. Whenever a transformation crashed, it would take us up to five hours to fully execute the data pipeline again because it was sequential.
  4. As a graphical tool, it was also very hard to test transformations.

With these issues in mind, we’ve set out to work on a new architecture that would:

  1. handle schema changes automatically
  2. use familiar tooling (Scala, SQL) to write transformations
  3. minimize execution time by parallelizing transformations execution
  4. be designed for automated testing
High level components overview
High level components overview

At a high level, we have these two main steps which are executed as part of an Airflow DAG:

  1. The extraction layer is responsible for extracting data from source databases, and loading it into our data lake.
  2. The transformation layer picks up the data, and executes custom SQL queries, which are called transformations, in order to generate custom views of the data.

Here’s an example: During the extraction phase, a table called booking is loaded from source database resulting in a copy of this table on our data lake, and then one transformation called fact_booking uses that, and many other tables, to create a complete picture of what happens to bookings on our platform.

Extraction layer

Now that you have a basic understanding of the overall architecture, let’s dig a bit deeper into the extraction layer. Here’s what it looks like:

Extraction layer architecture
Extraction layer architecture

Let’s first go through what each component does individually, and then tie everything together at the end.

JDBC Loader

  • Spark application
  • It reads data from source databases using Spark’s JDBC connection
  • The data read is then written to S3 as Parquet files

Debezium

  • Open source distributed platform for change data capture (CDC) maintained by RedHat
  • It works as a Kafka Connect Connector
  • Streams database’s event log to Kafka, e.g. Binlog in case of MySQL.

Avro Converter

  • Scala application
  • Reads in raw Avro files in S3
  • Communicates with Schema Service to handle schema changes gracefully
  • Writes out Parquet files

Schema Service

  • Scala library
  • Keeps track of all schema changes applied to source tables
  • Holds tables’ PKs, timestamp and partition columns
  • Prevents breaking changes from being introduced, e.g. column type change

Upsert

  • Spark application
  • Reads in Parquet change logs generated by CDC pipeline, JDBC Loader or Avro Converter
  • Compacts change logs based on table’s PK
  • Creates Hive table which contains an exact replica of production database

Now that we’ve explored all the components separately, let’s see how they work together.

Everything starts with the CDC pipeline. Currently we have two ways of ingesting data from source databases, Debezium or JDBC. Although we’re moving away from JDBC in favor of Debezium, we’ll highlight both scenarios in this article.

What’s important to notice, is at the CDC pipeline, we should have several Parquet files which represent the change logs from source databases. These files may contain several records for the same PK, and that’s why we have upsert after that to compact these records.

CDC with JDBC Loader

JDBC Loader task is triggered via Airflow for a specific table, connects to the source database, reads in data that has been updated, or all data for small tables, then writes out Parquet files.

CDC with Debezium

As we mentioned before, Debezium is constantly reading the databases’ event log, and publishing that to Kafka. At the other end of these topics, we have another Kafka Connector Sink which writes out Avro files to S3. Then Avro Converter picks up these files, and converts them to Parquet.

Notice: If there’s an inconsistent type change on any field, Avro Converter or JDBC Loader will communicate with Schema Service, and drop that field specifically so that we don’t end up with schema problems on the final table.

After CDC pipeline is completed, Upsert is the next component executed. The goal of this step is to generate for each source table another table in the data lake under the db_mirror schema. So for source table booking, you’ll have another table called db_mirror.booking.

To do that, the upserts component does the following:

  1. Reads in the Parquet change logs
  2. Compacts records based on table’s PKs
  3. Reads in current data from db_mirror table
  4. Performs a FULL OUTER JOIN between compacted logs and current db_mirror data based on the table’s PKs
  5. Selects latest record based on timestamp
  6. Deletes old record that were marked as deletions
  7. Generates a final Hive table

For big tables, the upsert component also tries to optimize the FULL OUTER JOIN operation by selecting only the partitions that have been affected by the incoming change logs.

At this point we have finished extracting data from source databases by using either Debezium or JDBC Loader, and are now ready to dive into part 2 for the transformations:

Are you interested in joining our engineering team? Check out our open positions.

If you found this post helpful, sign up for the Data Platform Webinar! On July 8 at 3 pm BST/4 pm CEST, our data platform experts and Snowflake are teaming up for a webinar. Learn how modern data architecture can transform ordinary trips into extraordinary ones with valuable business insights. Augusto Elesbão, senior data engineer at GetYourGuide; Robert Bemmann, data engineer; and Snowflake director of product management Torsten Grabs. Learn more and register.

LinkedIn Banner GYG(3).jpg
GetYourGuide event



Other articles from this series
No items found.

Featured roles

Marketing Executive
Berlin
Full-time / Permanent
Marketing Executive
Berlin
Full-time / Permanent
Marketing Executive
Berlin
Full-time / Permanent

Join the journey.

Our 800+ strong team is changing the way millions experience the world, and you can help.

Keep up to date with the latest news

Oops! Something went wrong while submitting the form.