Consuming Direct Messages Using the Java API

Direct messaging is useful in scenarios where high-throughput and low-latency is required. It is possible with the use of direct messages that some message loss may occur due to external factors, such as network congestion or occasional client disconnections. Direct messages are suitable for applications that need the latest information but not necessarily every single message. Examples of these applications may be weather applications, price checkers, GPS tracking, and so on.

No additional event broker configuration is required for direct messaging. If your application cannot tolerate message loss, we recommend that you use persistent messaging.

To consume direct messages using our Java API, use the following steps:

  1. Create a DirectMessageReceiver Object.
  2. Receive a Direct Message Synchronously.
  3. Receive a Direct Message Asynchronously.
  4. Extract Properties from an Inbound Message.

In some use cases, the API receives messages from the event broker faster than your application can process them. Messages can fill in the API's internal buffers causing back-pressure. If this scenario is possible, you may want to consider changing the default back-pressure settings to meet your requirements. For more information, see Handling Back-Pressure When Subscribing to Direct Messages.

To see a sample application that consumes direct messages, see the Solace Developer Hub.

Creating a DirectMessageReceiver Object

After a MessagingService object has established a connection to an event broker, use a DirectMessageReceiver object to consume direct messages from the event broker.

As with the MessagingService object, the DirectMessageReceiver object can be configured to use certain features of the API. The following list of methods can be used to configure the DirectMessageReceiverBuilder object. The build() method returns a DirectMessageReceiver object. To enable your DirectMessageReceiver to start receiving messages, call start() on it.

  • DirectMessageReceiverBuilder
    • createDirectMessageReceiverBuilder()
    • withSubscriptions(Topic SubScriptions subscriptions)
    • build()
  • DirectMessageReceiver
    • start()

For more information about the preceding methods, see the PubSub+ Messaging API for Java reference.

The following is an example that shows how to add subscriptions to topics to a DirectMessageReceiver object and connect to the event broker:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceiver objects. */        
final DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder()        
    .withSubscriptions(TopicSubscription.of("Solace/Example/direct/>"))   // Allows an SMF topic to be added to the message receiver
                                                                          // <--Add more subscriptions here if you want.
    .build();                                                             // Creates a DirectMessageReceiver object  based on the provided configuration.
    .start();                                                             // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

Asynchronous Receivers

It is also possible to start a direct message receiver using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows you how to invoke CompletionListener, which is a callback method for listening for the results of a future computation. This handler also notifies the application when the start operation is complete.

final CompletionListener<DirectMessageReceiver> receiverStartupListener = (receiver, throwable) -> {
        if (throwable == null) {
                             // Started successfully, can receive messages.
        } else {
                             // Deal with an exception during start.
            }
        };
receiverToBeStarted.startAsync(receiverStartupListener);    

Your receiver application is not operational until you call start() or startAsync() on it

Handling Back-Pressure When Subscribing to Direct Messages

When subscribing to direct messages, the API uses an internal buffer to store the messages that were received from the event broker. In ideal conditions, as soon as a message is received, the application processes the message. Depending on the processing speed and other factors, it's possible to receive messages faster than the subscribing application can process them (for example a high-volume burst of messages). If the messages are not processed and allowed to accumulate, the internal buffer can reach its capacity, which is referred to as back-pressure.

In our Java API, the DirectMessageReceiver has the following mechanisms to handle back-pressure:

Dropping the Latest Message

You can configure the API to drop the latest message that arrives when a specified capacity has been reached in the API's internal buffer. When this capacity is reached, the most recent messages are not placed on the internal buffer because it is full and are instead dropped (lost).

To configure this mechanism, call the onBackPressureDropLatest(int bufferCapacity) method on the DirectMessageReceiver object and then set the maximum number of messages that can accumulate ( bufferCapacity) before messages are dropped.

The following example shows how to configure the application to drop messages if there are a thousand messages queued in the API's internal buffer:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceiver objects. */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
    .fromProperties(receiverConfiguration)         // Uses authentication properties passed in through command line or hard-coded into program.
    .onBackPressureDropLatest(1000)                // Sets back-pressure capacity of 1000 messages, at which point the newest messages in the buffer are dropped 
    .build()                                       // Builds a DirectMessageReceiver object based on the provided configuration.    
    .start();                                      // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.    

Dropping the Oldest Message

You can configure the API to drop the oldest message when a specified capacity is reached in the API's internal buffer. When this capacity is reached, the oldest item is removed from the internal buffer to make room to receive the newer message. When the specified capacity is reached, the oldest items in the receiver buffer are removed to allow for more recent messages to be queued.

To configure this mechanism, call the onBackPressureDropOldest(int bufferCapacity) method on the DirectMessageReceiver object and then set the maximum number of messages that can accumulate ( bufferCapacity) before the oldest messages are removed from the internal buffer.

The following example shows how to configure the application to drop the oldest message in the API's internal buffer if there are a thousand messages queued:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceiver objects. */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
        .fromProperties(receiverConfiguration)         // Uses authentication properties passed in through command line or hard-coded into program.
        .onBackPressureDropOldest(1000)                // Sets back-pressure capacity of 1000 messages, at which point the oldest messages in the buffer are dropped.
        .build()                                       // Builds a DirectMessageReceiver object based on the provided configuration.    
        .start();                                      // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.    

Configuring an Unlimited Internal Buffer

The default configuration for this API is to use an unlimited sized internal buffer for messages. This configuration is not suitable for memory restrictive environments because the buffer is allowed to grow indefinitely, and it can cause out-of-memory errors (or potentially undefined errors). This configuration is useful when your infrastructure is made up of several microservices which are short-lived. Use of this configuration is also useful because you don't have to write code to handle back-pressure scenarios.

When you use an unlimited buffer, the Java API continuously puts messages it receives from the event broker onto its internal buffer. The following example shows an explicit call to the onBackPressureElastic() method, which is not required because it is the default behavior:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceiver objects. */
final DirectMessageReceiver receiver = service.createDirectMessageReceiverBuilder()
        .fromProperties(receiverConfiguration)  // Uses authentication properties passed in through command line or hard-coded into program.
        .onBackPressureElastic()                // Allows for an unlimited buffer capacity for the receiver object.
        .build()                                // Builds a DirectMessageReceiver object based on the provided configuration.    
        .start();                               // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.    

Receiving a Direct Message Synchronously

After you have established a connection to the event broker using a MessagingService object, you can use a DirectMessageReceiver object to subscribe to messages. The DirectMessageReceiver object must be subscribed to at least one topic for it to begin receiving messages.

The following example shows how an InboundMessage object is received by the DirectMessageReceiver object, and how the receiveMessage() method blocks the thread until the next message has been received:

final DirectMessageReceiver receiver = messagingService
            .createDirectMessageReceiverBuilder()
            .withSubscriptions(TopicSubscription.of("setSubscriptionExpressionHere"))
            .build().start(); .
        
final InboundMessage message = receiver.receiveMessage();// Receive an InboundMessage object. 

For more information about the methods, see thePubSub+ Messaging API for Java reference.

Receiving a Direct Message Asynchronously

After you have established a connection to the event broker using a MessagingService object, you can consume direct messages and handle them asynchronously using a DirectMessageReceiver object . To handle direct messages asynchronously, you use a MessageHandler object to act as a callback method to let the application know when a message has been received.

The following example shows how to receive messages asynchronously:

/* Create a receiver object and add subscription(s). */
final DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder()
    .withSubscriptions(TopicSubscription.of("setSubscriptionExpressionHere"))    // Allows an SMF topic to be added to the message receiver.
    .build()                                                   // Builds a DirectMessageReceiver object based on the provided configuration.    
    .start();                                                  // Causes the service to resume regular duties. Before this method is called, the service is considered off-duty.

final MessageHandler messageHandler = (message) -> {       // Interface for the listener of message handlers for inbound messages.
byte[] bytes = message.getPayloadAsBytes();                // Do something with a message, i.e access raw payload.
};
receiver.receiveAsync(messageHandler);                     // This method represents a push-based non-blocking interface. 
                                                           // Callback method of a messageHandler is executed on an internal API thread.

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, you can subscribe to messages using the PubSub+ Java API. As part of the API, it implicitly creates the inbound messages on matching delivery. After your application receives an InboundMessage object, you can extract a number of properties from that message, such as the message payload (as bytes or a string), sender ID and class of service. The following example shows how to extract properties from a message:

/* Using a messageHandler */
final MessageHandler messageHandler = (inboundMessage) -> {           
    byte[] bytes = inboundMessage.getPayloadAsBytes();      // Gets the raw payload of the message as a byte array.
    String senderID = indboundMessage.getSenderId();        // Returns the Sender's ID.
    int serviceClass = inboundMessage.getClassOfService();  // Retrieves the Class of Service level of a given message.
};
receiver.receiveAsync(messageHandler);                  

/* using the receiveMessage() method */
final InboundMessage inboundMessage= receiver.receiveMessage();    
String msgPayload = inboundMessage.getPayloadAsString();  // Gets the payload as a UTF-8 decoded as a String.
String senderID = indboundMessage.getSenderId();          // Returns the Sender's ID.
int serviceClass = inboundMessage.getClassOfService();    // Retrieves the Class of Service level of a given message.

For a complete list of methods that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Java reference.