Configuring Kafka Bridging
Kafka bridging allows you to configure message flow in both directions between a Kafka cluster and
To configure Kafka bridging, you need to configure a Kafka sender and a Kafka receiver.
-
A Kafka receiver receives events from one or more Kafka topics, converts the events to Solace Message Format (SMF), and publishes them to topics on the PubSub+ event broker.
-
A Kafka sender takes SMF messages from one or more queues, converts the messages to Kafka events, and sends them to Kafka topics on a remote Kafka cluster.
For more information about how Kafka bridging works, see Kafka Bridging.
This topic includes the following tasks:
- Creating a Kafka Receiver
- Configuring Topic Bindings for a Kafka Receiver
- Viewing Receiver Data
- Creating a Kafka Sender
- Configuring Queue Bindings for a Kafka Sender
- Viewing Sender Data
Creating a Kafka Receiver
A Kafka receiver enables the accumulation of Kafka events and their conversion to SMF messages. When you create a Kafka receiver, the event broker automatically creates a client for each Kafka receiver that publishes messages received from the Kafka topics to the Solace message bus. The client name is #kafka/rx/<rx-name>
, and the client username is #kafka/rx/<rx-name>
. The client username uses the #kafka
client profile and #acl-profile
ACL profile.
If no other Kafka receivers or senders exist on the #kafka
, This profile is required by the Kafka senders and receivers. This profile is created when you create the first Kafka sender or receiver and removed if you delete all of the Kafka senders and receivers.
To create a Kafka receiver, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Receivers tab.
- Click + Kafka Receiver.
- Enter a Name for the receiver and click Create.
- Set the following options for the receiver:
Option Description Enabled
Specifies whether the Kafka receiver is enabled.
Bootstrap Address List
Specifies the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. The bootstrap addresses must resolve to an appropriately configured and compatible listener port on the Kafka broker for the selected authentication scheme. If you do not provide a port, the default port is 9092.
You can configure a comma-separated list of addresses for the Kafka receiver to try in the event that an attempt to connect to the first address fails.
Specify IPv4 addresses in the dotted decimal notation form,
nnn.nnn.nnn.nnn
. You must enclose IPv6 addresses in square brackets. Specify the port as a numeric value from 0 to 65535.For example, a correctly formatted IPv4 address is:
192.168.100.1:9092
. The same address in IPv6 format is[::ffff:c0a8:6401]:9092
. The address corresponds to thebootstrap.servers
Kafka consumer API parameter.Authentication Scheme
Specifies the authentication scheme that the Kafka receiver uses to establish a connection to the remote Kafka cluster. The options are:
- None—no authentication required. This option may be useful for anonymous connections or when a Kafka receiver does not require authentication.
- Basic—log in with a specified username and password. Credentials can be transmitted using plain-text or encrypted with SSL.
- Scram—log in with Salted Challenge Response Authentication (SCRAM). You must specify a username, password, and the SCRAM hash.
- Client Certificate—log in with a client certificate to validate the receiver's identity. You must specify the PEM formatted content for the client certificate and the password for the certificate. The specified client certificate can also be used to identify the Kafka receiver with other authentication schemes.
- Kerberos—log in with the Kerberos mechanism. You must specify the service name of the remote Kafka broker, the user principal name of the Kafka receiver, and the keytab file for the Kafka receiver.
- OAuth Client—log in with OAuth 2.0 client credentials. You must specify the OAuth client ID, and token endpoint URL. You can also specify the OAuth scope.
Encryption Enabled
Specifies whether TLS encryption is enabled for the Kafka Receiver. The bootstrap addresses must resolve to PLAINTEXT or SASL_PLAINTEXT listener ports when encryption is disabled, and SSL or SASL_SSL listener ports when encryption is enabled.
The following table describes how the combination of TLS/SSL encryption and authentication scheme settings correspond to the
security.protocol
Kafka consumer API parameter:TLS/SSL Encryption Authentication Scheme Value for security.protocol Parameter no ssl
none
orclient-certificate
plaintext
ssl
none
orclient-certificate
ssl
no ssl
basic
,scram
, oroauth-client
sasl_plaintext
ssl
basic
,scram
, oroauth-client
sasl_ssl
- (Optional) Click Show Advanced Settings and set any of the following options for the receiver:
Option Description Delay
Specifies the delay in milliseconds to wait before accumulating a batch of messages from the Kafka cluster.
This setting corresponds to the
fetch.wait.max.ms
Kafka consumer API parameter.Max Size
Specifies the maximum size of a message batch, in bytes.
This setting corresponds to the
fetch.min.bytes
Kafka consumer API parameter.Group ID
Specifies the Kafka consumer group ID for the receiver.
Consumer groups allow Kafka consumers to work together and process Kafka events from a topic in parallel. Each consumer in the same group is assigned a different subset of partitions from a Kafka topic or set of topics. Depending on your deployment, you may want to specify certain details of the consumer group a Kafka receiver belongs to.
This setting corresponds to the
group.id
Kafka consumer API parameter.Keepalive Interval
Specifies the time, in milliseconds, between sending keepalive messages to the members of the consumer group.
This setting corresponds to the
heartbeat.interval.ms
Kafka consumer API parameter.Keepalive Timeout
Specifies the time, in milliseconds, until unresponsive consumer group members are removed, which triggers Partition Rebalancing across other members of the group.
This setting corresponds to the
session.timeout.ms
Kafka consumer API parameter.Membership Type Specifies the membership type of the Kafka consumer group for the receiver. The options are:
- Dynamic—specifies dynamic group membership. This option corresponds to an empty value for the Kafka consumer API parameter
group.instance.id
is empty. - Static—specifies static group membership. Static members can leave and rejoin the group (within the Keepalive Timeout period)) without prompting rebalancing.
This option corresponds to a string value of
<broker-name>/<vpn-name>/<receiver-name>
, where<broker-name>
event broker name,<vpn-name>
is the Message VPN name, and<receiver-name>
is the Kafka receiver name, for the Kafka consumer API parametergroup.instance.id
.
Partition Scheme List
Specifies an ordered, comma-separated list of schemes for partition assignment of the consumer group for this receiver. Both eager ("range.roundrobin") and cooperative ("cooperative-sticky") schemes are supported. The elected group leader chooses the first common strategy provided by all members of the group. Do not mix eager and cooperative schemes. For more information on these schemes, see the documentation for your Kafka implementation.
This setting corresponds to the
partition.assignment.strategy
Kafka consumer API parameter..Topic Exclude List
Specifies a comma-separated list of Kafka topics to ignore. You must specify topics as regular expressions, including POSIX.2 regular expressions. Each regular expression must start with the ^ character, otherwise it will be interpreted as a literal topic name. Topic Refresh Interval
Specifies the time, in milliseconds, between refreshes of topic metadata from the Kafka cluster. This setting corresponds to the
topic.metadata.refresh.interval.ms
Kafka consumer API parameter. - Dynamic—specifies dynamic group membership. This option corresponds to an empty value for the Kafka consumer API parameter
- Click Apply.
Configuring Topic Bindings for a Kafka Receiver
A topic binding specifies a Kafka topic that the event broker receives messages for and specifies how messages for the Kafka topic are sent to PubSub+ event brokers.
To configure a topic binding, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Receivers tab.
- Select the receiver that you want to set topic bindings for.
- Select the Topic Bindings tab.
- Click + Topic Binding.
- Enter a Name for the topic binding and click Create.
- Set the following options for the topic binding:
Option Description Enabled
Specifies whether the topic binding is enabled.
Local Topic Expression
Specifies the substitution expression used to generate the SMF topic for each message received from Kafka. This expression can include data extracted from the metadata of each individual Kafka message as it is received from the Kafka topic. For more information, see Substitution Expressions Overview.
Local Key Expression
Specifies the substitution expression used to generate a partition key for each message received from Kafka to determine which queue partition a message is sent to. This expression can include fields extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. For more information, see Substitution Expressions Overview.
Initial Offset
Specifies the initial offset to consume from the Kafka topic if no member of the group has consumed and committed any offset already, or if the last committed offset has been deleted. Offsets are unique per partition.
This setting corresponds to the Kafka consumer API
auto.offset.reset
configuration setting. The options are:- Beginning—start with the earliest offset available
- End—start with new offsets only
- Click Apply.
Viewing Receiver Data
To view information about the Kafka receiver, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Receivers tab.
- Select the receiver that you want to view information for.
- Click the Summary tab to display information about the receiver , including the message rate, uptime, and connection count.
- Click the Stats tab to display additional statistics for the receiver.
- Click the Remote Brokers tab to display information about the connected Kafka brokers.
Creating a Kafka Sender
A Kafka sender converts Solace messages to Kafka events and propagates those events to a remote Kafka cluster. When you create a Kafka sender, the event broker automatically creates a client for each Kafka sender, which binds to the same queues as the Kafka sender. The client name is #kafka/tx/<tx-name>
, and the client username is #kafka/tx/<tx-name>
. The client username uses the #kafka
client-profile and #acl-profile
acl-profile.
If no other Kafka senders or receivers exist on the #kafka
, This profile is required by the Kafka senders and receivers. This profile is created when you create the first Kafka sender or receiver and removed if you delete all of the Kafka senders and receivers.
To create a Kafka sender, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Senders tab.
- Click + Kafka Sender.
- Enter a Name for the sender and click Create.
- Set the following options for the sender:
Option Description Enabled
Specifies whether the Kafka sender is enabled.
Bootstrap Address List
Specifies the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. The bootstrap addresses must resolve to an appropriately configured and compatible listener port on the Kafka broker for the selected authentication scheme. If you do not provide a port, the default port is 9092.
You can configure a comma-separated list of addresses for the Kafka sender to try in the event that an attempt to connect to the first address fails.
Specify IPv4 addresses in the dotted decimal notation form,
nnn.nnn.nnn.nnn
. You must enclose IPv6 addresses in square brackets. Specify the port as a numeric value from 0 to 65535.For example, a correctly formatted IPv4 address is:
192.168.100.1:9092
. The same address in IPv6 format is[::ffff:c0a8:6401]:9092
. The address corresponds to thebootstrap.servers
Kafka consumer API parameter.Idempotence Enabled Specifies whether idempotence is enabled. Idempotence guarantees in order at-least-once message delivery to the remote Kafka topic, at the expense of performance. If you enable this option, each queue binding configured for the Kafka sender must have the acknowledgment mode set to
all
to be operational.This setting corresponds to the
enable.idempotence
Kafka producer API parameter.When you enable idempotence:
The Kafka sender sends an increasing sequence number with every message.
The remote Kafka broker acknowledges each message.
The Kafka broker remembers the largest sequence number it has written for each Kafka sender.
The Kafka broker discards any message received with a sequence number less than the largest written.
If idempotence is disabled, the Kafka sender is free to resend messages to the Kafka broker because of timeouts, leader changes, and so on. In this case, message duplication and/or reordering may occur.
Authentication Scheme Specifies the authentication scheme that the Kafka sender uses to establish a connection to the remote Kafka cluster. The options are:
- None—no authentication required
- Basic—log in with a specified username and password
- Scram—log in with Salted Challenge Response Authentication (SCRAM). You must specify a username, password, and the SCRAM hash
- Client Certificate—log in with a client certificate. You must specify the PEM formatted content for the client certificate and the password for the certificate. The specified client certificate can also be used to identify the Kafka receiver with other authentication schemes.
- Kerberos—log in with the Kerberos mechanism. You must specify the service name of the remote Kafka broker, the user principal name of the Kafka sender, and the keytab file for the Kafka sender.
- OAuth Client—log in with OAuth 2.0 client credentials. You must specify the OAuth client ID, token endpoint URL, and the OAuth scope.
Compression Enabled Specifies whether compression is enabled for the Kafka sender. If you enable compression, you must specify the compression type and level.
The compression type corresponds to the
compression.codec
Kafka producer API parameter. The options are:- Gzip—use Gzip compression
- Snappy—use Snappy compression
- Lz4—use LZ4 compression.
- Zstd—use Zstandard compression
The compression level corresponds to the
compression.level
Kafka producer API parameter. The options are:-1
—use the codec-dependent default compression level0
-9
is valid for Gzip compression0
is valid for Snappy compression0
-12
is valid for LZ4 compression0
-22
is valid for Zstandard compression
Encryption Enabled
Specifies whether TLS encryption is enabled for the Kafka sender. The bootstrap addresses must resolve to PLAINTEXT or SASL_PLAINTEXT listener ports when encryption is disabled, and SSL or SASL_SSL listener ports when encryption is enabled.
- (Optional) Click Show Advanced Settings and set any of the following options for the receiver:
Option Description Delay
Specifies the delay in milliseconds to wait before accumulating a batch of messages to send to the Kafka cluster.
This setting corresponds to the
queue.buffering.max.ms
Kafka producer API parameter.Max Message Count
Specifies the maximum number of messages to send to the Kafka cluster in a single batch. This setting corresponds to the
batch.num.messages
Kafka producer API parameter.Max Size
Specifies the maximum size of a message batch, in bytes.
This setting corresponds to the
batch.size
Kafka producer API parameter. - Click Apply.
Configuring Queue Bindings for a Kafka Sender
A queue binding specifies a queue on a PubSub+ event broker and specifies how messages from the queue are sent to a Kafka topic.
To configure a queue binding, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Senders tab.
- Select the sender that you want to set queue bindings for.
- Select the Queue Bindings tab.
- Click + Queue Binding.
- Select a queue and click Create.
- Set the following options for the queue binding:
Option Description Enabled
Specifies whether the queue binding is enabled.
Remote Topic
Specifies the topic on the Kafka cluster to send each message in the queue to.
Remote Key Expression
Specifies the substitution expression used to generate a partition key for each message sent to the Kafka to determine which queue partition a message is sent to. This expression can include fields extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. For more information, see Substitution Expressions Overview.
ACK Mode
Specifies the number of acknowledgments that this queue binding requires from the remote Kafka cluster,.
The acknowledgment mode corresponds to the
request.required.acks
Kafka producer API parameter. The options are:- None—specifies that no acknowledgments are required from the remote Kafka cluster are required. If you select this option, messages are delivered at-most-once.
- One—specifies that one acknowledgment is required from the remote Kafka cluster. If selected, messages are delivered at-least-once but may be reordered.
- All—specifies that all replicas on the remote Kafka cluster must acknowledge the message. If selected, messages are delivered at-least-once but may be reordered.
Partition Scheme
Specifies the partition selection scheme that the queue binding uses when publishing to the remote Kafka cluster. The options are:
- Consistent—the queue binding selects a Kafka partition based on a hash of the Kafka partition key generated by the Kafka sender
- Specific—the queue binding selects a Kafka partition based in a specified partition number
- Random—the queue binding selects a random Kafka partition. By default, this partition selection scheme is used as a fallback in cases where a consistent partition selection scheme is used but no partition key is available for the message.
Hash Algorithm
If you select the Consistent partition scheme, specifies the hash algorithm used to select the partition. The options are:
- CRC
- Murmur2
- Fowler-Noll-Vo 1a (Fnv1A )
Random Fallback Enabled
If you select the Consistent partition scheme, specifies whether a random selection scheme is used as a fallback if no key is available for the message. If you disable this option, a single partition is selected for all unkeyed messages.
Number If you select the Explicit partition scheme, specifies the partition number. - Click Apply.
Viewing Sender Data
To view information about the Kafka sender, perform these steps:
- Open Broker Manager. For instructions, see Using PubSub+ Broker Manager.
- In the navigation bar, click Bridges.
- Select the Kafka Bridges tab.
- Select the Kafka Senders tab.
- Select the receiver that you want to view information for.
- Click the Summary tab to display information about the sender, including the message rate, bridge status, and connection count.
- Click the Stats tab to display additional statistics for the receiver.
- Click the Remote Brokers tab to display information about the connected Kafka brokers.