Consuming Persistent Messages Using the Go API

Subscribing applications that cannot tolerate message loss can use persistent messaging (referred to as guaranteed messages in other parts of this documentation) instead of direct messaging. When persistent messaging is used, messages are stored on a queue on the event broker. Messages are not deleted from the event broker until the message has been consumed and acknowledged by the subscribing application (referred to as a message receiver). The PubSub+ Go API can only consume persistent messages from queues and not from topic endpoints.

To consume persistent messages, you must first set up a message queue on the event broker. For information about creating and configuring durable queues on an event broker, see Configuring Queues. Alternatively, a non-durable queue can be created when a persistent message receiver (PersistentMessageReceiver) is created. 

To use a persistent message receiver to consume persistent messages, use the following steps:

  1. Create a PersistentMessageReceiver.
  2. Receive a Persistent Message Synchronously.
  3. Receive a Persistent Message Asynchronously.
  4. Extract Properties from an Inbound Message.
  5. Message Acknowledgments.
  6. Create a Queue with the PubSub+ Go API.

Back-pressure can occur if your consumer (or subscribing) application experiences a situation where it is unable to process messages as fast as it receives them from the event broker. Messages continue to be buffered internally until a high watermark is reached at which point the API tells the event broker to stop sending messages to prevent message loss.

For examples of applications that consume persistent messages, see guaranteed_subscriber.go on the Solace GitHub page.

Creating a PersistentMessageReceiver

After you have established a connection to the event broker using a MessagingService instance, you use a PersistentMessageReceiver to consume persistent messages from a queue on the event broker.

You can use a PersistentMessageReceiverBuilder to configure your PersistentMessageReceiver to use certain features of the API, such as topic subscriptions. You then call Build() on the PersistentMessageReceiverBuilder, which returns a PersistentMessageReceiver instance. To enable your PersistentMessageReceiver to start receiving messages, call Start() on it.

Ensure that the queue properties you specify with the Go API correspond to the those configured on the event broker. For more information, see:

For more information about the functions used in the Go API, see the PubSub+ Messaging API for Go reference.

The following is an example how to use a PersistentMessageReceiver to bind to a queue:

nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName)     // Creates a reference to an exclusive, non-durable queue with the specified name
topic := resource.TopicSubscriptionOf("go/persistentReceiver/sample/topic")  // Creates a TopicSubscription of the specified topic string
			
/* Creates an instance of PersistentMessageReceiverBuilder, which is used to create PersistentMessageReceivers. */
persistentReceiver, builderError := messagingService. 
	CreatePersistentMessageReceiverBuilder().  // Creates a PersistentMessageReceiverBuilder that can be used to configure persistent message receiver instances.
	WithSubscriptions(topic).		   // Sets a list of TopicSubscriptions to subscribe to when starting the receiver.
	Build(nonDurableExclusiveQueue)            // Creates a new PersistentMessageReceiver instance based on the configured properties. Returns *IllegalArgumentError if the queue is nil.
if builderErr != nil {
	panic(builderErr)
}

startErr := persistentReceiver.Start()    // Start starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
if startErr != nil {
	panic(startErr)
}  

You can use the Go API to create durable and non-durable queues on the event broker. For more information see Creating Queues with the PubSub+ Go API.

Asynchronous Receivers

It is also possible to start a persistent message receiver using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows how to start a PersistentMessageReceiver asynchronously:

func ReceiverCallback(receiver solace.PersistentMessageReceiver, err error) {
	if err != nil {
		panic(err) // receiver started up with an error
	} else {
		// receiver started without issue
	}
}
// ...	
// ...
persistentReceiver.StartAsyncCallback(ReceiverCallback) // Starts the receiver asynchronously. Calls the callback when started with an error if one occurred, otherwise nil if successful.

Your receiver application is not operational until you call Start() or StartAsyncCallback() on it.

Consuming a Persistent Message Synchronously

You can consume persistent messages synchronously. To do this, you create a PersistentMessageReceiver and bind it to a queue. After successfully binding to the queue, your receiver application can begin to consume persistent messages.

When you use the ReceiveMessage(message.InboundMessage) function, it blocks the routine until the next message has been received.

When an application processes an InboundMessage, it can then send an acknowledgment to the event broker with PersistentMessageReceiver.Ack(). The event broker will then remove the InboundMessage from the queue. Until a message is acknowledged it remains on the broker queue and may be redelivered when the application reconnects to the queue.

For more information about the preceding functions, see the PubSub+ Messaging API for Go reference.

The following example shows you how to consume persistent messages synchronously:

persistentReceiver, err := messagingService.
	CreatePersistentMessageReceiverBuilder().  // Creates a PersistentMessageReceiverBuilder that can be used to configure persistent message receiver instances.
	WithSubscriptions(topicsSub...).       // Sets a list of TopicSubscriptions to subscribe to when starting the receiver.
	Build()                                // Creates a PersistentMessageReceiver with the specified properties.

if err != nil {
	panic(err)
}

if err := persistentReceiver.Start(); err != nil { // Starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
	panic(err)
}	
        
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr := persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil {  // Waits until the specified timeout to receive a message
	panic(regErr)
} else {
	// Process the received message
	persistentReceiver.Ack(message) // Acknowledges that a message was received.
}

If you do not call the ReceiveMessage() function, messages can accumulate on the API's internal buffer and you risk running into a back-pressure scenario. If this occurs, the Go API automatically informs the event broker to stop sending messages.

Consuming a Persistent Message Asynchronously

You can consume persistent messages in an asynchronous manner. To do so, you create a PersistentMessageReceiver and start the connection to the event broker as normal, but you use a MessageHandler to act as a callback function to notify your application when a message has been received.

The following example shows you how to consume persistent messages asynchronously:

// Message Handler
messageHandler := func(message message.InboundMessage) {
	var messageBody string
	if payload, ok := message.GetPayloadAsString(); ok {
		messageBody = payload
		persistentReceiver.Ack(message)  // Acknowledges the received message
	} else if payload, ok := message.GetPayloadAsBytes(); ok {
		messageBody = string(payload)
		persistentReceiver.Ack(message)  // Acknowledges the received message
	}
	fmt.Printf("Received Message Body %s \n", messageBody)
}

// ...
// ...        
// Register Message callback handler to the Message Receiver
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
panic(regErr)
}

Pausing and Resuming Message Consumption from Internal Buffers

When your application consumes messages asynchronously using the ReceiveAsync() function, you may call the Pause() and Resume()functions to control the flow of messages to your application's callback.

The Pause() and Resume() functions have no effect if you use ReceiveMessage().

You can use the Pause() and Resume() functions to control the flow of messages between the API's internal buffer and your application. This internal buffer is where messages are received from the event broker. This flow control is useful if your application must momentarily stop processing messages to handle other operations. The Pause() and Resume() functions do not control the flow of messages between the event broker and the internal buffer of the API. When you call the Pause()function, messages continue to be sent from the event broker. The Pause() and Resume() functions control the message delivery only to the application. Messages received from the event broker continue to accumulate in the internal buffer.

Since the event broker continues to send messages, a back-pressure scenario may occur–that is, messages continue to accumulate until an internal high watermark is reached. At this point, the PersistentMessageReceiver notifies the event broker to stop sending messages until the number of accumulated messages falls below the internal low watermark. This internal API mechanism handles back-pressure scenarios for you and ensures that no messages are lost between the event broker and your application.

The following functions are used to pause and resume processing of messages from the API's internal buffer:

    • Pause()
    • Resume()

For more information about the preceding functions, see the PubSub+ Messaging API for Go reference.

The following example shows how to pause and resume processing of messages from the internal queue in the API using the scheduler:

persistentReceiver.Pause() //  Pauses the receiver's message delivery to asynchronous message handlers.
// Perform any action here, for example wait 60 seconds: time.Sleep(60 * time.Second)
persistentReceiver.Resume() // Resumes the receiver's message delivery to asynchronous message handlers.

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, your receiver application can subscribe to topics. Whenever your application receives a message from the broker with a matching topic, an InboundMessage instance is returned to the application. You can extract a number of properties from an InboundMessage, such as the message payload (as bytes or a string), and sender ID. The following examples show how to extract properties from a message.

  • Use a MessageHandler callback when you receive a message asynchronously:

    func MessageHandler(message message.InboundMessage) {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok { // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {       // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    	persistentReceiver.Ack(message) // Acknowledges that a message was received.
    }				
  • Use the ReceiveMessage() function when you receive a message synchronously:

    var receivedMessage message.InboundMessage
    var regErr solace.Error
    if receivedMessage, regErr = persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
    	panic(regErr)
    } else {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok {   // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {         // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    	persistentReceiver.Ack(message) // Acknowledges that a message was received.
    }		

For a complete list of functions that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Go reference.

Message Acknowledgments

ACKs are asynchronous. Any routine can acknowledge messages at any time as long as the receiver has not been terminated. It is important to remember that after a PersistentMessageReceiver has acknowledged a message from the event broker, it deletes that message from the queue on the event broker. For this reason it's important to perform any processing and storage of the message before you acknowledge it.

The following example shows how to acknowledge a persistent message:

// Message Handler
messageHandler := func(message message.InboundMessage) {
	var messageBody string
	if payload, ok := message.GetPayloadAsString(); ok {
		messageBody = payload
		persistentReceiver.Ack(message)  // Acknowledges the received message
	} else if payload, ok := message.GetPayloadAsBytes(); ok {
		messageBody = string(payload)
		persistentReceiver.Ack(message)  // Acknowledges the received message
	}
	fmt.Printf("Received Message Body %s \n", messageBody)
}
// ...
// ...       
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {  // Register Message callback handler to the Message Receiver
panic(regErr)
}