Integrating Airflow and Great Expectations
You can now find the Great Expectations Provider on the Astronomer Registry, the discovery and distribution hub for Apache Airflow integrations created to aggregate and curate the best bits of the ecosystem.
Great Expectations is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in Python, then run validation using those “expectations” against datasets with Checkpoints. The Great Expectations team maintains an Airflow provider that gives users a convenient method for running validation directly from their DAGs.
This guide will walk through how to use the
GreatExpectationsOperator in an Airflow DAG and the Astronomer environment.
Great Expectations Concepts
Typically, using Great Expectations is a two-step process:
- Expectation Suite creation
First, a user creates test suites, or “Expectation Suites”, using Great Expectations methods. These suites are usually stored in JSON and can be checked into version control, just like regular tests. The suites are then loaded by the Great Expectations framework at test runtime, e.g. when processing a new batch of data in a pipeline.
For a step-by-step guide on how to configure a simple Great Expectations project, please see the “Getting started” tutorial.
This guide assumes that you have downloaded the code from the demo repository, which contains a sample Great Expectations project.
If you wish to use your own Great Expectations project along with this guide, ensure you have completed the following steps:
- Initialized a Great Expectations project
- Configured at least one Datasource
- Created at least one Expectation Suite
- Created a Checkpoint
If you set up a project manually, you will see a
great_expectations directory which contains several sub-directories, as well as the
great_expectations.yml configuration file. If you cloned the demo repository, the
great_expectations directory can be found under
GreatExpectationsOperatorrequires Airflow >=2.1, and you will need to change the value of
truein your airflow.cfg. If you are using an Astronomer project structure, add
ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=Trueto your Dockerfile. If you are working from the demo repository, this step has already been completed for you.
Use Case: Great Expectations Operator
Now that we've set up our system to work with Great Expectations, we can start exploring how to use it in our DAGs. The current Great Expectations provider version uses the Great Expectations V3 API.
To validate your data, the
GreatExpectationsOperator runs a Checkpoint against your dataset. Before you start writing your DAG, make sure you have a Data Context and Checkpoint configured. To do this, define dictionaries containing the necessary Data Context and Checkpoint fields and import those dictionaries into your DAG.
A Data Context represents a Great Expectations project. It organizes storage and access for Expectation Suites, data sources, notification settings, and data fixtures.
Checkpoints provide a convenient abstraction for bundling the validation of a Batch (or Batches) of data against an Expectation Suite (or several), as well as the actions that should be taken after the validation.
Our demo repository uses the following configuration:
great_expectationsdirectory is accessible by your DAG, as it is loaded into Docker as part of the
The Great Expectations provider is installed when you run
astro dev start, as it is part of
requirements.txt. For Astronomer projects, a provider version >=
0.1.1is required. If you are not using an Astronomer environment, install the Great Expectations provider in your environment manually:
pip install great_expectations airflow-provider-great-expectations>=0.1.0
It’s recommended to specify a version when installing the package. To make use of the latest Great Expectations V3 API, you need to specify a version >=
Using the Great Expectations Operator
Import the operator in your DAG file. You might also need to import the
BatchRequestclasses depending on how you're using the operator. To import the Great Expectations provider, configurations, and batch classes in a given DAG, add the following line to the top of the DAG file in your
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator from great_expectations.core.batch import BatchRequest from great_expectations.data_context.types.base import ( DataContextConfig, CheckpointConfig )
- Create a task using the
GreatExpectationsOperator. To use the operator in the DAG, define an instance of the
GreatExpectationsOperatorclass and assign it to a variable. In the following example, we define two different instances of the operator to complete two different steps in a data quality check workflow:
ge_data_context_root_dir_with_checkpoint_name_pass = GreatExpectationsOperator( task_id="ge_data_context_root_dir_with_checkpoint_name_pass", data_context_root_dir=ge_root_dir, checkpoint_name="taxi.pass.chk", ) ge_data_context_config_with_checkpoint_config_pass = GreatExpectationsOperator( task_id="ge_data_context_config_with_checkpoint_config_pass", data_context_config=example_data_context_config, checkpoint_config=example_checkpoint_config, ) ge_data_context_root_dir_with_checkpoint_name_pass >> ge_data_context_config_with_checkpoint_config_pass
The operator has several optional parameters, but it always requires either a
data_context_root_dir or a
data_context_config and either a
data_context_root_dir should point to the
great_expectations project directory generated when you created the project with the CLI. If using an in-memory
DataContextConfig must be defined, as in this example.
checkpoint_name references a checkpoint in the project
CheckpointStore defined in the
DataContext (which is often the
great_expectations/checkpoints/ path), so that
checkpoint_name = "taxi.pass.chk" would reference the file
great_expectations/checkpoints/taxi/pass/chk.yml. With a
checkpoint_kwargs may be passed to the operator to specify additional, overwriting configurations. A
checkpoint_config may be passed to the operator in place of a name, and can be defined like this example.
For a full list of parameters, see the
By default, a Great Expectations task will run validation and raise an
AirflowException if any of the tests fail. To override this behavior and continue running even if tests fail, set the
fail_task_on_validation_failure flag to
Connections and Backends
GreatExpectationsOperator can run a checkpoint on a dataset stored in any backend compatible with Great Expectations, e.g. BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, Athena. All that’s needed to get the Operator to point at an external dataset is to set up an Airflow Connection to the
datasource, and add the connection to your Great Expectations project, e.g. using the CLI to add a Postgres backend. Then, if using a
CheckpointConfig, ensure that the
"datasources" field refers to your backend connection name.
In this guide, we presented a brief overview of Great Expectations and explained how to use the provider operator to create Great Expectations Airflow tasks. For more examples on how to use the
GreatExpectationsOperator as part of an ELT pipeline, check out the Snowflake, BigQuery, and Redshift examples on the Astronomer Registry.