Publishing Direct Messages Using the Python API

Direct messages are useful when high-throughput and low-latency is required. We recommend that you publish events using direct messages if some message loss can be tolerated without negatively impacting client applications. Message loss can occur due to external factors, such as network congestion or client disconnection. If your applications require guaranteed delivery and message acknowledgment, then we recommend using persistent messages instead.

To publish direct messages using the PubSub+ Messaging API for Python, use the following steps:

  1. Create a DirectMessagePublisher.
  2. Configure and Create an OutboundMessage.
  3. Handling Back-Pressure When Publishing Direct Messages.
  4. Publish a Direct Message.
  5. Handle Errors.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. Messages can fill the API's internal buffers causing back-pressure. If this scenario is possible, we recommend that you consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Direct Messages.

For examples of applications that publish direct messages, see direct_publisher.py on the Solace GitHub page.

Creating a DirectMessagePublisher

After a MessagingService instance has established a connection to an event broker, use a DirectMessagePublisher to publish direct messages. To create a DirectMessagePublisher object, do the following:

  1. Call the create_direct_message_publisher_builder() function on a MessagingService object. This returns a DirectMessagePublisherBuilder object.

  2. You can now use the functions in the DirectMessagePublisherBuilder interface to configure a DirectMessagePublisher to use certain features of the API, such as back-pressure strategies.

  3. Call the build() function on your DirectMessagePublisherBuilder to return a DirectMessagePublisher object.

  4. To enable your DirectMessagePublisher to start publishing messages, call the start() function on it.

For more information, see the PubSub+ Messaging API for Python reference.

The following is an example that shows how to use a direct message publisher to enable your application to publish messages to the event broker:

# Create a DirectMessagePublisherBuilder which allows you to create a DirectMessagePublisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
               .on_back_pressure_reject(1000) \
               .build()

# Starts the configured DirectMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
direct_publisher.start()		

Asynchronous Publishers

It is also possible to start a direct message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

In the PubSub+ Python API, you can use the start_async() function to start a DirectMessagePublisher asynchronously. This allows the publisher object to start in a separate background thread, which means you can start multiple publishers concurrently or your code can continue to execute and you do not have wait for the function to return.

The example below shows 1000 publishers being started asynchronously. The as_completed() function lets your application use publishers when they become available, instead of waiting for all publishers to become operational.

import concurrent.futures
# ...
publishers = [direct_message_publisher_builder.build() for _ in range(10)]
futures_to_publisher = { publisher.start_async() : publisher for publisher in publishers}

for future in concurrent.futures.as_completed(futures_to_publisher):
    publisher = futures_to_publisher[future]
    try:
        # start async has no return but will raise an on start failure
        future.result()
        print(f'Publisher {id(publisher)} started successfully')
    except Exception as err:
        print(f'Publisher {id(publisher)} generated an error: {err}') 

Your publisher application is not operational until you call start() or start_async() on it.

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Python API, when you publish messages you use OutboundMessage instances. To configure and create OutboundMessage instances, follow these steps:

  1. Call message_builder() on a messaging service object to return an OutboundMessageBuilder instance. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage instances.

    outbound_msg_builder = messaging_service.message_builder()
  2. Configure your message with an OutboundMessageBuilder and then call the build() function to return a message instance. You can configure message properties using either method below.
    • Use the OutboundMessageBuilder interface and the with_property(propertyName,propertyValue) functions. Both Solace defined message_properties keys as well as arbitrary user-defined property keys are accepted. The following example shows how to set Solace defined properties for message ID, sender ID and message type, and a custom key-value property on a message:
      message_builder = messaging_service.message_builder()   \
                     .with_application_message_id("myID") \
                     .with_sender_id("senderID") \
                     .with_application_message_type("messageType") \
                     .with_property("key","value") \
                     .build(message_body)
    • Use a message properties dictionary with the message_properties.* keys. The following example shows how to create a message property dictionary with values for message ID, sender ID and message type constants, and then configure a message using the from_properties() function:
      from solace.messaging.config.solace_properties import message_properties
      # ...
      message_props = {
          message_properties.APPLICATION_MESSAGE_ID: "myID",
          message_properties.SENDER_ID: "senderID",
          message_properties.APPLICATION_MESSAGE_TYPE: "messageType"
          # For a complete list of message_properties constants see the PubSub+ Messaging API for Python reference.
          }							
      # ...												
      outbound_msg = outbound_msg_builder \
                     .from_properties(message_props) \
                     .build(message_body)

The following code example shows how to create a message builder, set message properties and create a message:

# Builder for creation of similarly configured messages. 
message_builder = messaging_service.message_builder()   \
               .with_application_message_id(message_id) \
               .from_properties(message_props) \
               .with_property("key","value") \
               .build(message_body)

For more information about the functions, see the PubSub+ Messaging API for Python reference.

Handling Back-Pressure When Publishing Direct Messages

When you use direct messaging, the messages are sent to a PubSub+ event broker using a topic. The event broker relays the messages to any consumers subscribed to that topic.

At the time the client application publishes a direct message, the API queues the message in an internal buffer before it is sent to event broker. In ideal conditions, as soon the application publishes a message, the API sends that message on the network, and that message is eventually received by event broker. It is possible for a client application to publish messages faster than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In the PubSub+ Python API, the DirectMessagePublisher has the following mechanisms to handle back-pressure:

Configuring an Unlimited Internal Buffer

The default configuration for the API is to use an unlimited sized internal buffer for messages. When you use an unlimited buffer, the Python API continuously puts messages published by the client application on the internal buffer. This configuration is useful for situations where your publishers need to send a lot of data quickly, even when the network cannot keep up without blocking. This configuration, called on_back_pressure_elastic(), is also useful because you don't have to write code to tell your application how to handle scenarios where the internal buffer reaches capacity.

We recommend this configuration when dealing with data sizes that do not overly strain your system resources, whether they are small or large. It's important to note that data size plays an important role in determining transmission bandwidth, affecting how quickly your application can write data to the socket. A large number of small messages, or a small number of large messages can both strain your application's resources. If your application lacks stability when using elastic back-pressure, we recommend you use alternative back-pressure strategies like 'wait' or 'reject' to manage flow control for your application.

This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived, and can provide publishing redundancy for the unlikely event of an internal queue encountering an out-of-memory scenario.

The following example shows an explicit call to the on_back_pressure_elastic() function, which is not required because it is the default behavior:

# Create a DirectMessagePublisherBuilder which allows you to create a DirectMessagePublisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
               .on_back_pressure_elastic() \
               .build()

# Starts the configured DirectMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
direct_publisher.start()		

Rejecting Messages When A Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use the on_back_pressure_reject(buffer_capacity: int) function to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and the API returns errors until the buffer has capacity again. If buffer_capacity is set to zero, the Python API does not buffer any outbound messages and rejects immediately when unable to write a message to the native library.

# Create a DirectMessagePublisherBuilder which allows you to create a DirectMessagePublisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
               .on_back_pressure_reject(1000) \
               .build()

# Starts the configured DirectMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
direct_publisher.start()		

Using a Publisher Readiness Listener

We recommend that you use a PublisherReadinessListener when you use the on_back_pressure_reject() function because it lets your application know when there is capacity available in the buffer and it can resume publishing messages.

The following is an example of registering an event handler PublisherReadinessListener instance:

class CanPublishListener(PublisherReadinessListener):
    def ready(self):
        # This method is executed when the publisher is ready to publish again
        # You can use this callback to handle message publishing when backpressure is relieved
        pass
# ...
# ...
direct_publisher.set_publisher_readiness_listener(CanPublishListener())    
			
# Initiator for the callback
some_condition_is_true = True
while some_condition_is_true:
    # prepare/process some data prior to publishing a message...
    try:
        message_publisher.publish("can't send, buffer over capacity", my_topic)
    except PublisherOverflowError as e:
        # Handle the overflow exception (backpressure)
        # You can do some work here or signal to slow down message publishing
        pass			  	

Throttling the Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful when applications can block and wait for capacity to appear in the buffer. You can use the on_back_pressure_wait(buffer_capacity: int) function to set the maximum number of messages that can accumulate in the buffer. When this maximum capacity (buffer_capacity) is reached, the publisher routine pauses and waits for available capacity in the internal buffer before letting the application publish any more messages. The back pressure strategy of on_back_pressure_wait() will not call a PublisherReadinessListener.

This function should be used when you don't want your application to throw exceptions after the buffer's capacity has been reached. Using this mechanism effectively gives time for the API to empty the internal buffer.

The following shows an example of how to configure an internal buffer to accommodate up to one thousand messages:

# Create a DirectMessagePublisherBuilder which allows you to create a DirectMessagePublisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder() \
               .on_back_pressure_wait(1000) \
               .build()

# Starts the configured DirectMessagePublisher synchronously. Before the start() function is called, the publisher is considered off-duty
direct_publisher.start()		

Publishing a Direct Message

After you have created the DirectMessagePublisher, you can start sending messages. When you send a message, there are two primary components:

The topic must be an object of the Topic class that follows the Solace hierarchical format, for example: solace/messaging/direct/pub. The publish function currently supports simple string messages, byte arrays, as well as OutboundMessage instances, which can be obtained through an OutboundMessageBuilder via messaging_service.MessageBuilder(). The following function publishes a direct message:

  • publish(message: bytearray | str | OutboundMessage, destination: Topic, additional_message_properties: Dict[str, str | int | float | bool | dict | list | tuple | bytearray | None] | None = None)
    • message—The outbound message that can be an OutboundMessage object, bytearray, or str. If the payload is a bytearray or str, the API creates an OutboundMessage object to send.
    • destination—The Topic to publish to.
    • additional_properties—Additional key-value properties to customize a message. Each key can be customer provided, or it can be a key from of type solace.messaging.config.solace_properties.message_properties.

See the PubSub+ Messaging API for Python reference for information about the publish functions that are available when you use a DirectMessagePublisher.

Here's an example that shows how to publish a direct message:

topic = Topic.of("topic/to/publish/to")
outbound_message = outbound_msg_builder \
                .from_properties(message_props) \
                .build("my message body")
direct_publisher.publish(outbound_message, topic)

Handling Errors

The Python API provides the set_publish_failure_listener() function that notifies the client if the API is unable to publish messages. A failed publish event could be due to an issue such as an invalid topic or a termination of the service. See the example below:

# An example implementation of a PublishFailureListener
class PublishFailureListenerImpl(PublishFailureListener):
    def on_failed_publish(self, failed_publish_event: 'FailedPublishEvent'):
        print(f"fail_destination name:{failed_publish_event.get_destination()}\n"
              f"fail_message:{failed_publish_event.get_message()}\n"
              f"fail_timestamp:{failed_publish_event.get_timestamp()}\n"
              f"fail_exception:{failed_publish_event.get_exception()}\n")

direct_publisher.set_publish_failure_listener(PublishFailureListenerImpl())