In the past I used to handle some data workflows with cron, especially ETL jobs. At the beginning it was easy and natural, but it didn’t scale well and ended up in trouble.

Fortunately, I’ve found a new approach based on Airflow. Open sourced by AirBnB and currently being incubated by Apache, it looked very interesting so I decided to try.

Cron issues

Using cron to manage my data pipelines became a headache, basically due to:

  • It can not handle dependencies between tasks, so many times it forces to set up fixed execution times with ad-hoc guard times.
  • It’s very difficult to add new jobs in complex crons. When to schedule a new heavy task? Some independent tasks share a common resource (i.e. a database) so it’s best to do not overlap them.
  • Hard to debug and maintain. The crontab is just a text file.
  • Rich logging have to be handled externally.
  • Lack of stats
cron example
cron example

Airflow’s workflows – aka DAGs

An Airflow’s DAGdirected acyclic graph – defines a workflow: which tasks have to be executed, when and how. It does not do any actual data processing, it’s just a definition of what has to be done.

example_of_dag
ETL workflow example.

A DAG is defined as a python script in the folder $AIRFLOW_HOME/dags. The scheduler evaluates the script and triggers the dag runs and task executions when necessary. It will also reflect the changes if any.

Creating a simple ETL DAG

simple_etl_dag
Simple ETL DAG.

Creating a DAG is fairly easy after reviewing the documentation and the examples. It usually needs the following steps:

1. Create the dag object itself overriding the needed default arguments
from airflow import DAG
from airflow.operators import BashOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'danidelvalle',
    'depends_on_past': True,
    'start_date': datetime(2016, 8, 30, 10, 15),
    'email': ['foo@bar.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 4,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG
dag = DAG('etl_dag',\
	default_args=default_args,\
	schedule_interval='15 * * * *')
2. Create the tasks by using the built-in operators or by creating some custom.
# Extract task
extract_template = BASE_TASK_PATH +\
	'extract.sh {{ts_nodash}} {{params.user_host}} '+\
	'{{params.remote_dir}} {{params.local_dir}}'
extract_task = BashOperator(
    task_id='extract',
    bash_command=extract_template,
	params={
		'user_host': REMOTE_USER_HOST,
		'remote_dir': REMOTE_PATH,
		'local_dir': DATASET_PATH_ORIG},
    dag=dag)

# Transform task
summarize_template = BASE_TASK_PATH +\
	'summarize_spark.py {{ts_nodash}} '+\
	'{{params.src_dir}} {{params.dst_dir}}'
summarize_task = BashOperator(
    task_id='summarize_spark',
    bash_command=summarize_template,
	params={
		'src_dir': DATASET_PATH_ORIG,
		'dst_dir': DATASET_HOURLY
	},
    dag=dag)

# Load task
load_template = BASE_TASK_PATH +\
	'load_hdfs.py {{ts_nodash}} '+\
	'{{params.src_dir}} {{params.dst_dir}}'
load_task = BashOperator(
    task_id='load_to_hdfs',
    bash_command=load_template,
	params={
		'src_dir': DATASET_HOURLY,
		'dst_dir': HDFS_DESTINATION_FOLDER
	},
    dag=dag)

# slack notification
slack_notify = SlackAPIPostOperator(
	task_id='slack_notify',
	token=SLACK_TOKEN,
	channel=SLACK_CHANNEL,
	username=SLACK_USERNAME,
	text='ETL {{ ts_nodash }} DONE!',
	dag=dag
)
3. Specify task dependencies
summarize_task.set_upstream(extract_task)
load_task.set_upstream(summarize_task)
slack_notify.set_upstream(load_task)

DAG activation (unpause)

By default DAGs are paused after creation. Before unpausing them, from the web UI or the CLI, I would recommend to test them with the command line interface to make sure there are no errors.

Once the DAG is active, we can monitor in several ways everything is working as expected:

  • By tailing the scheduler log.
  • By checking the logs in $AIRFLOW_HOME/logs/dag_id/task_id.
  • The web UI allows to monitor DAGS/tasks executions, display stats, view the logs, etc.

Pros

Out of the box, with a pretty straightforward configuration, Airflow offers:

  • Multiple operators like bash, http, slack, etc.
  • Specify task dependencies is straightforward. It also provide flexible and extendable operators to implement conditions, branches, and task-skipping.
  • Automatically retry failed jobs.
  • Email notifications of tasks retries or failures.
  • A terrific web interface to monitor the DAGs – a cool DAG visualization, gantt diagrams and task duration charts – and perform some maintenance.
  • A powerfull CLI, useful to test new tasks or dags.
  • Logging! The output of each task execution is saved in a file.
  • Scaling! Integration with Apache Mesos and Celery.

Cons

Airflow has been open-sourced very few time ago and is still in development; I’m an early adopter. I know I’ll need to deal with some small bugs or missing features, but on the other hand, it has a committed team and a growing community. So far, in my opinion it definetly worth it.

 

 

Advertisements

3 thoughts on “I’m sorry Cron, I’ve met AirBnB’s Airflow

  1. I’m new to airflow.Can you help me building this? I have installed airflow on local machine successfully. I was just playing with example_bash_operator.py which is already in built dag in example dag. I scheduled it according to my local system. but it was running in background but not showing any status in UI. Can you help me solving this problem.

    Like

    1. Sometimes I’ve to restart the webserver to refresh some changes, you may try. After modifying a dag usually I’ve to “refresh” it – there is a button in the index page at the right of each dag – to detect the changes.
      In addittion, take a look to the “browse” section to list the task instances and dag runs.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s