Skip to main content

Kafka Connect

Module kafka-connect


Important Capabilities

Platform InstanceEnabled by default

This plugin extracts the following:

  • Kafka Connect connector as individual DataFlowSnapshotClass entity
  • Creating individual DataJobSnapshotClass entity using {connector_name}:{source_dataset} naming
  • Lineage information between source database to Kafka topic

Current limitations:

  • works only for
    • JDBC and Debezium source connectors
    • BigQuery sink connector

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[kafka-connect]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

type: "kafka-connect"
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
- provider: env
value: jdbc:mysql://test_mysql:3306/librarydb
# Optional mapping of platform types to instance ids
platform_instance_map: # optional
mysql: test_mysql # optional
connect_to_platform_map: # optional
postgres-connector-finance-db: # optional - Connector name
postgres: core_finance_instance # optional - Platform to instance map
# Credentials
username: admin
password: password

# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
envstringThe environment that all assets produced by this connector belong toPROD
platform_instance_mapDict[str,string]Platform instance mapping to use when constructing URNs. e.g.platform_instance_map: { "hive": "warehouse" }
connect_uristringURI to connect to.http://localhost:8083/
usernamestringKafka Connect username.None
passwordstringKafka Connect password.None
cluster_namestringCluster to ingest from.connect-cluster
construct_lineage_workunitsbooleanWhether to create the input and output Dataset entitiesTrue
provided_configsArray of objectProvided ConfigurationsNone
connect_to_platform_mapDictPlatform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map. e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" }
connector_patternsAllowDenyPattern (see below for fields)regex patterns for connectors to filter for ingestion.{'allow': ['.*'], 'deny': [], 'ignoreCase': True}
connector_patterns.allowArray of stringList of regex patterns to include in ingestion['.*']
connector_patterns.denyArray of stringList of regex patterns to exclude from ingestion.[]
connector_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka_connect.KafkaConnectSource
  • Browse on GitHub


If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack