Skip to content

Gobblin Architecture

liyinan926 edited this page Jan 27, 2015 · 73 revisions
  • Author: Yinan
  • Reviewer: Chavdar

Gobblin Architecture Overview

Gobblin is built around the idea of extensibility, i.e., it should be easy for users to add new adapters or extend existing adapters to work with new sources and start extracting data from the new sources in any deployment settings. The architecture of Gobblin reflects this idea, as shown in the diagram below:

Gobblin Image

A Gobblin job is built on a set of constructs (illustrated by the light green boxes in the diagram above) that work together in a certain way and get the data extraction work done. All those constructs are pluggable through the job configuration and extensible by adding new or extending existing implementations.

A Gobblin job consists of a set of tasks (illustrated by the green boxes in the diagram above), each of which corresponds to a unit of work to be done (formally called a WorkUnit in Gobblin's terms) and is responsible for extracting a portion of the data. The tasks of a Gobblin job are executed by the Gobblin runtime (illustrated by the orange boxes in the diagram above) on the deployment setting of choice (illustrated by the red boxes in the diagram above).

The Gobblin runtime is responsible for running user-defined Gobblin jobs on the deployment setting of choice and and it handles the common tasks including job and task scheduling, error handling and task retries, resource negotiation and management, state management, data quality checking, data publishing, etc.

Gobblin currently supports two deployment settings: deployment on a single node and deployment on Hadoop MapReduce. We are working on adding support for deploying and running Gobblin as a native application on Yarn. Details on deployment of Gobblin can be found in Gobblin Deployment.

The running and operation of Gobblin are supported by a few components and utilities (illustrated by the blue boxes in the diagram above) that handle important things such as metadata management, state management, metric collection and reporting, and monitoring. Details about those components are be described below.

Gobblin Job Flow

A Gobblin job is responsible for extracting data in a defined scope/range from a data source and writing data to a sink such as HDFS. It manages the entire lifecycle of data ingestion in a certain flow as illustrated by the diagram below.

Gobblin Image

A Gobblin job starts with an optional phase of acquiring a job lock. The purpose of doing this is to prevent the next scheduled run of the same job from starting until the current run finishes. This phase is optional because some job schedulers such as Azkaban is already doing this. The next phase of the job is to create tasks based on the partitioning of WorkUnits implemented by the Source class and execute the tasks. Normally, a task is created per WorkUnit. However, there is a special type of WorkUnits called MultiWorkUnit that wraps multiple WorkUnits for which multiple tasks may be created, one per wrapped WorkUnit.

How tasks are executed and where they run depend on the deployment setting. In the standalone mode on a single node, tasks are running in a thread pool dedicated to that job, the size of which is configurable on a per-job basis. In the Hadoop MapReduce mode on a Hadoop cluster, tasks are running in the mappers (used purely as containers to run tasks).

After all tasks of a Gobblin job finish (either successfully or unsuccessfully), the job performs necessary cleanup on any temporary working data, persists the the job/task state, and finally publishes the data if it is OK to do so. Whether extracted data should be published is determined by the task states and the JobCommitPolicy used (configurable). More specifically, extracted data should be published if and only if any one of the following two conditions holds:

  1. JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS is specified in the job configuration.
  2. JobCommitPolicy.COMMIT_ON_FULL_SUCCESS and all tasks were successful.

If a Gobblin job is cancelled before it finishes, the job will not persist any job/task state nor commit and publish any data (as the dotted line shows in the diagram).

An optional final phase of a Gobblin job is to release the job lock if it is acquired at the beginning. This gives green light to the next scheduled run of the same job to proceed.

Gobblin Constructs and Task Flow

As described above, a Gobblin job creates and runs tasks, each of which is responsible for extracting a portion of the data to be pulled by the job. A Gobblin task is created from a WorkUnit that represents a unit of work and serves as a container of job configuration at runtime. A task composes the Gobblin constructs into a flow to extract, transform, checks quality on, and finally writes each extracted data record to the specified sink. The diagram below gives an overview on the Gobblin constructs that constitute the task flows in a Gobblin job.

Gobblin Image

A Source represents an adapter between a data source and Gobblin and is used by a Gobblin job at the beginning of the job flow. A Source is responsible for work partitioning and creating the list of WorkUnits, based on which tasks are created and executed. It is also responsible for creating an Extractor for each given WorkUnit. An Extractor, as the name suggests, is responsible for actually extracting schema and data records from the data source. Each Gobblin task uses its own Extractor instance to extract the portion of data it is assigned. An Extractor reads one data record at a time, although internally it may choose to pull and cache a batch of data records. Gobblin out-of-the-box provides some built-in Source and Extractor implementations that work with various types of of data sources, e.g., web services offering some Rest APIs, databases supporting JDBC, FTP/SFTP servers, etc.

A Converter is responsible for converting both schema and data records and is the core construct for data transformation. Converters are composible and can be chained together as long as each adjacent pair of Converters are compatible in the input and output schema and data record types. This allows building arbitrarily complex data transformation from simple Converters. Note that a Converter converts an input schema to one output schema, but may convert an input data record to zero, one, or many output data records. Each Converter converts every output records of the previous Converter, except for the first one that converts the original extracted data record. When converting a data record, a Converter also takes in the output converted schema of itself, except for the first one that takes in the original input schema. The following diagram explains how a chain of Converters work.

Gobblin Image

A QualityChecker, as the name suggests, is responsible for data quality checking. There are two types of QualityCheckers: one that checks individual data records and decides if each record should proceed to the next phase in the task flow and the other one that checks the whole task output and decides if data can be committed. We call the two types row-level QualityCheckers and task-level QualityCheckers, respectively. A QualityChecker can be configured to be MANDATORY or OPTIONAL and will participate in the decision on if quality checking passes if and only if it is MANDATORY. Similarly to Converters, more than one QualityChecker can be specified to do quality checking from different perspectives. If multiple QualityCheckers are specified, quality checking is considered passed if and only if all MANDATORY QualityCheckers are passed.

A ForkOperator is a type of control operators that allow a task flow to branch into multiple streams, each of which goes to a separately configured sink. This is useful for situations, e.g., that data records need to be written into multiple different storages, or that data records need to be written out to the same storage (say, HDFS) but in different forms for different downstream consumers.

A DataWriter is responsible for writing data records to the sink it is associated to. Gobblin out-of-the-box provides a AvroHdfsDataWriter for writing data in Avro onto HDFS.

The following diagram zooms in further and shows the details on how different constructs are connected and composed to form a task flow. The same task flow is employed regardless of the deployment setting and where tasks are running.

Gobblin Image

A Gobblin task flow consists of a main branch and a number of forked branches coming out of a ForkOperator. It is optional to specify a ForkOperator in the job configuration, However, internally a Gobblin task flow uses a IdentityForkOperator that simply connects the master branch and the single forked branch when no ForkOperator is specified in the job configuration. The reason behind this is it avoids special logic from being introduced into the task flow when a ForkOperator is specified in the job configuration.

The master branch of a Gobblin task starts with schema extraction from the source using the Extractor class specified in the job configuration. The extracted schema will go through a schema transformation phase if at least one Converter class is specified in the job configuration. The next phase of the task flow is to repeatedly extract data records one at a time. Each extracted data record will also go through a transformation phase if at least one Converter class is specified in the job configuration. Each extracted (or converted if applicable) data record is fed into an optional list of QualityCheckers that perform data quality checking on the record. Each record passes the quality checking if and only if it gets a pass from every quality checkers on the list that are MANDATORY to participate in the final decision. Quality checkers on the list that are OPTIONAL are informational only and will no participate in the decision on if a record passes the quality checking.

The quality checking phase ends the master branch. Data records that pass the quality checking will go through the ForkOperator and be further processed in the forked branches. The ForkOperator allows users to specify if an input schema or data record should go to a specific forked branch. If the input schema is specified not to go into a particular branch, that branch will be ignored. If an input schema or data record is specified to go into more than one forked branch, Gobblin assumes that the schema or data record class implements the Copyable interface and will attempt to make a copy of the schema or data record before passing it to each forked branch. So it is very important to make sure the input schema or data record to the ForkOperator is an instance of Copyable if it is going into more than one branch.

Similarly to the master branch, a forked branch of a Gobblin task flow also processes the input schema and each input data record one at a time through an optional transformation phase and a quality checking phase. Data records that pass the quality checking will be written out to the specified sink by a DataWriter. Each forked branch has its own sink configuration and a separate DataWriter that is able to write specifically to the sink it is attached to.

Upon successful processing of the last record, a forked branch does another round of quality checking that applies to all the records processed by the branch. This quality checking is referred to as task-level quality checking, whereas the other one that applies to each individual data record is referred to as record-level quality checking. If this last round of quality checking passed, the branch commits the data and exits.

A task flow completes its execution once the last extracted data record goes through every forked branches. During the execution of a task, a TaskStateTracker keeps track of the task's state and a core set of task metrics, e.g., total records extracted, records extracted per second, total bytes extracted, bytes extracted per second, etc.

Job State Management

Typically a Gobblin job runs periodically on some schedule and each run of the job is extracting data incrementally, i.e., extracting new data or changes to existing data within a specific range since the last run of the job. To make incremental extraction possible, Gobblin must persist the state of the job upon the completion of each run and load the state of the previous run so the next run knows where to start extracting. Gobblin maintains a state store that is responsible for job state persistence. Each run of a Gobblin job reads the state store for the state of the previous run and writes the state of itself to the state store upon its completion. The state of a run of a Gobblin job consists of the job configuration and any properties set at runtime at the job or task level.

Out-of-the-box, Gobblin uses an implementation of the state store that serializes job and task states into Hadoop SequenceFiles, one per job run. Each job has a separate directory where its job and task state SequenceFiles are stored. The file system on which the SequenceFile-based state store resides is configurable.

Handling of Job and Task Failures

Gobblin on a Single Node

Gobblin on Hadoop MapReduce

Clone this wiki locally