Skip to main content

Pulsar

Integration Details

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our metadata ingestion guide.

Module pulsar

Incubating

Important Capabilities

CapabilityStatusNotes
DomainsSupported via the domain config field
Platform InstanceEnabled by default

PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
FieldRequiredTypeDescriptionDefault
envstringPROD
platformstringThe platform that this source connects toNone
platform_instancestringThe instance of the platform that all assets produced by this recipe belong toNone
web_service_urlstringThe web URL for the cluster.http://localhost:8080
timeoutintegerTimout setting, how long to wait for the Pulsar rest api to send data before giving up5
issuer_urlstringThe complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.None
client_idstringThe application's client IDNone
client_secretstringThe application's client secretNone
tokenstringThe access token for the application. Mandatory for token based authentication.None
verify_sslGeneric dictEither a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.True
exclude_individual_partitionsbooleanExtract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.True
tenantsArray of stringListing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role[]
oid_configDictPlaceholder for OpenId discovery document
stateful_ingestionPulsarSourceStatefulIngestionConfig (see below for fields)see Stateful Ingestion
stateful_ingestion.enabledbooleanThe type of the ingestion state provider registered with datahub.False
stateful_ingestion.max_checkpoint_state_sizeintegerThe maximum size of the checkpoint state in bytes. Default is 16MB16777216
stateful_ingestion.state_providerDynamicTypedStateProviderConfig (see below for fields)The ingestion state provider configuration.
stateful_ingestion.state_provider.type❓ (required if stateful_ingestion.state_provider is set)stringThe type of the state provider to use. For DataHub use datahubNone
stateful_ingestion.state_provider.configGeneric dictThe configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).None
stateful_ingestion.ignore_old_statebooleanIf set to True, ignores the previous checkpoint state.False
stateful_ingestion.ignore_new_statebooleanIf set to True, ignores the current checkpoint state.False
stateful_ingestion.remove_stale_metadatabooleanSoft-deletes the entities of type in the last successful run but missing in the current run with stateful_ingestion enabled.True
stateful_ingestion.fail_safe_thresholdnumberPrevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.95.0
tenant_patternsAllowDenyPattern (see below for fields)List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.{'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase': True}
tenant_patterns.allowArray of stringList of regex patterns to include in ingestion['.*']
tenant_patterns.denyArray of stringList of regex patterns to exclude from ingestion.[]
tenant_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
namespace_patternsAllowDenyPattern (see below for fields)List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.{'allow': ['.*'], 'deny': ['public/functions'], 'ignoreCase': True}
namespace_patterns.allowArray of stringList of regex patterns to include in ingestion['.*']
namespace_patterns.denyArray of stringList of regex patterns to exclude from ingestion.[]
namespace_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
topic_patternsAllowDenyPattern (see below for fields)List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.{'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase': True}
topic_patterns.allowArray of stringList of regex patterns to include in ingestion['.*']
topic_patterns.denyArray of stringList of regex patterns to exclude from ingestion.[]
topic_patterns.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True
domainDict[str, AllowDenyPattern]Domain patterns
domain.key.allowArray of stringList of regex patterns to include in ingestion['.*']
domain.key.denyArray of stringList of regex patterns to exclude from ingestion.[]
domain.key.ignoreCasebooleanWhether to ignore case sensitivity during pattern matching.True

NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Prerequisites

In order to ingest metadata from Apache Pulsar, you will need:

  • Access to a Pulsar Instance, if authentication is enabled a valid access token.
  • Pulsar version >= 2.7.0

NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack