Consuming Direct Messages Using the Python API

Direct messaging is useful in scenarios where high-throughput and low-latency is required. It is possible with the use of direct messages that some message loss may occur due to external factors, such as network congestion or occasional client disconnections. Direct messages are suitable for applications that need the latest information but not necessarily every single message. Examples of these applications may be weather applications, price checkers, GPS tracking, and so on.

No additional event broker configuration is required for direct messaging. If your application cannot tolerate message loss, we recommend that you use persistent messaging.

To consume direct messages using the PubSub+ Messaging API for Python, use the following steps:

  1. Create a DirectMessageReceiver.
  2. Handling Back-Pressure When Subscribing to Direct Messages.
  3. Receive a Direct Message Synchronously.
  4. Receive a Direct Message Asynchronously.
  5. Extract Properties from an Inbound Message.
  6. Using PubSub+ Cache With the Python API.

In some use cases, the API receives messages from the event broker faster than your application can process them. Messages can fill in the API's internal buffers causing back-pressure. If this scenario is possible, you may want to consider changing the default back-pressure settings to meet your requirements. For more information, see Handling Back-Pressure When Subscribing to Direct Messages.

For examples of applications that consume direct messages, see direct_subscriber.py on the Solace GitHub page.

Creating a DirectMessageReceiver

After a MessagingService instance has established a connection to an event broker, use a DirectMessageReceiver to consume direct messages from the event broker. To create a DirectMessageReceiver object, do the following:

  1. Call the create_direct_message_receiver_builder() function on a MessagingService object. This returns a DirectMessageReceiverBuilder object.

  2. You can now use the functions in the DirectMessageReceiverBuilder interface to configure a DirectMessageReceiver to use certain features of the API, such as topic subscriptions and back-pressure strategies.

  3. Call the build() function on your DirectMessageReceiverBuilder to return a DirectMessageReceiver object.

  4. To enable your DirectMessageReceiver to start receiving messages, call the start() function on it.

For more information, see the PubSub+ Messaging API for Python reference.

The following example shows how to add a list of topic subscriptions to a DirectMessageReceiver and connect to the event broker:

# Define Topic subscriptions 
topics_sub = [TopicSubscription.of("solace/sample/1")]

# Create a DirectMessageReceiver Builder which allows you to create a DirectMessageReceiver  and start it
direct_receiver= messaging_service.create_direct_message_receiver_builder() \
               .with_subscriptions(topics_sub)\
               .build()

# Start starts the configured DirectMessageReceiver synchronously. Before this function is called, the receiver is considered off-duty
direct_receiver.start()		

Asynchronous Receivers

Alternatively, it's possible to start a direct message receiver using a callback listener to allow for asynchronous notifications when the start operation is complete.

In the PubSub+ Python API, you can use the start_async() function to start a DirectMessageReceiver asynchronously. This allows the receiver object to start in a separate background thread, which means your code can continue to execute and you do not have wait for the function to return.

import concurrent.futures
# ...
receivers = [direct_message_receiver_builder.build() for _ in range(10)]
futures_to_receiver = { receiver.start_async() : receiver for receiver in receivers}

for future in concurrent.futures.as_completed(futures_to_receiver):
    receiver = futures_to_receiver[future]
    try:
        # start async has no return but will raise an on start failure
        future.result()
        print(f'Receiver{id(receiver)} started successfully')
    except Exception as err:
        print(f'Receiver{id(receiver)} generated an error: {err}') 

Your receiver application is not operational until you call start() or start_async() on it.

Handling Back-Pressure When Subscribing to Direct Messages

When subscribing to direct messages, the API uses an internal buffer to store the messages that were received from the event broker. In ideal conditions, as soon as a message is received, the application processes the message. Depending on the processing speed and other factors, it's possible to receive messages faster than the subscribing application can process them (for example a high-volume burst of messages). If the messages are not processed and allowed to accumulate, the internal buffer can reach its capacity, which is referred to as back-pressure.

In the PubSub+ Python API, the DirectMessageReceiver has the following mechanisms to handle back-pressure:

Configuring an Unlimited Internal Buffer

The default configuration for this API is to use an unlimited sized internal buffer for messages. This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived. Use of this configuration is also useful because you don't have to write code to handle back-pressure scenarios.

When you use an unlimited buffer, the Python API continuously puts messages it receives from the event broker onto its internal buffer. The following example shows an explicit call to the on_back_pressure_elastic() function, which is not required because it is the default behavior:

# Create a DirectMessageReceiverBuilder which allows you to create a DirectMessageReceiver and start it
direct_receiver= messaging_service.create_direct_message_receiver_builder() \
                .with_subscriptions(topics_sub)\
                .on_back_pressure_elastic() \
                .build()

# Start starts the configured DirectMessageReceiver synchronously. Before this function is called, the receiver is considered off-duty
direct_receiver.start()		

Dropping the Latest Message

You can configure the API to drop the latest message when a specified capacity is reached in the API's internal buffer. When this capacity is reached, the most recent messages are not placed on the internal buffer because it is full and are instead dropped (lost).

To configure a different buffer size, call the on_back_pressure_drop_latest(buffer_capacity: int) function on the DirectMessageReceiverBuilder and then set the maximum number of messages that can accumulate (buffer_capacity) before messages are dropped.

The following example shows how to configure the application to drop messages if there are a thousand messages queued in the API's internal buffer:

# Create a DirectMessageReceiverBuilder which allows you to create a DirectMessageReceiver and start it
direct_receiver= messaging_service.create_direct_message_receiver_builder() \
                .with_subscriptions(topics_sub)\
                .on_back_pressure_drop_latest(1000) \
                .build()

# Start starts the configured DirectMessageReceiver synchronously. Before this function is called, the receiver is considered off-duty
direct_receiver.start()		

Dropping the Oldest Message

You can configure the API to drop the oldest message when a specified capacity is reached in the API's internal buffer. When this capacity is reached, the oldest item is removed from the internal queue to make room to receive the newer message. When the specified capacity is reached, the oldest items in the receiver's buffer are dropped to allow for more recent messages to be queued.

To configure this mechanism, call the on_back_pressure_drop_oldest(buffer_capacity: int) function on the DirectMessageReceiverBuilder and then set the maximum number of messages that can accumulate (buffer_capacity) before the oldest messages are removed from the internal buffer.

The following example shows how to configure the application to drop the oldest message in the API's internal buffer if there are a thousand messages queued:

# Create a DirectMessageReceiverBuilder which allows you to create a DirectMessageReceiver and start it
direct_receiver= messaging_service.create_direct_message_receiver_builder() \
                .with_subscriptions(topics_sub)\
                .on_back_pressure_drop_oldest(1000) \
                .build()

# Start starts the configured DirectMessageReceiver synchronously. Before this function is called, the receiver is considered off-duty
direct_receiver.start()		

Receiving a Direct Message Synchronously

After you have established a connection to the event broker using a MessagingService instance, you can use a DirectMessageReceiver to subscribe to messages. The DirectMessageReceiver must be subscribed to at least one topic for it to begin receiving messages.

The following example shows how an InboundMessage is received by the DirectMessageReceiver. Use the receive_message(timeout: int) function, which blocks the routine until:

  • Your receiver receives the next message.
  • A timeout occurs, based on the provided timeout parameter.
  • There is a service interruption, such as a receiver termination, MessagingService disconnect, or unrecoverable service interruption.
direct_receiver= messaging_service.create_direct_message_receiver_builder() \
                .with_subscriptions(topics_sub)\
                .build()
direct_receiver.start()		

# Blocking request to receive the next message. Usually used in a loop, the example below receives 1000 messages.			
for _ in range(1000):			
    inbound_message: 'InboundMessage' = direct_receiver.receive_message(1000) # Blocks for 1000 milliseconds to wait for a message to arrive.	 					

Receiving a Direct Message Asynchronously

After you have established a connection to the event broker using a MessagingService instance, you can consume direct messages and handle them asynchronously using a DirectMessageReceiver. To handle direct messages asynchronously, you use a MessageHandler to act as a callback function to let the application know when a message has been received. The abstract MessageHandler class contains the on_message() function you use to process instances of InboundMessage.

The following example shows an example implementation of a MessageHandler and the on_message() function you use to receive messages asynchronously:

class MessageHandlerExample(MessageHandler):
    def on_message(self, message: InboundMessage):
        # Check if the payload is a String or Byte, decode if its the later
        payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
        if isinstance(payload, bytearray):
            print(f"Received a message of type: {type(payload)}. Decoding to string")
            payload = payload.decode()

# Register an asynchronous message receiver on the DirectMessageReceiver instance.			
direct_receiver.receive_async(MessageHandlerExample())		

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, your receiver application can subscribe to topics. Whenever your application receives a message from the broker with a matching topic, an InboundMessage instance is returned to the application. You can extract a number of properties from an InboundMessage, such as the sender ID. The following examples show how to extract properties from a message.

  • Use a MessageHandler callback when you receive a message asynchronously:

    class MessageHandlerExample(MessageHandler):
        def __init__(self, direct_receiver: DirectMessageReceiver):
            self.receiver: DirectMessageReceiver = direct_receiver		
    			
        def on_message(self, message: InboundMessage):
            topic = message.get_destination_name()
            payload_as_bytes = message.get_payload_as_bytes()
            payload_as_string = message.get_payload_as_string()
            sender_id = message.get_sender_id()
            custom_property = message.get_property("custom_prop_name")
    						
    # Register an asynchronous message receiver on the DirectMessageReceiver instance.			
    direct_receiver.receive_async(MessageHandlerExample())		
  • Use the receive_message() function when you receive a message synchronously:

    # Blocking request to receive the next message. Usually used in a loop, the example below receives 1000 messages.			
    for _ in range(1000):			
        inbound_message: 'InboundMessage' = direct_receiver.receive_message(1000) # Blocks for 1000 milliseconds to wait for a message to arrive.
        payload_as_bytes = inbound_message.get_payload_as_bytes()
        payload_as_string = inbound_message.get_payload_as_string()
        sender_id = inbound_message.get_sender_id()
        custom_property = inbound_message.get_property("custom_prop_name")					

For a complete list of functions that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Python reference.