Consuming Persistent Messages Using the Java API

Receiver applications that cannot tolerate message loss can use persistent messaging (referred to as guaranteed messages in other parts of this documentation) instead of direct messaging. When persistent messaging is used, messages are stored on a queue on the event broker. Messages are not deleted from the event broker until the message has been consumed and acknowledged by the subscribing application (referred to as a message receiver). The PubSub+ Java API can only consume persistent messages from queues and not from topic endpoints.

To consume persistent messages, you must first set up a message queue on the event broker. For information about creating and configuring durable queues on an event broker, see Configuring Queues. Alternatively, a non-durable queue can be created when a persistent message receiver (PersistentMessageReceiver) is created. 

To use a persistent message receiver to consume persistent messages, use the following steps:

  1. Create a PersistentMessageReceiver object.
  2. Receive a Persistent Message Synchronously.
  3. Receive a Persistent Message Asynchronously.
  4. Extract Properties from an Inbound Message.
  5. Message Acknowledgments.
  6. Negative Acknowledgments (NACKs)
  7. Create a Queue with the Java API.
  8. Browse a Queue with the Java API.

Back-pressure can occur if your consumer application experiences a situation where it is unable to process messages as fast as it receives them from the event broker. Messages continue to be buffered internally until a high watermark is reached, at which point the API tells the event broker to stop sending messages to prevent message loss.

To see a sample application that subscribes to persistent messages from a queue, see the Solace Developer Hub.

Creating a PersistentMessageReceiver Object

After you have established a connection to the event broker using a MessagingService object, you use a PersistentMessageReceiver object to consume persistent messages from a queue on the event broker. To enable your PersistentMessageReceiver to start receiving messages, call start() on it.

Ensure that the queue properties you specify with the Java API correspond to the those configured on the event broker. For more information, see:

The following are objects and methods you use to configure how persistent messages are consumed from an event broker:

  • PersistentMessageReceiverBuilder
    • createPersistentMessageReceiverBuilder()
    • withMissingResourcesCreationStrategy(MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy strategy)
    • build(Queue endpointToConsumeFrom)
  • PersistentMessageReceiver
    • start()
  • Queue
    • durableExclusiveQueue(String queueName)
    • durableNonExclusiveQueue(String queueName)
    • nonDurableExclusiveQueue()
    • nonDurableExclusiveQueue(String queueName)

For more information about the preceding methods and interfaces, see the PubSub+ Messaging API for Java reference.

The following is an example how to use a PersistentMessageReceiver object to bind to a queue:

/* Creates an instance of PersistentMessageReceiverBuilder, which is used to create PersistentMessageReceiver objects. */        
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
    .build(Queue.durableExclusiveQueue(QUEUE_NAME))      // Creates a PersistentMessageReceiver object.
    .start();                                     // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

You can use the Java API to create durable and non-durable queues on the event broker. For more information see Creating Queues with the Java API.

Asynchronous Receivers

It is also possible to start a persistent message receiver using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows you how to invoke CompletionListener, which is a callback method for listening for the results of a future computation. This handler also notifies the application when the start operation is complete.

final CompletionListener<PersistentMessageReceiver> receiverStartupListener = (receiver, throwable) -> {
    if (throwable == null) {
    // Started successfully, can receive messages.
    } else {
    // Deal with an exception during start.
    }
};
receiverToBeStarted.startAsync(receiverStartupListener);    

Your receiver application is not operational until you call start() or startAsync() on it

Consuming a Persistent Message Synchronously

You can consume persistent messages synchronously. To do this, you create a PersistentMessageReceiver object and bind it to a queue. After successfully binding to the queue, your object can begin to consume persistent messages using the receiveMessage() method. This method blocks the thread until the next message has been received.

When an application processes an InboundMessage, it can then send an acknowledgment to the event broker with PersistentMessageReceiver.ack(). The event broker will then remove the InboundMessage from the queue. Until a message is acknowledged it remains on the broker queue and may be redelivered when the application reconnects to the queue.

For more information about the preceding methods, see the PubSub+ Messaging API for Java reference.

The following example shows you how to consume persistent messages synchronously:

/* Creates an instance of PersistentMessageReceiverBuilder, which is used to create PersistentMessageReceiver objects. */
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
    .build(queueToConsumeFrom)     // Creates a PersistentMessageReceiver object.
    .start();                      // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

final InboundMessage message = receiver.receiveMessage();    // Blocking request to receive the next message.
                                                 // Can ack any time later as long as terminate() has not been called on receiver.
receiver.ack(message);                           // Acknowledges the inbound message.

If you do not call the receiveMessage() method, messages can accumulate on the API's internal buffer and you risk running into a back-pressure scenarios. If this occurs, the Java API automatically informs the event broker to stop sending messages.

Consuming a Persistent Message Asynchronously

You can consume persistent messages in an asynchronous manner. To do so, you create a PersistentMessageReceiver object and start the connection to the event broker as normal, but you use a MessageHandler object to act as a callback method to notify your application when a message has been received.

The following example shows you how to consume persistent messages asynchronously:

/* Creates an instance of PersistentMessageReceiverBuilder, which is used to create PersistentMessageReceiver objects. */
final PersistentMessageReceiver receiver = service.createPersistentMessageReceiverBuilder()
    .build(queueToConsumeFrom)     // Creates an instance of PersistentMessageReceiver.
    .start();               // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

final MessageHandler messageHandler = (message) -> {           // Interface for the listener of message handlers for inbound messages.
if (message != null && message.getPayloadAsBytes() != null) {  // Do something with my message, for example check if it is not null or invalid.
    receiver.ack(message);                                     // Acknowledges the inbound message.
    }
};
receiver.receiveAsync(messageHandler);                   // Request to register an asynchronous message handler.

Pausing and Resuming Message Consumption from Internal Buffers

When your application consumes messages asynchronously using the receiveAsync() method, you can call the pause() and resume() methods to control the flow of messages to your application's callback

The pause() and resume() methods have no effect if you use receiveMessage().

You can use the pause() and resume() methods to control the flow of messages between the API's internal buffer and your application. This internal buffer is where messages are received from the event broker. This flow control is useful if your application must momentarily stop processing messages to handle other operations. The pause() and resume() methods do not control the flow of messages between the event broker and the internal buffer of the API. When you call the pause()method, messages continue to be sent from the event broker. The pause() and resume() methods control the message delivery only to the application. Messages received from the event broker continue to accumulate in the internal buffer.

Since the event broker continues to send messages, a back-pressure scenario may occur–that is, messages continue to accumulate until an internal high watermark is reached. At this point, the PersistentMessageReceiver object notifies the event broker to stop sending messages until the number of accumulated messages falls below the internal high watermark. This internal API mechanism handles back-pressure scenarios for you and ensures that no messages are lost between the event broker and your application.

The following object and methods are used to pause and resume processing of messages from the API's internal buffer:

  • ReceiverFlowControl
    • pause()
    • resume()

For more information about the preceding methods, see the PubSub+ Messaging API for Java reference.

The following example shows how to pause and resume processing of messages from the internal queue in the API using the scheduler:

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(() -> receiver.pause(), 10L, TimeUnit.SECONDS);      // Pause message delivery in 10 seconds.
scheduler.schedule(() -> receiver.resume(), 20L, TimeUnit.SECONDS);    // Resume message delivery in 20 seconds.

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, you can subscribe to messages using the PubSub+ Java API. As part of the API, it implicitly creates the inbound messages on matching delivery. After your application receives an InboundMessage object, you can extract a number of properties from that message, such as the message payload (as bytes or a string), sender ID and class of service. The following example shows how to extract properties from a message:

/* Using a messageHandler */
final MessageHandler messageHandler = (inboundMessage) -> {           
    byte[] bytes = inboundMessage.getPayloadAsBytes();      // Gets the raw payload of the message as a byte array.		
    String senderID = indboundMessage.getSenderId();        // Returns the Sender's ID.
    int serviceClass = inboundMessage.getClassOfService();  // Retrieves the Class of Service level of a given message.
    receiver.ack(inboundMessage);       
};
receiver.receiveAsync(messageHandler);                  
			
/* using the receiveMessage() method*/
final InboundMessage inboundMessage= receiver.receiveMessage();    
String msgPayload = inboundMessage.getPayloadAsString();  // Gets the payload as a UTF-8 decoded as a String.		
String senderID = indboundMessage.getSenderId();          // Returns the Sender's ID.
int serviceClass = inboundMessage.getClassOfService();    // Retrieves the Class of Service level of a given message.	
receiver.ack(inboundMessage);		     		

For a complete list of methods that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Java reference.

Message Acknowledgments

One of the two following application acknowledgment modes can be used for acknowledging a message:

  • Auto-acknowledgment

  • Client acknowledgment (default)

Auto-Acknowledgment Mode

When the auto-acknowledgment mode is used, the API automatically generates application-level acknowledgments. To configure your PersistentMessageReceiver to use auto-acknowledgments, you use the withMessageAutoAcknowledgement() method:

final PersistentMessageReceiver receiver = messagingService
        .createPersistentMessageReceiverBuilder()
        .withMessageAutoAcknowledgement()
        .build(Queue.durableExclusiveQueue(QUEUE_NAME));  

For the PubSub+ Java API, acknowledgments are sent at different times depending on whether the message is received asynchronously or synchronously:

  • when received asynchronously, the acknowledgment is sent after the message callback completes with no exceptions.

  • when received synchronously, the acknowledgment is sent after the message is removed from the API's internal queue during the receiveMessage() method. It's important to realize that the acknowledgment has been sent before control is returned to the application (that is after the receiveMessage() method completes).

Client-Acknowledgment Mode

Client acknowledgment mode is the default behavior of the PubSub+ Java API and means the client must explicitly send an acknowledgment for the message ID of each message received. Acknowledgments are asynchronous. Any thread can acknowledge messages at any time as long as the receiver has not been terminated. It is important to remember that after a PersistentMessageReceiver object has acknowledged a message from the event broker, it deletes that message from the queue on the event broker. For this reason it's important to perform any processing and storage of the message before you acknowledge it.

The following example shows how to acknowledge a persistent message synchronously and asynchronously:

/* Blocking request to receive a message.  */
final InboundMessage message = receiver.receiveMessage();
// auto-acked by now.

/* Request to register an async message handler. */            
receiver.receiveAsync(message -> {  // Asynchronous receiver message callback.
// Processing/storing of message happens here.
}); 
// auto-ack happens when the message processing callback method finishes without an error.           

Negative Acknowledgments for Specific Messages

You can use negative acknowledgments (Nacks) if you have configured your applications for client acknowledgments. When you use Nacks, you can send a settlement outcome to let the event broker know the result from processing a guaranteed message that was received. Based on the settlement outcome, the event broker knows how to handle the message on its queue. You can use the following are the settlement outcomes:

  • ACCEPTED—This Ack notifies the event broker that your client application successfully processed the guaranteed message. When the event broker receives this outcome it removes the message from its queue.
    • When you call the settle() method with an outcome of ACCEPTED, it is the same as using persistentReceiver.ack(message).
  • FAILED—This Nack notifies the event broker that your client application did not process the message. When the event broker receives this Nack it attempts to redeliver the message while adhering to delivery count limits.
  • REJECTED—This Nack notifies the event broker that your client application could process the message but it was not accepted (for example, failed validation). When the event broker receives this Nack it removes the message from its queue and then moves the message to the Dead Message Queue (DMQ) if it is configured.

Before you can use Nacks, you must use the withRequiredMessageClientOutcomeOperationSupport() method to add the Outcome.FAILED, Outcome.REJECTED, or both outcomes as Nack types when you create your PersistentMessageReceiver to prepare it to work with negative acknowledgments. You do not need to add the Outcome.ACCEPTED outcome because it is always available. If you try to use an outcome that has not been added, you get an error of Required Settlement Outcome Not Supported. The following code shows how to configure a PersistentMessageReceiver to use NACKs:

import com.solace.messaging.config.MessageAcknowledgementConfiguration.Outcome;
// ...
final PersistentMessageReceiver persistentReceiver = messagingService
        .createPersistentMessageReceiverBuilder()
        .withRequiredMessageClientOutcomeOperationSupport(
                new Outcome[]{Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED})
        .build(Queue.durableExclusiveQueue(QUEUE_NAME)); 
  • Nacks can be lost during transit (for example due to unexpected networking issues). Consider this fact as part of the logic for handling messages when you develop your application.
  • Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an InvalidOperationException occurs during the Flow bind request when an outcome is specified.

The following example shows how to settle an accepted message, which is the same as using persistentReceiver.ack(message) :

persistentReceiver.receiveAsync(message -> {  // Asynchronous receiver message callback.
// Processing/storing of message happens here.
persistentReceiver.settle(message, Outcome.ACCEPTED); 
});            

The following example shows how to settle a rejected message, for example your application cannot currently process the message:

persistentReceiver.receiveAsync(message -> {  // Asynchronous receiver message callback.
// Message processed but not accepted, send a NACK to the broker
persistentReceiver.settle(message, Outcome.REJECTED); 
});            

The following example show how to settle a failed message when there is a problem with the message content:

persistentReceiver.receiveAsync(message -> {  // Asynchronous receiver message callback.
// Failed to process message, send a NACK to the broker 
persistentReceiver.settle(message, Outcome.FAILED); 
});