Publishing Direct Messages Using the Java API

Direct messages are useful when high-throughput and low-latency is required. We recommend that you publish events using direct messages if some message loss can be tolerated without negatively impacting client applications. Message loss can occur due to external factors, such as network congestion or client disconnection. If your applications require guaranteed delivery and message acknowledgment, then we recommend using persistent messages instead.

To handle direct messages using the PubSub+ Java API, use the following steps:

  1. Create a DirectMessagePublisher Object.
  2. Configure and Create an OutboundMessage.
  3. Publish a Direct Message.
  4. Handle Errors.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. Messages can fill the API's internal buffers causing back-pressure. If this scenario is possible, we recommend that you consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Direct Messages.

For examples of applications that publish direct messages, see the Solace Developer Hub.

Creating a DirectMessagePublisher Object

After a MessagingService object has established a connection to an event broker, use a DirectMessagePublisher object to publish direct messages. As with the MessagingService object, the DirectMessagePublisher object allows you to configure what features to use in the API. You can use the following methods to set how to handle back-pressure for the DirectMessagePublisherBuilder object. These methods include the build() method, which returns a DirectMessagePublisher object. You then call the DirectMessagePublisher method on the DirectMessagePublisher object to connect to the event broker.

  • DirectMessagePublisherBuilder
    • createDirectMessagePublisherBuilder()
    • onBackPressureWait(int bufferCapacity)
    • onBackPressureReject(int bufferCapacity)
    • onBackPressureElastic()
    • build()
  • DirectMessagePublisher
    • start()

For more information, see the PubSub+ Messaging API for Java reference.

The following is an example that shows how to use a direct message publisher to enable your application to publish messages to the event broker::

/* Creates an instance of DirectMessagePublisherBuilder */
/* that is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureWait(1000)    // Causes the publisher thread to pause if any attempts are made to publish beyond the specified bufferCapacity (1000 in this example).
    .build()                     // Builds a DirectMessagePublisher object based on the provided configuration.
    .start();                    // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

Asynchronous Receivers

It is also possible to start a direct message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows you how to invoke CompletionListener, which is a callback method for listening for the results of a future computation. This handler also notifies the application when the start operation is complete.

final CompletionListener<DirectMessagePublisher> receiverStartupListener = (receiver, throwable) -> {
    if (throwable == null) {
    // Started successfully, can receive messages.
    } else {
    // Deal with an exception during start.
    }
};
receiverToBeStarted.startAsync(receiverStartupListener);    

Your publisher application is not operational until you call start() or startAsync() on it

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Java API, when you publish messages you use OutboundMessage objects. To configure and create OutboundMessage objects, follow these steps:

  1. Call messagingService.messageBuilder() to return an OutboundMessageBuilder object. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage objects.

    final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder()
  2. Configure your message with an OutboundMessageBuilder and then call the build() method to return a message object. You can configure message properties using either method below.
    • Use the Properties interface and the setProperty(name,value) method. The following example shows how to set a custom name-value property on a Properties object, and then configure a message using the fromProperties() method:
      final Properties messageProperties = new Properties();
      messageProperties.setProperty("PropertyName", "PropertyValue");
      final OutboundMessage message = messageBuilder
          .fromProperties(messageProperties)
          .build(messagePayload);	
    • Use the OutboundMessageBuilder interface and the with*() methods. The following example shows how to set Sender ID, and a custom key-value property on a message:
      final OutboundMessage myCustomMessage = messageBuilder
          .withSenderId("mySenderId")   
          // For a complete list of with*() methods see the PubSub+ Messaging API for Java reference
          .withProperty("PropertyKey", "PropertyValue") 
          .build(messagePayload);

The following code example shows how to create a message builder, set message properties and create a message:

/* Builder for creation of similarly configured messages */
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
final OutboundMessage message = messageBuilder
    .fromProperties(messageProperties)       // For example TTL, Sender ID, Sequence Number etc.  
    .withExpiration(Instant.now()            // Sets expiration time using the current system time as starting point.
    .toEpochMilli() + 10000L)                // Expire the message in 10 seconds.
    .build("My_Message");                    // Builds the message.

For more information about the methods, see the PubSub+ Messaging API for Java reference.

Handling Back-Pressure When Publishing Direct Messages

When you use direct messaging, the messages are sent to a PubSub+ event broker using a topic. The event broker relays the messages to any consumers subscribed to that topic.

At the time the client application publishes a direct message, the API queues the message in an internal buffer before it is sent to event broker. In ideal conditions, as soon the application publishes a message, the API sends that message on the network, and that message is eventually received by event broker. It is possible for a client application to publish messages faster than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In our Java API, the DirectMessagePublisher has the following mechanisms to handle back-pressure:

Rejecting Messages When A Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use the onBackPressureReject(int bufferCapacity) method to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and the API throws exceptions until the buffer has capacity again. On calls to publish, the value for bufferCapacity must be greater than zero.

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureReject(1000)    // Creates a message buffer with space to accommodate 1000 messages, at which point messages are rejected until there is room.
    .build()                       // Builds a DirectMessagePublisher object based on the provided configuration.    
    .start();                      // Causes the service to start regular duties. Before this method is called, the service is considered off-duty.

Using a Publisher Readiness Listener

We recommend that you use a PublisherReadinessListener interface when you use the onBackPressureReject() method because it lets your application known when there is capacity available in the buffer and it can resume publishing messages.

The following is an example of registering an event handlerPublisherReadinessListener interface :

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureReject(1000)    // Creates a message buffer with space to accommodate 1000 messages, at which point messages are rejected until there is room.
    .build()                       // Builds a DirectMessagePublisher object based on the provided configuration.    
    .start();                      // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

final PublisherReadinessListener canPublishListener = () -> {
// This code is executed when our publisher is ready to publish again. 
};
publisher.setPublisherReadinessListener(canPublishListener);    // Register listener, usually set once.        

Throttling the Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful to free capacity to its internal buffers. You can use the onBackPressureWait() method to set the maximum number of messages that can accumulate in the buffer. When this maximum capacity (bufferCapacity) is reached, the publisher thread pauses and waits for available capacity in the internal buffer before letting the application publish any more messages.

This method should be used when you want the application's publishing requests to wait for space when the buffer's capacity has been reached. Using this mechanism effectively gives time for the API to empty the internal buffer. An additional benefit is that when you use persistent publishing, the API does not discard any messages.

The following shows an example of how to configure the internal buffer to accommodate up to one thousand messages:

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureWait(1000)    // Creates a message buffer with space to accommodate 1000 messages, at which point the thread pauses until there is room.
    .build()                     // Builds a DirectMessagePublisher object based on the provided configuration.    
    .start();                    // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

Configuring an Unlimited Internal Buffer

The default configuration for the API is to use an unlimited sized internal buffer for messages. This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived, and can provide publishing redundancy for the unlikely event of an internal queue encountering an out-of-memory scenario. Use of this configuration is also useful because you don't have to write code to handle back-pressure scenarios.

When you use an unlimited buffer, the Java API continuously puts messages on the internal buffer it published by the client application. The following example shows an explicit call to the onBackPressureElastic() method, which is not required because it is the default behavior:

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureElastic()    // Creates an unlimited capacity message buffer. 
    .build()                    // Builds DirectMessagePublisher object based on the provided configuration.    
    .start();                   // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

Publishing a Direct Message

After you have created the DirectPublisher object, you can start sending messages. When you send a message, there are two primary components:

The topic must be an object of the Topic class that follows the Solace hierarchical format, for example: solace/messaging/direct/pub. The publish method currently supports simple string messages, byte arrays, as well as objects of the OutboundMessage class, which can be obtained through an OutboundMessageBuilder via MessagingService.messageBuilder(). The following are methods to publish a direct message:

  • publish(byte[] message, Topic destination)
  • publish(OutboundMessage message, Topic destination)
  • publish(OutboundMessage message, Topic destination, Properties additionalMessageProperties)
  • publish(String message, Topic destination)

See the PubSub+ Messaging API for Java reference for information about the publish methods that are available in the DirectMessagePublisher interface

Here's an example that shows how to publish a direct message:

while (System.in.available() == 0 && !isShutdown) {
    try {          
    String message = "This is a message from Solace";    // The contents or payload of the message. 
    String topicString = "solace/samples/direct/pub";    // The topic the message is published to on the event broker.
    publisher.publish(message,Topic.of(topicString));    // This method publishes the message to the topic until stopped by conditions in while loop.
    } catch (RuntimeException e) { 
        System.out.printf("### Caught while trying to publisher.publish(): %s%n",e);
        isShutdown = true;  
    } finally {
        try {
        Thread.sleep(1000 / APPROX_MSG_RATE_PER_SEC);   
        } catch (InterruptedException e) {
        isShutdown = true;
            }
        }
    }

Handling Errors

The PubSub+ Java API provides the setPublishFailureListener() method that notifies the client if the API is unable to publish messages. A failed publish event could be due to an issue such as an invalid topic or a termination of the service. See the example below:

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublisher objects. */
final DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder()
    .onBackPressureWait(1000)    // Creates an unlimited capacity message buffer. 
    .build()                     // Builds a DirectMessagePublisher object based on the provided configuration.    
    .start();                    // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

publisher.setPublishFailureListener(e -> {   // Can be called for access control list violations. 
System.out.println("### FAILED PUBLISH " + e);  // In this example, an error message is printed.
});