Configuring Kafka Bridging

Kafka bridging enables Solace PubSub+ software event brokers to convert Solace messages that are placed in one or more queues into Kafka events and publish them to a remote Kafka cluster, and to consume Kafka events and publish them to a topic on the Solace event broker. For more information, see Kafka Bridging

To configure Kafka bridging, you need to create the following:

  1. A Kafka receiver—receives events from one or more Kafka topics, converts the events to Solace messages, and publishes them in Solace Message Format (SMF) to topics on the PubSub+ event broker.

  2. A Kafka sender—takes SMF messages from one or more queues, converts the messages to Kafka events, and publishes them to Kafka topics on the remote Kafka cluster.

In addition, you may want to change the maximum number of simultaneous Kafka broker connections the event broker supports for the current Message VPN. For more information, see Configuring the Maximum Number of Kafka Broker Connections.

The following sections describe how to configure a Kafka receiver and a Kafka sender. For a detailed example that walks through all the steps required to set up a working Kafka bridge scenario, see Kafka Bridging Example

For instructions to configure Kafka bridging in PubSub+ Broker Manager, see Configuring Kafka Bridging.

Any Kafka bridging configuration made in BETA versions (earlier than 10.6.1) used in standalone deployments will be discarded upon upgrade to version 10.6.1 and later. In addition, you must remove any BETA Kafka bridging configuration used in a redundant (HA) deployment before attempting to upgrade to 10.6.1 or later.

Configuring a Kafka Receiver

On the event broker, the accumulation and conversion of Kafka events to Solace messages is enabled by a Kafka receiver, which is configured at the Message VPN level. You can give the receiver any name. When you create a Kafka receiver for a Message VPN, the event broker automatically creates the following objects:

  • A single client for each Kafka receiver. This client publishes messages to the Solace message bus which have been received from the Kafka topics of all its topic bindings. This client's name is #kafka/rx/<rx-name>, and uses the client username #kafka/rx/<rx-name>. This client username uses the #kafka client profile and #acl-profile ACL profile.

  • A single client-profile, called #kafka, which is needed for all Kafka senders and receivers in the Message VPN. This profile is created when the first Kafka sender or receiver is created, and removed with the last Kafka sender or receiver.

For more information, see Kafka Receiver .

To create a Kafka receiver, enter the following commands:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# create kafka-receiver <name> 

To configure an existing Kafka receiver:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# kafka-receiver <name> 

To enable a Kafka receiver after it has been created:

solace(configure/message-vpn/kafka/kafka-receiver)# no shutdown

Where:

message-vpn <name> is the name of the Message VPN where you want to create the Kafka receiver.

kafka-receiver <name> is the name of the Kafka receiver.

The configuration tasks you can perform for a Kafka receiver include:

 

Configuring Authentication Schemes for Kafka Receivers

To configure an authentication scheme that the given Kafka receiver will use to establish a connection to the remote Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# authentication
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}

Where:

none specifies to login with no authentication. For more information, see None.

basic specifies to login with a username and password. For more information, see Basic Authentication.

scram specifies to login with SCRAM (Salted Challenge Response Authentication). For more information, see SCRAM Authentication

client-certificate specifies to login with a client TLS certificate. For more information, see Client Certificate Authentication.

kerberos specifies to login with the Kerberos mechanism. For more information, see Kerberos Authentication.

oauth-client specifies to login with OAuth 2.0 client credentials. For more information, see OAuth Client Authentication.

None

If no authentication scheme for a Kafka receiver is configured, the receiver will not use an authentication scheme. This may be useful for anonymous connections or when a Kafka receiver does not require authentication.

Basic Authentication

Using the basic authentication scheme, Kafka receivers can authenticate with a username and password combination. Credentials can either be transmitted using plain-text or encrypted with SSL.

To configure settings for a basic authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# basic
solace(configure/message-vpn/kafka/kafka-receiver/authentication/basic)# username <name> [password <password>]

Where:

<name> is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.

<password> is the password to be used with the specified username. The password may contain up to 255 characters.

The no version of the this command, no username, removes any configured username and password.

SCRAM Authentication

When using SCRAM authentication (RFC 5802), a Kafka receiver uses a challenge/response mechanism to authenticate a username and password with the remote Kafka cluster.

To configure settings for a SCRAM authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# scram
solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# hash {sha-256 | sha-512}
solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# username <name> [password <password>]

Where:

sha-256 specifies to use a SHA-2 256 bit hash for SCRAM authentication.

sha-512 specifies to use SHA-2 512 bit hash for SCRAM authentication. This is the default setting.

<name> is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.

<password> is the password to be used with the specified username. The password may contain up to 255 characters.

The no form of the hash command, no hash, returns the value to the default.

The no form of the username command, no username, removes any configured username and password.

Client Certificate Authentication

When using client certificate authentication, a Kafka receiver provides a certificate file to validate its identity. Client certificate authentication is only available to connections that use TLS/SSL (see Enabling TLS/SSL Encryption for Kafka Receiver Connections).

The client certificate installed here may also be used if the Kafka cluster requests it with other authentication schemes. If you configure a client certificate authentication scheme for the Kafka receiver, the receiver provides only this client certificate as a means of identifying itself. However, it is possible to provide a client certificate when using a basic, SCRAM, or OAuth authentication scheme as well.

To configure settings for a client certificate authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# client-certificate
solace(configure/message-vpn/kafka/kafka-receiver/authentication/client-certificate)# certificate <filename>

Where:

<filename> is the filename of the certificate file. The certificate file must be located in the certs folder of the event broker. Once a certificate is configured, a copy of it is saved internally. The file in the certs directory is no longer required.

The no version of this command, no certificate-file, removes the certificate association for the current Kafka receiver.

Kerberos Authentication

When using Kerberos authentication, a Kafka receiver must first authenticate with a Kerberos Authentication Server (AS) which grants the Kafka receiver a Ticket Granting Ticket (TGT). With a valid TGT, a Kafka receiver can attempt to authenticate with the remote Kafka broker using a service ticket obtained from the Ticket Granting Service (TGS). The AS and TGS (components of a Key Distribution Center (KDC)) are hosted on an external server or servers—not on the event broker.

To implement Kerberos authentication for a Kafka receiver, you must configure the following:

  1. The service name of the remote Kafka broker.

  2. The user principal name of the Kafka receiver.

  3. The keytab file for the Kafka receiver.

The Kerberos realm (including KDC address) that this Kafka receiver and the remote Kafka broker are part of must be configured before you can enable Kerberos authentication. For more information, see Managing Kerberos Realms.

To configure these settings, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# kerberos
solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# service-name <value>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# user-principal-name <value> <keytab-file>

Where:

service-name <value> is the Kerberos service name of the remote Kafka broker, not including /hostname@<REALM>.

user-principal-name <value> is the Kerberos user principal name of the Kafka receiver. This must include the @<REALM> suffix.

<keytab-file> is the filename of the Kerberos keytab file. The keytab file must be located in the keytabs folder of the event broker. These keytabs differ from those used in client authentication. Keytabs used for Kafka bridging authentication contain user principal names rather than service principal names, and apply to specific Message VPNs rather than globally.

OAuth Client Authentication

When using OAuth client authentication, a Kafka receiver obtains access tokens from an authorization server using the Client Credentials Grant flow (RFC 6749 §4.4), also known as two-legged OAuth. In this flow, a simple request with basic authentication is sent to the authorization server to get an access token. The Kafka receiver can then use the returned access token to make authenticated requests to the remote Kafka cluster until it expires (tokens are usually valid for an hour). When the token is near expiry, the Kafka consumer will automatically request another.

To enable OAuth client authentication, you must configure the client ID, client secret, and token endpoint that the Kafka receiver will use to request access tokens. Optionally, you can also configure the OAuth scope.

To configure these settings, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/authentication)# oauth-client
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-id <client-id>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-secret <client-secret>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# token-endpoint <token-endpoint>
solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# scope <scope>

Where:

client-id <client-id> is the OAuth client ID the Kafka receiver uses to login to the authorization server when requesting access tokens. The OAuth client ID may contain up to 200 characters.

client-secret <client-secret> is the OAuth client secret the Kafka receiver uses to login to the authorization server when requesting access tokens. The OAuth client secret may contain up to 512 characters.

token-endpoint <token-endpoint> is the OAuth token endpoint URL that the Kafka receiver uses to request a token for login to the remote Kafka cluster. The OAuth token endpoint may contain up to 2048 characters. In addition, the token endpoint parameter must use TLS, that is, it must start with https:// (case insensitive).

(Optional) scope <scope> is the OAuth scope. The OAuth scope may contain up to 200 characters.

Configuring Message Batch Handling for Kafka Receivers

To configure the delay that the event broker waits before accumulating a batch of messages from the Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# batch
solace(configure/message-vpn/kafka/kafka-receiver/batch)# delay <ms>

Where:

<ms> specifies the delay in milliseconds to wait before accumulating a batch of messages from the Kafka cluster. The valid range of values is 0-300000. The default is 500. This corresponds to the fetch.wait.max.ms Kafka consumer API parameter.

The no version of this command, no delay, resets the value to the default.

To configure the maximum size of a message batch, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# batch
solace(configure/message-vpn/kafka/kafka-receiver/batch)# max-size <bytes>

Where:

<bytes> specifies the maximum size of a message batch in bytes. The valid range of values is 1 to 100000000. the default is 1. This corresponds to the fetch.min.bytes Kafka consumer API parameter.

The no version of this command, no max-size, resets the value to the default.

Configuring the Bootstrap Address List for Kafka Receivers

A bootstrap address is the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. You can configure a list of these addresses for the Kafka receiver to try in the event that an attempt to connect to one address fails.

To configure the bootstrap address list for a Kafka receiver, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver)# bootstrap-addresses <address-list>

Where:

<address-list> is a comma-separated list of FQDNs or addresses (and optional ports) of brokers in the Kafka cluster from which the state of the entire cluster can be learned. IPv4 addresses must be specified in the dotted decimal notation form, nnn.nnn.nnn.nnn. IPv6 addresses must be enclosed in square brackets. The port is specified as a decimal value from 0 to 65535. For example, a correctly formatted IPv4 address is: 192.168.100.1:9092. The same address in IPv6 format is [::ffff:c0a8:6401]:9092. This corresponds to the bootstrap.servers Kafka consumer API parameter.

If a port is not provided with an address it will default to 9092.

The no form of this command, no bootstrap-addresses, removes the bootstrap address list from the Kafka receiver.

Configuring the Kafka Receiver Consumer Group

Consumer groups allow Kafka consumers to work together and process Kafka events from a topic in parallel. Each consumer in the same group is assigned a different subset of partitions from a Kafka topic or set of topics. Depending on your deployment, you may want to specify certain details of the consumer group a Kafka receiver belongs to.

To configure the consumer group, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver)# group

The configuration tasks you can perform for a Kafka receiver consumer group include:

Configuring the Kafka Consumer Group ID

To configure the Kafka consumer group ID for a Kafka receiver, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/group)# id <value>

Where:

<value> specifies the ID. This corresponds to the group.id Kafka consumer API parameter.

The no form of this command, no id, removes the consumer group ID specification.

Configuring the Kafka Consumer Group Keepalive Settings

To configure the keepalive interval for a Kafka consumer group, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/group)# interval <ms>

Where:

<ms> specifies the time (in milliseconds) between sending keepalives to the group. The valid range of values is 1 to 3600000. The default is 3000. This corresponds to the heartbeat.interval.ms Kafka consumer API parameter.

The no form of this command, no interval, resets the value to the default.

To configure the keepalive timeout for a Kafka consumer group, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/group)# timeout <ms>

Where:

<ms> specifies the time (in milliseconds) until unresponsive group members are removed, triggering a partition rebalance across other members of the group. The valid range of values is 1 to 3600000. The default is 45000. This corresponds to the session.timeout.ms Kafka consumer API parameter.

The no form of this command, no timeout, resets the value to the default.

Configuring the Kafka Consumer Group Membership Type 

To configure the membership type for a Kafka receiver consumer group, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/group)# membership-type {dynamic | static}

Where:

dynamic specifies dynamic membership. This is the default. If the group membership type is dynamic, the Kafka consumer API parameter group.instance.id is empty.

static specifies static membership. Static members can leave and rejoin the group (within the configured keepalive timeout value) without prompting a group rebalance. If the group membership type is static, the Kafka consumer API parameter group.instance.id is set to string <broker-name>/<vpn-name>/<receiver-name>, where <broker-name> is the AD-enabled router name, <vpn-name> is the Message VPN name, and <receiver-name> is the Kafka receiver name.

The no version of this command, no membership-type, resets the value to the default.

Configuring the Kafka Consumer Group Partition Scheme

To configure the partition scheme for a Kafka receiver consumer group, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/group)# partition-scheme <partition-scheme-list>

Where:

<partition-scheme-list> is an ordered, comma-separated list of schemes for partition assignment of the consumer group for this receiver. Both Eager ("range, roundrobin") and Cooperative ("cooperative-sticky") schemes are supported. The elected group leader will choose the first common strategy provided by all members of the group. Eager and Cooperative schemes must not be mixed. For more information on these schemes, see the documentation for your Kafka implementation. The default partition scheme is ("range, roundrobin"). This corresponds to the partition.assignment.strategy Kafka consumer API parameter.

The no version of this command, no partition-scheme, resets the value to the default.

Configuring Kafka Topic Metadata Handling for Kafka Receivers

To exclude certain topic metadata during refreshes, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# metadata
solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic
solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# exclude <regex-list>

Where:

<regex-list> is a comma-separated list of regular expressions (including POSIX.2 regular expressions). Any matching topic names will be ignored in the broker metadata. Note that each regular expression needs to start with the ^ character otherwise it will be interpreted as a literal topic name.

The no form of this command, no exclude, removes all configured expressions.

To configure the time between refreshes of topic metadata from the Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# metadata
solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic
solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# refresh-interval <ms>

Where:

<ms> specifies the time (in milliseconds) between refreshes of topic metadata from the Kafka cluster. The valid range of values is 1000 to 3600000. the default is 30000. This corresponds to the topic.metadata.refresh.interval.ms Kafka consumer API parameter.

The no form of this command, no refresh-interval, resets the value to the default.

Configuring Topic Bindings for Kafka Receivers

Each topic binding names the Kafka topic from which messages are drawn, and includes attributes which dictate how messages from that Kafka topic are to be sent to the Solace PubSub+ software event broker.

To create a topic binding to receive messages from a remote Kafka topic, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver)# create topic-binding <kafka-topic-or-regex> 

Where:

<kafka-topic-or-regex> is the name of the Kafka topic you want this receiver to obtain messages from, or a regex pattern (including POSIX.2 regular expressions) to receive from multiple Kafka topics. Note that each regular expression needs to start with the ^ character otherwise it will be interpreted as a literal topic name.

To enable a topic binding, enter the following command:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# no shutdown

The configuration tasks you can perform for a Kafka receiver topic binding include:

Configuring the Initial Offset to Consume from the Kafka Topic

You can configure the initial offset that the Kafka receiver consumes from the Kafka topic if no member of the consumer group has consumed and committed any offset already, or if the last committed offset has been deleted. Offsets are unique per Kafka partition.

To configure the initial offset that the Kafka receiver consumes from the Kafka topic, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# initial-offset {beginning | end}

Where:

beginning specifies to start with the earliest offset available.

end specifies to start with new offsets only. This is the default value.

The initial offset corresponds to the auto.offset.reset Kafka consumer API parameter.

The no form of this command, no initial-offset, resets the value to the default.

Configuring Partition Key Generation

When messages are received from Kafka topics, you can configure the topic binding to generate a partition key for each message. This is useful for determining which queue partition a message is sent to. For more information, see Message Distribution with Partitioned Queues

To configure how the topic binding generates a partition key for each message received from a Kafka topic, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# key <key-expression>

Where:

<key-expression> specifies the substitution expression used to generate the partition key for each message received from the Kafka topic. For more information, see Substitution Expressions Overview. If no value is configured, no key is included for each message as it is published into the event broker.

Configuring SMF Topic Generation

When messages are received from Kafka topics they are published to SMF topics based on an expression you configure when you create a topic binding.

To configure how the topic binding generates the SMF topic for each message received from a Kafka topic, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# topic <topic-expression>

Where:

<topic-expression> specifies the substitution expression used to generate the SMF topic for each message received from the Kafka topic. For more information, see Substitution Expressions Overview. This expression can include data extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. If no value is configured, the Topic Binding will not be operational.

Enabling TLS/SSL Encryption for Kafka Receiver Connections

To enable TLS/SSL encryption for Kafka receiver connections, enter the following commands:

solace(configure/message-vpn/kafka/kafka-receiver)# transport
solace(configure/message-vpn/kafka/kafka-receiver/transport)# ssl

The following table describes how the combination of TLS/SSL encryption and authentication scheme settings correspond to the security.protocol Kafka consumer API parameter:

TLS/SSL Encryption Authentication Scheme Value for security.protocol Parameter
no ssl none or client-certificate plaintext
ssl none or client-certificate ssl
no ssl basic, scram, or oauth-client sasl_plaintext
ssl basic, scram, or oauth-client sasl_ssl

The no form of this command, no ssl, disables TLS/SSL encryption.

Configuring a Kafka Sender

On the event broker, conversion from Solace messages to Kafka events and propagation of those events to a remote Kafka cluster is enabled by a Kafka sender, which is configured at the Message VPN level. You can give the sender any name. When you create a Kafka sender for a Message VPN, the event broker automatically creates the following objects:

  • A single client for each Kafka sender. This client binds to the queues needed by the queue bindings of its sender. This client's name is #kafka/tx/<tx-name>, and uses the client username #kafka/tx/<tx-name>. This client username uses the #kafka client-profile and #acl-profile acl-profile.

  • A single client-profile, called #kafka, which is needed for all Kafka senders and receivers in the Message VPN. This profile is created when the first Kafka sender or receiver is created, and removed with the last Kafka sender or receiver.

 

For more information, see Kafka Sender.

To create a Kafka sender, enter the following commands:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# create kafka-sender <name> 

To configure an existing Kafka sender:

solace(configure)# message-vpn <name>
solace(configure/message-vpn)# kafka
solace(configure/message-vpn/kafka)# kafka-sender <name> 

To enable a Kafka sender after it has been created:

solace(configure/message-vpn/kafka/kafka-sender)# no shutdown

Where:

message-vpn <name> is the name of the Message VPN where you want to create the Kafka sender.

kafka-sender <name> is the name of the Kafka sender.

The configuration tasks you can perform for a Kafka sender include:

Configuring Authentication Schemes for Kafka Senders

To configure an authentication scheme that the given Kafka sender will use to establish a connection to the remote Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# authentication
solace(configure/message-vpn/kafka/kafka-sender/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}

Where:

none specifies to login with no authentication. For more information, see None.

basic specifies to login with a username and password. For more information, see Basic Authentication.

scram specifies to login with SCRAM (Salted Challenge Response Authentication). For more information, see SCRAM Authentication

client-certificate specifies to login with a client TLS certificate. For more information, see Client Certificate Authentication.

kerberos specifies to login with the Kerberos mechanism. For more information, see Kerberos Authentication.

oauth-client specifies to login with OAuth 2.0 client credentials. For more information, see OAuth Client Authentication.

None

If no authentication scheme for a Kafka sender is configured, the sender will not use an authentication scheme. This may be useful for anonymous connections or when a Kafka sender does not require authentication.

Basic Authentication

Using the basic authentication scheme, Kafka senders can authenticate with a username and password combination. Credentials can either be transmitted using plain-text or encrypted with SSL.

To configure settings for a basic authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# basic
solace(configure/message-vpn/kafka/kafka-sender/authentication/basic)# username <name> [password <password>]

Where:

<name> is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.

<password> is the password to be used with the specified username. The password may contain up to 255 characters.

The no version of the this command, no username, removes any configured username and password.

SCRAM Authentication

When using SCRAM authentication (RFC 5802), a Kafka sender uses a challenge/response mechanism to authenticate a username and password with the remote Kafka cluster.

To configure settings for a SCRAM authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# scram
solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# hash {sha-256 | sha-512}
solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# username <name> [password <password>]

Where:

sha-256 specifies to use a SHA-2 256 bit hash for SCRAM authentication.

sha-512 specifies to use SHA-2 512 bit hash for SCRAM authentication. This is the default setting.

<name> is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.

<password> is the password to be used with the specified username. The password may contain up to 255 characters.

The no form of the hash command, no hash, returns the value to the default.

The no form of the username command, no username, removes any configured username and password.

Client Certificate Authentication

When using client certificate authentication, a Kafka sender provides a certificate file to validate its identity. Client certificate authentication is only available to connections that use TLS/SSL (see Enabling TLS/SSL Encryption for Kafka Receiver Connections).

The client certificate installed here may also be used if the Kafka cluster requests it with other authentication schemes. If you configure a client certificate authentication scheme for the Kafka sender, the sender provides only this client certificate as a means if identifying itself. However, it is possible to provide a client certificate when using a basic or SCRAM authentication scheme as well.

To configure settings for a client certificate authentication scheme, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# client-certificate
solace(configure/message-vpn/kafka/kafka-sender/authentication/client-certificate)# certificate <filename>

Where:

<filename> is the filename of the certificate file. The certificate file must be located in the certs folder of the event broker. Once a certificate is configured, a copy of it is saved internally. The file in the certs directory is no longer required.

The no version of this command, no certificate-file, removes the certificate association for the current Kafka sender.

Kerberos Authentication

When using Kerberos authentication, a Kafka sender must first authenticate with a Kerberos Authentication Server (AS) which grants the Kafka sender a Ticket Granting Ticket (TGT). With a valid TGT, a Kafka sender can attempt to authenticate with the remote Kafka broker using a service ticket obtained from the Ticket Granting Service (TGS). The AS and TGS (components of a Key Distribution Center (KDC)) are hosted on an external server or servers—not on the event broker.

To implement Kerberos authentication for a Kafka sender, you must configure the following:

  1. The service name of the remote Kafka broker.

  2. The user principal name of the Kafka sender.

  3. The keytab file for the Kafka sender.

The Kerberos realm (including KDC address) that this Kafka sender and the remote Kafka broker are part of must be configured before you can enable Kerberos authentication. For more information, see Managing Kerberos Realms.

To configure these settings, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# kerberos
solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# service-name <value>
solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# user-principal-name <value> <keytab-file>

Where:

service-name <value> is the Kerberos service name of the remote Kafka broker, not including /hostname@<REALM>.

user-principal-name <value> is the Kerberos user principal name of the Kafka sender. This must include the @<REALM> suffix.

<keytab-file> is the filename of the Kerberos keytab file. The keytab file must be located in the keytabs folder of the event broker. These keytabs differ from those used in client authentication. Keytabs used for Kafka bridging authentication contain user principal names rather than service principal names, and apply to specific Message VPNs rather than globally.

OAuth Client Authentication

When using OAuth client authentication, a Kafka sender obtains access tokens from an authorization server using the Client Credentials Grant flow (RFC 6749 §4.4), also known as two-legged OAuth. In this flow, a simple request with basic authentication is sent to the authorization server to get an access token. The Kafka sender can then use the returned access token to make authenticated requests to the remote Kafka cluster until it expires (tokens are usually valid for an hour). When the token is near expiry, the Kafka sender will automatically request another.

To enable OAuth client authentication, you must configure the client ID, client secret, and token endpoint that the Kafka sender will use to request access tokens. Optionally, you can also configure the OAuth scope.

To configure these settings, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/authentication)# oauth-client
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-id <client-id>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-secret <client-secret>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# token-endpoint <token-endpoint>
solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# scope <scope>

Where:

client-id <client-id> is the OAuth client ID the Kafka sender uses to login to the authorization server when requesting access tokens. The OAuth client ID may contain up to 200 characters.

client-secret <client-secret> is the OAuth client secret the Kafka sender uses to login to the authorization server when requesting access tokens. The OAuth client secret may contain up to 512 characters.

token-endpoint <token-endpoint> is the OAuth token endpoint URL that the Kafka sender uses to request a token for login to the remote Kafka cluster. The OAuth token endpoint may contain up to 2048 characters. In addition, the token endpoint parameter must use TLS, that is, it must start with https:// (case insensitive).

(Optional) scope <scope> is the OAuth scope. The OAuth scope may contain up to 200 characters.

Configuring Message Batch Handling for Kafka Senders

To configure the delay that the Kafka sender waits before accumulating a batch of messages to send to the Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# delay <ms>

Where:

<ms> specifies the delay in milliseconds to wait before accumulating a batch of messages to send to the Kafka cluster. The valid range of values is 0 to 90000. The default is 5. This corresponds to the queue.buffering.max.ms Kafka producer API parameter.

The no version of this command, no delay, resets the value to the default.

To configure the maximum number of messages in a batch, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# max-messages <count>

Where:

<count> specifies the maximum number of messages to send to the Kafka cluster in a single batch. The valid range of values is 1 to 1000000. The default is 10000. This corresponds to the batch.num.messages Kafka producer API parameter.

The no version of this command, no max-messages, resets the value to the default.

To configure the maximum size of a message batch, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# batch
solace(configure/message-vpn/kafka/kafka-sender/batch)# max-size <bytes>

Where:

<bytes> specifies the maximum message size of a message batch in bytes. The valid range of values is 1 to 2147483647. the default is 1000000. This corresponds to the batch.size Kafka producer API parameter.

The no version of this command, no max-size, resets the value to the default.

Configuring the Bootstrap Address List for Kafka Senders

A bootstrap address is the fully qualified domain name or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka sender can fetch the state of the entire cluster. You can configure a list of these addresses for the Kafka sender to try in the event that an attempt to connect to one address fails.

To configure the bootstrap address list for a Kafka sender, enter the following command:

solace(configure/message-vpn/kafka/kafka-sender)# bootstrap-addresses <address-list>

Where:

<address-list> is a comma-separated list of FQDNs or addresses (and optional ports) of brokers in the Kafka cluster from which the state of the entire cluster can be learned. IPv4 addresses must be specified in the dotted decimal notation form, nnn.nnn.nnn.nnn. IPv6 addresses must be enclosed in square brackets. The port is specified as a decimal value from 0 to 65535. For example, a correctly formatted IPv4 address is: 192.168.100.1:9092. The same address in IPv6 format is [::ffff:c0a8:6401]:9092. This corresponds to the bootstrap.servers Kafka producer API parameter.

If a port is not provided with an address it will default to 9092.

The no form of this command, no bootstrap-addresses, removes the bootstrap address list from the Kafka sender.

Configuring Compression for Kafka Senders

To configure the compression type the Kafka sender uses when propagating messages, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# compression
solace(configure/message-vpn/kafka/kafka-sender/compression)# type {gzip | snappy | lz4 | zstd}

Where:

gzip specifies to use Gzip compression. This is the default.

snappy specifies to use Snappy compression.

lz4 specifies to use LZ4 compression.

zstd specifies to use Zstandard compression.

The compression type corresponds to the compression.codec Kafka producer API parameter.

The no version of this command, no compression, resets the value to the default.

To configure the compression level the Kafka sender uses when propagating messages, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# compression
solace(configure/message-vpn/kafka/kafka-sender/compression)# level 

Where:

<value> specifies the compression level. The valid range of values depends on the compression type:

The compression level corresponds to the compression.level Kafka producer API parameter.

The no version of this command, no compression, resets the value to the default.

  • -1 means use the codec-dependent default compression level and is always valid. This is the default.

  • 0-9 is valid for the gzip compression codec.

  • 0 is valid for the snappy compression codec.

  • 0-12 is valid for the lz4 compression codec.

  • 0-22 is valid for the zstd compression codec.

To enable compression, see Enabling Compression for Kafka Sender Connections.

A Kafka sender is blocked from being operational if you configure it to use both encryption and compression, while crime-exploit-protection is enabled (see Configuring CRIME Exploit Protection).

Enabling Idempotence for Kafka Senders

Idempotence guarantees in order at-least-once message delivery to the remote Kafka topic, at the expense of performance. It is disabled by default. If you enable idempotence, each queue binding configured for the Kafka sender must have the acknowledgment mode set to all to be operational.

To enable idempotence for a Kafka sender, enter the following command:

solace(configure/message-vpn/kafka/kafka-sender)# idempotence

This corresponds to the enable.idempotence Kafka producer API parameter.

The no version of this command, no idempotence disables it.

When you enable idempotence:

  • The Kafka sender sends an increasing sequence number with every message.

  • The remote Kafka broker acknowledges each message.

  • The Kafka broker remembers the largest sequence number it has written for each Kafka sender.

  • The Kafka broker discards any message received with a sequence number less than the largest written.

If idempotence is disabled, the Kafka sender is free to resend messages to the Kafka broker because of timeouts, leader changes, and so on. In this case, message duplication and/or reordering may occur.

Configuring Queue Bindings for Kafka Senders

Each binding names the queue from which messages are drawn, and includes attributes which dictate how those messages are to be sent to Kafka.

To create a queue binding for a Kafka sender, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# create queue-binding <queue-name>

To enable a queue binding:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# no shutdown

Where:

<queue-name> is the name of a queue in the given Message VPN that the Kafka sender binds to.

The configuring tasks you can perform for a Kafka sender queue binding include:

Configuring the Acknowledgment Mode

To configure the number of acknowledgments that this queue binding requires from the remote Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# ack-mode {none | one | all}

Where:

none specifies that no acknowledgements from the remote Kafka cluster are required. If configured, messages are delivered at-most-once.

one specifies that one acknowledgement from the remote Kafka cluster is required. If configured, messages are delivered at-least-once but may be reordered.

all specifies that all replicas on the remote Kafka cluster need to acknowledge the message. If configured, messages are delivered at-least-once but may be reordered. This is the default setting.

The acknowledgment mode corresponds to the request.required.acks Kafka producer API parameter.

The no form of this command, no ack-mode, resets the value to the default.

Configuring the Kafka Partition Selection Scheme

To configure the partition selection scheme that the given queue binding will use when publishing to the remote Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# scheme {consistent | explicit | random}

Where:

consistent specifies to use a consistent partition selection scheme. For more information, see Consistent Partition Selection.

explicit specifies to use an explicit partition selection scheme. For more information, see Explicit Partition Selection.

random specifies to use a random partition selection scheme. For more information, see Random Partition Selection.

Consistent Partition Selection

When using a consistent partition selection scheme, the queue binding selects a Kafka partition based on a hash of the Kafka partition key generated by the Kafka sender (see Configuring Kafka Partition Key Generation). If no key is available for the message, by default, a random selection scheme is used as a fallback. You can disable this fallback, and if you do a single partition is selected for all unkeyed messages (see Random Partition Selection).

To configure the hash algorithm used to select the partition, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# consistent 
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/consistent)# hash {crc | murmur2 | fnv1a}

Where:

crc specifies to use CRC Hash. This is the default setting.

murmur2 specifies to use Murmur2 Hash.

fnv1a specifies to use Fowler-Noll-Vo 1a Hash.

The no version of this command, no hash, resets the value to the default.

Explicit Partition Selection

When using an explicit partition selection scheme, the queue binding selects a Kafka partition based explicitly on the number you specify.

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# explicit 
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/explicit)# number <value>

Where:

<value> is the Kafka partition number you want the queue binding to use when publishing messages.

The no version of this command, no number, resets the value to the default.

Random Partition Selection

When using a random partition selection scheme, the queue binding selects a random Kafka partition. By default, this partition selection scheme is used as a fallback in cases where a consistent partition selection scheme is being used but no partition key is available for the message. If you disable this fallback, a single partition is selected for all unkeyed messages.

To disable random partition selection as a fallback, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# random 
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# no fallback

To enable the fallback if disabled, enter the following command.

solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# fallback

Configuring Kafka Partition Key Generation

To configure how the queue binding generates Kafka partition keys for each message sent to the remote Kafka cluster, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# key <key-expression>

Where:

<key-expression> specifies the substitution expression used to generate the Kafka partition key for each message sent to Kafka. For more information about substitution expressions, see Substitution Expressions Overview. This expression can include fields extracted from the metadata of each individual SMF message as it is taken from the queue. If empty, no key is included for each message as it is published into Kafka.

Configuring SMF to Kafka Topic Mapping

To configure the topic on the Kafka cluster that this queue binding sends each message to, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote 
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# topic <kafka-topic>

Where:

<kafka-topic> is the Kafka topic on the Kafka cluster to send each message taken from the queue to. If no topic is configured, the queue binding will not be operational.

Enabling TLS/SSL Encryption for Kafka Sender Connections

To enable TLS/SSL encryption for Kafka sender connections, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# transport
solace(configure/message-vpn/kafka/kafka-sender/transport)# ssl

For more information describing how the combination of TLS/SSL encryption and authentication scheme settings correspond to the security.protocol Kafka producer API parameter, see Enabling TLS/SSL Encryption for Kafka Receiver Connections.

The no form of this command, no ssl, disables TLS/SSL encryption.

Enabling Compression for Kafka Sender Connections

By default, Kafka sender connections do not have compression enabled. You can enable compression according to the type and level configured for the sender (see Configuring Compression for Kafka Senders).

To enable compression for Kafka sender connections, enter the following commands:

solace(configure/message-vpn/kafka/kafka-sender)# transport
solace(configure/message-vpn/kafka/kafka-sender/transport)# compressed

The no form of this command, no compressed, disables compression.

Configuring the Maximum Number of Kafka Broker Connections

By default, the maximum number of Kafka broker connections supported for a Message VPN is the same as the maximum supported for the system. Depending on your deployment, you may want to change this behavior so that one Message VPN does not consume all of the connections that the entire system supports.

To configure the maximum number of simultaneous Kafka broker connections for this Message VPN, enter the following command:

solace(configure/message-vpn/kafka)# max-broker-connections <value>

Where:

<value> is the integer value specifying the maximum total number of Kafka broker connections permitted for the Message VPN.

the no version of this command, no max-broker-connections, returns the value to the default (which is the maximum value supported by the platform).

  • To view the maximum total number of Kafka broker connections that the Solace PubSub+ event broker can support, enter the show kafka User EXEC command.
  • This is a global/read-write parameter, and is chosen by the system administrator, not the Message VPN administrator.