Consuming Persistent Messages Using the Go API

Subscribing applications that cannot tolerate message loss can use persistent messaging (referred to as guaranteed messages in other parts of this documentation) instead of direct messaging. When persistent messaging is used, messages are stored on a queue on the event broker. Messages are not deleted from the event broker until the message has been consumed and acknowledged by the subscribing application (referred to as a message receiver). The PubSub+ Go API can only consume persistent messages from queues and not from topic endpoints.

To consume persistent messages, you must first set up a message queue on the event broker. For information about creating and configuring durable queues on an event broker, see Configuring Queues. Alternatively, a non-durable queue can be created when a persistent message receiver (PersistentMessageReceiver) is created. 

To use a persistent message receiver to consume persistent messages, use the following steps:

  1. Create a PersistentMessageReceiver.
  2. Receive a Persistent Message Synchronously.
  3. Receive a Persistent Message Asynchronously.
  4. Extract Properties from an Inbound Message.
  5. Message Acknowledgments.
  6. Negative Acknowledgments for Specific Messages.
  7. Create a Queue with the PubSub+ Go API.

Back-pressure can occur if your consumer (or subscribing) application experiences a situation where it is unable to process messages as fast as it receives them from the event broker. Messages continue to be buffered internally until a high watermark is reached at which point the API tells the event broker to stop sending messages to prevent message loss.

For examples of applications that consume persistent messages, see guaranteed_subscriber.go on the Solace GitHub page.

Creating a PersistentMessageReceiver

After you have established a connection to the event broker using a MessagingService instance, you use a PersistentMessageReceiver to consume persistent messages from a queue on the event broker.

You can use a PersistentMessageReceiverBuilder to configure your PersistentMessageReceiver to use certain features of the API, such as topic subscriptions. You then call Build() on the PersistentMessageReceiverBuilder, which returns a PersistentMessageReceiver instance. To enable your PersistentMessageReceiver to start receiving messages, call Start() on it.

Ensure that the queue properties you specify with the Go API correspond to the those configured on the event broker. For more information, see:

For more information about the functions used in the Go API, see the PubSub+ Messaging API for Go reference.

The following is an example how to use a PersistentMessageReceiver to bind to a queue:

nonDurableExclusiveQueue := resource.QueueNonDurableExclusive(queueName)     // Creates a reference to an exclusive, non-durable queue with the specified name
topic := resource.TopicSubscriptionOf("go/persistentReceiver/sample/topic")  // Creates a TopicSubscription of the specified topic string
			
/* Creates an instance of PersistentMessageReceiverBuilder, which is used to create PersistentMessageReceivers. */
persistentReceiver, builderError := messagingService. 
	CreatePersistentMessageReceiverBuilder().  // Creates a PersistentMessageReceiverBuilder that can be used to configure persistent message receiver instances.
	WithSubscriptions(topic).		   // Sets a list of TopicSubscriptions to subscribe to when starting the receiver.
	Build(nonDurableExclusiveQueue)            // Creates a new PersistentMessageReceiver instance based on the configured properties. Returns *IllegalArgumentError if the queue is nil.
if builderErr != nil {
	panic(builderErr)
}

startErr := persistentReceiver.Start()    // Start starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
if startErr != nil {
	panic(startErr)
}  

You can use the Go API to create durable and non-durable queues on the event broker. For more information see Provisioning and Deprovisioning Queues with the PubSub+ Go API.

Asynchronous Receivers

It is also possible to start a persistent message receiver using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows how to start a PersistentMessageReceiver asynchronously:

func ReceiverCallback(receiver solace.PersistentMessageReceiver, err error) {
	if err != nil {
		panic(err) // receiver started up with an error
	} else {
		// receiver started without issue
	}
}
// ...	
// ...
persistentReceiver.StartAsyncCallback(ReceiverCallback) // Starts the receiver asynchronously. Calls the callback when started with an error if one occurred, otherwise nil if successful.

Your receiver application is not operational until you call Start() or StartAsyncCallback() on it.

Consuming a Persistent Message Synchronously

You can consume persistent messages synchronously. To do this, you create a PersistentMessageReceiver and bind it to a queue. After successfully binding to the queue, your receiver application can begin to consume persistent messages.

When you use the ReceiveMessage(message.InboundMessage) function, it blocks the routine until the next message has been received.

When an application processes an InboundMessage, it can then send an acknowledgment to the event broker with PersistentMessageReceiver.Ack(). The event broker will then remove the InboundMessage from the queue. Until a message is acknowledged it remains on the broker queue and may be redelivered when the application reconnects to the queue.

For more information about the preceding functions, see the PubSub+ Messaging API for Go reference.

The following example shows you how to consume persistent messages synchronously:

persistentReceiver, err := messagingService.
	CreatePersistentMessageReceiverBuilder().  // Creates a PersistentMessageReceiverBuilder that can be used to configure persistent message receiver instances.
	WithSubscriptions(topicsSub...).       // Sets a list of TopicSubscriptions to subscribe to when starting the receiver.
	Build()                                // Creates a PersistentMessageReceiver with the specified properties.

if err != nil {
	panic(err)
}

if err := persistentReceiver.Start(); err != nil { // Starts the receiver synchronously. Before this function is called, the receiver is considered off-duty.
	panic(err)
}	
        
var receivedMessage message.InboundMessage
var regErr solace.Error
if receivedMessage, regErr := persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil {  // Waits until the specified timeout to receive a message
	panic(regErr)
} else {
	// Process the received message
	persistentReceiver.Ack(message) // Acknowledges that a message was received.
}

If you do not call the ReceiveMessage() function, messages can accumulate on the API's internal buffer and you risk running into a back-pressure scenario. If this occurs, the Go API automatically informs the event broker to stop sending messages.

Consuming a Persistent Message Asynchronously

You can consume persistent messages in an asynchronous manner. To do so, you create a PersistentMessageReceiver and start the connection to the event broker as normal, but you use a MessageHandler to act as a callback function to notify your application when a message has been received.

The following example shows you how to consume persistent messages asynchronously:

// Message Handler
messageHandler := func(message message.InboundMessage) {
	var messageBody string
	if payload, ok := message.GetPayloadAsString(); ok {
		messageBody = payload
		persistentReceiver.Ack(message)  // Acknowledges the received message
	} else if payload, ok := message.GetPayloadAsBytes(); ok {
		messageBody = string(payload)
		persistentReceiver.Ack(message)  // Acknowledges the received message
	}
	fmt.Printf("Received Message Body %s \n", messageBody)
}

// ...
// ...        
// Register Message callback handler to the Message Receiver
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
panic(regErr)
}

Pausing and Resuming Message Consumption from Internal Buffers

When your application consumes messages asynchronously using the ReceiveAsync() function, you may call the Pause() and Resume()functions to control the flow of messages to your application's callback.

The Pause() and Resume() functions have no effect if you use ReceiveMessage().

You can use the Pause() and Resume() functions to control the flow of messages between the API's internal buffer and your application. This internal buffer is where messages are received from the event broker. This flow control is useful if your application must momentarily stop processing messages to handle other operations. The Pause() and Resume() functions do not control the flow of messages between the event broker and the internal buffer of the API. When you call the Pause()function, messages continue to be sent from the event broker. The Pause() and Resume() functions control the message delivery only to the application. Messages received from the event broker continue to accumulate in the internal buffer.

Since the event broker continues to send messages, a back-pressure scenario may occur–that is, messages continue to accumulate until an internal high watermark is reached. At this point, the PersistentMessageReceiver notifies the event broker to stop sending messages until the number of accumulated messages falls below the internal low watermark. This internal API mechanism handles back-pressure scenarios for you and ensures that no messages are lost between the event broker and your application.

The following functions are used to pause and resume processing of messages from the API's internal buffer:

    • Pause()
    • Resume()

For more information about the preceding functions, see the PubSub+ Messaging API for Go reference.

The following example shows how to pause and resume processing of messages from the internal queue in the API using the scheduler:

persistentReceiver.Pause() //  Pauses the receiver's message delivery to asynchronous message handlers.
// Perform any action here, for example wait 60 seconds: time.Sleep(60 * time.Second)
persistentReceiver.Resume() // Resumes the receiver's message delivery to asynchronous message handlers.

Extracting Properties from an Inbound Message

After you establish a connection to the event broker, your receiver application can subscribe to topics. Whenever your application receives a message from the broker with a matching topic, an InboundMessage instance is returned to the application. You can extract a number of properties from an InboundMessage, such as the message payload (as bytes or a string), and sender ID. The following examples show how to extract properties from a message.

  • Use a MessageHandler callback when you receive a message asynchronously:

    func MessageHandler(message message.InboundMessage) {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok { // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {       // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    	persistentReceiver.Ack(message) // Acknowledges that a message was received.
    }				
  • Use the ReceiveMessage() function when you receive a message synchronously:

    var receivedMessage message.InboundMessage
    var regErr solace.Error
    if receivedMessage, regErr = persistentReceiver.ReceiveMessage(1 * time.Second); regErr != nil {
    	panic(regErr)
    } else {
    	var messagePayload string
    	var senderID string
    	if payload, ok := receivedMessage.GetPayloadAsString(); ok {   // Extract the payload from a received message.
    		messagePayload = payload
    	}
    	if senderID, ok := receivedMessage.GetSenderID(); ok {         // Extract the sender ID from a received message.
    		senderID = senderID
    	}
    	persistentReceiver.Ack(message) // Acknowledges that a message was received.
    }		

For a complete list of functions that you can use to extract properties from an InboundMessage, see the PubSub+ Messaging API for Go reference.

Message Acknowledgments

ACKs are asynchronous. Any routine can acknowledge messages at any time as long as the receiver has not been terminated. It is important to remember that after a PersistentMessageReceiver has acknowledged a message from the event broker, it deletes that message from the queue on the event broker. For this reason it's important to perform any processing and storage of the message before you acknowledge it.

The following example shows how to acknowledge a persistent message:

// Message Handler
messageHandler := func(message message.InboundMessage) {
	var messageBody string
	if payload, ok := message.GetPayloadAsString(); ok {
		messageBody = payload
		persistentReceiver.Ack(message)  // Acknowledges the received message
	} else if payload, ok := message.GetPayloadAsBytes(); ok {
		messageBody = string(payload)
		persistentReceiver.Ack(message)  // Acknowledges the received message
	}
	fmt.Printf("Received Message Body %s \n", messageBody)
}
// ...
// ...       
if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {  // Register Message callback handler to the Message Receiver
panic(regErr)
}		

Negative Acknowledgments for Specific Messages

You can use negative acknowledgments (NACKs) if your PersistentMessageReceiver is not configured to automatically ACK received messages. When you use NACKs, you can send a settlement outcome to let the event broker know the result from processing a guaranteed message that was received. Based on the settlement outcome, the event broker knows how to handle the message on its queue. You can use the following settlement outcomes:

  • ACCEPTED—This ACK notifies the event broker that your client application successfully processed the guaranteed message. When the event broker receives this outcome it removes the message from its queue.
    • When you call the Settle() function with an outcome of ACCEPTED, it is the same as using persistentReceiver.Ack(receivedMessage).
  • FAILED—This NACK notifies the event broker that your client application did not process the message. When the event broker receives this NACK it attempts to redeliver the message while adhering to delivery count limits. The message remains on the broker during redelivery while the broker waits for it to be acknowledged with a status other than FAILED.
  • REJECTED—This NACK notifies the event broker that your client application could not process the message. When the event broker receives this NACK it removes the message from its queue and then moves the message to the Dead Message Queue (DMQ) if it is configured.

Before you can use NACKs, you must use the WithRequiredMessageOutcomeSupport() function to add the config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome, or both outcomes as NACK types when you create your PersistentMessageReceiver to prepare it to work with negative acknowledgments. You do not need to add the config.PersistentReceiverAcceptedOutcome outcome because it is always available. If you try to call Settle() on an outcome that has not been added, you get an error of Required Settlement Outcome Not Supported. The following code shows how to configure a PersistentMessageReceiver to use NACKs:

messagingService.
       CreatePersistentMessageReceiverBuilder().
       WithMessageClientAcknowledgement().
       WithRequiredMessageOutcomeSupport(
               config.PersistentReceiverFailedOutcome,
               config.PersistentReceiverRejectedOutcome,
       ).
       Build(durableExclusiveQueue)

Alternatively, you can use a property map instead of a setter:

messagingService.
    CreatePersistentMessageReceiverBuilder().
    WithMessageClientAcknowledgement().
    FromConfigurationProvider(config.ReceiverPropertyMap{
        config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf(
            "%s,%s",
            config.PersistentReceiverFailedOutcome,
            config.PersistentReceiverRejectedOutcome,
        ),
    }).
    Build(durableExclusiveQueue)
  • NACKs can be lost during transit (for example due to unexpected networking issues). Consider this fact as part of the logic for handling messages when you develop your application.
  • NACKs are supported on event brokers 10.2.1 and later. If an event broker does not support NACKs, an InvalidOperationException occurs when you call start() on a PersistentMessageReceiver instance configured to use message outcomes.

The following examples shows how to settle a message, with placeholder, user-defined functions to determine if the message outcome should be ACCEPTED, REJECTED, or FAILED. The decision regarding which NACK outcome to use when you receive a message should be made based on your application's requirements.

  • Receive a message synchronously:
    message, err := persistentReceiver.ReceiveMessage()
    if err != nil {
        return // Exit on error
    }
    
    var messageBody string
    
    // Decode the payload as a string or bytes
    if payload, ok := message.GetPayloadAsString(); ok {
        messageBody = payload
    } else if payload, ok := message.GetPayloadAsBytes(); ok {
        messageBody = string(payload)
    }
    fmt.Printf("Received Message Body: %s\n", messageBody)
    
    
    // Determine the appropriate settlement outcome
    if isMsgOK(message) {
        // Message is good, process it here and then ACK it
        persistentReceiver.Settle(message, config.PersistentReceiverAcceptedOutcome)
        persistentReceiver.Ack(message)
    } else if isMsgPossiblySalvageableLater(message) {
        // Application cannot currently process the message, redeliver later
        persistentReceiver.Settle(message, config.PersistentReceiverFailedOutcome)
    } else {
        // Problem with the message content, remove from the endpoint
        persistentReceiver.Settle(message, config.PersistentReceiverRejectedOutcome)
    }
    
  • Receive a message asynchronously:
    messageHandler := func(message message.InboundMessage) {
        var messageBody string
    
        if payload, ok := message.GetPayloadAsString(); ok {
            messageBody = payload
        } else if payload, ok := message.GetPayloadAsBytes(); ok {
            messageBody = string(payload)
        }
        fmt.Printf("Received Message Body: %s\n", messageBody)
    
        // Determine the appropriate settlement outcome
        var messageSettlementError error
        if isMsgOK(message) {
            // Message is good, process it here and then ACK it
            messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverAcceptedOutcome)
            persistentReceiver.Ack(message)
        } else if isMsgPossiblySalvageableLater(message) {
            // Application cannot currently process the message, redeliver later
            messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverFailedOutcome)
        } else {
            // Problem with the message content, remove from the endpoint
            messageSettlementError = persistentReceiver.Settle(message, config.PersistentReceiverRejectedOutcome)
        }
    }
    
    if regErr := persistentReceiver.ReceiveAsync(messageHandler); regErr != nil {
        panic(regErr)
    }