Consuming Persistent Messages Using the Python API

Subscribing applications that cannot tolerate message loss can use persistent messaging (referred to as guaranteed messages in other parts of this documentation) instead of direct messaging. When persistent messaging is used, messages are stored on a queue on the event broker. Messages are not deleted from the event broker until the message has been consumed and acknowledged by the subscribing application (referred to as a message receiver). The PubSub+ Python API can only consume persistent messages from queues and not from topic endpoints.

To consume persistent messages, you must first set up a message queue on the event broker. For information about creating and configuring durable queues on an event broker, see Configuring Queues. Alternatively, a non-durable queue can be created when a persistent message receiver (PersistentMessageReceiver) is created. 

To consume persistent messages using the PubSub+ Messaging API for Python, use the following steps:

  1. Create a PersistentMessageReceiver.
  2. Receive a Persistent Message Synchronously.
  3. Receive a Persistent Message Asynchronously.
  4. Extract Properties from an Inbound Message.
  5. Message Acknowledgments.
  6. Create a Queue with the PubSub+ Python API.

Internal back-pressure can occur if your consumer (or subscribing) application experiences a situation where it is unable to process messages as fast as it receives them from the event broker. Messages continue to be buffered internally until a high watermark is reached at which point the API tells the event broker to stop sending messages to prevent message loss.

For examples of applications that consume persistent messages, see guaranteed_subscriber.py on the Solace GitHub page.

Creating a PersistentMessageReceiver

After you have established a connection to the event broker using a MessagingService instance, you use a PersistentMessageReceiver to consume persistent messages from a queue on the event broker. To create a PersistentMessageReceiver object, do the following:

  1. Call the create_persistent_message_receiver_builder() function on a MessagingService object. This returns a PersistentMessageReceiverBuilder object.

  2. The PersistentMessageReceiverBuilder object gives you access to a number of functions that let you customize a PersistentMessageReceiver object. These include the following:

    • with_message_replay(replay_strategy: ReplayStrategy)—Add a message replay message strategy to a persistent receiver.
    • with_message_selector(selector_query_expression: str)—Enables support for message selection based on message header parameter and message properties values.
  3. Call the build() function on your PersistentMessageReceiverBuilder to return a PersistentMessageReceiver object:

    • build(endpoint_to_consume_from: Queue)—Returns a PersistentMessageReceiver object. Takes an explicit queue to consume from as a parameter.

  4. To enable your PersistentMessageReceiver to start receiving messages, call the start() function on it.

To enable your PersistentMessageReceiver to start receiving messages, call start() on it.

Ensure that the queue properties you specify with the Python API correspond to the those configured on the event broker. For more information, see:

For more information about the functions used in the Python API, see the PubSub+ Messaging API for Python reference.

The following is an example how to use a PersistentMessageReceiver to bind to a queue:

# Define Topic subscriptions 
topics_sub = [TopicSubscription.of("solace/sample/1")]

# Queue name. This assumes that a persistent queue already exists on the broker with the right topic subscription 
durable_exclusive_queue = Queue.durable_exclusive_queue("sample-queue")			

# Create a PersistentMessageReceiver Builder which allows you to create a PersistentMessageReceiver  and start it
persistent_receiver= messaging_service.create_persistent_message_receiver_builder() \
               .build(durable_exclusive_queue)

# Start starts the configured PersistentMessageReceiver synchronously. Before this function is called, the receiver is considered off-duty
persistent_receiver.start()	
						
# Add any additional subscriptions to your receiver			
persistent_receiver.add_subscription(topics_sub)	

You can use the Python API to create durable and non-durable queues on the event broker. For more information see Creating Queues with the Python API.

Asynchronous Receivers

It is also possible to start a persistent message receiver using a callback listener to allow for asynchronous notifications after the start operation is complete.

In the PubSub+ Python API, you can use the start_async() function to start a PersistentMessageReceiver asynchronously. This allows the receiver object to start in a separate background thread, which means your code can continue to execute and you do not have wait for the function to return.

import concurrent.futures
# ...
receivers = [persistent_message_receiver_builder.build() for _ in range(10)]
futures_to_receiver = { receiver.start_async() : receiver for receiver in receivers}

for future in concurrent.futures.as_completed(futures_to_receiver):
    receiver = futures_to_receiver[future]
    try:
        # start async has no return but will raise an on start failure
        future.result()
        print(f'Receiver{id(receiver)} started successfully')
    except Exception as err:
        print(f'Receiver{id(receiver)} generated an error: {err}') 

Your receiver application is not operational until you call start() or start_async() on it.

Consuming a Persistent Message Synchronously

You can consume persistent messages synchronously. To do this, you create a PersistentMessageReceiver and bind it to a queue. After successfully binding to the queue, your receiver application can begin to consume persistent messages.

When you use the receive_message(timeout: int) function, it blocks the routine until:

  • your receiver receives the next message
  • a timeout occurs, based on the provided timeout parameter
  • or there is a service interruption, such as a receiver termination, MessagingService disconnect, or unrecoverable service interruption.

When an application processes an InboundMessage, it can then send an acknowledgment to the event broker with PersistentMessageReceiver.ack(). The event broker will then remove the InboundMessage from the queue. Until a message is acknowledged it remains on the broker queue and may be redelivered when the application reconnects to the queue.

For more information about the preceding functions, see the PubSub+ Messaging API for Python reference.

The following example shows you how to consume persistent messages synchronously:

persistent_receiver= messaging_service.create_persistent_message_receiver_builder() \
                .build(durable_exclusive_queue)
persistent_receiver.start()		

# Blocking request to receive the next message. Usually used in a loop, the example below receives 1000 messages.			
for _ in range(1000):			
    received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # Blocks for 1000 milliseconds to wait for a message to arrive.
    persistent_receiver.ack(received_message)	 					

If you do not call the receive_message() function, messages can accumulate on the API's internal buffer and you risk running into a back-pressure scenario. If this occurs, the Python API automatically informs the event broker to stop sending messages.

Consuming a Persistent Message Asynchronously

You can consume persistent messages in an asynchronous manner. To do so, you create a PersistentMessageReceiver and start the connection to the event broker as normal, but you use a MessageHandler to act as a callback function to notify your application when a message has been received.

The receive_async(message_handler: MessageHandler) function does not block your receiving application, which allows it to continue executing other code concurrently. This is useful when your receiver needs to process many messages at once or handle a stream of continuous messages.

receive_async() is not an asynchronous co-routine or generator, and is not asyncio compatible. receive_async() returns immediately, and works with native threads under the hood. This function invokes the callback on a new python thread for every message.

When an application processes an InboundMessage, it can then send an acknowledgment to the event broker with PersistentMessageReceiver.ack(). The event broker will then remove the InboundMessage from the queue. Until a message is acknowledged it remains on the broker queue and may be redelivered when the application reconnects to the queue.

For more information about the preceding functions, see the PubSub+ Messaging API for Python reference.

The following example shows you how to consume and acknowledge persistent messages asynchronously:

class MessageHandlerExample(MessageHandler):
    def __init__(self, persistent_receiver: PersistentMessageReceiver):
        self.receiver: PersistentMessageReceiver = persistent_receiver		
			
    def on_message(self, message: InboundMessage):
        # Check if the payload is a String or Byte, decode if its the later
        payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
        if isinstance(payload, bytearray):
            print(f"Received a message of type: {type(payload)}. Decoding to string")
            payload = payload.decode()
        self.receiver.ack(message)
						
# Register an asynchronous message receiver on the PersistentMessageReceiver instance.			
persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))		

Pausing and Resuming Message Consumption from Internal Buffers

When your application consumes messages asynchronously using the receive_async() function, you may call the pause() and resume()functions to control the flow of messages to your application's callback.

When you call the pause() function, any timeout values used in the synchronous receive_message(timeout: int) function are still valid. This means timeouts will likely occur if you call the pause() function.

You can use the pause() and resume() functions to control the flow of messages between the API's internal buffer and your application. This internal buffer is where messages are received from the event broker. This flow control is useful if your application must momentarily stop processing messages to handle other operations. The pause() and resume() functions do not control the flow of messages between the event broker and the internal buffer of the API. When you call the pause()function, messages continue to be sent from the event broker. The pause() and resume() functions control the message delivery only to the application. Messages received from the event broker continue to accumulate in the internal buffer.

Since the event broker continues to send messages, a back-pressure scenario may occur–that is, messages continue to accumulate until an internal high watermark is reached. At this point, the PersistentMessageReceiver notifies the event broker to stop sending messages until the number of accumulated messages falls below the internal low watermark. This internal API mechanism handles back-pressure scenarios for you and ensures that no messages are lost between the event broker and your application.

The following functions are used to pause and resume processing of messages from the API's internal buffer:

    • pause()
    • resume()

For more information about the preceding functions, see the PubSub+ Messaging API for Python reference.

The following example shows how to pause and resume processing of messages from the internal queue in the API using the scheduler:

persistent_receiver.pause() # Pauses the receiver's message delivery to asynchronous message handlers.
# Perform any action here, for example wait 60 seconds: time.sleep(60)
persistent_receiver.resume() # Resumes the receiver's message delivery to asynchronous message handlers.

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, your receiver application can subscribe to topics. Whenever your application receives a message from the broker with a matching topic, an InboundMessage instance is returned to the application. You can extract a number of properties from an InboundMessage, such as the sender ID. The following examples show how to extract properties from a message.

  • Use a MessageHandler callback when you receive a message asynchronously:

    class MessageHandlerExample(MessageHandler):
        def __init__(self, persistent_receiver: PersistentMessageReceiver):
            self.receiver: PersistentMessageReceiver = persistent_receiver		
    			
        def on_message(self, message: InboundMessage):
            topic = message.get_destination_name()
            payload_as_bytes = message.get_payload_as_bytes()
            payload_as_string = message.get_payload_as_string()
            sender_id = message.get_sender_id()
            custom_property = message.get_property("custom_prop_name")
            self.receiver.ack(message)
    						
    # Register an asynchronous message receiver on the PersistentMessageReceiver instance.			
    persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))		
  • Use the receive_message() function when you receive a message synchronously:

    # Blocking request to receive the next message. Usually used in a loop, the example below receives 1000 messages.			
    for _ in range(1000):			
        received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # Blocks for 1000 milliseconds to wait for a message to arrive.
        payload_as_bytes = received_message.get_payload_as_bytes()
        payload_as_string = received_message.get_payload_as_string()
        sender_id = received_message.get_sender_id()
        custom_property = received_message.get_property("custom_prop_name")
        persistent_receiver.ack(received_message)						

For a complete list of functions that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Python reference.

Message Acknowledgments

One of the two following application acknowledgment modes can be used for acknowledging a message:

  • Client acknowledgment (default)

  • Auto-acknowledgment

Client-Acknowledgment Mode

Client acknowledgment mode is the default behavior of the PubSub+ Python API and means the client must explicitly send an acknowledgment for the message ID of each message received. ACKs are asynchronous. Any routine can acknowledge messages at any time as long as the receiver has not been terminated. It is important to remember that after a PersistentMessageReceiver has acknowledged a message from the event broker, it deletes that message from the queue on the event broker. For this reason it's important to perform any processing and storage of the message before you acknowledge it.

The following example shows how to acknowledge a persistent message synchronously and asynchronously:

  • Use a MessageHandler callback when you receive a message asynchronously:

    class MessageHandlerExample(MessageHandler):
        def __init__(self, persistent_receiver: PersistentMessageReceiver):
            self.receiver: PersistentMessageReceiver = persistent_receiver		
    			
        def on_message(self, message: InboundMessage):
            # Process the message.
            self.receiver.ack(message)
    						
    # Register an asynchronous message receiver on the PersistentMessageReceiver instance.			
    persistent_receiver.receive_async(MessageHandlerExample(persistent_receiver))		
  • Use the receive_message() function when you receive a message synchronously:

    # Blocking request to receive the next message. Usually used in a loop, the example below receives 1000 messages.			
    for _ in range(1000):			
        received_message: 'InboundMessage' = persistent_receiver.receive_message(1000) # Blocks for 1000 milliseconds to wait for a message to arrive.
        # Process the message.
        persistent_receiver.ack(received_message)						

Auto-Acknowledgment Mode

When the auto-acknowledgment mode is used, the API automatically generates application-level acknowledgments. To configure your PersistentMessageReceiver to use auto-acknowledgments, you use the with_message_auto_acknowledgement() method:

persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder() \
                                                                        .with_message_auto_acknowledgement() \
                                                                        .build(durable_exclusive_queue) 

Negative Acknowledgments for Specific Messages

You can use negative acknowledgments (NACKs) if your PersistentMessageReceiver is not configured to automatically ACK received messages. When you use NACKs, you can send a settlement outcome to let the event broker know the result from processing a guaranteed message that was received. Based on the settlement outcome, the event broker knows how to handle the message on its queue. You can use the following settlement outcomes:

  • ACCEPTED—This ACK notifies the event broker that your client application successfully processed the guaranteed message. When the event broker receives this outcome it removes the message from its queue.
    • When you call the settle() function with an outcome of ACCEPTED, it is the same as using persistent_receiver.ack(received_message).
  • FAILED—This NACK notifies the event broker that your client application did not process the message. When the event broker receives this NACK it attempts to redeliver the message while adhering to delivery count limits. The message remains on the broker during redelivery while the broker waits for it to be acknowledged with a status other than FAILED.
  • REJECTED—This NACK notifies the event broker that your client application could not process the message. When the event broker receives this NACK it removes the message from its queue and then moves the message to the Dead Message Queue (DMQ) if it is configured.

Before you can use NACKs, you must use the with_required_message_outcome_support() function to add the Outcome.FAILED, Outcome.REJECTED, or both outcomes as NACK types when you create your PersistentMessageReceiver to prepare it to work with negative acknowledgments. You do not need to add the Outcome.ACCEPTED outcome because it is always available. If you try to call settle() on an outcome that has not been added, you get an error of Required Settlement Outcome Not Supported. The following code shows how to configure a PersistentMessageReceiver to use NACKs:

from solace.messaging.config.message_acknowledgement_configuration import Outcome
# ...
nacking_receiver = messaging_service.create_persistent_message_receiver_builder() \
          .with_required_message_outcome_support(Outcome.FAILED, Outcome.REJECTED)
          .build(Queue.durable_exclusive_queue("queue_name"))

Alternatively, you can use a property map instead of a setter:

from solace.messaging.config.solace_properties.receiver_properties import PERSISTENT_REQUIRED_MESSAGE_OUTCOME_SUPPORT
from solace.messaging.config.solace_constants.receiver_constants import PERSISTENT_RECEIVER_OUTCOME_FAILED, PERSISTENT_RECEIVER_OUTCOME_REJECTED
# ...
nacking_receiver = messaging_service.create_persistent_message_receiver_builder() \
          .fromPropertries({PERSISTENT_REQUIRED_MESSAGE_OUTCOME_SUPPORT:f"{PERSISTENT_RECEIVER_OUTCOME_FAILED},{PERSISTENT_RECEIVER_OUTCOME_REJECTED}"})
          .build(Queue.durable_exclusive_queue("queue_name"))
  • NACKs can be lost during transit (for example due to unexpected networking issues). Consider this fact as part of the logic for handling messages when you develop your application.
  • NACKs are supported on event brokers 10.2.1 and later. If an event broker does not support NACKs, an InvalidOperationException occurs when you call start() on a PersistentMessageReceiver instance configured to use message outcomes.

The following examples shows how to settle a message, with placeholder, user-defined functions to determine if the message outcome should be ACCEPTED, REJECTED, or FAILED. The decision regarding which NACK outcome to use when you receive a message should be made based on your application's requirements.

  • Receive a message synchronously:
    nacking_receiver.start()
    nacking_receiver.add_subscription(TopicSubscription.of(topic_name))
    msg: InboundMessage = receiver.receive_message()
    if isMsgOK(msg):   
        # Message is good, process it here and then ACK it
        self.receiver.settle(msg, Outcome.ACCEPTED)
    elif isMsgPossiblySalvageableLater(msg):
        # For example, your application cannot currently process the message, redeliver later
        self.receiver.settle(msg, Outcome.FAILED)
    else:
        # For example, there is a problem with the message content, remove from endpoint
        self.receiver.settle(msg, Outcome.REJECTED)      
  • Receive a message asynchronously:
    class MessageHandlerExample(MessageHandler):
        def __init__(self, nacking_receiver: PersistentMessageReceiver):
            super().__init__()
            self.receiver: PersistentMessageReceiver = nacking_receiver
            
        def on_message(self, msg: InboundMessage):
            try:
                # Process the message.       
                if isMsgOK(msg):   
                    # Message is good, process it here and then ACK it
                    self.receiver.settle(msg, Outcome.ACCEPTED)
                elif isMsgPossiblySalvageableLater(msg):
                    # For example, your application cannot currently process the message
                    self.receiver.settle(msg, Outcome.FAILED)
                else:      
                    # For example, there is a problem with the message content
                    self.receiver.settle(msg, Outcome.REJECTED)
            except Exception as e:
                # Log or handle exceptions appropriately
                print(f"Error processing message: {e}")
                # Consider how you want to settle the message if an error occurs
                self.receiver.settle(msg, Outcome.FAILED)
    
    nacking_receiver.start()
    nacking_receiver.add_subscription(TopicSubscription.of(topic_name))								
    nacking_receiver.receive_async(MessageHandlerExample(nacking_receiver))