Kafka
Prerequisites
To better understand how Nussknacker works with Kafka, it's recommended to read the following first:
If you want to use Flink engine, this is also recommended:
Concepts
Sources and sinks
Kafka topics are native streaming data input to Nussknacker and the native output where results of Nussknacker scenarios processing are placed. In Nussknacker terminology input topics are handled by source components, output topics are handled by sink components. This section provides important details of Nussknacker's integration with Kafka and Schema Registry.
Schemas
Schema defines the format of data. Nussknacker expects that messages in topics are described by the schema. Nussknacker uses information contained in schemas for code completion and validation of messages. Schema of message can be described in Avro Schema format or JSON Schema format (Confluent Schema Registry only)
Schemas are managed by Schema Registry - Confluent Schema Registry and Azure Schema Registry are supported.
To preview schemas or add a new version, you can use tools available on your cloud platform or tools like AKHQ
Association between schema with topic
To properly present information about topics and version and to recognize which schema is assigned to version, Nussknacker follow conventions:
- For Confluent-based implementation it uses TopicNameStrategy for subject names.
It means that it looks for schemas available at
<topic-name>-(key or value)
subject. For example for topictransactions
, it looks for schemas attransactions-key
subject for key andtransactions-value
subject for value - In the Azure Schema Registry, subject concept doesn't exist - schemas are grouped by the same schema name. Because of that, Nussknacker introduces
own convention for association between schema and topic: schema name should be in format:
CamelCasedTopicNameKey
for keys andCamelCasedTopicNameValue
for values. For example forinput-events
topic, schema name should be namedInputEventsKey
for key orInputEventsValue
for value. Be aware that it may require change of schema name not only in Azure portal but also inside schema content - those names should be the same to make serialization works correctly
Connection and Authentication Configuration
Under the hood Nussknacker uses kafkaProperties
to configure standard kafka client. It means that all standard Kafka client properties will be respected.
For detailed instruction where it should be placed inside Nussknacker's configuration, take a look at Configuration details section
Kafka - Connection
To configure connection to kafka, you need to configure at least bootstrap.servers
property. It should contain comma separated list of urls to Kafka brokers.
Kafka - Authentication
Kafka cluster has multiple options to configure Authentication. Take a look at Kafka security documentation
to see detailed examples how those options should be translated into properties. For example for the typical SASL_SSL
configuration with
credential in JAAS
format, you should provide configuration similar to this one:
kafkaProperties {
"schema.registry.url": "http://schemaregistry:8081"
"bootstrap.servers": "broker1:9092,broker2:9092"
"security.protocol": "SASL_SSL"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"some_user\" password=\"some_user\";"
}
If you use Azure Events Hubs (which uses this mode), username will be $ConnectionString
and password will be the connection string starting with Endpoint=sb://
.
In case if you use your own CA and client+server certificates authentication, you should additionally provide:
ssl.keystore.location
, ssl.keystore.password
, ssl.key.password
, ssl.truststore.location
, ssl.truststore.password
.
To make sure if your configuration is correct, you can test it with standard kafka-cli commands
like kafka-console-consumer
, kafka-console-producer
or kcat.
Some tutorials how to do that:
After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.
Schema Registry - Connection
Currently, Nussknacker supports two implementations of Schema Registries: based on Confluent Schema Registry and based on Azure Schema Registry.
To configure connection Schema Registry, you need to configure at least schema.registry.url
. It should contain comma separated list of urls to Schema Registry.
For the single node installation, it will be just an url. Be aware that contrary to Kafka brokers, Schema Registry urls should start with https://
or http://
.
Nussknacker determines which registry implementation (Confluent or Azure) is used from the schema.registry.url
property.
If the URL ends with .servicebus.windows.net
, Nussknacker assumes that Azure schema registry is used; if not Confluent schema registry is assumed.
Confluent-based Schema Registry - Connection and Authentication
For Confluent-based implementation you should provide at least schema.registry.url
. If your schema registry is secured
by user and password, you should additionally provide "basic.auth.credentials.source": USER_INFO
and "basic.auth.user.info": "some_user:some_password"
entries.
To read more see Schema registry documentation
To make sure if your configuration is correct, you can test it with kafka-avro-console-consumer
, kafka-avro-console-producer
available
in Confluent Schema Registry distribution. After you'll get properly working set of properties, you just need to copy it to Nussknacker's configuration.
Azure-based Schema Registry - Connection and Authentication
For Azure-based implementation, firstly you should provide schema.registry.url
and schema.group
properties. First one should be the
https://<event-hubs-namespace>.servicebus.windows.net
url, the second one should be the name of schema groups where will be
located all schemas used by Nussknacker.
Regarding authentication, a couple of options can be used - you can provide credential via:
azure.tenant.id
, azure.client.id
and azure.client.secret
properties, or you can use one of other methods handled
by Azure's DefaultAzureCredential.
For example via Azure CLI or Azure PowerShell.
Messaging
You can use standard kafka-cli commands like kafka-console-consumer
, kafka-console-producer
, kcat, Confluent's
kafka-avro-console-consumer
, kafka-avro-console-producer
commands for Confluent-based Avro encoded messages or graphical tools like AKHQ
to interact with kafka source and sink topics used in Nu scenarios.
Be aware that Azure-based Avro encoded messages have a little different format than Confluent - Schema ID is passed in headers instead of payload. It can be less supported by some available tools. See Schema Registry comparison section for details.