Publishing Persistent Messages Using the Go API

When your applications require confirmation handling and exactly once delivery, we recommend that you use persistent messages instead of direct messages. To publish persistent messages in the PubSub+ Go API, you first set up a message queue on the PubSub+ event broker.

For information about creating and configuring queues on an event broker, see Configuring Queues.

To handle persistent messages using the PubSub+ Go API, use the following steps:

  1. Create a PersistentMessagePublisher.
  2. Configure and Create an OutboundMessage.
  3. Handling Back-Pressure When Publishing Persistent Messages.
  4. Publish a Persistent Message.
  5. Acknowledging Messages and Handling Errors.
  6. User Contexts.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. This may cause messages to accumulate in the API internal buffers causing back-pressure. If this scenario is possible, consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Persistent Messages.

For examples of applications that publish persistent messages, see guaranteed_publisher.go on the Solace GitHub page.

Creating a PersistentMessagePublisher

After a MessagingService instance has established a connection to an event broker, you use a PersistentMessagePublisher to publish persistent messages. You can use a PersistentMessagePublisherBuilder to configure your PersistentMessagePublisher to use certain features of the API, such as back-pressure strategies. Call the Build() function on your PersistentMessagePublisherBuilder to return a PersistentMessagePublisherinstance. To enable your PersistentMessagePublisher to start publishing messages, call Start() on it.

The following shows an example of how to use PersistentMessagePublisher to connect to the event broker using a MessagingService instance:

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublishers. */
persistentPublisher, builderError := messagingService. 
       CreatePersistentMessagePublisherBuilder(). // Creates a PersistentMessagePublisherBuilder that can be used to configure persistent message publisher instances.
       Build()                                    // Creates a new PersistentMessagePublisher instance based on the configured properties.
if builderErr != nil {
       panic(builderErr)
}

startErr := persistentPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
       panic(startErr)
}  

Asynchronous Publishers

It is also possible to start a persistent message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows how to start a PersistentMessagePublisher asynchronously:

func PublisherCallback(publisher solace.PersistentMessagePublisher, err error) {
       if err != nil {
               panic(err) // Publisher started up with an error.
       } else {
               // Publisher started without issue.
       }
}
// ...
// ...
persistentPublisher.StartAsyncCallback(PublisherCallback) // Starts the publisher asynchronously. Calls the callback when started with an error if one occurred, otherwise nil if successful.

Your publisher application is not operational until you call Start() or StartAsyncCallback() on it.

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Go API, when you publish messages you use OutboundMessage instances. To configure and create OutboundMessage instances, follow these steps:

  1. Call messagingService.MessageBuilder() to return an OutboundMessageBuilder instance. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage instances.

    messageBuilder := messagingService.MessageBuilder()
  2. Configure your message with an OutboundMessageBuilder and then call the Build() function to return a message instance. You can configure message properties using either method below.
    • Use config.MessagePropertyMap and the config.MessageProperty* functions. The following example shows how to set MessagePropertyMap values for message ID, sender ID and message type constants, and then configure a message using the FromConfigurationProvider() function:
      messageProperties := config.MessagePropertyMap{
          config.MessagePropertyApplicationMessageID: "message ID",
          config.MessagePropertySenderID:             "Sender ID",
          config.MessagePropertyApplicationMessageType: "Message Type",
          // For a complete list of MessageProperty constants see the  Messaging API for Go reference.
      }   
      // ...                        
      // ...                        
      message, err := messageBuilder.FromConfigurationProvider(messageProperties).
          BuildWithStringPayload("my message payload")
      if err != nil {
          panic(err)
      }
      
    • Use the OutboundMessageBuilder interface and the WithProperty(propertyName,propertyValue) functions. Both Solace defined config.MessageProperty keys as well as arbitrary user-defined property keys are accepted. The following example shows how to set a custom key-value property on a message:
      messageBuilder := messagingService.MessageBuilder().
          WithProperty("propertyName", "propertyValue")
      // ...                            
      // ...                            
      message, err := messageBuilder.BuildWithStringPayload("my message payload")
      if err != nil {
          panic(err)
      }
      

The following code example shows how to create a message builder, set message properties and create a message:

/* Builder for creation of similarly configured messages */
messageBuilder := messagingService.MessageBuilder()
message, err := messageBuilder.
       FromConfigurationProvider(messageProperties).     // For Sender ID, Sequence Number etc.  
       WithExpiration(time.Now().Add(10 * time.Second)). // Expire the message in 10 seconds.
       BuildWithStringPayload("my message payload")      // Builds the message.

For more information about the functions, see the PubSub+ Messaging API for Go reference.

Setting a Partition Key

You can set a partition key to use partitioned queues. Partitioned Queues is a feature configured on the PubSub+ Event Broker that allows you to easily scale the number of consumer applications bound to a queue. A partition key can be set on each message in the publishing application to ensure that all messages with the same partition key are delivered to the same consumer without additional logic in the consumer application. For more information see Partitioned Queues.

Use the WithProperty(property,value) function to set a property-value pair on a Go API message.

  • property—The constant config.QueuePartitionKey or the string value JMSXGroupID.

  • value—A string representing the value of your partition key. Client applications set the value at publish time.

The following example shows how to set the value of the constant config.QueuePartitionKey:

// Set the queue partition key on the outbound message using the `with_property()` builder method.
func SetQueuePartitionKeyUsingWithProperty(queuePartitionKeyValue string) {
    payload := "my_payload"

    outboundMessage := messaging.
        MessagingService.
        MessageBuilder().
        WithProperty(config.QueuePartitionKey, queuePartitionKeyValue).
        BuildWithStringPayload(payload)
}

// You can also set the queue partition key on the outbound message using the `from_properties()` builder method.
func SetQueuePartitionKeyUsingFromConfigurationProvider(queuePartitionKeyValue string) {
    payload := "my_payload"
    messageConfig := config.MessagePropertyMap{
        config.QueuePartitionKey: queuePartitionKeyValue,
    }

    outboundMessage := messaging.
        MessagingService.
        MessageBuilder().
        FromConfigurationProvider(messageConfig).
        Build(payload)
}

Handling Back-Pressure When Publishing Persistent Messages

When you use persistent messaging, the messages are sent to the PubSub+ event broker with a topic and may be enqueued on any queue with a matching topic subscription. The event broker then delivers the messages asynchronously to any consumers subscribed to that queue. At the time the client application publishes the persistent message, the API queues the message in an internal buffer before it is sent to the event broker. In ideal conditions, as soon the application publishes a message, the API sends that message to the network, and that message is eventually received by event broker. It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In the PubSub+ Go API, the PersistentMessagePublisherBuilder has two primary mechanisms to handle back-pressure which you can use:

Rejecting Messages When a Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use OnBackPressureReject(bufferSize uint) function to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and errors are returned until the buffer has capacity again. On calls to publish, bufferSize must be greater than or equal to zero.

/* Creates an instance of PersistentMessagePublisherBuilder, which is used to create PersistentMessagePublishers. */
persistentPublisher, builderError := messagingService. 
       CreatePersistentMessagePublisherBuilder(). // Creates a PersistentMessagePublisherBuilder that can be used to configure persistent message publisher instances.
       OnBackPressureReject(500).                 // Sets the pressure strategy to reject messages once the bufferSize, in number of messages, is reached.
       Build()                                    // Creates a new PersistentMessagePublisher instance based on the configured properties.
if builderErr != nil {
       panic(builderErr)
}

startErr := persistentPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
       panic(startErr)
}     

Using a Publisher Readiness Listener

We recommend that you use a PublisherReadinessListener when you use the OnBackPressureReject() function because it lets your application know when there is capacity available in the buffer and it can resume publishing messages.

The following is an example of registering an event handler using a PublisherReadinessListener:

func PublisherReadinessListener(){
       // What you want your application to do when it can publish messages again.
}
// ...
// ...
persistentPublisher, builderError := messagingService. 
CreatePersistentMessagePublisherBuilder(). // Creates a PersistentMessagePublisherBuilder that can be used to configure persistent message publisher instances.
OnBackPressureReject(500).                 // Sets the pressure strategy to reject messages once the bufferSize, in number of messages, is reached.
Build()                                    // Creates a new PersistentMessagePublisher instance based on the configured properties.
if builderErr != nil {
       panic(builderErr)
}

startErr := persistentPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
       panic(startErr)
}
persistentPublisher.SetPublisherReadinessListener(canPublishListener);    // Registers a listener to be called when the publisher can send messages.     

For the best performance, we recommend you use OnBackPressureReject(0) with a PublisherReadinessListener. A bufferSize of zero means the publisher application will not buffer messages before sending them to the broker, which reduces latency and improves performance. This prevents large numbers of messages accumulating on the publisher's internal buffer and is the most performant method of dealing with back pressure.

Throttling Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful to free capacity to its internal buffers. You can use the OnBackPressureWait(bufferSize uint) function to set the maximum number of messages that can accumulate in the buffer. The default setting for the PubSub+ Go API is for the publisher application to wait with a bufferSize of 50. When this maximum capacity (bufferSize) is reached, the publisher routine pauses and waits for available capacity in the internal buffer before letting the application publish any more messages.

This function should be used when you want the publishing application to wait for space when the buffer's capacity has been reached. Using this mechanism gives time for the API to empty the internal buffer. An additional benefit for persistent messaging with this configuration is that the API does not discard any messages.

The following shows an example of how to configure the internal buffer to accommodate up to one thousand messages:

persistentPublisher, builderError := messagingService. 
       CreatePersistentMessagePublisherBuilder(). // Creates a PersistentMessagePublisherBuilder that can be used to configure persistent message publisher instances.
       OnBackPressureWait(1000).                   // Sets back pressure strategy to wait and block until there is space in the buffer of size bufferSize.
       Build()                                    // Creates a new PersistentMessagePublisher instance based on the configured properties.
if builderErr != nil {
       panic(builderErr)
}

startErr := persistentPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
       panic(startErr)
}  

Publishing a Persistent Message

After you have established a connection to the event broker using a MessagingServiceinstance, you can use a PersistentMessagePublisher to publish persistent messages.

A persistent message has the following components:

  • A topic to publish to (required)
  • A message payload (optional)

Persistent message publishing involves the receipt of acknowledgments or publish receipts. Depending on your requirements, your client application can publish as:

  • non-blocking, allows your application to perform other functions while the PublishReceiptListener waits for the acknowledgment
  • blocking, waits until an acknowledgment has been received; an acknowledge indicates that the message has been received and persisted by the broker

Non-Blocking Functions

The following are non-blocking functions:

  • PublishBytes(message []byte, destination *resource.Topic)
  • PublishString(message string, destination *resource.Topic)
  • Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{})

For more information about these functions, see the PubSub+ Messaging API Go reference.

Blocking Functions

If you want your publisher to await for an acknowledgment from the broker before returning from the Publish() call, you can use the following blocking function:

  • PublishAwaitAcknowledgement(message message.OutboundMessage, destination *resource.Topic, timeout time.Duration, properties config.MessagePropertiesConfigurationProvider)

The preceding function can be used with a PersistentMessagePublisher to publish an OutboundMessage to the broker using a topic. This function blocks the main routine until either:

  • the publisher API receives an acknowledgment from the broker
  • the timeout period elapses
messageBody := "Hello from Go Persistent Publisher Sample"
messageBuilder := messagingService.MessageBuilder().     // Creates an OutboundMessageBuilder that can be used to build messages.
    WithProperty("application", "samples").               // Sets an individual message property on the resulting message.
    WithProperty("language", "go")                         // Both Solace defined config.MessageProperty keys as well as user-defined property keys are accepted.

go func() {
    for persistentPublisher.IsReady() {                   // Checks if the publisher can publish messages.
        message, err := messageBuilder.BuildWithStringPayload(messageBody)
        if err != nil {
            panic(err)
        }

        topic := resource.TopicOf("go/persistent/publisher/")

        publishErr := persistentPublisher.PublishAwaitAcknowledgement(message, topic, 2*time.Second, nil) // Sends the message and awaits an acknowledgement. on OutboundMessage
        if publishErr != nil {                             // Optionally, you can provide properties in the form of OutboundMessageProperties to override  
            panic(publishErr)                              // any properties set on OutboundMessage  
        }
    }
}()        

Acknowledging Messages and Handling Errors

A publish receipt is a delivery confirmation that indicates whether or not the event broker successfully processed the message. These publish receipts can indicate success or failure, and are handled by a MessagePublisReceiptListener instance. You can set your MessagePublishReceiptListener with the SetMessagePublishReceiptListener() function.

The following example shows how to use the MessagePublishReceiptListener to listen for publish receipts:

/* listener that processes all delivery confirmations/timeouts for all messages */
func PublishReceiptListener(receipt solace.PublishReceipt) {
    fmt.Println("Received a Publish Receipt from the broker\n")
    fmt.Println("IsPersisted: ", receipt.IsPersisted())
    fmt.Println("Message : ", receipt.GetMessage())
    if receipt.GetError() != nil {
        fmt.Println("Guaranteed Message is NOT persisted on the broker! Received NAK")
        fmt.Println("Error is: ", receipt.GetError())
        // probably want to do something here.  some error handling possibilities:
        //  - send the message again
        //  - send it somewhere else (error handling queue?)
        //  - log and continue
        //  - pause and retry (backoff) - maybe set a flag to slow down the publisher
    }
}

persistentPublisher.SetMessagePublishReceiptListener(PublishReceiptListener)           // Listen to all delivery confirmations for all messages being sent.
publishErr := persistentPublisher.PublishString("Hello world!", topicDestination)      // Publish a message with a String payload.    
if publishErr != nil {                                          
    panic(publishErr)                                               
}     

Strategies for Handling Publish Receipt Errors

The following are application-specific strategies you can use to handle receipt errors when publishing.

Wait and Retry
Wait a number of seconds before trying to send the message again. For example, using time.sleep(1000) to wait for 1 second before attempting to publish again.
Retry a Predefined number of Times
Try to re-publish the message a predefined number of times before dropping it.
Discard the Message
Simply discard messages with failed publish receipts. We don't recommend this strategy if your application cannot tolerate message loss.

To receive a failed publish receipt when there is no matching subscription, this option must be set for the event broker or event broker service. For more information, see Handling Guaranteed Messages with No Matches (for appliances and software event brokers) or the Reject Messages to Sender On No Subscription Match Discard (for PubSub+ Cloud).

User Contexts

Optionally, you can use user contexts to correlate information for persistent messages to publish receipts in your application. This information is user-specific and is meaningful only to your publishing application and is not sent to the broker. A user context permits you to attach data to the publish call that can later be retrieved from the publish receipt listener.

When you use a user context, it allows you to handle multiple scenarios. It also allows your application to decide what action to take or how to process the publish receipt based on the context.

For example, if a non-blocking application has multiple routines to publish persistent messages, each routine can include its identifier as the user context when it publishes a persistent message. The PubSub+ Go API tracks the user context when specified for each message and returns the user context as part of the publish receipt when the message is acknowledged or rejected by the event broker. The publishing application can then send the publish receipt to the correct routine that sent the message based on the user context.

You can set the user context when you publish the message. For example, you use the Publish(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider, context interface{}) function, where the user context is specified as type interface{}.

The following example shows how to get the user context from a publish receipt:

/* A simple example of a persistent message publishReceipt */
var acknowledgedMessage message.OutboundMessage = publishReceipt.GetMessage() // Retrieves message associated with a Receipt.    
userContext := publishReceipt.GetUserContext()        // Corresponding context can be retrieved this way from a publish receipt when provided during message publishing.

If your application is non-blocking, you can also use a persistent message publisher with publish receipt in your callback to log information. For example, you can use non-blocking message publishing and then send alerts to notify the application of the status of published messages, such as:

  • the event broker successfully receives and processes a message
  • access control violations (ACL)
  • a queue being over quota
  • invalid topics / topics with no subscribers

The following code shows an example of a PublishReceiptListener:

/* listener that processes all delivery confirmations/timeouts for all messages */
func PublishReceiptListener(receipt solace.PublishReceipt) {
    fmt.Println("Received a Publish Receipt from the broker\n")
    fmt.Println("IsPersisted: ", receipt.IsPersisted())
    fmt.Println("Message : ", receipt.GetMessage())
    if receipt.GetError() != nil {
        fmt.Println("Guaranteed Message is NOT persisted on the broker! Received NAK")
        fmt.Println("Error is: ", receipt.GetError())
        // probably want to do something here.  some error handling possibilities:
        //  - send the message again
        //  - send it somewhere else (error handling queue?)
        //  - log and continue
        //  - pause and retry (backoff) - maybe set a flag to slow down the publisher
    }
}

persistentPublisher.SetMessagePublishReceiptListener(PublishReceiptListener)     // Listen to all delivery confirmations for all messages being sent.
publishErr := persistentPublisher.PublishString("Hello world!", topicDestination)      // Publish a message with a String payload.    
if publishErr != nil {                                          
    panic(publishErr)                                               
}