Configuring Kafka Bridging

Kafka bridging allows you to configure message flow in both directions between a Kafka cluster and a SolacePubSub+ software event broker or . The Kafka bridge is configured within the Solace event broker and no external Kafka Connect infrastructure is required to pass messages to and from Kafka.

To configure Kafka bridging, you need to configure a Kafka sender and a Kafka receiver.

  • A Kafka receiver —receives events from one or more Kafka topics, converts the events to Solace Message Format (SMF), and publishes them to topics on the PubSub+ event broker.

  • A Kafka sender—takes SMF messages from one or more queues, converts the messages to Kafka events, and sends them to Kafka topics on a remote Kafka cluster.

For more information about how Kafka bridging works, see Kafka Bridging.

This topic includes the following tasks:

Creating a Kafka Receiver

A Kafka receiver enables the accumulation of Kafka events and their conversion to SMF messages. When you create a Kafka receiver, the event broker automatically creates a client for each Kafka receiver that publishes messages received from the Kafka topics to the Solace message bus. The client name is #kafka/rx/<rx-name>, and the client username is #kafka/rx/<rx-name>. The client username uses the #kafka client profile and #acl-profile ACL profile.

If no other Kafka receivers or senders exist on the Message VPN, the event broker also creates a single client-profile, named #kafka, This profile is required by the Kafka senders and receivers. This profile is created when you create the first Kafka sender or receiver and removed if you delete all of the Kafka senders and receivers.

To create a Kafka receiver, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Receivers tab.
  6. Click + Kafka Receiver.
  7. Enter a Name for the receiver and click Create.
  8. Set the following options for the receiver:

    OptionDescription

    Enabled

    Specifies whether the Kafka receiver is enabled.

    Bootstrap Address List

    Specifies the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. The bootstrap addresses must resolve to an appropriately configured and compatible listener port on the Kafka broker for the selected authentication scheme. If you do not provide a port, the default port is 9092.

    You can configure a comma-separated list of addresses for the Kafka receiver to try in the event that an attempt to connect to the first address fails.

    Specify IPv4 addresses in the dotted decimal notation form, nnn.nnn.nnn.nnn. You must enclose IPv6 addresses in square brackets. Specify the port as a numeric value from 0 to 65535.

    For example, a correctly formatted IPv4 address is: 192.168.100.1:9092. The same address in IPv6 format is [::ffff:c0a8:6401]:9092. The address corresponds to the bootstrap.servers Kafka consumer API parameter.

    Authentication Scheme

    Specifies the authentication scheme that the Kafka receiver uses to establish a connection to the remote Kafka cluster. The options are:

    • None—no authentication required. This option may be useful for anonymous connections or when a Kafka receiver does not require authentication.
    • Basic—log in with a specified username and password. Credentials can be transmitted using plain-text or encrypted with SSL.
    • Scram—log in with Salted Challenge Response Authentication (SCRAM). You must specify a username, password, and the SCRAM hash.
    • Client Certificate—log in with a client certificate to validate the receiver's identity. You must specify the PEM formatted content for the client certificate and the password for the certificate. The specified client certificate can also be used to identify the Kafka receiver with other authentication schemes.
    • Kerberos—log in with the Kerberos mechanism. You must specify the service name of the remote Kafka broker, the user principal name of the Kafka receiver, and the keytab file for the Kafka receiver.
    • OAuth Client—log in with OAuth 2.0 client credentials. You must specify the OAuth client ID, and token endpoint URL. You can also specify the OAuth scope.

    Encryption Enabled

    Specifies whether TLS encryption is enabled for the Kafka Receiver. The bootstrap addresses must resolve to PLAINTEXT or SASL_PLAINTEXT listener ports when encryption is disabled, and SSL or SASL_SSL listener ports when encryption is enabled.

    The following table describes how the combination of TLS/SSL encryption and authentication scheme settings correspond to the security.protocol Kafka consumer API parameter:

    TLS/SSL Encryption Authentication Scheme Value for security.protocol Parameter
    no sslnone or client-certificateplaintext
    sslnone or client-certificatessl
    no sslbasic, scram, or oauth-clientsasl_plaintext
    sslbasic, scram, or oauth-clientsasl_ssl

  9. (Optional) Click Show Advanced Settings and set any of the following options for the receiver:

    OptionDescription

    Delay

    Specifies the delay in milliseconds to wait before accumulating a batch of messages from the Kafka cluster.

    This setting corresponds to the fetch.wait.max.ms Kafka consumer API parameter.

    Max Size

    Specifies the maximum size of a message batch, in bytes.

    This setting corresponds to the fetch.min.bytes Kafka consumer API parameter.

    Group ID

    Specifies the Kafka consumer group ID for the receiver.

    Consumer groups allow Kafka consumers to work together and process Kafka events from a topic in parallel. Each consumer in the same group is assigned a different subset of partitions from a Kafka topic or set of topics. Depending on your deployment, you may want to specify certain details of the consumer group a Kafka receiver belongs to.

    This setting corresponds to the group.id Kafka consumer API parameter.

    Keepalive Interval

    Specifies the time, in milliseconds, between sending keepalive messages to the members of the consumer group.

    This setting corresponds to the heartbeat.interval.ms Kafka consumer API parameter.

    Keepalive Timeout

    Specifies the time, in milliseconds, until unresponsive consumer group members are removed, which triggers Partition Rebalancing across other members of the group.

    This setting corresponds to the session.timeout.ms Kafka consumer API parameter.

    Membership Type

    Specifies the membership type of the Kafka consumer group for the receiver. The options are:

    • Dynamic—specifies dynamic group membership. This option corresponds to an empty value for the Kafka consumer API parameter group.instance.id is empty.
    • Static—specifies static group membership. Static members can leave and rejoin the group (within the Keepalive Timeout period)) without prompting rebalancing. This option corresponds to a string value of <broker-name>/<vpn-name>/<receiver-name>, where <broker-name> event broker name, <vpn-name> is the Message VPN name, and <receiver-name> is the Kafka receiver name, for the Kafka consumer API parameter group.instance.id.

    Partition Scheme List

    Specifies an ordered, comma-separated list of schemes for partition assignment of the consumer group for this receiver. Both eager ("range.roundrobin") and cooperative ("cooperative-sticky") schemes are supported. The elected group leader chooses the first common strategy provided by all members of the group. Do not mix eager and cooperative schemes. For more information on these schemes, see the documentation for your Kafka implementation.

    This setting corresponds to the partition.assignment.strategy Kafka consumer API parameter..

    Topic Exclude List

    Specifies a comma-separated list of Kafka topics to ignore. You must specify topics as regular expressions, including POSIX.2 regular expressions. Each regular expression must start with the ^ character, otherwise it will be interpreted as a literal topic name.

    Topic Refresh Interval

    Specifies the time, in milliseconds, between refreshes of topic metadata from the Kafka cluster. This setting corresponds to the topic.metadata.refresh.interval.ms Kafka consumer API parameter.

  10. Click Apply.

Configuring Topic Bindings for a Kafka Receiver

A topic binding specifies a Kafka topic that the event broker receives messages for and specifies how messages for the Kafka topic are sent to PubSub+ event brokers.

To configure a topic binding, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Receivers tab.
  6. Select the receiver that you want to set topic bindings for.
  7. Select the Topic Bindings tab.
  8. Click + Topic Binding.
  9. Enter a Name for the topic binding and click Create.
  10. Set the following options for the topic binding:

    OptionDescription

    Enabled

    Specifies whether the topic binding is enabled.

    Local Topic Expression

    Specifies the substitution expression used to generate the SMF topic for each message received from Kafka. This expression can include data extracted from the metadata of each individual Kafka message as it is received from the Kafka topic. For more information, see Substitution Expressions Overview.

    Local Key Expression

    Specifies the substitution expression used to generate a partition key for each message received from Kafka to determine which queue partition a message is sent to. This expression can include fields extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. For more information, see Substitution Expressions Overview.

    Initial Offset

    Specifies the initial offset to consume from the Kafka topic if no member of the group has consumed and committed any offset already, or if the last committed offset has been deleted. Offsets are unique per partition.

    This setting corresponds to the Kafka consumer API auto.offset.reset configuration setting. The options are:

    • Beginning—start with the earliest offset available
    • End—start with new offsets only

  11. Click Apply.

Viewing Receiver Data

To view information about the Kafka receiver, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Receivers tab.
  6. Select the receiver that you want to view information for.
  7. Click the Summary tab to display information about the receiver , including the message rate, uptime, and connection count.
  8. Click the Stats tab to display additional statistics for the receiver.
  9. Click the Remote Brokers tab to display information about the connected Kafka brokers.

Creating a Kafka Sender

A Kafka sender converts Solace messages to Kafka events and propagates those events to a remote Kafka cluster. When you create a Kafka sender, the event broker automatically creates a client for each Kafka sender, which binds to the same queues as the Kafka sender. The client name is #kafka/tx/<tx-name>, and the client username is #kafka/tx/<tx-name>. The client username uses the #kafka client-profile and #acl-profile acl-profile.

If no other Kafka senders or receivers exist on the Message VPN, the event broker also creates a single client-profile, named #kafka, This profile is required by the Kafka senders and receivers. This profile is created when you create the first Kafka sender or receiver and removed if you delete all of the Kafka senders and receivers.

To create a Kafka sender, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Senders tab.
  6. Click + Kafka Sender.
  7. Enter a Name for the sender and click Create.
  8. Set the following options for the sender:

    OptionDescription

    Enabled

    Specifies whether the Kafka sender is enabled.

    Bootstrap Address List

    Specifies the fully qualified domain name (FQDN) or IP address and optional port of one Kafka broker in a Kafka cluster where the Kafka receiver can fetch the state of the entire cluster. The bootstrap addresses must resolve to an appropriately configured and compatible listener port on the Kafka broker for the selected authentication scheme. If you do not provide a port, the default port is 9092.

    You can configure a comma-separated list of addresses for the Kafka sender to try in the event that an attempt to connect to the first address fails.

    Specify IPv4 addresses in the dotted decimal notation form, nnn.nnn.nnn.nnn. You must enclose IPv6 addresses in square brackets. Specify the port as a numeric value from 0 to 65535.

    For example, a correctly formatted IPv4 address is: 192.168.100.1:9092. The same address in IPv6 format is [::ffff:c0a8:6401]:9092. The address corresponds to the bootstrap.servers Kafka consumer API parameter.

    Idempotence Enabled

    Specifies whether idempotence is enabled. Idempotence guarantees in order at-least-once message delivery to the remote Kafka topic, at the expense of performance. If you enable this option, each queue binding configured for the Kafka sender must have the acknowledgment mode set to all to be operational.

    This setting corresponds to the enable.idempotence Kafka producer API parameter.

    When you enable idempotence:

    • The Kafka sender sends an increasing sequence number with every message.

    • The remote Kafka broker acknowledges each message.

    • The Kafka broker remembers the largest sequence number it has written for each Kafka sender.

    • The Kafka broker discards any message received with a sequence number less than the largest written.

    If idempotence is disabled, the Kafka sender is free to resend messages to the Kafka broker because of timeouts, leader changes, and so on. In this case, message duplication and/or reordering may occur.

    Authentication Scheme

    Specifies the authentication scheme that the Kafka sender uses to establish a connection to the remote Kafka cluster. The options are:

    • None—no authentication required
    • Basic—log in with a specified username and password
    • Scram—log in with Salted Challenge Response Authentication (SCRAM). You must specify a username, password, and the SCRAM hash
    • Client Certificate—log in with a client certificate. You must specify the PEM formatted content for the client certificate and the password for the certificate. The specified client certificate can also be used to identify the Kafka receiver with other authentication schemes.
    • Kerberos—log in with the Kerberos mechanism. You must specify the service name of the remote Kafka broker, the user principal name of the Kafka sender, and the keytab file for the Kafka sender.
    • OAuth Client—log in with OAuth 2.0 client credentials. You must specify the OAuth client ID, token endpoint URL, and the OAuth scope.
    Compression Enabled

    Specifies whether compression is enabled for the Kafka sender. If you enable compression, you must specify the compression type and level.

    The compression type corresponds to the compression.codec Kafka producer API parameter. The options are:

    • Gzip—use Gzip compression
    • Snappy—use Snappy compression
    • Lz4—use LZ4 compression.
    • Zstd—use Zstandard compression

    The compression level corresponds to the compression.level Kafka producer API parameter. The options are:

    • -1—use the codec-dependent default compression level
    • 0-9 is valid for Gzip compression
    • 0 is valid for Snappy compression
    • 0-12 is valid for LZ4 compression
    • 0-22 is valid for Zstandard compression

    Encryption Enabled

    Specifies whether TLS encryption is enabled for the Kafka sender. The bootstrap addresses must resolve to PLAINTEXT or SASL_PLAINTEXT listener ports when encryption is disabled, and SSL or SASL_SSL listener ports when encryption is enabled.

  9. (Optional) Click Show Advanced Settings and set any of the following options for the receiver:

    OptionDescription

    Delay

    Specifies the delay in milliseconds to wait before accumulating a batch of messages to send to the Kafka cluster.

    This setting corresponds to the queue.buffering.max.ms Kafka producer API parameter.

    Max Message Count

    Specifies the maximum number of messages to send to the Kafka cluster in a single batch. This setting corresponds to the batch.num.messages Kafka producer API parameter.

    Max Size

    Specifies the maximum size of a message batch, in bytes.

    This setting corresponds to the batch.size Kafka producer API parameter.

  10. Click Apply.

Configuring Queue Bindings for a Kafka Sender

A queue binding specifies a queue on a PubSub+ event broker and specifies how messages from the queue are sent to a Kafka topic.

To configure a queue binding, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Senders tab.
  6. Select the sender that you want to set queue bindings for.
  7. Select the Queue Bindings tab.
  8. Click + Queue Binding.
  9. Select a queue and click Create.
  10. Set the following options for the queue binding:

    OptionDescription

    Enabled

    Specifies whether the queue binding is enabled.

    Remote Topic

    Specifies the topic on the Kafka cluster to send each message in the queue to.

    Remote Key Expression

    Specifies the substitution expression used to generate a partition key for each message sent to the Kafka to determine which queue partition a message is sent to. This expression can include fields extracted from the metadata of each individual Kafka message as it is received from the Kafka Topic. For more information, see Substitution Expressions Overview.

    ACK Mode

    Specifies the number of acknowledgments that this queue binding requires from the remote Kafka cluster,.

    The acknowledgment mode corresponds to the request.required.acks Kafka producer API parameter. The options are:

    • None—specifies that no acknowledgments are required from the remote Kafka cluster are required. If you select this option, messages are delivered at-most-once.
    • One—specifies that one acknowledgment is required from the remote Kafka cluster. If selected, messages are delivered at-least-once but may be reordered.
    • All—specifies that all replicas on the remote Kafka cluster must acknowledge the message. If selected, messages are delivered at-least-once but may be reordered.

    Partition Scheme

    Specifies the partition selection scheme that the queue binding uses when publishing to the remote Kafka cluster. The options are:

    • Consistent—the queue binding selects a Kafka partition based on a hash of the Kafka partition key generated by the Kafka sender
    • Specific—the queue binding selects a Kafka partition based in a specified partition number
    • Random—the queue binding selects a random Kafka partition. By default, this partition selection scheme is used as a fallback in cases where a consistent partition selection scheme is used but no partition key is available for the message.

    Hash Algorithm

    If you select the Consistent partition scheme, specifies the hash algorithm used to select the partition. The options are:

    • CRC
    • Murmur2
    • Fowler-Noll-Vo 1a (Fnv1A )

    Random Fallback Enabled

    If you select the Consistent partition scheme, specifies whether a random selection scheme is used as a fallback if no key is available for the message. If you disable this option, a single partition is selected for all unkeyed messages.

    NumberIf you select the Explicit partition scheme, specifies the partition number.

  11. Click Apply.

Viewing Sender Data

To view information about the Kafka sender, perform these steps:

  1. Open Broker Manager. For instructions, see PubSub+ Broker Manager.
  2. Select a Message VPN.
  3. In the navigation bar, click Bridges.
  4. Select the Kafka Bridges tab.
  5. Select the Kafka Senders tab.
  6. Select the receiver that you want to view information for.
  7. Click the Summary tab to display information about the sender, including the message rate, bridge status, and connection count.
  8. Click the Stats tab to display additional statistics for the receiver.
  9. Click the Remote Brokers tab to display information about the connected Kafka brokers.