Configuring Kafka Bridging
Kafka bridging enables Solace PubSub+ software event brokers to convert Solace messages that are placed in one or more queues into Kafka events and publish them to a remote Kafka cluster, and to consume Kafka events and publish them to a topic on the Solace event broker. For more information, see Kafka Bridging
To configure Kafka bridging, you need to create the following:
-
A Kafka receiver—receives events from one or more Kafka topics, converts the events to Solace messages, and publishes them in Solace Message Format (SMF) to topics on the PubSub+ event broker.
-
A Kafka sender—takes SMF messages from one or more queues, converts the messages to Kafka events, and publishes them to Kafka topics on the remote Kafka cluster.
In addition, you may want to change the maximum number of simultaneous Kafka broker connections the event broker supports for the current Message VPN. For more information, see Configuring the Maximum Number of Kafka Broker Connections.
The following sections describe how to configure a Kafka receiver and a Kafka sender. For a detailed example that walks through all the steps required to set up a working Kafka bridge scenario, see Kafka Bridging Example
For instructions to configure Kafka bridging in PubSub+ Broker Manager, see Configuring Kafka Bridging.
Configuring a Kafka Receiver
On the event broker, the accumulation and conversion of Kafka events to Solace messages is enabled by a Kafka receiver, which is configured at the Message VPN level. You can give the receiver any name. When you create a Kafka receiver for a Message VPN, the event broker automatically creates the following objects:
-
A single client for each Kafka receiver. This client publishes messages to the Solace message bus which have been received from the Kafka topics of all its topic bindings. This client's name is
#kafka/rx/<rx-name>
, and uses the client username#kafka/rx/<rx-name>
. This client username uses the#kafka
client profile and#acl-profile
ACL profile. -
A single client-profile, called
#kafka
, which is needed for all Kafka senders and receivers in the Message VPN. This profile is created when the first Kafka sender or receiver is created, and removed with the last Kafka sender or receiver.
For more information, see Kafka Receiver .
To create a Kafka receiver, enter the following commands:
solace(configure)# message-vpn <name> solace(configure/message-vpn)# kafka solace(configure/message-vpn/kafka)# create kafka-receiver <name>
To configure an existing Kafka receiver:
solace(configure)# message-vpn <name> solace(configure/message-vpn)# kafka solace(configure/message-vpn/kafka)# kafka-receiver <name>
To enable a Kafka receiver after it has been created:
solace(configure/message-vpn/kafka/kafka-receiver)# no shutdown
Where:
message-vpn <name>
is the name of the Message VPN where you want to create the Kafka receiver.
kafka-receiver <name>
is the name of the Kafka receiver.
The configuration tasks you can perform for a Kafka receiver include:
Configuring Authentication Schemes for Kafka Receivers
To configure an authentication scheme that the given Kafka receiver will use to establish a connection to the remote Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# authentication solace(configure/message-vpn/kafka/kafka-receiver/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}
Where:
none
specifies to login with no authentication. For more information, see None.
basic
specifies to login with a username and password. For more information, see Basic Authentication.
scram
specifies to login with SCRAM (Salted Challenge Response Authentication). For more information, see SCRAM Authentication
client-certificate
specifies to login with a client TLS certificate. For more information, see Client Certificate Authentication.
kerberos
specifies to login with the Kerberos mechanism. For more information, see Kerberos Authentication.
oauth-client
specifies to login with OAuth 2.0 client credentials. For more information, see OAuth Client Authentication.
None
If no authentication scheme for a Kafka receiver is configured, the receiver will not use an authentication scheme. This may be useful for anonymous connections or when a Kafka receiver does not require authentication.
Basic Authentication
Using the basic authentication scheme, Kafka receivers can authenticate with a username and password combination. Credentials can either be transmitted using plain-text or encrypted with SSL.
To configure settings for a basic authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# basic solace(configure/message-vpn/kafka/kafka-receiver/authentication/basic)# username <name> [password <password>]
Where:
<name>
is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.
<password>
is the password to be used with the specified username. The password may contain up to 255 characters.
The no version of the this command, no username
, removes any configured username and password.
SCRAM Authentication
When using SCRAM authentication (RFC 5802), a Kafka receiver uses a challenge/response mechanism to authenticate a username and password with the remote Kafka cluster.
To configure settings for a SCRAM authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# scram solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# hash {sha-256 | sha-512} solace(configure/message-vpn/kafka/kafka-receiver/authentication/scram)# username <name> [password <password>]
Where:
sha-256
specifies to use a SHA-2 256 bit hash for SCRAM authentication.
sha-512
specifies to use SHA-2 512 bit hash for SCRAM authentication. This is the default setting.
<name>
is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.
<password>
is the password to be used with the specified username. The password may contain up to 255 characters.
The no form of the hash
command, no hash
, returns the value to the default.
The no form of the username
command, no username
, removes any configured username and password.
Client Certificate Authentication
When using client certificate authentication, a Kafka receiver provides a certificate file to validate its identity. Client certificate authentication is only available to connections that use TLS/SSL (see Enabling TLS/SSL Encryption for Kafka Receiver Connections).
The client certificate installed here may also be used if the Kafka cluster requests it with other authentication schemes. If you configure a client certificate authentication scheme for the Kafka receiver, the receiver provides only this client certificate as a means of identifying itself. However, it is possible to provide a client certificate when using a basic, SCRAM, or OAuth authentication scheme as well.
To configure settings for a client certificate authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# client-certificate solace(configure/message-vpn/kafka/kafka-receiver/authentication/client-certificate)# certificate <filename>
Where:
<filename>
is the filename of the certificate file. The certificate file must be located in the certs
folder of the event broker. Once a certificate is configured, a copy of it is saved internally. The file in the certs
directory is no longer required.
The no version of this command, no certificate-file
, removes the certificate association for the current Kafka receiver.
Kerberos Authentication
When using Kerberos authentication, a Kafka receiver must first authenticate with a Kerberos Authentication Server (AS) which grants the Kafka receiver a Ticket Granting Ticket (TGT). With a valid TGT, a Kafka receiver can attempt to authenticate with the remote Kafka broker using a service ticket obtained from the Ticket Granting Service (TGS). The AS and TGS (components of a Key Distribution Center (KDC)) are hosted on an external server or servers—not on the event broker.
To implement Kerberos authentication for a Kafka receiver, you must configure the following:
-
The service name of the remote Kafka broker.
-
The user principal name of the Kafka receiver.
-
The keytab file for the Kafka receiver.
The Kerberos realm (including KDC address) that this Kafka receiver and the remote Kafka broker are part of must be configured before you can enable Kerberos authentication. For more information, see Managing Kerberos Realms.
To configure these settings, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# kerberos solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# service-name <value> solace(configure/message-vpn/kafka/kafka-receiver/authentication/kerberos)# user-principal-name <value> <keytab-file>
Where:
service-name <value>
is the Kerberos service name of the remote Kafka broker, not including /hostname@<REALM>
.
user-principal-name <value>
is the Kerberos user principal name of the Kafka receiver. This must include the @<REALM>
suffix.
<keytab-file>
is the filename of the Kerberos keytab file. The keytab file must be located in the keytabs
folder of the event broker. These keytabs differ from those used in client authentication. Keytabs used for Kafka bridging authentication contain user principal names rather than service principal names, and apply to specific Message VPNs rather than globally.
OAuth Client Authentication
When using OAuth client authentication, a Kafka receiver obtains access tokens from an authorization server using the Client Credentials Grant flow (RFC 6749 §4.4), also known as two-legged OAuth. In this flow, a simple request with basic authentication is sent to the authorization server to get an access token. The Kafka receiver can then use the returned access token to make authenticated requests to the remote Kafka cluster until it expires (tokens are usually valid for an hour). When the token is near expiry, the Kafka consumer will automatically request another.
To enable OAuth client authentication, you must configure the client ID, client secret, and token endpoint that the Kafka receiver will use to request access tokens. Optionally, you can also configure the OAuth scope.
To configure these settings, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/authentication)# oauth-client solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-id <client-id> solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# client-secret <client-secret> solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# token-endpoint <token-endpoint> solace(configure/message-vpn/kafka/kafka-receiver/authentication/oauth-client)# scope <scope>
Where:
client-id <client-id>
is the OAuth client ID the Kafka receiver uses to login to the authorization server when requesting access tokens. The OAuth client ID may contain up to 200 characters.
client-secret <client-secret>
is the OAuth client secret the Kafka receiver uses to login to the authorization server when requesting access tokens. The OAuth client secret may contain up to 512 characters.
token-endpoint <token-endpoint>
is the OAuth token endpoint URL that the Kafka receiver uses to request a token for login to the remote Kafka cluster. The OAuth token endpoint may contain up to 2048 characters. In addition, the token endpoint parameter must use TLS, that is, it must start with https://
(case insensitive).
(Optional) scope <scope>
is the OAuth scope. The OAuth scope may contain up to 200 characters.
Configuring Message Batch Handling for Kafka Receivers
To configure the delay that the event broker waits before accumulating a batch of messages from the Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# batch solace(configure/message-vpn/kafka/kafka-receiver/batch)# delay <ms>
Where:
<ms>
specifies the delay in milliseconds to wait before accumulating a batch of messages from the Kafka cluster. The valid range of values is 0-300000. The default is 500. This corresponds to the fetch.wait.max.ms
Kafka consumer API parameter.
The no version of this command, no delay
, resets the value to the default.
To configure the maximum size of a message batch, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# batch solace(configure/message-vpn/kafka/kafka-receiver/batch)# max-size <bytes>
Where:
<bytes>
specifies the maximum size of a message batch in bytes. The valid range of values is 1 to 100000000. the default is 1. This corresponds to the fetch.min.bytes
Kafka consumer API parameter.
The no version of this command, no max-size
, resets the value to the default.
Configuring the Bootstrap Address List for Kafka Receivers
A bootstrap address is the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. You can configure a list of these addresses for the Kafka receiver to try in the event that an attempt to connect to one address fails.
To configure the bootstrap address list for a Kafka receiver, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver)# bootstrap-addresses <address-list>
Where:
<address-list>
is a comma-separated list of FQDNs or addresses (and optional ports) of brokers in the Kafka cluster from which the state of the entire cluster can be learned. IPv4 addresses must be specified in the dotted decimal notation form, nnn.nnn.nnn.nnn. IPv6 addresses must be enclosed in square brackets. The port is specified as a decimal value from 0 to 65535. For example, a correctly formatted IPv4 address is: 192.168.100.1:9092
. The same address in IPv6 format is [::ffff:c0a8:6401]:9092
. This corresponds to the bootstrap.servers
Kafka consumer API parameter.
If a port is not provided with an address it will default to 9092.
The no form of this command, no bootstrap-addresses
, removes the bootstrap address list from the Kafka receiver.
Configuring the Kafka Receiver Consumer Group
Consumer groups allow Kafka consumers to work together and process Kafka events from a topic in parallel. Each consumer in the same group is assigned a different subset of partitions from a Kafka topic or set of topics. Depending on your deployment, you may want to specify certain details of the consumer group a Kafka receiver belongs to.
To configure the consumer group, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver)# group
The configuration tasks you can perform for a Kafka receiver consumer group include:
Configuring the Kafka Consumer Group ID
To configure the Kafka consumer group ID for a Kafka receiver, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/group)# id <value>
Where:
<value>
specifies the ID. This corresponds to the group.id
Kafka consumer API parameter.
The no form of this command, no id
, removes the consumer group ID specification.
Configuring the Kafka Consumer Group Keepalive Settings
To configure the keepalive interval for a Kafka consumer group, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/group)# interval <ms>
Where:
<ms>
specifies the time (in milliseconds) between sending keepalives to the group. The valid range of values is 1 to 3600000. The default is 3000. This corresponds to the heartbeat.interval.ms
Kafka consumer API parameter.
The no form of this command, no interval
, resets the value to the default.
To configure the keepalive timeout for a Kafka consumer group, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/group)# timeout <ms>
Where:
<ms>
specifies the time (in milliseconds) until unresponsive group members are removed, triggering a partition rebalance across other members of the group. The valid range of values is 1 to 3600000. The default is 45000. This corresponds to the session.timeout.ms
Kafka consumer API parameter.
The no form of this command, no timeout
, resets the value to the default.
Configuring the Kafka Consumer Group Membership Type
To configure the membership type for a Kafka receiver consumer group, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/group)# membership-type {dynamic | static}
Where:
dynamic
specifies dynamic membership. This is the default. If the group membership type is dynamic
, the Kafka consumer API parameter group.instance.id
is empty.
static
specifies static membership. Static members can leave and rejoin the group (within the configured keepalive timeout value) without prompting a group rebalance. If the group membership type is static
, the Kafka consumer API parameter group.instance.id
is set to string <broker-name>/<vpn-name>/<receiver-name>
, where <broker-name>
is the AD-enabled router name, <vpn-name>
is the Message VPN name, and <receiver-name>
is the Kafka receiver name.
The no version of this command, no membership-type
, resets the value to the default.
Configuring the Kafka Consumer Group Partition Scheme
To configure the partition scheme for a Kafka receiver consumer group, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/group)# partition-scheme <partition-scheme-list>
Where:
<partition-scheme-list>
is an ordered, comma-separated list of schemes for partition assignment of the consumer group for this receiver. Both Eager ("range, roundrobin") and Cooperative ("cooperative-sticky") schemes are supported. The elected group leader will choose the first common strategy provided by all members of the group. Eager and Cooperative schemes must not be mixed. For more information on these schemes, see the documentation for your Kafka implementation. The default partition scheme is ("range, roundrobin"). This corresponds to the partition.assignment.strategy
Kafka consumer API parameter.
The no version of this command, no partition-scheme
, resets the value to the default.
Configuring Kafka Topic Metadata Handling for Kafka Receivers
To exclude certain topic metadata during refreshes, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# metadata solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# exclude <regex-list>
Where:
<regex-list>
is a comma-separated list of regular expressions (including POSIX.2 regular expressions). Any matching topic names will be ignored in the broker metadata. Note that each regular expression needs to start with the ^ character otherwise it will be interpreted as a literal topic name.
The no form of this command, no exclude
, removes all configured expressions.
To configure the time between refreshes of topic metadata from the Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# metadata solace(configure/message-vpn/kafka/kafka-receiver/metadata)# topic solace(configure/message-vpn/kafka/kafka-receiver/metadata/topic)# refresh-interval <ms>
Where:
<ms>
specifies the time (in milliseconds) between refreshes of topic metadata from the Kafka cluster. The valid range of values is 1000 to 3600000. the default is 30000. This corresponds to the topic.metadata.refresh.interval.ms
Kafka consumer API parameter.
The no form of this command, no refresh-interval
, resets the value to the default.
Configuring Topic Bindings for Kafka Receivers
Each topic binding names the Kafka topic from which messages are drawn, and includes attributes which dictate how messages from that Kafka topic are to be sent to the Solace PubSub+ software event broker.
To create a topic binding to receive messages from a remote Kafka topic, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver)# create topic-binding <kafka-topic-or-regex>
Where:
<kafka-topic-or-regex>
is the name of the Kafka topic you want this receiver to obtain messages from, or a regex pattern (including POSIX.2 regular expressions) to receive from multiple Kafka topics. Note that each regular expression needs to start with the ^ character otherwise it will be interpreted as a literal topic name.
To enable a topic binding, enter the following command:
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# no shutdown
The configuration tasks you can perform for a Kafka receiver topic binding include:
Configuring the Initial Offset to Consume from the Kafka Topic
You can configure the initial offset that the Kafka receiver consumes from the Kafka topic if no member of the consumer group has consumed and committed any offset already, or if the last committed offset has been deleted. Offsets are unique per Kafka partition.
To configure the initial offset that the Kafka receiver consumes from the Kafka topic, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# initial-offset {beginning | end}
Where:
beginning
specifies to start with the earliest offset available.
end
specifies to start with new offsets only. This is the default value.
The initial offset corresponds to the auto.offset.reset
Kafka consumer API parameter.
The no form of this command, no initial-offset
, resets the value to the default.
Configuring Partition Key Generation
When messages are received from Kafka topics, you can configure the topic binding to generate a partition key for each message. This is useful for determining which queue partition a message is sent to. For more information, see Message Distribution with Partitioned Queues
To configure how the topic binding generates a partition key for each message received from a Kafka topic, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# key <key-expression>
Where:
<key-expression>
specifies the substitution expression used to generate the partition key for each message received from the Kafka topic. For more information, see Substitution Expressions Overview. If no value is configured, no key is included for each message as it is published into the event broker.
Configuring SMF Topic Generation
When messages are received from Kafka topics they are published to SMF topics based on an expression you configure when you create a topic binding.
To configure how the topic binding generates the SMF topic for each message received from a Kafka topic, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver/topic-binding)# local solace(configure/message-vpn/kafka/kafka-receiver/topic-binding/local)# topic <topic-expression>
Where:
<topic-expression>
specifies the substitution expression used to generate the SMF topic for each message received from the Kafka topic. For more information, see Substitution Expressions Overview. This expression can include data extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. If no value is configured, the Topic Binding will not be operational.
Enabling TLS/SSL Encryption for Kafka Receiver Connections
To enable TLS/SSL encryption for Kakfa receiver connections, enter the following commands:
solace(configure/message-vpn/kafka/kafka-receiver)# transport solace(configure/message-vpn/kafka/kafka-receiver/transport)# ssl
The following table describes how the combination of TLS/SSL encryption and authentication scheme settings correspond to the security.protocol
Kafka consumer API parameter:
TLS/SSL Encryption | Authentication Scheme | Value for security.protocol Parameter |
---|---|---|
no ssl
|
none or client-certificate |
plaintext
|
ssl
|
none or client-certificate |
ssl
|
no ssl
|
basic , scram , or oauth-client |
sasl_plaintext
|
ssl
|
basic , scram , or oauth-client |
sasl_ssl
|
The no form of this command, no ssl
, disables TLS/SSL encryption.
Configuring a Kafka Sender
On the event broker, conversion from Solace messages to Kafka events and propagation of those events to a remote Kafka cluster is enabled by a Kafka sender, which is configured at the Message VPN level. You can give the sender any name. When you create a Kafka sender for a Message VPN, the event broker automatically creates the following objects:
-
A single client for each Kafka sender. This client binds to the queues needed by the queue bindings of its sender. This client's name is
#kafka/tx/<tx-name>
, and uses the client username#kafka/tx/<tx-name>
. This client username uses the#kafka
client-profile and#acl-profile
acl-profile. -
A single client-profile, called
#kafka
, which is needed for all Kafka senders and receivers in the Message VPN. This profile is created when the first Kafka sender or receiver is created, and removed with the last Kafka sender or receiver.
For more information, see Kafka Sender.
To create a Kafka sender, enter the following commands:
solace(configure)# message-vpn <name> solace(configure/message-vpn)# kafka solace(configure/message-vpn/kafka)# create kafka-sender <name>
To configure an existing Kafka sender:
solace(configure)# message-vpn <name> solace(configure/message-vpn)# kafka solace(configure/message-vpn/kafka)# kafka-sender <name>
To enable a Kafka sender after it has been created:
solace(configure/message-vpn/kafka/kafka-sender)# no shutdown
Where:
message-vpn <name>
is the name of the Message VPN where you want to create the Kafka sender.
kafka-sender <name>
is the name of the Kafka sender.
The configuration tasks you can perform for a Kafka sender include:
Configuring Authentication Schemes for Kafka Senders
To configure an authentication scheme that the given Kafka sender will use to establish a connection to the remote Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# authentication solace(configure/message-vpn/kafka/kafka-sender/authentication)# auth-scheme {none | basic | scram | client-certificate | kerberos | oauth-client}
Where:
none
specifies to login with no authentication. For more information, see None.
basic
specifies to login with a username and password. For more information, see Basic Authentication.
scram
specifies to login with SCRAM (Salted Challenge Response Authentication). For more information, see SCRAM Authentication
client-certificate
specifies to login with a client TLS certificate. For more information, see Client Certificate Authentication.
kerberos
specifies to login with the Kerberos mechanism. For more information, see Kerberos Authentication.
oauth-client
specifies to login with OAuth 2.0 client credentials. For more information, see OAuth Client Authentication.
None
If no authentication scheme for a Kafka sender is configured, the sender will not use an authentication scheme. This may be useful for anonymous connections or when a Kafka sender does not require authentication.
Basic Authentication
Using the basic authentication scheme, Kafka senders can authenticate with a username and password combination. Credentials can either be transmitted using plain-text or encrypted with SSL.
To configure settings for a basic authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/authentication)# basic solace(configure/message-vpn/kafka/kafka-sender/authentication/basic)# username <name> [password <password>]
Where:
<name>
is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.
<password>
is the password to be used with the specified username. The password may contain up to 255 characters.
The no version of the this command, no username
, removes any configured username and password.
SCRAM Authentication
When using SCRAM authentication (RFC 5802), a Kafka sender uses a challenge/response mechanism to authenticate a username and password with the remote Kafka cluster.
To configure settings for a SCRAM authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/authentication)# scram solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# hash {sha-256 | sha-512} solace(configure/message-vpn/kafka/kafka-sender/authentication/scram)# username <name> [password <password>]
Where:
sha-256
specifies to use a SHA-2 256 bit hash for SCRAM authentication.
sha-512
specifies to use SHA-2 512 bit hash for SCRAM authentication. This is the default setting.
<name>
is the client username to use for authentication with the remote Kafka cluster. The username may contain up to 255 characters.
<password>
is the password to be used with the specified username. The password may contain up to 255 characters.
The no form of the hash
command, no hash
, returns the value to the default.
The no form of the username
command, no username
, removes any configured username and password.
Client Certificate Authentication
When using client certificate authentication, a Kafka sender provides a certificate file to validate its identity. Client certificate authentication is only available to connections that use TLS/SSL (see Enabling TLS/SSL Encryption for Kafka Receiver Connections).
The client certificate installed here may also be used if the Kafka cluster requests it with other authentication schemes. If you configure a client certificate authentication scheme for the Kafka sender, the sender provides only this client certificate as a means if identifying itself. However, it is possible to provide a client certificate when using a basic or SCRAM authentication scheme as well.
To configure settings for a client certificate authentication scheme, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/authentication)# client-certificate solace(configure/message-vpn/kafka/kafka-sender/authentication/client-certificate)# certificate <filename>
Where:
<filename>
is the filename of the certificate file. The certificate file must be located in the certs
folder of the event broker. Once a certificate is configured, a copy of it is saved internally. The file in the certs
directory is no longer required.
The no version of this command, no certificate-file
, removes the certificate association for the current Kafka sender.
Kerberos Authentication
When using Kerberos authentication, a Kafka sender must first authenticate with a Kerberos Authentication Server (AS) which grants the Kafka sender a Ticket Granting Ticket (TGT). With a valid TGT, a Kafka sender can attempt to authenticate with the remote Kafka broker using a service ticket obtained from the Ticket Granting Service (TGS). The AS and TGS (components of a Key Distribution Center (KDC)) are hosted on an external server or servers—not on the event broker.
To implement Kerberos authentication for a Kafka sender, you must configure the following:
-
The service name of the remote Kafka broker.
-
The user principal name of the Kafka sender.
-
The keytab file for the Kafka sender.
The Kerberos realm (including KDC address) that this Kafka sender and the remote Kafka broker are part of must be configured before you can enable Kerberos authentication. For more information, see Managing Kerberos Realms.
To configure these settings, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/authentication)# kerberos solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# service-name <value> solace(configure/message-vpn/kafka/kafka-sender/authentication/kerberos)# user-principal-name <value> <keytab-file>
Where:
service-name <value>
is the Kerberos service name of the remote Kafka broker, not including /hostname@<REALM>
.
user-principal-name <value>
is the Kerberos user principal name of the Kafka sender. This must include the @<REALM>
suffix.
<keytab-file>
is the filename of the Kerberos keytab file. The keytab file must be located in the keytabs
folder of the event broker. These keytabs differ from those used in client authentication. Keytabs used for Kafka bridging authentication contain user principal names rather than service principal names, and apply to specific Message VPNs rather than globally.
OAuth Client Authentication
When using OAuth client authentication, a Kafka sender obtains access tokens from an authorization server using the Client Credentials Grant flow (RFC 6749 §4.4), also known as two-legged OAuth. In this flow, a simple request with basic authentication is sent to the authorization server to get an access token. The Kafka sender can then use the returned access token to make authenticated requests to the remote Kafka cluster until it expires (tokens are usually valid for an hour). When the token is near expiry, the Kafka sender will automatically request another.
To enable OAuth client authentication, you must configure the client ID, client secret, and token endpoint that the Kafka sender will use to request access tokens. Optionally, you can also configure the OAuth scope.
To configure these settings, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/authentication)# oauth-client solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-id <client-id> solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# client-secret <client-secret> solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# token-endpoint <token-endpoint> solace(configure/message-vpn/kafka/kafka-sender/authentication/oauth-client)# scope <scope>
Where:
client-id <client-id>
is the OAuth client ID the Kafka sender uses to login to the authorization server when requesting access tokens. The OAuth client ID may contain up to 200 characters.
client-secret <client-secret>
is the OAuth client secret the Kafka sender uses to login to the authorization server when requesting access tokens. The OAuth client secret may contain up to 512 characters.
token-endpoint <token-endpoint>
is the OAuth token endpoint URL that the Kafka sender uses to request a token for login to the remote Kafka cluster. The OAuth token endpoint may contain up to 2048 characters. In addition, the token endpoint parameter must use TLS, that is, it must start with https://
(case insensitive).
(Optional) scope <scope>
is the OAuth scope. The OAuth scope may contain up to 200 characters.
Configuring Message Batch Handling for Kafka Senders
To configure the delay that the Kafka sender waits before accumulating a batch of messages to send to the Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# batch solace(configure/message-vpn/kafka/kafka-sender/batch)# delay <ms>
Where:
<ms>
specifies the delay in milliseconds to wait before accumulating a batch of messages to send to the Kafka cluster. The valid range of values is 0 to 90000. The default is 5. This corresponds to the queue.buffering.max.ms
Kafka producer API parameter.
The no version of this command, no delay
, resets the value to the default.
To configure the maximum number of messages in a batch, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# batch solace(configure/message-vpn/kafka/kafka-sender/batch)# max-messages <count>
Where:
<count>
specifies the maximum number of messages to send to the Kafka cluster in a single batch. The valid range of values is 1 to 1000000. The default is 10000. This corresponds to the batch.num.messages
Kafka producer API parameter.
The no version of this command, no max-messages
, resets the value to the default.
To configure the maximum size of a message batch, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# batch solace(configure/message-vpn/kafka/kafka-sender/batch)# max-size <bytes>
Where:
<bytes>
specifies the maximum message size of a message batch in bytes. The valid range of values is 1 to 2147483647. the default is 1000000. This corresponds to the batch.size
Kafka producer API parameter.
The no version of this command, no max-size
, resets the value to the default.
Configuring the Bootstrap Address List for Kafka Senders
A bootstrap address is the fully qualified domain name or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka sender can fetch the state of the entire cluster. You can configure a list of these addresses for the Kafka sender to try in the event that an attempt to connect to one address fails.
To configure the bootstrap address list for a Kafka sender, enter the following command:
solace(configure/message-vpn/kafka/kafka-sender)# bootstrap-addresses <address-list>
Where:
<address-list>
is a comma-separated list of FQDNs or addresses (and optional ports) of brokers in the Kafka cluster from which the state of the entire cluster can be learned. IPv4 addresses must be specified in the dotted decimal notation form, nnn.nnn.nnn.nnn. IPv6 addresses must be enclosed in square brackets. The port is specified as a decimal value from 0 to 65535. For example, a correctly formatted IPv4 address is: 192.168.100.1:9092
. The same address in IPv6 format is [::ffff:c0a8:6401]:9092
. This corresponds to the bootstrap.servers
Kafka producer API parameter.
If a port is not provided with an address it will default to 9092.
The no form of this command, no bootstrap-addresses
, removes the bootstrap address list from the Kafka sender.
Configuring Compression for Kafka Senders
To configure the compression type the Kafka sender uses when propagating messages, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# compression solace(configure/message-vpn/kafka/kafka-sender/compression)# type {gzip | snappy | lz4 | zstd}
Where:
gzip
specifies to use Gzip compression. This is the default.
snappy
specifies to use Snappy compression.
lz4
specifies to use LZ4 compression.
zstd
specifies to use Zstandard compression.
The compression type corresponds to the compression.codec
Kafka producer API parameter.
The no version of this command, no compression
, resets the value to the default.
To configure the compression level the Kafka sender uses when propagating messages, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# compression solace(configure/message-vpn/kafka/kafka-sender/compression)# level
Where:
<value>
specifies the compression level. The valid range of values depends on the compression type:
The compression level corresponds to the compression.level
Kafka producer API parameter.
The no version of this command, no compression
, resets the value to the default.
-
-1
means use the codec-dependent default compression level and is always valid. This is the default. -
0
-9
is valid for thegzip
compression codec. -
0
is valid for thesnappy
compression codec. -
0
-12
is valid for thelz4
compression codec. -
0
-22
is valid for thezstd
compression codec.
To enable compression, see Enabling Compression for Kafka Sender Connections.
A Kafka sender is blocked from being operational if you configure it to use both encryption and compression, while crime-exploit-protection
is enabled (see Configuring CRIME Exploit Protection).
Enabling Idempotence for Kafka Senders
Idempotence guarantees in order at-least-once message delivery to the remote Kafka topic, at the expense of performance. It is disabled by default. If you enable idempotence, each queue binding configured for the Kafka sender must have the acknowledgment mode set to all
to be operational.
To enable idempotence for a Kafka sender, enter the following command:
solace(configure/message-vpn/kafka/kafka-sender)# idempotence
This corresponds to the enable.idempotence
Kafka producer API parameter.
The no version of this command, no idempotence
disables it.
When you enable idempotence:
-
The Kafka sender sends an increasing sequence number with every message.
-
The remote Kafka broker acknowledges each message.
-
The Kafka broker remembers the largest sequence number it has written for each Kafka sender.
-
The Kafka broker discards any message received with a sequence number less than the largest written.
If idempotence is disabled, the Kafka sender is free to resend messages to the Kafka broker because of timeouts, leader changes, and so on. In this case, message duplication and/or reordering may occur.
Configuring Queue Bindings for Kafka Senders
Each binding names the queue from which messages are drawn, and includes attributes which dictate how those messages are to be sent to Kafka.
To create a queue binding for a Kafka sender, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# create queue-binding <queue-name>
To enable a queue binding:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# no shutdown
Where:
<queue-name>
is the name of a queue in the given Message VPN that the Kafka sender binds to.
The configuring tasks you can perform for a Kafka sender queue binding include:
Configuring the Acknowledgment Mode
To configure the number of acknowledgments that this queue binding requires from the remote Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# ack-mode {none | one | all}
Where:
none
specifies that no acknowledgements from the remote Kafka cluster are required. If configured, messages are delivered at-most-once.
one
specifies that one acknowledgement from the remote Kafka cluster is required. If configured, messages are delivered at-least-once but may be reordered.
all
specifies that all replicas on the remote Kafka cluster need to acknowledge the message. If configured, messages are delivered at-least-once but may be reordered. This is the default setting.
The acknowledgment mode corresponds to the request.required.acks
Kafka producer API parameter.
The no form of this command, no ack-mode
, resets the value to the default.
Configuring the Kafka Partition Selection Scheme
To configure the partition selection scheme that the given queue binding will use when publishing to the remote Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# scheme {consistent | explicit | random}
Where:
consistent
specifies to use a consistent partition selection scheme. For more information, see Consistent Partition Selection.
explicit
specifies to use an explicit partition selection scheme. For more information, see Explicit Partition Selection.
random
specifies to use a random partition selection scheme. For more information, see Random Partition Selection.
Consistent Partition Selection
When using a consistent partition selection scheme, the queue binding selects a Kafka partition based on a hash of the Kafka partition key generated by the Kafka sender (see Configuring Kafka Partition Key Generation). If no key is available for the message, by default, a random selection scheme is used as a fallback. You can disable this fallback, and if you do a single partition is selected for all unkeyed messages (see Random Partition Selection).
To configure the hash algorithm used to select the partition, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# consistent solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/consistent)# hash {crc | murmur2 | fnv1a}
Where:
crc
specifies to use CRC Hash. This is the default setting.
murmur2
specifies to use Murmur2 Hash.
fnv1a
specifies to use Fowler-Noll-Vo 1a Hash.
The no version of this command, no hash
, resets the value to the default.
Explicit Partition Selection
When using an explicit partition selection scheme, the queue binding selects a Kafka partition based explicitly on the number you specify.
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# explicit solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/explicit)# number <value>
Where:
<value>
is the Kafka partition number you want the queue binding to use when publishing messages.
The no version of this command, no number
, resets the value to the default.
Random Partition Selection
When using a random partition selection scheme, the queue binding selects a random Kafka partition. By default, this partition selection scheme is used as a fallback in cases where a consistent partition selection scheme is being used but no partition key is available for the message. If you disable this fallback, a single partition is selected for all unkeyed messages.
To disable random partition selection as a fallback, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# partition solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition)# random solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# no fallback
To enable the fallback if disabled, enter the following command.
solace(configure/message-vpn/kafka/kafka-sender/queue-binding/partition/random)# fallback
Configuring Kafka Partition Key Generation
To configure how the queue binding generates Kafka partition keys for each message sent to the remote Kafka cluster, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# key <key-expression>
Where:
<key-expression>
specifies the substitution expression used to generate the Kafka partition key for each message sent to Kafka. For more information about substitution expressions, see Substitution Expressions Overview. This expression can include fields extracted from the metadata of each individual SMF message as it is taken from the queue. If empty, no key is included for each message as it is published into Kafka.
Configuring SMF to Kafka Topic Mapping
To configure the topic on the Kafka cluster that this queue binding sends each message to, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender/queue-binding)# remote solace(configure/message-vpn/kafka/kafka-sender/queue-binding/remote)# topic <kafka-topic>
Where:
<kafka-topic>
is the Kafka topic on the Kafka cluster to send each message taken from the queue to. If no topic is configured, the queue binding will not be operational.
Enabling TLS/SSL Encryption for Kafka Sender Connections
To enable TLS/SSL encryption for Kakfa sender connections, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# transport solace(configure/message-vpn/kafka/kafka-sender/transport)# ssl
For more information describing how the combination of TLS/SSL encryption and authentication scheme settings correspond to the security.protocol
Kafka producer API parameter, see Enabling TLS/SSL Encryption for Kafka Receiver Connections.
The no form of this command, no ssl
, disables TLS/SSL encryption.
Enabling Compression for Kafka Sender Connections
By default, Kafka sender connections do not have compression enabled. You can enable compression according to the type and level configured for the sender (see Configuring Compression for Kafka Senders).
To enable compression for Kafka sender connections, enter the following commands:
solace(configure/message-vpn/kafka/kafka-sender)# transport solace(configure/message-vpn/kafka/kafka-sender/transport)# compressed
The no form of this command, no compressed
, disables compression.
Configuring the Maximum Number of Kafka Broker Connections
By default, the maximum number of Kafka broker connections supported for a Message VPN is the same as the maximum supported for the system. Depending on your deployment, you may want to change this behavior so that one Message VPN does not consume all of the connections that the entire system supports.
To configure the maximum number of simultaneous Kafka broker connections for this Message VPN, enter the following command:
solace(configure/message-vpn/kafka)# max-broker-connections <value>
Where:
<value>
is the integer value specifying the maximum total number of Kafka broker connections permitted for the Message VPN.
the no version of this command, no max-broker-connections
, returns the value to the default (which is the maximum value supported by the platform).
- To view the maximum total number of Kafka broker connections that the Solace PubSub+ event broker can support, enter the
show kafka
User EXEC command. - This is a global/read-write parameter, and is chosen by the system administrator, not the Message VPN administrator.