Skip to main content

DataHub Metadata Ingestion

Python version 3.6+

This module hosts an extensible Python-based metadata ingestion system for DataHub. This supports sending data to DataHub using Kafka or through the REST API. It can be used through our CLI tool, with an orchestrator like Airflow, or as a library.

Getting Started#

Prerequisites#

Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. If you are trying this out locally, the easiest way to do that is through quickstart Docker images.

Install from PyPI#

The folks over at Acryl Data maintain a PyPI package for DataHub metadata ingestion.

# Requires Python 3.6+python3 -m pip install --upgrade pip wheel setuptoolspython3 -m pip install --upgrade acryl-datahubdatahub version# If you see "command not found", try running this instead: python3 -m datahub version

If you run into an error, try checking the common setup issues.

Installing Plugins#

We use a plugin architecture so that you can install only the dependencies you actually need.

Plugin NameInstall CommandProvides
fileincluded by defaultFile source and sink
consoleincluded by defaultConsole sink
athenapip install 'acryl-datahub[athena]'AWS Athena source
bigquerypip install 'acryl-datahub[bigquery]'BigQuery source
bigquery-usagepip install 'acryl-datahub[bigquery-usage]'BigQuery usage statistics source
feastpip install 'acryl-datahub[feast]'Feast source
gluepip install 'acryl-datahub[glue]'AWS Glue source
hivepip install 'acryl-datahub[hive]'Hive source
mssqlpip install 'acryl-datahub[mssql]'SQL Server source
mysqlpip install 'acryl-datahub[mysql]'MySQL source
oraclepip install 'acryl-datahub[oracle]'Oracle source
postgrespip install 'acryl-datahub[postgres]'Postgres source
redshiftpip install 'acryl-datahub[redshift]'Redshift source
sagemakerpip install 'acryl-datahub[sagemaker]'AWS SageMaker source
sqlalchemypip install 'acryl-datahub[sqlalchemy]'Generic SQLAlchemy source
snowflakepip install 'acryl-datahub[snowflake]'Snowflake source
snowflake-usagepip install 'acryl-datahub[snowflake-usage]'Snowflake usage statistics source
supersetpip install 'acryl-datahub[superset]'Superset source
mongodbpip install 'acryl-datahub[mongodb]'MongoDB source
ldappip install 'acryl-datahub[ldap]' (extra requirements)LDAP source
lookerpip install 'acryl-datahub[looker]'Looker source
lookmlpip install 'acryl-datahub[lookml]'LookML source, requires Python 3.7+
kafkapip install 'acryl-datahub[kafka]'Kafka source
druidpip install 'acryl-datahub[druid]'Druid Source
dbtno additional dependenciesdbt source
datahub-restpip install 'acryl-datahub[datahub-rest]'DataHub sink over REST API
datahub-kafkapip install 'acryl-datahub[datahub-kafka]'DataHub sink over Kafka

These plugins can be mixed and matched as desired. For example:

pip install 'acryl-datahub[bigquery,datahub-rest]'

You can check the active plugins:

datahub check plugins

Basic Usage#

pip install 'acryl-datahub[datahub-rest]'  # install the required plugindatahub ingest -c ./examples/recipes/example_to_datahub_rest.yml

Install using Docker#

Docker Hub datahub-ingestion docker

If you don't want to install locally, you can alternatively run metadata ingestion within a Docker container. We have prebuilt images available on Docker hub. All plugins will be installed and enabled automatically.

Limitation: the datahub_docker.sh convenience script assumes that the recipe and any input/output files are accessible in the current working directory or its subdirectories. Files outside the current working directory will not be found, and you'll need to invoke the Docker image directly.

# Assumes the DataHub repo is cloned locally../metadata-ingestion/scripts/datahub_docker.sh ingest -c ./examples/recipes/example_to_datahub_rest.yml

Install from source#

If you'd like to install from source, see the developer guide.

Recipes#

A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink). Here's a simple example that pulls metadata from MSSQL and puts it into datahub.

# A sample recipe that pulls metadata from MSSQL and puts it into DataHub# using the Rest API.source:  type: mssql  config:    username: sa    password: ${MSSQL_PASSWORD}    database: DemoData
transformers:  - type: "fully-qualified-class-name-of-transformer"    config:      some_property: "some.value"
sink:  type: "datahub-rest"  config:    server: "http://localhost:8080"

We automatically expand environment variables in the config, similar to variable substitution in GNU bash or in docker-compose files. For details, see https://docs.docker.com/compose/compose-file/compose-file-v2/#variable-substitution.

Running a recipe is quite easy.

datahub ingest -c ./examples/recipes/mssql_to_datahub.yml

A number of recipes are included in the examples/recipes directory.

Sources#

Kafka Metadata kafka#

Extracts:

  • List of topics - from the Kafka broker
  • Schemas associated with each topic - from the schema registry
source:  type: "kafka"  config:    connection:      bootstrap: "broker:9092"      consumer_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.DeserializingConsumer      schema_registry_url: http://localhost:8081      schema_registry_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient

The options in the consumer config and schema registry config are passed to the Kafka DeserializingConsumer and SchemaRegistryClient respectively.

For a full example with a number of security options, see this example recipe.

MySQL Metadata mysql#

Extracts:

  • List of databases and tables
  • Column types and schema associated with each table
source:  type: mysql  config:    username: root    password: example    database: dbname    host_port: localhost:3306    table_pattern:      deny:        # Note that the deny patterns take precedence over the allow patterns.        - "performance_schema"      allow:        - "schema1.table2"      # Although the 'table_pattern' enables you to skip everything from certain schemas,      # having another option to allow/deny on schema level is an optimization for the case when there is a large number      # of schemas that one wants to skip and you want to avoid the time to needlessly fetch those tables only to filter      # them out afterwards via the table_pattern.    schema_pattern:      deny:        - "garbage_schema"      allow:        - "schema1"

Microsoft SQL Server Metadata mssql#

We have two options for the underlying library used to connect to SQL Server: (1) python-tds and (2) pyodbc. The TDS library is pure Python and hence easier to install, but only PyODBC supports encrypted connections.

Extracts:

  • List of databases, schema, tables and views
  • Column types associated with each table/view
source:  type: mssql  config:    username: user    password: pass    host_port: localhost:1433    database: DemoDatabase    include_views: True # whether to include views, defaults to True    table_pattern:      deny:        - "^.*\\.sys_.*" # deny all tables that start with sys_      allow:        - "schema1.table1"        - "schema1.table2"    options:      # Any options specified here will be passed to SQLAlchemy's create_engine as kwargs.      # See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details.      # Many of these options are specific to the underlying database driver, so that library's      # documentation will be a good reference for what is supported. To find which dialect is likely      # in use, consult this table: https://docs.sqlalchemy.org/en/14/dialects/index.html.      charset: "utf8"    # If set to true, we'll use the pyodbc library. This requires you to have    # already installed the Microsoft ODBC Driver for SQL Server.    # See https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15    use_odbc: False    uri_args: {}
Example: using ingestion with ODBC and encryption

This requires you to have already installed the Microsoft ODBC Driver for SQL Server. See https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15

source:  type: mssql  config:    # See https://docs.sqlalchemy.org/en/14/dialects/mssql.html#module-sqlalchemy.dialects.mssql.pyodbc    use_odbc: True    username: user    password: pass    host_port: localhost:1433    database: DemoDatabase    include_views: True # whether to include views, defaults to True    uri_args:      # See https://docs.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver15      driver: "ODBC Driver 17 for SQL Server"      Encrypt: "yes"      TrustServerCertificate: "Yes"      ssl: "True"      # Trusted_Connection: "yes"

Hive hive#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table
  • Detailed table and storage information
source:  type: hive  config:    # For more details on authentication, see the PyHive docs:    # https://github.com/dropbox/PyHive#passing-session-configuration.    # LDAP, Kerberos, etc. are supported using connect_args, which can be    # added under the `options` config parameter.    #scheme: 'hive+http' # set this if Thrift should use the HTTP transport    #scheme: 'hive+https' # set this if Thrift should use the HTTP with SSL transport    username: user # optional    password: pass # optional    host_port: localhost:10000    database: DemoDatabase # optional, if not specified, ingests from all databases    # table_pattern/schema_pattern is same as above    # options is same as above
Example: using ingestion with Azure HDInsight
# Connecting to Microsoft Azure HDInsight using TLS.source:  type: hive  config:    scheme: "hive+https"    host_port: <cluster_name>.azurehdinsight.net:443    username: admin    password: "<password>"    options:      connect_args:        http_path: "/hive2"        auth: BASIC    # table_pattern/schema_pattern is same as above

PostgreSQL postgres#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table
  • Also supports PostGIS extensions
  • database_alias (optional) can be used to change the name of database to be ingested
source:  type: postgres  config:    username: user    password: pass    host_port: localhost:5432    database: DemoDatabase    database_alias: DatabaseNameToBeIngested    include_views: True # whether to include views, defaults to True    # table_pattern/schema_pattern is same as above    # options is same as above

Redshift redshift#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table
  • Also supports PostGIS extensions
source:  type: redshift  config:    username: user    password: pass    host_port: example.something.us-west-2.redshift.amazonaws.com:5439    database: DemoDatabase    include_views: True # whether to include views, defaults to True    # table_pattern/schema_pattern is same as above    # options is same as above
Extra options when running Redshift behind a proxy

This requires you to have already installed the Microsoft ODBC Driver for SQL Server. See https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15

source:  type: redshift  config:    # username, password, database, etc are all the same as above    host_port: my-proxy-hostname:5439    options:      connect_args:        sslmode: "prefer" # or "require" or "verify-ca"        sslrootcert: ~ # needed to unpin the AWS Redshift certificate

AWS SageMaker sagemaker#

Extracts:

  • Feature groups
  • Models, jobs, and lineage between the two (e.g. when jobs output a model or a model is used by a job)
source:  type: sagemaker  config:    aws_region: # aws_region_name, i.e. "eu-west-1"    env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD".
    # Credentials. If not specified here, these are picked up according to boto3 rules.    # (see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)    aws_access_key_id: # Optional.    aws_secret_access_key: # Optional.    aws_session_token: # Optional.    aws_role: # Optional (Role chaining supported by using a sorted list).
    extract_feature_groups: True # if feature groups should be ingested, default True    extract_models: True # if models should be ingested, default True    extract_jobs: # if jobs should be ingested, default True for all      auto_ml: True      compilation: True      edge_packaging: True      hyper_parameter_tuning: True      labeling: True      processing: True      training: True      transform: True

Snowflake snowflake#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table
source:  type: snowflake  config:    username: user    password: pass    host_port: account_name    database_pattern:      # The escaping of the \$ symbol helps us skip the environment variable substitution.      allow:        - ^MY_DEMO_DATA.*        - ^ANOTHER_DB_REGEX      deny:        - ^SNOWFLAKE\$        - ^SNOWFLAKE_SAMPLE_DATA\$    warehouse: "COMPUTE_WH" # optional    role: "sysadmin" # optional    include_views: True # whether to include views, defaults to True    # table_pattern/schema_pattern is same as above    # options is same as above
tip

You can also get fine-grained usage statistics for Snowflake using the snowflake-usage source.

Superset superset#

Extracts:

  • List of charts and dashboards
source:  type: superset  config:    username: user    password: pass    provider: db | ldap    connect_uri: http://localhost:8088    env: "PROD" # Optional, default is "PROD"

See documentation for superset's /security/login at https://superset.apache.org/docs/rest-api for more details on superset's login api.

Oracle oracle#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table

Using the Oracle source requires that you've also installed the correct drivers; see the cx_Oracle docs. The easiest one is the Oracle Instant Client.

source:  type: oracle  config:    # For more details on authentication, see the documentation:    # https://docs.sqlalchemy.org/en/14/dialects/oracle.html#dialect-oracle-cx_oracle-connect and    # https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connection-strings.    username: user    password: pass    host_port: localhost:5432    database: dbname    service_name: svc # omit database if using this option    include_views: True # whether to include views, defaults to True    # table_pattern/schema_pattern is same as above    # options is same as above

Feast feast#

Note: Feast ingestion requires Docker to be installed.

Extracts:

Note: this uses a separate Docker container to extract Feast's metadata into a JSON file, which is then parsed to DataHub's native objects. This was done because of a dependency conflict in the feast module.

source:  type: feast  config:    core_url: localhost:6565 # default    env: "PROD" # Optional, default is "PROD"    use_local_build: False # Whether to build Feast ingestion image locally, default is False

Google BigQuery bigquery#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table
source:  type: bigquery  config:    project_id: project # optional - can autodetect from environment    options: # options is same as above      # See https://github.com/mxmzdlv/pybigquery#authentication for details.      credentials_path: "/path/to/keyfile.json" # optional    include_views: True # whether to include views, defaults to True    # table_pattern/schema_pattern is same as above
tip

You can also get fine-grained usage statistics for BigQuery using the bigquery-usage source.

AWS Athena athena#

Extracts:

  • List of databases and tables
  • Column types associated with each table
source:  type: athena  config:    username: aws_access_key_id # Optional. If not specified, credentials are picked up according to boto3 rules.    # See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html    password: aws_secret_access_key # Optional.    database: database # Optional, defaults to "default"    aws_region: aws_region_name # i.e. "eu-west-1"    s3_staging_dir: s3_location # "s3://<bucket-name>/prefix/"    # The s3_staging_dir parameter is needed because Athena always writes query results to S3.    # See https://docs.aws.amazon.com/athena/latest/ug/querying.html    # However, the athena driver will transparently fetch these results as you would expect from any other sql client.    work_group: athena_workgroup # "primary"    # table_pattern/schema_pattern is same as above

AWS Glue glue#

Note: if you also have files in S3 that you'd like to ingest, we recommend you use Glue's built-in data catalog. See here for a quick guide on how to set up a crawler on Glue and ingest the outputs with DataHub.

Extracts:

  • List of tables
  • Column types associated with each table
  • Table metadata, such as owner, description and parameters
  • Jobs and their component transformations, data sources, and data sinks
source:  type: glue  config:    aws_region: # aws_region_name, i.e. "eu-west-1"    extract_transforms: True # whether to ingest Glue jobs, defaults to True    env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD".
    # Filtering patterns for databases and tables to scan    database_pattern: # Optional, to filter databases scanned, same as schema_pattern above.    table_pattern: # Optional, to filter tables scanned, same as table_pattern above.
    # Credentials. If not specified here, these are picked up according to boto3 rules.    # (see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)    aws_access_key_id: # Optional.    aws_secret_access_key: # Optional.    aws_session_token: # Optional.    aws_role: # Optional (Role chaining supported by using a sorted list).

Druid druid#

Extracts:

  • List of databases, schema, and tables
  • Column types associated with each table

Note It is important to define a explicitly define deny schema pattern for internal druid databases (lookup & sys) if adding a schema pattern otherwise the crawler may crash before processing relevant databases. This deny pattern is defined by default but is overriden by user-submitted configurations

source:  type: druid  config:    # Point to broker address    host_port: localhost:8082    schema_pattern:      deny:        - "^(lookup|sys).*"    # options is same as above

Other databases using SQLAlchemy sqlalchemy#

The sqlalchemy source is useful if we don't have a pre-built source for your chosen database system, but there is an SQLAlchemy dialect defined elsewhere. In order to use this, you must pip install the required dialect packages yourself.

Extracts:

  • List of schemas and tables
  • Column types associated with each table
source:  type: sqlalchemy  config:    # See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls    connect_uri: "dialect+driver://username:password@host:port/database"    options: {} # same as above    schema_pattern: {} # same as above    table_pattern: {} # same as above    include_views: True # whether to include views, defaults to True

MongoDB mongodb#

Extracts:

  • List of databases
  • List of collections in each database and infers schemas for each collection

By default, schema inference samples 1,000 documents from each collection. Setting schemaSamplingSize: null will scan the entire collection. Moreover, setting useRandomSampling: False will sample the first documents found without random selection, which may be faster for large collections.

Note that schemaSamplingSize has no effect if enableSchemaInference: False is set.

source:  type: "mongodb"  config:    # For advanced configurations, see the MongoDB docs.    # https://pymongo.readthedocs.io/en/stable/examples/authentication.html    connect_uri: "mongodb://localhost"    username: admin    password: password    env: "PROD" # Optional, default is "PROD"    authMechanism: "DEFAULT"    options: {}    database_pattern: {}    collection_pattern: {}    enableSchemaInference: True    schemaSamplingSize: 1000    useRandomSampling: True # whether to randomly sample docs for schema or just use the first ones, True by default    # database_pattern/collection_pattern are similar to schema_pattern/table_pattern from above

LDAP ldap#

Extracts:

  • List of people
  • Names, emails, titles, and manager information for each person
  • List of groups
source:  type: "ldap"  config:    ldap_server: ldap://localhost    ldap_user: "cn=admin,dc=example,dc=org"    ldap_password: "admin"    base_dn: "dc=example,dc=org"    filter: "(objectClass=*)" # optional field    drop_missing_first_last_name: False # optional

The drop_missing_first_last_name should be set to true if you've got many "headless" user LDAP accounts for devices or services should be excluded when they do not contain a first and last name. This will only impact the ingestion of LDAP users, while LDAP groups will be unaffected by this config option.

LookML lookml#

Note! This plugin uses a package that requires Python 3.7+!

Extracts:

  • LookML views from model files
  • Name, upstream table names, dimensions, measures, and dimension groups
source:  type: "lookml"  config:    base_folder: /path/to/model/files # where the *.model.lkml and *.view.lkml files are stored    connection_to_platform_map: # mappings between connection names in the model files to platform names      connection_name: platform_name (or platform_name.database_name) # for ex. my_snowflake_conn: snowflake.my_database    model_pattern: {}    view_pattern: {}    env: "PROD" # optional, default is "PROD"    parse_table_names_from_sql: False # see note below    platform_name: "looker" # optional, default is "looker"

Note! The integration can use sql-metadata to try to parse the tables the views depends on. As these SQL's can be complicated, and the package doesn't official support all the SQL dialects that Looker supports, the result might not be correct. This parsing is disabled by default, but can be enabled by setting parse_table_names_from_sql: True.

Looker dashboards looker#

Extracts:

  • Looker dashboards and dashboard elements (charts)
  • Names, descriptions, URLs, chart types, input view for the charts

See the Looker authentication docs for the steps to create a client ID and secret.

source:  type: "looker"  config:    client_id: # Your Looker API3 client ID    client_secret: # Your Looker API3 client secret    base_url: # The url to your Looker instance: https://company.looker.com:19999 or https://looker.company.com, or similar.    dashboard_pattern: # supports allow/deny regexes    chart_pattern: # supports allow/deny regexes    actor: urn:li:corpuser:etl # Optional, defaults to urn:li:corpuser:etl    env: "PROD" # Optional, default is "PROD"    platform_name: "looker" # Optional, default is "looker"

File file#

Pulls metadata from a previously generated file. Note that the file sink can produce such files, and a number of samples are included in the examples/mce_files directory.

source:  type: file  config:    filename: ./path/to/mce/file.json

dbt dbt#

Pull metadata from dbt artifacts files:

  • dbt manifest file
    • This file contains model, source and lineage data.
  • dbt catalog file
    • This file contains schema data.
    • dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
  • dbt sources file
    • This file contains metadata for sources with freshness checks.
    • We transfer dbt's freshness checks to DataHub's last-modified fields.
    • Note that this file is optional โ€“ if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
  • target_platform:
  • load_schemas:
    • Load schemas from dbt catalog file, not necessary when the underlying data platform already has this data.
  • node_type_pattern:
    • Use this filter to exclude and include node types using allow or deny method
source:  type: "dbt"  config:    manifest_path: "./path/dbt/manifest_file.json"    catalog_path: "./path/dbt/catalog_file.json"    sources_path: "./path/dbt/sources_file.json" # (optional, used for freshness checks)    target_platform: "postgres" # optional, eg "postgres", "snowflake", etc.    load_schemas: True or False    node_type_pattern: # optional      deny:        - ^test.*      allow:        - ^.*

Note: when load_schemas is False, models that use identifiers to reference their source tables are ingested using the model identifier as the model name to preserve the lineage.

Google BigQuery Usage Stats bigquery-usage#

  • Fetch a list of queries issued
  • Fetch a list of tables and columns accessed
  • Aggregate these statistics into buckets, by day or hour granularity

Note: the client must have one of the following OAuth scopes, and should be authorized on all projects you'd like to ingest usage stats from.

source:  type: bigquery-usage  config:    projects: # optional - can autodetect a single project from the environment      - project_id_1      - project_id_2    options:      # See https://googleapis.dev/python/logging/latest/client.html for details.      credentials: ~ # optional - see docs    env: PROD
    bucket_duration: "DAY"    start_time: ~ # defaults to the last full day in UTC (or hour)    end_time: ~ # defaults to the last full day in UTC (or hour)
    top_n_queries: 10 # number of queries to save for each table
note

This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the bigquery source.

Snowflake Usage Stats snowflake-usage#

  • Fetch a list of queries issued
  • Fetch a list of tables and columns accessed (excludes views)
  • Aggregate these statistics into buckets, by day or hour granularity

Note: the user/role must have access to the account usage table. The "accountadmin" role has this by default, and other roles can be granted this permission.

Note: the underlying access history views that we use are only available in Snowflake's enterprise edition or higher.

source:  type: snowflake-usage  config:    username: user    password: pass    host_port: account_name    role: ACCOUNTADMIN    env: PROD
    bucket_duration: "DAY"    start_time: ~ # defaults to the last full day in UTC (or hour)    end_time: ~ # defaults to the last full day in UTC (or hour)
    top_n_queries: 10 # number of queries to save for each table
note

This source only does usage statistics. To get the tables, views, and schemas in your Snowflake warehouse, ingest using the snowflake source.

Kafka Connect kafka-connect#

Extracts:

  • Kafka Connect connector as individual DataFlowSnapshotClass entity
  • Creating individual DataJobSnapshotClass entity using {connector_name}:{source_dataset} naming
  • Lineage information between source database to Kafka topic
source:  type: "kafka-connect"  config:    connect_uri: "http://localhost:8083"    cluster_name: "connect-cluster"    connector_patterns:      deny:        - ^denied-connector.*      allow:        - ^allowed-connector.*

Current limitations:

  • Currently works only for Debezium source connectors.

Sinks#

DataHub Rest datahub-rest#

Pushes metadata to DataHub using the GMA rest API. The advantage of the rest-based interface is that any errors can immediately be reported.

sink:  type: "datahub-rest"  config:    server: "http://localhost:8080"

DataHub Kafka datahub-kafka#

Pushes metadata to DataHub by publishing messages to Kafka. The advantage of the Kafka-based interface is that it's asynchronous and can handle higher throughput. This requires the Datahub mce-consumer container to be running.

sink:  type: "datahub-kafka"  config:    connection:      bootstrap: "localhost:9092"      producer_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.SerializingProducer      schema_registry_url: "http://localhost:8081"      schema_registry_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient

The options in the producer config and schema registry config are passed to the Kafka SerializingProducer and SchemaRegistryClient respectively.

For a full example with a number of security options, see this example recipe.

Console console#

Simply prints each metadata event to stdout. Useful for experimentation and debugging purposes.

sink:  type: "console"

File file#

Outputs metadata to a file. This can be used to decouple metadata sourcing from the process of pushing it into DataHub, and is particularly useful for debugging purposes. Note that the file source can read files generated by this sink.

sink:  type: file  config:    filename: ./path/to/mce/file.json

Transformations#

If you'd like to modify data before it reaches the ingestion sinks โ€“ for instance, adding additional owners or tags โ€“ you can use a transformer to write your own module and integrate it with DataHub.

Check out the transformers guide for more info!

Using as a library#

In some cases, you might want to construct the MetadataChangeEvents yourself but still use this framework to emit that metadata to DataHub. In this case, take a look at the emitter interfaces, which can easily be imported and called from your own code.

Lineage with Airflow#

There's a couple ways to get lineage information from Airflow into DataHub.

note

If you're simply looking to run ingestion on a schedule, take a look at these sample DAGs:

Using Datahub's Airflow lineage backend (recommended)#

caution

The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+.

  1. First, you must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.

    # For REST-based:airflow connections add  --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'# For Kafka-based (standard Kafka sink config can be passed via extras):airflow connections add  --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
  2. Add the following lines to your airflow.cfg file.

    [lineage]backend = datahub_provider.lineage.datahub.DatahubLineageBackenddatahub_kwargs = {    "datahub_conn_id": "datahub_rest_default",    "capture_ownership_info": true,    "capture_tags_info": true,    "graceful_exceptions": true }# The above indentation is important!

    Configuration options:

    • datahub_conn_id (required): Usually datahub_rest_default or datahub_kafka_default, depending on what you named the connection in step 1.
    • capture_ownership_info (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
    • capture_tags_info (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
    • graceful_exceptions (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
  3. Configure inlets and outlets for your Airflow operators. For reference, look at the sample DAG in lineage_backend_demo.py, or reference lineage_backend_taskflow_demo.py if you're using the TaskFlow API.

  4. [optional] Learn more about Airflow lineage, including shorthand notation and some automation.

Emitting lineage via a separate operator#

Take a look at this sample DAG:

In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.

Developing#

See the guides on developing, adding a source and using transformers.