Publishing Persistent Messages Using the Java API

When your applications require confirmation handling and exactly once delivery, we recommend that you use persistent messages instead of direct messages. To publish persistent messages in the PubSub+ Java API, you first set up a message queue on the PubSub+ event broker.

For information about creating and configuring queues on an event broker, see Configuring Queues.

To handle persistent messages using the PubSub+ Java API, use the following steps:

  1. Create a PersistentMessagePublisher Object.
  2. Configure and Create an OutboundMessage.
  3. Publish a Persistent Message.
  4. Acknowledging Messages and Handling Errors.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. This may cause messages to accumulate in the API internal buffers causing back-pressure. If this scenario is possible, consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Persistent Messages.

For examples of applications that publish persistent messages, see the Solace Developer Hub.

Creating a PersistentMessagePublisher Object

After a MessagingService object has established a connection to an event broker, you use a PersistentMessagePublisher object to publish persistent messages. As with the MessagingService object, the PersistentMessagePublisher object allows you to configure what features to use in the API. You can use the following methods to set how to handle back-pressure for the PersistentMessagePublisherBuilder object. These methods include the build() method , which returns a PersistentMessagePublisher object. To enable your PersistentMessagePublisher to start publishing messages, call start() on it.

  • PersistentMessagePublisherBuilder
    • createPersistentMessagePublisherBuilder()
    • onBackPressureReject()
    • onBackPressureWait()
    • onBackPressureElastic()
    • build()
  • PersistentMessagePublisher
    • start()

The following shows an example of how to use PersistentMessagePublisher object to connect to the event broker using a MessagingService object:

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublisher objects. */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
    .build()             // Builds a PersistentMessagePublisher object based on the provided configuration.
    .start();            // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.            

Asynchronous Receivers

It is also possible to start a persistent message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows you how to invoke CompletionListener, which is a callback method for listening for the results of a future computation. This handler also notifies the application when the start operation is complete.

final CompletionListener<PersistentMessagePublisher> receiverStartupListener = (receiver, throwable) -> {
    if (throwable == null) {
    // Started successfully, can receive messages.
    } else {
    // Deal with an exception during start.
    }
};
receiverToBeStarted.startAsync(receiverStartupListener);    

Your publisher application is not operational until you call start() or startAsync() on it

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Java API, when you publish messages you use OutboundMessage objects. To configure and create OutboundMessage objects, follow these steps:

  1. Call messagingService.messageBuilder() to return an OutboundMessageBuilder object. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage objects.

    final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder()
  2. Configure your message with an OutboundMessageBuilder and then call the build() method to return a message object. You can configure message properties using either method below.
    • Use the Properties interface and the setProperty(name,value) method. The following example shows how to set a custom name-value property on a Properties object, and then configure a message using the fromProperties() method:
      final Properties messageProperties = new Properties();
      messageProperties.setProperty("PropertyName", "PropertyValue");
      final OutboundMessage message = messageBuilder
      	.fromProperties(messageProperties)
      	.build(messagePayload);	
    • Use the OutboundMessageBuilder interface and the with*() methods. The following example shows how to set Time to Live, Sender ID, and a custom key-value property on a message:
      final OutboundMessage myCustomMessage = messageBuilder
      	.withTimeToLive(1000)
      	.withSenderId("mySenderId")   
      	// For a complete list of with*() methods see the PubSub+ Messaging API Java reference
      	.withProperty("PropertyKey", "PropertyValue") 
      	.build(messagePayload);

The following code example shows how to create a message builder, set message properties and create a message:

/* Builder for creation of similarly configured messages */
final OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
final OutboundMessage message = messageBuilder
    .fromProperties(messageProperties)       // For example TTL, Sender ID, Sequence Number etc.  
    .withExpiration(Instant.now()            // Sets expiration time using the current system time as starting point.
    .toEpochMilli() + 10000L)                // Expire the message in 10 seconds.
    .build("My_Message");                    // Builds the message.

For more information about the methods, see the PubSub+ Messaging API Java reference.

Setting a Partition Key

You can set a partition key to use partitioned queues. Partitioned Queues is a feature configured on the PubSub+ Event Broker that allows you to easily scale the number of consumer applications bound to a queue. A partition key can be set on each message in the publishing application to ensure that all messages with the same partition key are delivered to the same consumer without additional logic in the consumer application. For more information see Partitioned Queues.

You can use setProperty(name,value) or withProperty(key,value) to set a key-value pair on a message using an OutboundMessageBuilder.

  • name/key—The constant MessageUserPropertyConstants.QUEUE_PARTITION_KEY or the string value "JMSXGroupID".

  • value—A string representing the value of your partition key. Client applications set the value at publish time.

The following example sets the partition key value to "Group-0":

// Using fromProperties() method
final Properties additionalProperties = new Properties();
additionalProperties.setProperty(MessageUserPropertyConstants.QUEUE_PARTITION_KEY, "Group-0");
final OutboundMessage message = messageBuilder
	.fromProperties(additionalProperties)
	.build(payload);

// Using withProperty() method
final OutboundMessage myCustomMessage = messagingService.messageBuilder()
       .withProperty(MessageUserPropertyConstants.QUEUE_PARTITION_KEY, "Group-0")  
       .build(payload);

Handling Back-Pressure When Publishing Persistent Messages

When you use persistent messaging, the messages are sent to the PubSub+ event broker with a topic and may be enqueued on any queue with a matching topic subscription. The event broker then delivers the messages asynchronously to any consumers subscribed to that queue. At the time the client application publishes the persistent message, the API queues the message in an internal buffer before it is sent to the event broker. In ideal conditions, as soon the application publishes a message, the API sends that message to the network, and that message is eventually received by event broker. It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In the PubSub+ Java API, the PersistentMessagePublisherBuilder has three primary mechanisms to handle back-pressure which you can use:

Rejecting Messages When a Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use onBackPressureReject(int bufferCapacity) method to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and exceptions are thrown until the buffer has capacity again. On calls to publish, bufferCapacity must be greater than zero.

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublisher objects. */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
    .onBackPressureReject(1000)     // Creates a message buffer with space to accommodate 1000 messages, at which point messages are rejected until there is room.
    .build()             // Builds a PersistentMessagePublisher object based on the provided configuration.    
    .start();            // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.    

Using a Publisher Readiness Listener

We recommend that you use the PublisherReadinessListener interface when you use the onBackPressureReject() method because it lets your application know when there is capacity available in the buffer and it can resume publishing messages.

The following is an example of registering an event handler using the PublisherReadinessListener interface:

/* Object that is used with synchronization to ensure that 'ready' event is issued after exception handling once 'publish' is processed */
final Object[] lock = new Object[0];
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
    .onBackPressureReject(1000)     // Creates a message buffer with space to accommodate 1000 messages, at which point messages are rejected until there is room.
    .build()             // Builds a PersistentMessagePublisher object based on the provided configuration.    
    .start();            // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

final PublisherReadinessListener canPublishListener = () -> {
// This code is executed when the publisher is ready to publish again. 
};
publisher.setPublisherReadinessListener(canPublishListener);    // Register listener, usually set once.        

Throttling Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful to free capacity to its internal buffers. You can use the onBackPressureWait(int bufferCapacity) method to set the maximum number of messages that can accumulate in the buffer. When this maximum capacity (bufferCapacity) is reached, the publisher thread pauses and waits for available capacity in the internal buffer before letting the application publish any more messages.

This method should be used when you want the application's publishing requests to take longer if the buffer's capacity has been reached. Using this mechanism gives time for the application to empty the internal buffer. An additional benefit for persistent messaging with this configuration is that the API does not discard any messages.

The following shows an example of how to configure the internal buffer to accommodate up to one thousand messages:

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublisher objects. */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
    .onBackPressureWait(1000)  // Creates a message buffer with space to accommodate 1000 messages, at which point the thread pauses until there is room.
    .build()             // Builds a PersistentMessagePublisher object based on the provided configuration.    
    .start();            // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.    

Configuring an Unlimited Internal Buffer

The default configuration for the API is to use an unlimited sized internal buffer for messages. This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived, and can provide publishing redundancy for the unlikely event of an internal queue encountering an out-of-memory scenario. Use of this configuration is also useful because you don't have to write code to handle back-pressure scenarios.

When you use an unlimited buffer, the Java API continuously puts messages on the internal buffer published by the client application. The following example shows an explicit call to the onBackPressureElastic() method, which is not required because it is the default behavior:

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublisher objects. */
final PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
    .onBackPressureElastic()  // Creates an unlimited capacity message buffer. 
    .build()             // Builds a PersistentMessagePublisher object based on the provided configuration.    
    .start();            // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.                    

Publishing a Persistent Message

After you have established a connection to the event broker using a MessagingService object, you can use a PersistentMessagePublisher object to publish persistent messages.

A persistent message has the following components:

  • A topic to publish to (required)
  • A message payload (optional)

Persistent message publishing involves the receipt of acknowledgments or publish receipts. Depending on your requirements, your client application can publish as:

  • non-blocking, allows your application to perform other functions while the PublishReceiptListener waits for the acknowledgment
  • blocking, waits until an acknowledgment has been received; an acknowledge indicates that the message has been received and persisted by the broker

Non-Blocking Methods

The following are non-blocking methods:

  • publish(byte[] message, Topic destination)
  • publish(byte[] message, Topic destination, Object userContext)
  • publish(OutboundMessage message, Topic destination)
  • publish(OutboundMessage message, Topic destination, Object userContext)
  • publish(OutboundMessage message, Topic destination, Object userContext, Properties additionalMessageProperties)
  • publish(String message, Topic destination)
  • publish(String message, Topic destination, Object userContext)

For more information about the methods, see the PubSub+ Messaging API Java reference.

Blocking Methods

If you want your publisher to await for an acknowledgment from the broker before returning from the publish() call, you can use the following blocking methods:

  • publishAwaitAcknowledgement(OutboundMessage message, Topic destination, long timeout)
  • publishAwaitAcknowledgement(OutboundMessage message, Topic destination, long timeout, Properties additionalMessageProperties)

Either of the preceding methods can be used with a PersistentMessagePublisher object to publish an OutboundMessage to the broker using a topic. This method blocks the main thread until either:

  • the publisher API receives an acknowledgment from the broker
  • the timeout period elapses
/* A simple example of a blocking publish event for a persistent message. */
final long deliveryConfirmationTimeOutInMilliseconds = 20000L;  // wait at most for 20 seconds before considering that message is not delivered to the broker    
OutboundMessage message = messageBuilder.build("Hello World!");
                    
try {
    publisher.publishAwaitAcknowledgement(messageBuilder.build("Hello World!"),          // Publishing a message, blocking.
    toDestination, deliveryConfirmationTimeOutInMilliseconds);                           // Publish to Topic "toDestination" with timeout value.
        } catch (PubSubPlusClientException.TimeoutException e) {  
        logger.warn(String.format("Timeout for Message %s - %s", message, e)); // Time-out has occurred.
        } catch (PubSubPlusClientException e) { 
        logger.warn(String.format("NACK for Message %s - %s", message, e));    // Message was not spooled on broker queue, a NACK is sent.
        } catch (InterruptedException e) {    
        logger.info("Got interrupted, probably shutting down",e);              // Interrupted while waiting for publish confirmation.
        }                

User Contexts

Optionally, you can use user contexts to correlate information for persistent messages to publish receipts in your application. This information is user-specific and is meaningful only to your publishing application and is not sent to the broker. A user context permits you to attach data to the publish() call that can later be retrieved from the publish receipt listener.

When you use a user context, it allows you to handle multiple scenarios. It also allows your application to decide what action to take or how to process the publish receipt based on the context.

For example, if a non-blocking application has multiple threads to publish persistent messages, each thread can include its identifier as the user context when it publishes a persistent message. The Java API tracks the user context when specified for each message and returns the user context as part of the publish receipt when the message is acknowledged or rejected by the event broker. The publishing application can then send the publish receipt to the correct thread that sent the message based on the user context.

You can set the user context when you publish the message. For example, you use the publish(OutboundMessage message,Topic destination, Object userContext) method, where the user context is specified as type Object.

The following example shows how to get the user context from a publish receipt:

/* A simple example of a persistent message publishReceipt */
final OutboundMessage acknowledgedMessage = publishReceipt.getMessage(); // Retrieves message associated with a Receipt.    
final Object processingContext = publishReceipt.getUserContext();        // Corresponding context can be retrieved this way from a publish receipt when provided during message publishing.

If your application is non-blocking, you can also use a persistent message publisher with publish receipt in your callback to log information. For example, you can use non-blocking message publishing and then send alerts to notify the application of the status of published messages, such as:

  • the event broker successfully receives and processes a message
  • access control violations (ACL)
  • a queue being over quota
  • invalid topics / topics with no subscribers

The following example shows how the publish receipts are logged using Log4j2 with commands such as logger.warn() and logger.debug().

/* listener that processes all delivery confirmations/timeouts for all messages */
final MessagePublishReceiptListener deliveryConfirmationListener = (publishReceipt) -> {
final PubSubPlusClientException e = publishReceipt.getException();
    if (e == null) {                                          // No exception, broker has confirmed a successful publish receipt.
    OutboundMessage outboundMessage = publishReceipt.getMessage();
    logger.debug(String.format("ACK for Message %s", outboundMessage));         // The broker has the message logs an "ack". 
    } else {                                                                    // Negative acknowledgment, broker has not received the message. 
    OutboundMessage outboundMessage = publishReceipt.getMessage();              // Which message was not received. 
    logger.warn(String.format("NACK for Message %s - %s", outboundMessage, e)); // Logs a "NACK" or negative acknowledgment. 
    }                                                                      
    };
publisher.setMessagePublishReceiptListener(deliveryConfirmationListener);     // Listen to all delivery confirmations for all messages being sent.
publisher.publish("Hello world!", topicDestination);                          // Publish a message with a String payload.

Acknowledging Messages and Handling Errors

A publish receipt is a delivery confirmation that indicates whether or not the event broker has successfully spooled a message on a queue. These publish receipts can indicate success or failure, and are handled by the MessagePublisReceiptListener object. You create a MessagePublishReceiptListener object by calling the setMessagePublishReceiptListener() method.

The following example shows how to use the MessagePublishReceiptListener to listen for publish receipts:

/* listener that processes delivery confirmations / timeouts for all messages */
final MessagePublishReceiptListener deliveryConfirmationListener = (publishReceipt) -> {
final PubSubPlusClientException e = publishReceipt.getException();
    if (e == null) {  // No exception, broker has confirmed receipt.
      // Code to log, handle a successful publishReceipt.     
    } else {         // Negative acknowledgment, broker has not received the message. 
      // Code to log, handle a failed publishReceipt.
            }                                                                      
    };
publisher.setMessagePublishReceiptListener(deliveryConfirmationListener);          // Listen to delivery confirmations for all messages being sent.
publisher.publish("Hello world!", topicDestination);                               // Publish a message with a String payload.        

Strategies for Handling Publish Receipt Errors

The following are application-specific strategies you can use to handle receipt errors when publishing.

Wait and Retry
Wait a number of seconds before trying to send the message again. For example, using Thread.sleep(1000) to wait for 1 second before attempting to publish again.
Retry a Predefined number of Times
Try to re-publish the message a predefined number of times before dropping it.
Discard the Message
Simply discard messages with failed publish receipts. We don't recommend this strategy if your application cannot tolerate message loss.

To receive a failed publish receipt when there is no matching subscription, this option must be set for the event broker or event broker service. For more information, see Handling Guaranteed Messages with No Matches (for appliances and software event brokers) or the Reject Messages to Sender On No Subscription Match Discard (for PubSub+ Cloud).