Kafka Connect
Module kafka-connect
Important Capabilities
Capability | Status | Notes |
---|---|---|
Platform Instance | ✅ | Enabled by default |
This plugin extracts the following:
- Kafka Connect connector as individual
DataFlowSnapshotClass
entity - Creating individual
DataJobSnapshotClass
entity using{connector_name}:{source_dataset}
naming - Lineage information between source database to Kafka topic
Current limitations:
- works only for
- JDBC and Debezium source connectors
- BigQuery sink connector
Install the Plugin
pip install 'acryl-datahub[kafka-connect]'
Quickstart Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide
source:
type: "kafka-connect"
config:
# Coordinates
connect_uri: "http://localhost:8083"
cluster_name: "connect-cluster"
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
# Optional mapping of platform types to instance ids
platform_instance_map: # optional
mysql: test_mysql # optional
connect_to_platform_map: # optional
postgres-connector-finance-db: # optional - Connector name
postgres: core_finance_instance # optional - Platform to instance map
# Credentials
username: admin
password: password
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field | Required | Type | Description | Default |
---|---|---|---|---|
env | string | The environment that all assets produced by this connector belong to | PROD | |
platform_instance_map | Dict[str,string] | Platform instance mapping to use when constructing URNs. e.g.platform_instance_map: { "hive": "warehouse" } | ||
connect_uri | string | URI to connect to. | http://localhost:8083/ | |
username | string | Kafka Connect username. | None | |
password | string | Kafka Connect password. | None | |
cluster_name | string | Cluster to ingest from. | connect-cluster | |
construct_lineage_workunits | boolean | Whether to create the input and output Dataset entities | True | |
provided_configs | Array of object | Provided Configurations | None | |
connect_to_platform_map | Dict | Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map . e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" } | ||
connector_patterns | AllowDenyPattern (see below for fields) | regex patterns for connectors to filter for ingestion. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True, 'alphabet': '[A-Za-z0-9 _.-]'} | |
connector_patterns.allow | Array of string | List of regex patterns for process groups to include in ingestion | ['.*'] | |
connector_patterns.deny | Array of string | List of regex patterns for process groups to exclude from ingestion. | [] | |
connector_patterns.ignoreCase | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
connector_patterns.alphabet | string | Allowed alphabets pattern | [A-Za-z0-9 _.-] |
The JSONSchema for this configuration is inlined below.
{
"title": "KafkaConnectSourceConfig",
"description": "Any non-Dataset source that produces lineage to Datasets should inherit this class.\ne.g. Orchestrators, Pipelines, BI Tools etc.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance_map": {
"title": "Platform Instance Map",
"description": "Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { \"hive\": \"warehouse\" }`",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"connect_uri": {
"title": "Connect Uri",
"description": "URI to connect to.",
"default": "http://localhost:8083/",
"type": "string"
},
"username": {
"title": "Username",
"description": "Kafka Connect username.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Kafka Connect password.",
"type": "string"
},
"cluster_name": {
"title": "Cluster Name",
"description": "Cluster to ingest from.",
"default": "connect-cluster",
"type": "string"
},
"construct_lineage_workunits": {
"title": "Construct Lineage Workunits",
"description": "Whether to create the input and output Dataset entities",
"default": true,
"type": "boolean"
},
"connector_patterns": {
"title": "Connector Patterns",
"description": "regex patterns for connectors to filter for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true,
"alphabet": "[A-Za-z0-9 _.-]"
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"provided_configs": {
"title": "Provided Configs",
"description": "Provided Configurations",
"type": "array",
"items": {
"$ref": "#/definitions/ProvidedConfig"
}
},
"connect_to_platform_map": {
"title": "Connect To Platform Map",
"description": "Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either `platform_instance_map` or `connect_to_platform_map`. e.g.`connect_to_platform_map: { \"postgres-connector-finance-db\": \"postgres\": \"core_finance_instance\" }`",
"type": "object"
}
},
"additionalProperties": false,
"definitions": {
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns for process groups to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns for process groups to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
},
"alphabet": {
"title": "Alphabet",
"description": "Allowed alphabets pattern",
"default": "[A-Za-z0-9 _.-]",
"type": "string"
}
},
"additionalProperties": false
},
"ProvidedConfig": {
"title": "ProvidedConfig",
"type": "object",
"properties": {
"provider": {
"title": "Provider",
"type": "string"
},
"path_key": {
"title": "Path Key",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
},
"required": [
"provider",
"path_key",
"value"
],
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.kafka_connect.KafkaConnectSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack