Skip to main content

Kafka Connect

Module kafka-connect

Certified

Important Capabilities

CapabilityStatusNotes
Platform InstanceEnabled by default

This plugin extracts the following:

  • Kafka Connect connector as individual DataFlowSnapshotClass entity
  • Creating individual DataJobSnapshotClass entity using {connector_name}:{source_dataset} naming
  • Lineage information between source database to Kafka topic

Current limitations:

  • works only for
    • JDBC and Debezium source connectors
    • BigQuery sink connector

Install the Plugin

pip install 'acryl-datahub[kafka-connect]'

Quickstart Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide

source:
type: "kafka-connect"
config:
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
# Optional mapping of platform types to instance ids
platform_instance_map: # optional
mysql: test_mysql # optional
connect_to_platform_map: # optional
postgres-connector-finance-db: # optional - Connector name
postgres: core_finance_instance # optional - Platform to instance map
# Credentials
username: admin
password: password

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
FieldRequiredTypeDescriptionDefault
envstringThe environment that all assets produced by this connector belong toPROD
platform_instance_mapDict[str,string]Platform instance mapping to use when constructing URNs. e.g.platform_instance_map: { "hive": "warehouse" }
connect_uristringURI to connect to.http://localhost:8083/
usernamestringKafka Connect username.None
passwordstringKafka Connect password.None
cluster_namestringCluster to ingest from.connect-cluster
construct_lineage_workunitsbooleanWhether to create the input and output Dataset entitiesTrue
provided_configsArray of objectProvided ConfigurationsNone
connect_to_platform_mapDictPlatform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map. e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" }
connector_patternsAllowDenyPattern (see below for fields)regex patterns for connectors to filter for ingestion.{'allow': ['.*'], 'deny': [], 'ignoreCase': True, 'alphabet': '[A-Za-z0-9 _.-]'}
connector_patterns.allowArray of stringList of regex patterns for process groups to include in ingestion['.*']
connector_patterns.denyArray of stringList of regex patterns for process groups to exclude from ingestion.[]
connector_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
connector_patterns.alphabetstringAllowed alphabets pattern[A-Za-z0-9 _.-]

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka_connect.KafkaConnectSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack