Airflow Integration
DataHub supports integration of
- Airflow Pipeline (DAG) metadata
- DAG and Task run information as well as
- Lineage information when present
There are a few ways to enable these integrations from Airflow into DataHub.
Using Datahub's Airflow lineage plugin (new)
::: note
We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 or on MWAA with an Airflow version >= 2.0.2 :::
You need to install the required dependency in your airflow.
pip install acryl-datahub-airflow-plugin
Disable lazy plugin load in your airflow.cfg. On MWAA you should add this config to your Apache Airflow configuration options.
core.lazy_load_plugins : False
You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.
# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'Add your
datahub_conn_id
and/orcluster
to yourairflow.cfg
file if it is not align with the default values. See configuration parameters belowConfiguration options:
Name Default value Description datahub.datahub_conn_id datahub_rest_default The name of the datahub connection you set in step 1. datahub.cluster prod name of the airflow cluster datahub.capture_ownership_info true If true, the owners field of the DAG will be capture as a DataHub corpuser. datahub.capture_tags_info true If true, the tags field of the DAG will be captured as DataHub tags. datahub.graceful_exceptions true If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. Configure
inlets
andoutlets
for your Airflow operators. For reference, look at the sample DAG inlineage_backend_demo.py
, or referencelineage_backend_taskflow_demo.py
if you're using the TaskFlow API.[optional] Learn more about Airflow lineage, including shorthand notation and some automation.
How to validate installation
Go and check in Airflow at Admin -> Plugins menu if you can see the Datahub plugin
Run an Airflow DAG and you should see in the task logs Datahub releated log messages like:
Emitting Datahub ...
Using Datahub's Airflow lineage backend
caution
The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+. For managed services like MWAA you should use the Datahub Airflow plugin as the lineage backend is not supported there
note
If you are looking to run Airflow and DataHub using docker locally, follow the guide here. Otherwise proceed to follow the instructions below.
Setting up Airflow to use DataHub as Lineage Backend
- You need to install the required dependency in your airflow. See https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend
pip install acryl-datahub[airflow]
You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.
# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'Add the following lines to your
airflow.cfg
file.[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_kwargs = {
"datahub_conn_id": "datahub_rest_default",
"cluster": "prod",
"capture_ownership_info": true,
"capture_tags_info": true,
"graceful_exceptions": true }
# The above indentation is important!Configuration options:
datahub_conn_id
(required): Usuallydatahub_rest_default
ordatahub_kafka_default
, depending on what you named the connection in step 1.cluster
(defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.capture_ownership_info
(defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.capture_tags_info
(defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.capture_executions
(defaults to false): If true, it captures task runs as DataHub DataProcessInstances. This feature only works with Datahub GMS version v0.8.33 or greater.graceful_exceptions
(defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
Configure
inlets
andoutlets
for your Airflow operators. For reference, look at the sample DAG inlineage_backend_demo.py
, or referencelineage_backend_taskflow_demo.py
if you're using the TaskFlow API.[optional] Learn more about Airflow lineage, including shorthand notation and some automation.
Emitting lineage via a separate operator
Take a look at this sample DAG:
lineage_emission_dag.py
- emits lineage using the DatahubEmitterOperator.
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.