Consuming Direct Messages Using the Go API

Direct messaging is useful in scenarios where high-throughput and low-latency is required. It is possible with the use of direct messages that some message loss may occur due to external factors, such as network congestion or occasional client disconnections. Direct messages are suitable for applications that need the latest information but not necessarily every single message. Examples of these applications may be weather applications, price checkers, GPS tracking, and so on.

No additional event broker configuration is required for direct messaging. If your application cannot tolerate message loss, we recommend that you use persistent messaging.

To consume direct messages using the PubSub+ Go API, use the following steps:

  1. Create a DirectMessageReceiver.
  2. Handling Back-Pressure When Subscribing to Direct Messages.
  3. Receive a Direct Message Synchronously.
  4. Receive a Direct Message Asynchronously.
  5. Extract Properties from an Inbound Message.

In some use cases, the API receives messages from the event broker faster than your application can process them. Messages can fill in the API's internal buffers causing back-pressure. If this scenario is possible, you may want to consider changing the default back-pressure settings to meet your requirements. For more information, see Handling Back-Pressure When Subscribing to Direct Messages.

For examples of applications that consume direct messages, see direct_subscriber.go on the Solace GitHub page.

Creating a DirectMessageReceiver

After a MessagingService instance has established a connection to an event broker, use a DirectMessageReceiver to consume direct messages from the event broker.

You can use a DirectMessageReceiverBuilder to configure your DirectMessageReceiver to use certain features of the API, such as topic subscriptions and back-pressure strategies. Calling the Build() function on your DirectMessageReceiverBuilder returns a DirectMessageReceiver instance. To enable your DirectMessageReceiver to start receiving messages, call Start() on it.

For more information, see the PubSub+ Messaging API for Go reference.

The following is an example that shows how to add subscriptions to topics to a DirectMessageReceiver and connect to the event broker:

directReceiver, err := messagingService.
	CreateDirectMessageReceiverBuilder().  // Creates a DirectMessageReceiverBuilder that can be used to configure direct message receiver instances.
	WithSubscriptions(topicsSub...).       // Sets a list of TopicSubscriptions to subscribe to when starting the receiver.
	Build()                                // Creates a DirectMessageReceiver with the specified properties.

if err != nil {
	panic(err)
}

if err := directReceiver.Start(); err != nil { // Starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
	panic(err)
}		

Asynchronous Receivers

Alternatively, it's possible to start a direct message receiver using a callback listener to allow for asynchronous notifications when the start operation is complete.

The following example shows how to start the DirectMessageReceiver asynchronously:

func ReceiverCallback(receiver solace.DirectMessageReceiver, err error) {
	if err != nil {
		panic(err) // receiver started up with an error
	} else {
		// receiver started without issue
	}
}
// ...	
// ...
directReceiver.StartAsyncCallback(ReceiverCallback) // Starts the receiver asynchronously. Calls the callback when started with an error if one occurred, otherwise nil if successful.

Your receiver application is not operational until you call Start() or StartAsyncCallback() on it.

Handling Back-Pressure When Subscribing to Direct Messages

When subscribing to direct messages, the API uses an internal buffer to store the messages that were received from the event broker. In ideal conditions, as soon as a message is received, the application processes the message. Depending on the processing speed and other factors, it's possible to receive messages faster than the subscribing application can process them (for example a high-volume burst of messages). If the messages are not processed and allowed to accumulate, the internal buffer can reach its capacity, which is referred to as back-pressure.

In the PubSub+ Go API, the DirectMessageReceiver has the following mechanisms to handle back-pressure:

Dropping the Latest Message

The default behavior of the Go API is to drop the latest message when 50 messages accumulate in the API's internal buffer. When this capacity is reached, the most recent messages are not placed on the internal buffer because it is full and are instead dropped (lost).

To configure a different buffer size, call the OnBackPressureDropLatest(bufferSize uint) function on the DirectMessageReceiverBuilder and then set the maximum number of messages that can accumulate ( bufferSize) before messages are dropped.

The following example shows how to configure the application to drop messages if there are a thousand messages queued in the API's internal buffer:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceivers. */
directReceiver, builderError := messagingService. 
	CreateDirectMessageReceiverBuilder(). // Creates a DirectMessageReceiverBuilder that can be used to configure direct message receiver instances.
	OnBackPressureDropLatest(1000).        // If the buffer is full and a message arrives, the incoming message is discarded.
	Build()                               // Creates a new DirectMessageReceiver instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directReceiver.Start()    // Start starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
if startErr != nil {
	panic(startErr)
} 

Dropping the Oldest Message

You can configure the API to drop the oldest message when a specified capacity is reached in the API's internal buffer. When this capacity is reached, the oldest item is removed from the internal queue to make room to receive the newer message. When the specified capacity is reached, the oldest items received by the receiver are removed to allow for more recent messages to be queued.

To configure this mechanism, call the OnBackPressureDropOldest(bufferSize uint) function on the DirectMessageReceiverBuilder and then set the maximum number of messages that can accumulate ( bufferSize) before the oldest messages are removed from the internal buffer.

The following example shows how to configure the application to drop the oldest message in the API's internal buffer if there are a thousand messages queued:

/* Creates an instance of DirectMessageReceiverBuilder, which is used to create DirectMessageReceivers. */
directReceiver, builderError := messagingService. 
	CreateDirectMessageReceiverBuilder(). // Creates a DirectMessageReceiverBuilder that can be used to configure direct message receiver instances.
	OnBackPressureDropOldest(1000).        // If the buffer is full and a message arrives, the oldest message in the buffer is discarded.
	Build()                               // Creates a new DirectMessageReceiver instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directReceiver.Start()    // Start starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
if startErr != nil {
	panic(startErr)
}  

Receiving a Direct Message Synchronously

After you have established a connection to the event broker using a MessagingService instance, you can use a DirectMessageReceiver to subscribe to messages. The DirectMessageReceiver must be subscribed to at least one topic for it to begin receiving messages.

The following example shows how an InboundMessage is received by the DirectMessageReceiver, and how the ReceiveMessage() function blocks the routine until the next message has been received:

// ReceiveMessage waits until the specified timeout to receive a message, or will wait forever if the timeout specified is a negative value
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr = directReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
	panic(regErr)
} else {
	// Process the received message
}

Receiving a Direct Message Asynchronously

After you have established a connection to the event broker using a MessagingService instance, you can consume direct messages and handle them asynchronously using a DirectMessageReceiver. To handle direct messages asynchronously, you use a MessageHandler to act as a callback function to let the application know when a message has been received.

The following example shows how to receive messages asynchronously:

// Register Message callback handler to the Message Receiver
if regErr := directReceiver.ReceiveAsync(MessageHandler); regErr != nil {
panic(regErr)
}
			
// Message Handler
func MessageHandler(message message.InboundMessage) {
	// Process the received message
}

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, your receiver application can subscribe to topics. Whenever your application receives a message from the broker with a matching topic, an InboundMessage instance is returned to the application. You can extract a number of properties from an InboundMessage, such as the message payload (as bytes or a string), and sender ID. The following examples show how to extract properties from a message.

  • Use a MessageHandler callback when you receive a message asynchronously:

    func MessageHandler(message message.InboundMessage) {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok { // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {       // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    }				
  • Use the ReceiveMessage() function when you receive a message synchronously:

    var receivedMessage message.InboundMessage
    var regErr solace.Error
    if receivedMessage, regErr = directReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
    	panic(regErr)
    } else {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok {   // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {         // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    }		

For a complete list of functions that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Go reference.