Advent of data

2022
5 min read

Using Airflow the wrong way

tl;dr: import airflow as a Python package and schedule it, but not in Airflow
Airflow the wrong way (credits)

Airflow is a rare piece of software in the data engineering stack. It has seen a lot. From the initial release in 2015 to today Airflow usage has dramatically changed. It all started with Maxime Beauchemin's vision within Airbnb. Airflow as an sequential idempotent orchestrator of Hive partitions to what we have today, a fragmented cloud data stack where Airflow is a CRON on steroid with a nice UI.

Yes, CRON on steroid, this is what Ry Walker—CEO of Astronomer—wrote 3 years ago. To be fair this is a good way to tl;dr what Airflow is doing to newcomers—even if it supposes they know what CRON is. Still, this is a pretty bad way to explain Airflow and I think Airflow inner power resides in something else. Airflow is a framework to write data jobs, the true power of Airflow live in the open-source providers the community developed and in its operators abstraction.

What if we use Airflow as a Python framework rather than a all-in-one application with a scheduler and a webserver. What if we schedule Airflow in Gitlab?

I know it may seems weird. In the last years dissonant voices came out to say that Airflow should only be seen as a scheduler linked to containers orchestrator—my post is actually totally the opposite of what is stated in this reference article. Ruling that Python code should never run in Airflow workers because Airflow scheduler can't handle it or because it scales badly. So the solution was either to go serverless to run your code in elastic container services or within your own Kubernetes cluster. To make it work you have to create your own container layer packaging application to be accessible from CLI in Docker images. What a waste of time.

Airflow as a framework

Let's deep-dive a bit in the Airflow framework. The framework is whatever makes Airflow a great solution when designing DAGs except everything related on how you schedule it or orchestrate with other DAG. Only things that are within the DAG himself. So mainly the TaskFlow API and the Operator object interface.

Now that I've conceptually introduced Airflow as a framework we want to implement a data transfer from a Postgres table to a BigQuery table, then run a BigQuery query on top of it to compute metrics and export in Excel the data we have in BigQuery. For that we need to create a DAG.

In order to make it work we have 4 steps in our pipeline and every step should at maximum use already existing community providers:

  1. Extract the data from Postgres to GCS
  2. Create a raw dataset in BigQuery to create the table from the extracted data
  3. Load the data from GCS in BigQuery
  4. Compute data on top of BigQuery and export it in Excel

To make this simple DAG work I'll only use postgres and google Airflow providers. In each you'll be able to find operators or hooks that 95% of the time will cover what you're looking for—I say 95% of the time because it's open-source, you know. For the 3 first steps an operator exists and for the last step I decided to go for a custom Python code that still use existing hooks. You can check the code in the toggle below.

This is cool we now have a Python code that we need to run, in order to run it you have to:

  • pip install apache-airflow and the corresponding providers (here: apache-airflow-providers-google and apache-airflow-providers-postgres)
  • define the connections in environnement variables
  • run airflow dags test my_first_pipeline

See the code — Airflow DAG



from io import BytesIO

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator


@task
def compute_price_and_extract():
    bq_hook = BigQueryHook(gcp_conn_id="gcp", location="EU")
    gcs_hook = GCSHook(gcp_conn_id="gcp")

    data = bq_hook.get_records("SELECT SUM(price) FROM raw.fruits")
    df = pd.DataFrame(data)

    output = BytesIO()
    df.to_excel(output)
    output.seek(0)

    gcs_hook.upload("analytics", "prices.xlsx", data=output.read())


@dag(
    schedule="0 1 * * *",
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
)
def my_first_pipeline():
    """
    ### Prepare and load data
    This is the DAG that loads all the raw data
    """
    BUCKET = "postgres-raw-landing"
    FILENAME = "fruits.csv"

    upload_data = PostgresToGCSOperator(
        task_id="fruits_pg_to_gcs",
        postgres_conn_id="postgres",
        gcp_conn_id="gcp",
        sql="SELECT * FROM fruits",
        bucket=BUCKET,
        filename=FILENAME,
        gzip=False,
        export_format="csv",
    )

    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id="create_dataset",
        dataset_id="raw",
        location="EU",
        gcp_conn_id="gcp",
    )

    upload_erf_to_bq = GCSToBigQueryOperator(
        task_id="upload_raw_fruits",
        gcp_conn_id="gcp",
        bucket=BUCKET,
        source_objects=[FILENAME],
        destination_project_dataset_table="raw.fruits",
        write_disposition="WRITE_TRUNCATE",
        location="EU",
        source_format="CSV",
        autodetect=True,
    )

    upload_data >> create_dataset >> upload_erf_to_bq >> compute_price_and_extract()


my_first_pipeline()


Run Airflow code from Gitlab

Now that we have our DAG defined let's try to run it from somewhere. This somewhere should not be Airflow. We'll try to create a Gitlab configuration to do it. In Gitlab we will need:

  • install the Python requirements.txt—I don't recommend to use a bare pip setup, prefer poetry or pdm
  • define the environement variables AIRFLOW_CONN_POSTGRES and AIRFLOW_CONN_GCP in Gitlab CI/CD settings
  • autorise the Gitlab runner IP addresses on your infrastructure to be able to acess the Postgres server

Once everything is setup1 you'll be able to run a CI pipeline with an airflow command. The next step can be to schedule your pipeline to run every day and you'll be done. You now have a free scheduler using Airflow code.

The green pipeline in Gitlab CI/CD

If you want to go further we can even imagine to package the pip installations in a Docker image to avoid having a random cache that can disapear because of limited lifetime of Gitlab runners.

There are accepted limitations to this setup, I don't recommend to run it in production, the most annoying one is that in this specific setup with Gitlab public the data will go through Gitlab own instances which are located in the US region.

.gitlab-ci.yml



image: python:3.10

variables:
    PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
    AIRFLOW_HOME: "$CI_PROJECT_DIR"
    AIRFLOW_CONN_POSTGRES: "$AIRFLOW_CONN_POSTGRES"
    AIRFLOW_CONN_GCP: "google-cloud-platform://?key_path=/key.json&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&project=$PROJECT_ID&num_retries=5"

cache:
    key: $CI_COMMIT_REF_SLUG
    paths:
        - .cache/pip

before_script:
    - python3 -m pip install --upgrade pip
    - python3 -m pip install -r requirements.txt
    - airflow db init
    - cat $PRIVATE_KEY > /key.json


stages:
    - deploy

run-fruits:
  stage: deploy
  script:
    - airflow dags test my_first_pipeline

Conclusion

Obviously this article is a bit provocative. I recommend to nobody to run Airflow from Gitlab in production except if you wanna save a lot of costs. The main idea is to say that this is stupid to reduce Airflow to his scheduling and orchestration capacities. Airflow comes with a set a good practices and a framework to define data jobs. And to be honest, this framework is pretty good, even if it has been designed years ago.

Aside from this, there are many flaws that Airflow has to address one day. The simple fact that pip install apache-airflow drains the whole earth when you runs it—107 packages on a fresh install—it prevents every light integration of Airflow in other tools.

Another issue is that your Gitlab needs to access your data systems, which is more than what you usually need from your CI setup. We can also push the crazyness further and use Airflow competitors to run our Airflow dags. Even if I'd say this is stupid, but this article is not far from being stupid.


  1. You can see the Gitlab repo of this experiment here.

PS: I'm sorry I've finished this article a bit in a rush become it was planned as the 2nd of the Advent of Data. I hope you like the idea.