Request-Reply Messaging in the Java API

Request-reply messaging is method of data transmission where applications simulate separate point-to-point channels: one for requests, and another for replies. In request-reply messaging, each request sent from a message requestor requires a reply from a message replier. When a message replier consumes a request message, it sends a reply back to the requestor. This messaging pattern is useful when each message sent between components in your applications requires a reply, for example when performing authentication or financial transactions.

The PubSub+ Messaging APIs publish request messages with a unique, automatically generated ReplyTo destination topic in the message header field. This ReplyTo topic serves as the return address that the reply should be sent to. Because the ReplyTo topic destination is handled by the PubSub+ Messaging APIs, it allows users to perform request-reply operations without worrying about registering appropriate topic subscriptions to receive replies.

To use the request-reply messaging pattern with the PubSub+ Java API, follow these steps:

  1. Create a RequestReplyMessagePublisher
  2. Sending a Request
  3. Create a RequestReplyMessageReceiver
  4. Receiving Requests and Sending Replies

Create a RequestReplyMessagePublisher

To send message requests, create a MessagingService object (see Messaging Service for instructions). After you create a MessagingService object and connect it to the event broker, use the RequestReply() method to build a RequestReplyMessagePublisher object:

final RequestReplyMessagePublisher requestReplyMessagePublisher =
      messagingService.requestReply()
                      .createRequestReplyMessagePublisherBuilder()
                      .build()
                      .start();

Next create an OutboundMessage instance. This is the request that your publisher sends to the receiver instance. For information on creating an OutboundMessage object see Configuring and Creating Outbound Messages.

Sending a Request

When you send a request, it can be either synchronous or asynchronous. A synchronous request blocks your application until a reply is received. An asynchronous request allows your application to send multiple requests before any replies are received.

Sending a Synchronous Request

The PubSub+ Java API provides synchronous request-reply messaging, which blocks each request until a reply is received. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To send a synchronous request, use your RequestReplyMessagePublisher to call the publishAwaitResponse() method. The publishAwaitResponse() method takes the following parameters:

  • requestMessage—the OutboundMessage request to send
  • additionalMessageProperties—(Optional) A Property object that contains additional message properties (see Configuring and Creating Outbound Messages). Omit this parameter if you do not have additional message properties to set.
  • requestDestination—the Topic destination for request messages
  • replyTimeout—a Long value representing the maximum time to wait for a response message (in milliseconds)

For more information, see the PubSub+ Messaging API for Java reference.

The following example shows how to send a request message synchronously and assign the reply to an InboundMessage object:

final InboundMessage inboundMessage = requestReplyMessagePublisher.publishAwaitResponse(outboundMessage, Topic.of("my/sample/topic"), 1000);

For a complete example, see DirectRequestorBlocking.java on the Solace Developer Hub.

Sending an Asynchronous Request

The PubSub+ Java API provides asynchronous request-reply messaging, which allows your application to send multiple requests before a reply is received. This is useful for asynchronous communication where the order of events is not important. To send an asynchronous request, use your RequestReplyMessagePublisher to call the publish() method. The publish() method takes the following parameters:

  • requestMessage—the OutboundMessage request to send
  • additionalMessageProperties—(Optional) A Property object that contains additional message properties (see Configuring and Creating Outbound Messages). Omit this parameter if you do not have additional message properties to set.
  • replyMessageHandler—an instance of RequestReplyMessagePublisher.ReplyMessageHandler, a callback handler to process the reply message when it arrives or when a timeout occurs.
  • userContext—(Optional) An Object of userContext to be made available during response message processing (see User Contexts). Omit this property if you do not have context to set.
  • requestDestination—the Topic destination for request messages
  • replyTimeout—a Long value representing the maximum time to wait for a response message (in milliseconds)

For more information, see the PubSub+ Messaging API for Java reference.

The following shows an example implementation of a ReplyMessageHandler, which sends a request message asynchronously and assigns the reply to an InboundMessage object:

requestReplyMessagePublisher.publish(outboundMessage,
        (inboundMessage, userContext, pubSubPlusClientException) -> {
            if (pubSubPlusClientException == null) { // Message received
                System.out.println("The reply inboundMessage payload being logged is : " + inboundMessage.getPayloadAsString());
            } else { // Error occurred 
                if (userContext != null) {
                    System.out.println(String.format("Error for Message %s - %s", userContext, pubSubPlusClientException));
                }
                if (pubSubPlusClientException instanceof PubSubPlusClientException.TimeoutException) {
                    // requestor application did not receive a reply for the published message within specified timeout
                    // good location for implementing resiliency or retry mechanisms
                    System.out.printf("Publishing action timed out without any reply. Error : : %s%n", pubSubPlusClientException);
                    System.out.println("Publish timed-out for message with payload :" + outboundMessage.getPayloadAsString());
                } else {
                    throw new RuntimeException(pubSubPlusClientException);
                }
            }
        }
        , Topic.of("my/sample/topic"), 1000);

For a complete example, see DirectRequestorNonBlocking.java on the Solace Developer Hub.

Create a RequestReplyMessageReceiver

To send message replies, create a MessagingService object (see Messaging Service for instructions). After you create a MessagingService object and connect it to the event broker, use the RequestReply() method to build a RequestReplyMessageReceiver object:

final RequestReplyMessageReceiver requestReplyMessageReceiver =
      messagingService.requestReply()
                      .createRequestReplyMessageReceiverBuilder()
                      .build(topicSubscription)
                      .start();

Next create an OutboundMessage instance. This is the reply that your receiver sends to the requestor instance. For information on creating an OutboundMessage object see Configuring and Creating Outbound Messages.

Receiving Requests and Sending Replies

Your RequestReplyMessageReceiver can receive a request synchronously or asynchronously as an InboundMessage object.

Receiving a Request Synchronously and Sending a Reply

The PubSub+ Java API provides synchronous request-reply messaging, which blocks your application until the receiveMessage() method returns. This is useful for synchronous, point to point communication where the order of events is important, for example when processing financial transactions. To receive a synchronous request, use your RequestReplyMessageReceiver object to call the receiveMessage() method. The receiveMessage() method takes the following parameters:

  • requestMessageHandler—an instance of RequestReplyMessageReceiver.RequestMessageHandler, a callback handler to process incoming request messages and the replier objects. This callback allows the receiveMessage() method to receive both an inboundMessage (the request) and an instance of RequestReplyMessageReceiver.Replier. The replier object allows your RequestReplyMessageReceiver to send a reply back to the requestor.
  • timeOut—(Optional) a Long value representing the time to wait before exiting the synchronous method (in milliseconds). Value should be greater than 0. Omit this property if you do not want to set a timeout value.

For more information, see the PubSub+ Messaging API for Java reference.

The following shows an example implementation of a RequestMessageHandler, which receives a request message synchronously, assigns the reply to an OutboundMessage object and sends it with the reply() method:

final RequestReplyMessageReceiver.RequestMessageHandler messageHandler = (inboundMessage, replier) -> {
    final String stringPayload = inboundMessage.getPayloadAsString();
    // Process the inboundMessage request, save it, print it etc..
    // Create the outbound message:
    final String outboundMessageStringPayload = "This is my reply!";
    final OutboundMessage outboundMessage = outboundMessageBuilder.build(outboundMessageStringPayload);
    // Use the replier object to send the outbound message back to the requestor: 
    replier.reply(outboundMessage);
};
							
requestReplyMessageReceiver.receiveMessage(messageHandler, 1000);

For a complete example, see DirectReplierBlocking.java on the Solace Developer Hub.

Receiving a Request Asynchronously and Sending a Reply

The PubSub+ Java API provides asynchronous request-reply messaging, which allows your application to receive multiple message requests asynchronously with the receiveAsync() method. This is useful for point to point communication where the order of events is not important. To receive asynchronous requests, use a RequestReplyMessageReceiver object to call the receiveAsync() method. The receiveAsync() method takes the following parameters:

  • requestMessageHandler—an instance of RequestReplyMessageReceiver.RequestMessageHandler, a callback handler to process incoming request messages and the replier objects. This callback allows the receiceAsync() method to receive both an inboundMessage (the request) and an instance of RequestReplyMessageReceiver.Replier. The replier object allows your RequestReplyMessageReceiver to send a reply back to the requestor.
  • executorService—(Optional) a user-provided instance of ExecutorService for message scheduling. The shutdown of the Executor service or any another maintenance work is the responsibility of the developer. When the order of the messages needs to be preserved, a single thread-based executor is required. Omit this property if you do not want to use an executor service.

For more information, see the PubSub+ Messaging API for Java reference.

The following shows an example implementation of a RequestMessageHandler, which receives a request message asynchronously, assigns the reply to an OutboundMessage object and sends it with the reply() method:

final RequestReplyMessageReceiver.RequestMessageHandler messageHandler = (inboundMessage, replier) -> {
    // Process the inboundMessage request:
    final String stringPayload = inboundMessage.getPayloadAsString();
    // Create the outbound message with headers and payload:
    final String outboundMessageStringPayload = "Hello World!";
    final OutboundMessage outboundMessage = outboundMessageBuilder.build(outboundMessageStringPayload.toString());
    // Use the replier object to send the outbound message back to the requestor: 
    replier.reply(outboundMessage);
};
			
// Create a scheduler variable with the required configurations to process the message with:
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);	
requestReplyMessageReceiver.receiveAsync(messageHandler, executorService);

For a complete example, see DirectReplierNonBlocking.java on the Solace Developer Hub.