Skip to main content
Version: Next

DataHub

Migrate data from one DataHub instance to another.

Requires direct access to the database, kafka broker, and kafka schema registry of the source DataHub instance. Testing

Important Capabilities

CapabilityStatusNotes
Detect Deleted EntitiesOptionally enabled via stateful_ingestion.remove_stale_metadata

Overview

This source pulls data from two locations:

  • The DataHub database, containing a single table holding all versioned aspects
  • The DataHub Kafka cluster, reading from the MCL Log topic for timeseries aspects.

All data is first read from the database, before timeseries data is ingested from kafka. To prevent this source from potentially running forever, it will not ingest data produced after the datahub_source ingestion job is started. This stop_time is reflected in the report.

Data from the database and kafka are read in chronological order, specifically by the createdon timestamp in the database and by kafka offset per partition. In order to properly read from the database, please ensure that the createdon column is indexed. Newly created databases should have this index, named timeIndex, by default, but older ones you may have to create yourself, with the statement:

CREATE INDEX timeIndex ON metadata_aspect_v2 (createdon);

If you do not have this index, the source may run incredibly slowly and produce significant database load.

Stateful Ingestion

On first run, the source will read from the earliest data in the database and the earliest kafka offsets. Every commit_state_interval (default 1000) records, the source will store a checkpoint to remember its place, i.e. the last createdon timestamp and kafka offsets. This allows you to stop and restart the source without losing much progress, but note that you will re-ingest some data at the start of the new run.

If any errors are encountered in the ingestion process, e.g. we are unable to emit an aspect due to network errors, the source will keep running, but will stop committing checkpoints, unless commit_with_parse_errors (default false) is set. Thus, if you re-run the ingestion, you can re-ingest the data that was missed, but note it will all re-ingest all subsequent data.

If you want to re-ingest all data, you can set a different pipeline_name in your recipe, or set stateful_ingestion.ignore_old_state:

source:
config:
# ... connection config, etc.
stateful_ingestion:
enabled: true
ignore_old_state: true
urn_pattern: # URN pattern to ignore/include in the ingestion
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*

Limitations

  • Can only pull timeseries aspects retained by Kafka, which by default lasts 90 days.
  • Does not detect hard timeseries deletions, e.g. if via a datahub delete command using the CLI. Therefore, if you deleted data in this way, it will still exist in the destination instance.
  • If you have a significant amount of aspects with the exact same createdon timestamp, stateful ingestion will not be able to save checkpoints partially through that timestamp. On a subsequent run, all aspects for that timestamp will be ingested.

Performance

On your destination DataHub instance, we suggest the following settings:

  • Enable async ingestion
  • Use standalone consumers (mae-consumer and mce-consumer)
    • If you are migrating large amounts of data, consider scaling consumer replicas.
  • Increase the number of gms pods to add redundancy and increase resilience to node evictions
    • If you are migrating large amounts of data, consider increasing elasticsearch's thread count via the ELASTICSEARCH_THREAD_COUNT environment variable.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[datahub]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

pipeline_name: datahub_source_1
datahub_api:
server: "http://localhost:8080" # Migrate data from DataHub instance on localhost:8080
token: "<token>"
source:
type: datahub
config:
include_all_versions: false
database_connection:
scheme: "mysql+pymysql" # or "postgresql+psycopg2" for Postgres
host_port: "<database_host>:<database_port>"
username: "<username>"
password: "<password>"
database: "<database>"
kafka_connection:
bootstrap: "<boostrap_url>:9092"
schema_registry_url: "<schema_registry_url>:8081"
stateful_ingestion:
enabled: true
ignore_old_state: false
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
extractor_config:
set_system_metadata: false # Replicate system metadata

# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
sink:
type: datahub-rest
config:
server: "<destination_gms_url>"
token: "<token>"

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
commit_state_interval
integer
Number of records to process before committing state
Default: 1000
commit_with_parse_errors
boolean
Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors.
Default: False
database_query_batch_size
integer
Number of records to fetch from the database at a time
Default: 10000
database_table_name
string
Name of database table containing all versioned aspects
Default: metadata_aspect_v2
include_all_versions
boolean
If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect.
Default: False
kafka_topic_name
string
Name of kafka topic containing timeseries MCLs
Default: MetadataChangeLog_Timeseries_v1
max_workers
integer
Number of worker threads to use for datahub api ingestion.
Default: 20
pull_from_datahub_api
boolean
Use the DataHub API to fetch versioned aspects.
Default: False
database_connection
SQLAlchemyConnectionConfig
Database connection config
database_connection.host_port 
string
host URL
database_connection.scheme 
string
scheme
database_connection.database
string
database (catalog)
database_connection.options
object
Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. To set connection arguments in the URL, specify them under connect_args.
database_connection.password
string(password)
password
database_connection.sqlalchemy_uri
string
URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.
database_connection.username
string
username
kafka_connection
KafkaConsumerConnectionConfig
Kafka connection config
kafka_connection.bootstrap
string
Default: localhost:9092
kafka_connection.client_timeout_seconds
integer
The request timeout used when interacting with the Kafka APIs.
Default: 60
kafka_connection.consumer_config
object
Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .
kafka_connection.schema_registry_config
object
Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient
kafka_connection.schema_registry_url
string
Default: http://localhost:8080/schema-registry/api/
urn_pattern
AllowDenyPattern
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
urn_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
urn_pattern.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
urn_pattern.allow.string
string
urn_pattern.deny
array
List of regex patterns to exclude from ingestion.
Default: []
urn_pattern.deny.string
string
stateful_ingestion
StatefulIngestionConfig
Stateful Ingestion Config
Default: {'enabled': True, 'max_checkpoint_state_size': 167...
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False

Code Coordinates

  • Class Name: datahub.ingestion.source.datahub.datahub_source.DataHubSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for DataHub, feel free to ping us on our Slack.