How we use Airflow for SQL alerts and Slack notifications


captionless image

At Sirena, we use Apache Airflow for running all of our data orchestration. Airflow is an open-source tool that was created by Airbnb to create and schedule data workflows. In essence, Airflow is an orchestrator that runs tasks on given frequencies while also handling backfilling, task dependencies and so much more. In Airflow, workflows are defined as Directed Acyclic Graphs (DAGs) that define the dependencies among different tasks.

At Sirena, we found another use for Airflow besides helping us move data from one place to another. We also use it to trigger alerts on certain data quality measures. The idea is quite simple, we define a SQL query and we define a condition for the result. The query should output a single number, and if that number does not match the condition, then we send a Slack alert. The beauty of it is how simple they are to create. We have parametrized the alerts in a way that allow us to create configuration files in YAML that will get materialized as DAGs in Airflow (more about this on a future post about Airflow factories…). These alerts are especially good for making sure nothing catastrophic has happened to the data pipelines.

In this post, I will guide you through the process of setting up a similar workflow for your Airflow implementation.

Configuration Files

Before we dig into Python code, let’s stop for a second to think about how we want our configuration file to look. Besides all of the “required” fields that are needed to create DAGs in Airflow, such as a name, interval, owner, etc. we are going to need a few other things.

Arguably the most important fields are those that we are going to use to actually define the alert. For that, we are going to need a field to write our SQL query and our condition, such as “greater than 5” or something. We’ll break our condition into two; the criteria (greater than, less than, etc) and our target value.

Besides that, we also need to define how we want to notify, who we want to notify and what the actual message will be. As I mentioned before, in Sirena we use Slack to send notifications, so our notification method is Slack. Our recipients come from a list of Slack channels or users, but we have a dedicated channel for all of the Data Team’s notifications. Lastly, we’ll use the description of the alert to send descriptive messages to let us know what happened.

# Give the alert a name
# This will be sent in the notification
name: Example Alert

# This will be the name of the generated DAG
# Should be a short and machine-friendly name
# Use underscores (`_`) instead of spaces and use lowercase
# We will add an `alert_` prefix and a `_dag` suffix, so no need to include that
nickname: example

# Add a description to describe the alert
# This will be sent in the notification
description: >
  An example alert to show how to configure alerts in Airflow. 
  This message will be part of the notification sent to people.
# Easy way to disable an alert
# If true, a notification will be sent.
# If false, no notifications will be sent.
enabled: false

# Determines how often the alert should be checked (usually `@daily`)
interval: "@daily"

# The owner of the DAG (usually `airflow`)
owner: airflow

# Defines the condition to check against the returned value
# The available criteria are:
#   greater than: Notifies if the returned value is greater than the condition value
#   equal to: Notifies if the returned value is exactly equal to the condition value
#   less than: Notifies if the returned value is less than the condition value
criteria: greater than

# The value to compare against
value: 10

# The query to execute to be compared with the condition
# This query should return one and only one column and one and only one row
# Note: If you are using Snowflake, Airflow will only have access to the RAW database
query: >
  select count(*)
  from raw.public.clients
# The method of notification
# slack: Sends a Slack message to the recipients (users or channels).
# email: Not implemented. Sends an email to the recipients.
notifier: slack

# If notifier is email, this list has to be of emails addresses
# If notifier is slack, this list has to be of Slack usernames and/or channels
# Note: Use double quotes ("") for each recipient
recipients:
  - "#data-alerts"

Notice that we define a notifier. This allows us to configure what service we’ll use to send out the notifications to our list of recipients.

We can create one of these configuration files for every alert we want to set up and store them all in /dags/alerts/, our DAG factory will pick up all of these config files from here and create the necessary DAGs. Our DAG factory will pick up all of the configuration files from this directory, that’s why that enabled property comes in handy. We can use it to enable/disable alerts without having to delete the entire file.

Alert Factory

Now we can jump to our alert factory. I suggest you follow my blog for an upcoming post all about DAG factories in Airflow. For now, it’s important to know that a factory is essentially a Python script that will iterate through all the configuration files in /dags/alerts/ and create a DAG for each one of those. The code for it is pretty straightforward, but let’s go through it together.

The first thing we’ll want to do is import all of the configuration files from /dags/alerts/ and iterate them. For each one of them, we’ll want to create a DAG and in order to do this, we implemented a function create_dag_alert that takes the configuration file (as a Python dictionary) and returns an Airflow DAG with one single task that uses the PythonOperator.

alert_files = glob.glob("dags/alerts/*.yaml")

for alert_path in alert_files:

    # Read YAML file
    with open(alert_path, "r") as stream:
        config = yaml.safe_load(stream)

    # Check if the alert is enabled
    if config["enabled"]:

        # Add to global scope
        globals()[config["nickname"]] = create_dag_alert(config)

Let’s break this function into different parts so we can go over it part by part:

# Function that takes the config and creates a DAG
def create_dag_alert(config):

    # Part 1: Define the DAG
    ...

    # Part 2: Task function
    ...

    # Part 3: Setup the DAG and task
    ...

Part 1: Define the DAG

We start by defining some default arguments for our DAG based on the configuration file. You can see that for some of these values we actually use the config file (e.g. owner and schedule_interval) but for some others, we just hard-code the values we want to use (e.g. retries and catchup). This is up to you, we didn’t see a lot of value in adding those to the configuration file at the moment, but we might do it at some point. The point is, don’t stick to this way of doing it, do whatever works best for you.

# Define default arguments for DAG
default_args = {
    "owner": config["owner"],
    "start_date": datetime(2021, 2, 21),
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
}

# Initialize the DAG
dag = DAG(
    dag_id="alert_" + config["nickname"] + "_dag",
    default_args=default_args,
    schedule_interval=config["interval"],
    catchup=False,
)

Part 2: Task function

This is actually the bulk of the factory. What we want to do here is create a function that will be executed using the PythonOperator. Essentially, this is the function that should determine if the condition is met and if it’s not, send out the notification.

We start by getting the query from the configuration file and running it against our data warehouse (in our case Snowflake, but this should work with any data warehouse). We then evaluate the result against the target value configured in the YAML file and compare it using the criteria we specified. After that, we check to see if we need to notify and if we do we call the appropriate function depending on the notifier we selected.

def _run_alert(**context):

    # Get data using query
    log.info("Get data using query")
    query = config["query"]
    log.debug(f"Query: {query}")

    # Run query against Snowflake
    result = sf.query(query)
    result_value = result[0][0]

    # Default action is to not notify
    notify = False

    # Compare return value to condition
    criteria = config["criteria"].strip().lower().replace(" ", "_")

    if criteria == "greater_than":
        if result_value > config["value"]:
            notify = True

    elif criteria == "equal_to":
        if result_value == config["value"]:
            notify = True

    elif criteria == "less_than":
        if result_value < config["value"]:
            notify = True

    else:
        log.error(f"Unknown condition criteria: {criteria}")
        log.error("Original condition criteria:", config["criteria"])
        log.error(
            "Check the value of the 'criteria' parameter in the configuration file of this alert"
        )
        raise RuntimeError(f"Unknown condition criteria: {criteria}")

    # Notify only if condition was met
    if notify:

        # Check which notifier to use
        notifier = config["notifier"].strip().lower().replace(" ", "_")

        # Check Slack notifier
        if notifier == "slack":

            # Call the Slack notifier
            notify_slack(
                config["name"],
                config["description"],
                config["recipients"],
                config["criteria"].strip().lower(),
                config["value"],
                result_value,
            )

        # Check the email notifier
        elif notifier == "email":

            # Call the email notifier
            notify_email(
                config["name"],
                config["description"],
                config["recipients"],
                config["criteria"].strip().lower(),
                config["value"],
                result_value,
            )

        # Unknown notifier
        else:
            log.error(f"Unknown notifier: {notifier}")
            log.error("Original notifier:", config["notifier"])
            log.error(
                "Check the value of the 'notifier' parameter in the configuration file of this alert"
            )
            raise RuntimeError(f"Unknown notifier: {notifier}")

    return "OK"

Part 3: Setup the DAG and task

Lastly, we just need to add a task to the DAG to execute the _run_alert function using the PythonOperator.

with dag:
    t1 = PythonOperator(
        task_id="run_alert", python_callable=_run_alert, provide_context=True
    )

return dag

Going back full circle, notice how we are returning the DAG (dag) we created so we can use it and add it to the global scope.

# Add to global scope
globals()[config["nickname"]] = create_dag_alert(config)

Conclusion

Now, there’s a lot to be said about this implementation. I think the most important point about it is the DAG factory approach to it and how simple it is to write new alerts. But more about that in a future post. About the alerts themselves, it’s good to bear in mind that this is not going to replace your data quality tool like great_expectations, or even dbt tests. Furthermore, in the same way, that great_expectations and dbt tests can live together in harmony, I see a place for these SQL alerts to be combined with these other tools. The way I think about it is that you can use these SQL alerts for data loading, dbt tests for testing out your transformations and great_expectations for your final products to make sure your data stakeholders are using reliable data.

Looking back at the implementation itself, as I was writing this post I noticed that this code is in need of a refactor, and there are some things that could be implemented in a cleaner way. But hey, I leave it as an exercise to the reader to find better ways to improve this script and maybe even extend it to more and better use-cases. In any case, hope this is useful as a base and sparks your creativity to improve your notifications regarding data loading into your data warehouse.