Publishing Persistent Messages Using the Python API

When your applications require confirmation handling and at least once delivery, we recommend that you use persistent messages instead of direct messages. To publish persistent messages in the PubSub+ Python API, you first set up a message queue on the PubSub+ event broker.

For information about creating and configuring queues on an event broker, see Configuring Queues.

To publish persistent messages using the PubSub+ Messaging API for Python, use the following steps:

  1. Create a PersistentMessagePublisher.
  2. Configure and Create an OutboundMessage.
  3. Handling Back-Pressure When Publishing Persistent Messages.
  4. Publish a Persistent Message.
  5. Acknowledging Messages and Handling Errors.
  6. User Contexts.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. This may cause messages to accumulate in the API internal buffers causing back-pressure. If this scenario is possible, consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Persistent Messages.

For examples of applications that publish persistent messages, see guaranteed_publisher.py on the Solace GitHub page.

Creating a PersistentMessagePublisher

After a MessagingService instance has established a connection to an event broker, use a PersistentMessagePublisher to publish persistent messages. To create a PersistentMessagePublisher object, do the following:

  1. Call the create_persistent_message_publisher_builder() function on a MessagingService object. This returns a PersistentMessagePublisherBuilder object.

  2. You can now use the functions in the PersistentMessagePublisherBuilder interface to configure a PersistentMessagePublisher to use certain features of the API, such as back-pressure strategies.

  3. Call the build() function on your PersistentMessagePublisherBuilder to return a PersistentMessagePublisher object.

  4. To enable your PersistentMessagePublisher to start publishing messages, call the start() function on it.

For more information, see the PubSub+ Messaging API for Python reference.

The following is an example that shows how to use a persistent message publisher to enable your application to publish messages to the event broker:

# Create a PersistentMessagePublisherBuilder which allows you to create a PersistentMessagePublisher and start it
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
               .on_back_pressure_reject(1000) \
               .build()

# Starts the configured PersistentMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
persistent_publisher.start()        

Asynchronous Publishers

It is also possible to start a persistent message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

In the PubSub+ Python API, you can use the start_async() function to start a PersistentMessagePublisher asynchronously. This allows the publisher object to start in a separate background thread, which means you can start multiple publishers concurrently or your code can continue to execute and you do not have wait for the function to return.

The example below shows 1000 publishers being started asynchronously. The wait() function blocks the application until all publishers have been started.

import concurrent.futures
# ...
publishers = [persistent_message_publisher_builder.build() for _ in range(10)]
futures_to_publisher = { publisher.start_async() : publisher for publisher in publishers}

for future in concurrent.futures.as_completed(futures_to_publisher):
    publisher = futures_to_publisher[future]
    try:
        # start async has no return but will raise an on start failure
        future.result()
        print(f'Publisher {id(publisher)} started successfully')
    except Exception as err:
        print(f'Publisher {id(publisher)} generated an error: {err}') 

Your publisher application is not operational until you call start() or start_async() on it.

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Python API, when you publish messages you use OutboundMessage instances. To configure and create OutboundMessage instances, follow these steps:

  1. Call message_builder() on a messaging service object to return an OutboundMessageBuilder instance. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage instances.

    outbound_msg_builder = messaging_service.message_builder()
  2. Configure your message with an OutboundMessageBuilder and then call the build() function to return a message instance. You can configure message properties using either method below.
    • Use the OutboundMessageBuilder interface and the with_property(propertyName,propertyValue) functions. Both Solace defined message_properties keys as well as arbitrary user-defined property keys are accepted. The following example shows how to set Solace defined properties for message ID, sender ID and message type, and a custom key-value property on a message:
      message_builder = messaging_service.message_builder()   \
                     .with_application_message_id("myID") \
                     .with_sender_id("senderID") \
                     .with_application_message_type("messageType") \
                     .with_property("key","value") \
                     .build(message_body)
    • Use a message properties dictionary with the message_properties.* keys. The following example shows how to create a message property dictionary with values for message ID, sender ID and message type constants, and then configure a message using the from_properties() function:
      from solace.messaging.config.solace_properties import message_properties
      # ...
      message_props = {
          message_properties.APPLICATION_MESSAGE_ID: "myID",
          message_properties.SENDER_ID: "senderID",
          message_properties.APPLICATION_MESSAGE_TYPE: "messageType"
          # For a complete list of message_properties constants see the PubSub+ Messaging API for Python reference.
          }                            
      # ...                                                
      outbound_msg = outbound_msg_builder \
                     .from_properties(message_props) \
                     .build(message_body)

The following code example shows how to create a message builder, set message properties and create a message:

# Builder for creation of similarly configured messages. 
message_builder = messaging_service.message_builder()   \
               .with_application_message_id(message_id) \
               .from_properties(message_props) \
               .with_property("key","value") \
               .build(message_body)

For more information about the functions, see the PubSub+ Messaging API for Python reference.

Setting a Partition Key

You can set a partition key to use partitioned queues. Partitioned Queues is a feature available on the PubSub+ event broker that allows you to easily scale the number of consumer applications bound to a queue. A partition key can be set on each message in the publishing application to ensure that all messages with the same partition key are delivered to the same consumer without additional logic in the consumer application. For more information see Partitioned Queues.

The partition key is a property of a published message, so use the with_property(property,value) to set a property-value pair on a Python API message.

  • property—The constant message_user_property_constants.QUEUE_PARTITION_KEY or the string value JMSXGroupID.

  • value—A string representing the value of your partition key. Client applications set the value at publish time.

The following example shows how to create a message and publish it to a queue partition:

# Set the queue partition key on the outbound message using the with_property() builder method.
def set_queue_partition_key_using_with_property(queue_partition_key_value: str):
    payload = "my_payload"
    outbound_message = MessagingService \
        .message_builder() \
        .with_property(message_user_property_constants.QUEUE_PARTITION_KEY, queue_partition_key_value) \
        .build(payload)

# You can also set the queue partition key on the outbound message using the from_properties() builder method.
def set_queue_partition_key_using_from_properties(queue_partition_key_value: str):
    payload = "my_payload"
    additional_properties  = {message_user_property_constants.QUEUE_PARTITION_KEY, queue_partition_key_value}
    outbound_message = MessagingService \
        .message_builder() \
        .from_properties(additonal_properties) \
        .build(my_payload)

Handling Back-Pressure When Publishing Persistent Messages

When you use persistent messaging, the messages are sent to the PubSub+ event broker with a topic and may be enqueued on any queue with a matching topic subscription. The event broker then delivers the messages asynchronously to any consumers subscribed to that queue. At the time the client application publishes the persistent message, the API queues the message in an internal buffer before it is sent to the event broker. In ideal conditions, as soon the application publishes a message, the API sends that message to the network, and that message is eventually received by event broker. It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In the PubSub+ Python API, the PersistentMessagePublisher has the following mechanisms to handle back-pressure:

Configuring an Unlimited Internal Buffer

The default configuration for the API is to use an unlimited sized internal buffer for messages. When you use an unlimited buffer, the Python API continuously puts messages published by the client application on the internal buffer. This configuration is useful for situations where your publishers need to send a lot of data quickly, even when the network cannot keep up without blocking. This configuration, called on_back_pressure_elastic(), is also useful because you don't have to write code to tell your application how to handle scenarios where the internal buffer reaches capacity.

We recommend this configuration when dealing with data sizes that do not overly strain your system resources, whether they are small or large. It's important to note that data size plays an important role in determining transmission bandwidth, affecting how quickly your application can write data to the socket. A large number of small messages, or a small number of large messages can both strain your application's resources. If your application lacks stability when using elastic back-pressure, we recommend you use alternative back-pressure strategies like 'wait' or 'reject' to manage flow control for your application.

This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived, and can provide publishing redundancy for the unlikely event of an internal queue encountering an out-of-memory scenario.

The following example shows an explicit call to the on_back_pressure_elastic() function, which is not required because it is the default behavior:

# Create a PersistentMessagePublisherBuilder which allows you to create a PersistentMessagePublisher and start it
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
               .on_back_pressure_elastic() \
               .build()

# Starts the configured PersistentMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
persistent_publisher.start()        

Rejecting Messages When A Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use the on_back_pressure_reject(buffer_capacity: int) function to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and the API returns PublisherOverflowError until the buffer has capacity again. If buffer_capacity is set to zero, the Python API does not buffer any outbound messages and rejects immediately when unable to write a message to the native library.

# Create a PersistentMessagePublisherBuilder which allows you to create a PersistentMessagePublisher and start it
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
               .on_back_pressure_reject(1000) \
               .build()

# Starts the configured PersistentMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
persistent_publisher.start()        

Using a Publisher Readiness Listener

We recommend that you use a PublisherReadinessListener when you use the on_back_pressure_reject() function because it lets your application known when there is capacity available in the buffer and it can resume publishing messages.

The following is an example of registering an event handler PublisherReadinessListener instance:

class CanPublishListener(PublisherReadinessListener):
    def ready(self):
        # This method is executed when the publisher is ready to publish again
        # You can use this callback to handle message publishing when backpressure is relieved
        pass
# ...
# ...
persistent_publisher.set_publisher_readiness_listener(CanPublishListener())    
            
# Initiator for the callback
some_condition_is_true = True
while some_condition_is_true:
    # prepare/process some data prior to publishing a message...
    try:
        message_publisher.publish("can't send, buffer over capacity", my_topic)
    except PublisherOverflowError as e:
        # Handle the overflow exception (backpressure)
        # You can do some work here or signal to slow down message publishing
        pass                  

Throttling the Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful when applications can block and wait for capacity to appear in the buffer. You can use the on_back_pressure_wait(buffer_capacity: int) function to set the maximum number of messages that can accumulate in the buffer. When this maximum capacity (buffer_capacity) is reached, the publisher routine blocks and waits for available capacity in the internal buffer before letting the application publish any more messages. The back pressure strategy of on_back_pressure_wait() will not call a PublisherReadinessListener.

This function should be used when you want application requests to take longer after the buffer's capacity has been reached. Using this mechanism effectively gives time for the API to empty the internal buffer.

The following shows an example of how to configure an internal buffer to accommodate up to one thousand messages:

# Create a PersistentMessagePublisherBuilder which allows you to create a PersistentMessagePublisher and start it
persistent_publisher = messaging_service.create_persistent_message_publisher_builder() \
               .on_back_pressure_wait(1000) \
               .build()

# Starts the configured PersistentMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
persistent_publisher.start()        

Publishing a Persistent Message

After you have established a connection to the event broker using a MessagingService instance, you can use a PersistentMessagePublisher to publish persistent messages.

A persistent message has the following components:

  • A topic to publish to (required)
  • A message payload (optional)

Persistent message publishing involves the receipt of acknowledgments or publish receipts. Depending on your requirements, your client application can publish as:

  • asynchronous, allows your application to perform other functions while the PublishReceiptListener waits for the acknowledgment
  • synchronous, waits until an acknowledgment has been received; an acknowledge indicates that the message has been received and persisted by the broker

Asynchronous Publish Function

The following is an asynchronous publish function:

  • publish(message: bytearray | str | OutboundMessage, destination: Topic, user_context: Any | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—The outbound message that can be an OutboundMessage object, bytearray, or str. If the payload is a bytearray or str, the API creates an OutboundMessage object to send.
    • destination—The Topic to publish to.
    • user-context—The context associated with an action that is performed when the message delivery to the broker is confirmed using MessagePublishReceiptListener.on_publish_receipt() When the user-context is not supposed to be available, omit the user_context parameter. For more information, see User Contexts.
    • additional_message_properties—Additional key-value properties to customize a message. Each key can be customer provided, or it can be a key from of type solace.messaging.config.solace_properties.message_properties.
# Sends a persistent message asynchronously
persistent_publisher.publish(my_message, topic_to_publish_to)             

For more information about these functions, see the PubSub+ Messaging API Python reference.

Synchronous Publish Function

Synchronous publish functions do not return until the message has been received, written to persistent storage, and acknowledged by the event broker. The PubSub+ Python API provides the following synchronous function for publishing messages:

  • publish_await_acknowledgement(message: bytearray | str | OutboundMessage, destination: Topic, time_out: int | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—The outbound message that can be an OutboundMessage object, bytearray, or str. If the payload is a bytearray or str, the API creates an OutboundMessage object to send.
    • destination—The Topic to publish to.
    • time_out—The maximum time (in milliseconds) to wait for a message acknowledgement.
    • additional_message_properties—Additional key-value properties to customize a message. Each key can be customer provided, or it can be a key from of type solace.messaging.config.solace_properties.message_properties.

The preceding function can be used with a PersistentMessagePublisher to publish an OutboundMessage to the broker using a topic. This function blocks the main routine until either:

  • the publisher API receives an acknowledgment from the broker
  • the timeout period elapses
# Sends a persistent message, blocking until publish acknowledgment is received or timeout occurs.
persistent_publisher.publish_await_acknowledgement(my_message, topic_to_publish_to, 1000, None)             

Acknowledging Messages and Handling Errors

A publish receipt is a delivery confirmation that indicates whether or not the event broker successfully processed the message. These publish receipts can indicate success or failure, and are handled by a MessagePublisReceiptListener instance. You can set your MessagePublishReceiptListener with the set_message_publish_receipt_listener() function.

The following example shows how to use the MessagePublishReceiptListener to listen for publish receipts:

# An example implementation of the Message publish listener interface to process broker message publish notifications.
class MessageReceiptListener(MessagePublishReceiptListener):
    def __init__(self):
        self._receipt_count = 0
    @property
    def receipt_count(self):
        return self._receipt_count

    def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
        self._receipt_count += 1
         print(f"\tMessage: {publish_receipt.message}\n"
                f"\tIs persisted: {publish_receipt.is_persisted}\n"
                f"\tTimestamp: {publish_receipt.time_stamp}\n"
                f"\tException: {publish_receipt.exception}\n")

# set a message delivery listener to the publisher
receipt_listener = MessageReceiptListener()
persistent_publisher.set_message_publish_receipt_listener(receipt_listener) 
# ...
persistent_publisher.publish(my_message, topic_to_publish_to) 

Strategies for Handling Publish Receipt Errors

The following are application-specific strategies you can use to handle receipt errors when you publish messages. Depending on your application, you may require some combination of all three strategies:

Wait and Retry
Wait a number of seconds before trying to send the message again. For example, if the broker temporarily cannot accept messages, use time.sleep() to pause your message publisher.
Retry a Predefined number of Times
Try to re-publish the message a predefined number of times before dropping it. This is useful to avoid unrecoverable transmission issues that can cause infinite retry loops.
Discard the Message

Discard messages with failed publish receipts. This can be useful for messages that can not be delivered to an endpoint for any reason. We don't recommend this strategy if your application cannot tolerate message loss.

To receive a failed publish receipt when there is no matching subscription, this option must be set for the event broker or event broker service. For more information, see Handling Guaranteed Messages with No Matches (for appliances and software event brokers) or the Reject Messages to Sender On No Subscription Match Discard (for PubSub+ Cloud).

User Contexts

Optionally, you can use user contexts to correlate information for persistent messages to publish receipts in your application. This information is user-specific and is meaningful only to your publishing application and is not sent to the broker. A user context permits you to attach data to the publish call that can later be retrieved from the publish receipt listener.

When you use a user context, it allows you to handle multiple scenarios. It also allows your application to decide what action to take or how to process the publish receipt based on the context.

For example, if a non-blocking application has multiple routines to publish persistent messages, each routine can include its identifier as the user context when it publishes a persistent message. The PubSub+ Python API tracks the user context when specified for each message and returns the user context as part of the publish receipt when the message is acknowledged or rejected by the event broker. The publishing application can then send the publish receipt to the correct routine that sent the message based on the user context.

You can set the user context when you publish the message. For example, you use the publish(message: bytearray | str | OutboundMessage, destination: Topic, user_context: Any | None = None, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None) function, where the user context can be any data type.

The following example shows how to get the user context from a publish receipt:

# An example of a MessagePublishReceiptListener implementation
class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
    def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
        print(f"\tMessage: {publish_receipt.message}\n"
              f"\tIs persisted: {publish_receipt.is_persisted}\n"
              f"\tTimestamp: {publish_receipt.time_stamp}\n"
              f"\tException: {publish_receipt.exception}\n")
       if publish_receipt.user_context:
              print(f'\tUser context received: {publish_receipt.user_context.get_custom_message}')

If your application is non-blocking, you can also use a persistent message publisher with publish receipt in your callback to log information. For example, you can use non-blocking message publishing and then send alerts to notify the application of the status of published messages, such as:

  • the event broker successfully receives and processes a message
  • access control violations (ACL)
  • a queue being over quota
  • invalid topics / topics with no subscribers

The following code shows an example of a MessagePublishReceiptListener:

# An example implementation of the Message publish listener interface to process broker message publish notifications.
class MessageReceiptListener(MessagePublishReceiptListener):
    def __init__(self):
        self._receipt_count = 0
    @property
    def receipt_count(self):
        return self._receipt_count

    def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
        self._receipt_count += 1
         print(f"\tMessage: {publish_receipt.message}\n"
                f"\tIs persisted: {publish_receipt.is_persisted}\n"
                f"\tTimestamp: {publish_receipt.time_stamp}\n"
                f"\tException: {publish_receipt.exception}\n")
         if publish_receipt.user_context:
                print(f'\tUser context received: {publish_receipt.user_context.get_custom_message}')

# set a message delivery listener to the publisher
receipt_listener = MessageReceiptListener()
persistent_publisher.set_message_publish_receipt_listener(receipt_listener) 
# ...
persistent_publisher.publish_await_acknowledgement(my_message, topic_to_publish_to, 1000, None)