Skip to main content
Version: Next

Prefect Integration with DataHub

Overview

DataHub supports integration with Prefect, allowing you to ingest:

  • Prefect flow and task metadata
  • Flow run and Task run information
  • Lineage information (when available)

This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.

Prefect DataHub Block

What is a Prefect DataHub Block?

Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The prefect-datahub block uses the DataHub REST emitter to send metadata events while running Prefect flows.

Prerequisites

  1. Use either Prefect Cloud (recommended) or a self-hosted Prefect server.

  2. For Prefect Cloud setup, refer to the Cloud Quickstart guide.

  3. For self-hosted Prefect server setup, refer to the Host Prefect Server guide.

  4. Ensure the Prefect API URL is set correctly. Verify using:

    prefect profile inspect
  5. API URL format:

    • Prefect Cloud: https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>
    • Self-hosted: http://<host>:<port>/api

Setup Instructions

1. Installation

Install prefect-datahub using pip:

pip install 'prefect-datahub'

Note: Requires Python 3.7+

2. Saving Configurations to a Block

Save your configuration to the Prefect block document store:

from prefect_datahub.datahub_emitter import DatahubEmitter

DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("MY-DATAHUB-BLOCK")

Configuration options:

ConfigTypeDefaultDescription
datahub_rest_urlstrhttp://localhost:8080DataHub GMS REST URL
envstrPRODEnvironment for assets (see FabricType)
platform_instancestrNonePlatform instance for assets (see Platform Instances)

3. Using the Block in Prefect Workflows

Load and use the saved block in your Prefect workflows:

from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter

datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")

@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data

@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()

Note: To emit tasks, you must call emit_flow(). Otherwise, no metadata will be emitted.

Concept Mapping

Prefect ConceptDataHub Concept
FlowDataFlow
Flow RunDataProcessInstance
TaskDataJob
Task RunDataProcessInstance
Task TagTag

Validation and Troubleshooting

Validating the Setup

  1. Check the Prefect UI's Blocks menu for the DataHub emitter.

  2. Run a Prefect workflow and look for DataHub-related log messages:

    Emitting flow to datahub...
    Emitting tasks to datahub...

Debugging Common Issues

Incorrect Prefect API URL

If the Prefect API URL is incorrect, set it manually:

prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'

DataHub Connection Error

If you encounter a ConnectionError: HTTPConnectionPool(host='localhost', port=8080), ensure that your DataHub GMS service is running.

Additional Resources

For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.