Request-Reply Messaging in the Python API

Request-reply messaging is a method of data transmission where applications use separate point-to-point channels: one for requests, and another for replies. In request-reply messaging, each request sent from a message requestor requires a reply from a message replier. When a message replier consumes a request message, it sends a reply back to the requestor. This messaging pattern is useful when each message sent between components in your applications requires a reply, for example when performing authentication or financial transactions.

The PubSub+ Messaging APIs publish request messages with a unique, automatically generated ReplyTo destination topic in the message header field. This ReplyTo topic serves as the return address that the reply should be sent to. Because the ReplyTo topic destination is handled by the PubSub+ Messaging APIs, it allows users to perform request-reply operations without worrying about registering appropriate topic subscriptions to receive replies.

Request-reply messaging can only be used with direct messages in the PubSub+ Python API.

To use the request-reply messaging pattern with the Python API, follow these steps:

  1. Create a RequestReplyMessagePublisher
  2. Sending a Request
  3. Create a RequestReplyMessageReceiver
  4. Receiving Requests and Sending Replies

Create a RequestReplyMessagePublisher

To send message requests, create a MessagingService object (see Messaging Service for instructions). After you create a MessagingService object and connect it to the event broker, use the request_reply() function to build a RequestReplyMessagePublisher object:

direct_requestor: RequestReplyMessagePublisher = messaging_service.request_reply() \
                                                  .create_request_reply_message_publisher_builder() \
                                                  .build()
direct_requestor.start()

Next create an OutboundMessage instance. This is the request that your publisher sends to the receiver instance. For information on creating an OutboundMessage object see Configuring and Creating Outbound Messages.

Sending a Request

When you send a request, it can be either blocking or non-blocking. A blocking request blocks your application until a reply is received. A non-blocking request allows your application to send multiple requests before any replies are received.

Sending a Blocking Request

The PubSub+ Python API provides synchronous request-reply messaging, which blocks each request until a reply is received. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To send a blocking request, use your RequestReplyMessagePublisher to call the publish_await_response() function. The publish_await_response() function takes the following parameters:

  • request_message—the OutboundMessage request to send
  • request_destination—the Topic destination for request messages
  • reply_timeout—an Int value representing the maximum time to wait for a response message (in milliseconds)
  • additional_message_properties—(Optional) A Dict that contains additional message properties (see Configuring and Creating Outbound Messages). Omit this parameter if you do not have additional message properties to set.

For more information, see the PubSub+ Messaging API for Python reference.

The following example shows how to send a blocking message request and assign the reply to an InboundMessage object:

inbound_message_response = direct_requestor.publish_await_response(request_message=outbound_msg, \
                                                                            request_destination=Topic.of('my/sample/topic'), \
                                                                            reply_timeout=3000)

For a complete example, see direct_requestor_blocking.py on the Solace Developer Hub.

Sending a Non-Blocking Request

The PubSub+ Python API provides non-blocking request-reply messaging, which allows your application to send multiple requests before a reply is received. This is useful for asynchronous communication where the order of events is not important. To send a non-blocking request, use your RequestReplyMessagePublisher to call the publish() function. The publish() function takes the following parameters:

  • request_message—the OutboundMessage request to send.
  • request_destination—the Topic destination for request messages.
  • additional_message_properties—(Optional) A Dict that contains additional message properties (see Configuring and Creating Outbound Messages). Omit this parameter if you do not have additional message properties to set.
  • reply_timeout—an Int value representing the maximum time to wait for a response message (in milliseconds)

For more information, see the PubSub+ Messaging API for Python reference.

The following code snippet shows how to send a non-blocking message request and assign the reply to the result of the asynchronous publish_async() function:

publish_async = direct_requestor.publish(request_message=outbound_msg, \
                                         request_destination=Topic.of('my/sample/topic'), \
                                         reply_timeout=3000)
            
if publish_async.exception() is not None:
    logger.warn(f"Got exception from request reply publish: {publish_async.exception()}")
            
response = publish_async.result()

For a complete example, see direct_requestor.py on the Solace Developer Hub.

Create a RequestReplyMessageReceiver

To send message replies, create a MessagingService object (see Messaging Service for instructions). After you create a MessagingService object and connect it to the event broker, use the request_reply() function to build a RequestReplyMessageReceiver object:

direct_replier: RequestReplyMessageReceiver = messaging_service.request_reply() \
                                               .create_request_reply_message_receiver_builder() \
                                               .build(TopicSubscription.of('my/sample/topic'))
direct_replier.start()

Next create an OutboundMessage instance. This is the reply that your receiver sends to the requestor instance. For information on creating an OutboundMessage object see Configuring and Creating Outbound Messages.

Receiving Requests and Sending Replies

Your RequestReplyMessageReceiver can receive a request synchronously or asynchronously as an InboundMessage object.

Receiving a Request Synchronously and Sending a Reply

The PubSub+ Python API provides synchronous request-reply messaging, which blocks your application until the receive_message() function returns. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To receive a synchronous request, use your RequestReplyMessageReceiver object to call the receive_message() function. The receive_message() function returns the received message and a replier object. The replier object allows your RequestReplyMessageReceiver to send a reply back to the requestor. The receive_message() function takes the following parameter:

  • timeout—(Optional) an Int value representing the time to wait before exiting the blocking function (in milliseconds). Value should be greater than 0. Omit this property if you do not want to set a timeout value.

For more information, see the PubSub+ Messaging API for Python reference.

The code below shows how to receive a blocking message request, assign the reply to an OutboundMessage object, and send a reply with the reply() function:

msg, replier = direct_replier.receive_message(5000)
            
if replier is not None:
    outbound_msg = service.message_builder().build("my reply")
    replier.reply(outbound_msg)

For a complete example, see the receive_request_and_send_response_message() function in how_to_use_request_reply_pattern.py on the Solace Developer Hub.

Receiving a Request Asynchronously and Sending a Reply

The PubSub+ Python API provides asynchronous request-reply messaging, which allows your application to receive multiple message requests asynchronously with the receive_async() function. This is useful for point to point communication where the order of events is not important. To receive asynchronous requests, use a RequestReplyMessageReceiver object to call the receive_async() function. The receive_async() function takes the following parameter:

  • message_handler—an instance of RequestMessageHandler, a callback handler to process incoming request messages and the replier objects. This callback allows the receive_async() function to receive both an inboundMessage (the request) and an instance of RequestReplyMessageReceiver.Replier. The replier object allows your RequestReplyMessageReceiver to send a reply back to the requestor.

For more information, see the PubSub+ Messaging API for Python reference.

The following shows an example implementation of a RequestMessageHandler, which receives a non-blocking message request, assigns the reply to an OutboundMessage object and sends it with the reply() function:

class RequestMessageHandlerImpl(RequestMessageHandler):
    def __init__(self, message_builder):
        self.message_builder = message_builder

    def on_message(self, request: InboundMessage, replier: Replier):

        # Check if the payload is a String or Byte, decode if its the later
        payload = request.get_payload_as_string() if request.get_payload_as_string() is not None else request.get_payload_as_bytes()
        if isinstance(payload, bytearray):
            print(f"Received a message of type: {type(payload)}. Decoding to string")
            payload = payload.decode()
        # Handle the message payload, ie. save it, print it etc..
        # Prepare response payload
        response = "This is my reply message"

        if replier is not None:
            outbound_msg = self.message_builder \
            .build(response)
            replier.reply(outbound_msg)
        else:
            print(f'Invalid request, reply_to not set')    
# ...
# Prepare outbound message builder
outbound_msg_builder = messaging_service.message_builder()            
direct_replier.receive_async(RequestMessageHandlerImpl(outbound_msg_builder))

For a complete example, see direct_replier.py on the Solace Developer Hub.