Publishing Direct Messages Using the Go API

Direct messages are useful when high-throughput and low-latency is required. We recommend that you publish events using direct messages if some message loss can be tolerated without negatively impacting client applications. Message loss can occur due to external factors, such as network congestion or client disconnection. If your applications require guaranteed delivery and message acknowledgment, then we recommend using persistent messages instead.

To handle direct messages using the PubSub+ Go API, use the following steps:

  1. Create a DirectMessagePublisher.
  2. Configure and Create an OutboundMessage.
  3. Handling Back-Pressure When Publishing Direct Messages.
  4. Publish a Direct Message.
  5. Handle Errors.

In some use cases, it's possible for your application to send messages faster than the messages can be transported. Messages can fill the API's internal buffers causing back-pressure. If this scenario is possible, we recommend that you consider changing the back-pressure settings to meet the requirements of your application. For more information, see Handling Back-Pressure When Publishing Direct Messages.

For examples of applications that publish direct messages, see direct_publisher.go on the Solace GitHub page.

Creating a DirectMessagePublisher

After a MessagingService instance has established a connection to an event broker, use a DirectMessagePublisher to publish direct messages. You can use a DirectMessagePublisherBuilder to configure your DirectMessagePublisher to use certain features of the API, such as back-pressure strategies. Call the Build() function on your DirectMessagePublisherBuilder to return a DirectMessagePublisher instance. To enable your DirectMessagePublisher to start publishing messages, call Start() on it.

For more information, see the PubSub+ Messaging API for Go reference.

The following is an example that shows how to use a direct message publisher to enable your application to publish messages to the event broker:

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublishers. */
directPublisher, builderError := messagingService. 
	CreateDirectMessagePublisherBuilder(). // Creates a DirectMessagePublisherBuilder that can be used to configure direct message publisher instances.
	OnBackPressureReject().
	Build()                                // Creates a new DirectMessagePublisher instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
	panic(startErr)
}

Asynchronous Publishers

It is also possible to start a direct message publisher using a callback listener to allow for asynchronous notifications after the start operation is complete.

The following example shows how to start a DirectMessagePublisher asynchronously:

func PublisherCallback(publisher solace.DirectMessagePublisher, err error) {
	if err != nil {
		panic(err) // Publisher started up with an error.
	} else {
		// Publisher started without issue.
	}
}
// ...	
// ...
directPublisher.StartAsyncCallback(PublisherCallback) // Starts the publisher asynchronously. Calls the callback when started with an error if one occurred, otherwise nil if successful.

Your publisher application is not operational until you call Start() or StartAsyncCallback() on it.

Configuring and Creating Outbound Messages

Your client applications explicitly create the outbound messages to publish. In the PubSub+ Go API, when you publish messages you use OutboundMessage instances. To configure and create OutboundMessage instances, follow these steps:

  1. Call messagingService.MessageBuilder() to return an OutboundMessageBuilder instance. For better performance, we recommend you use a single OutboundMessageBuilder to create multiple OutboundMessage instances.

    messageBuilder := messagingService.MessageBuilder()
  2. Configure your message with an OutboundMessageBuilder and then call the Build() function to return a message instance. You can configure message properties using either method below.
    • Use config.MessagePropertyMap and the config.MessageProperty* functions. The following example shows how to set MessagePropertyMap values for message ID, sender ID and message type constants, and then configure a message using the FromConfigurationProvider() function:
      messageProperties := config.MessagePropertyMap{
      	config.MessagePropertyApplicationMessageID: "message ID",
      	config.MessagePropertySenderID:      "Sender ID",
      	config.MessagePropertyApplicationMessageType: "Message Type",
      	// For a complete list of MessageProperty constants see the PubSub+ Messaging API for Go reference.
      }	
      // ...						
      // ...						
      message, err := messageBuilder.FromConfigurationProvider(messageProperties)
      	.BuildWithStringPayload("my message payload")
      if err != nil {
      	panic(err)
      }
    • Use the OutboundMessageBuilder interface and the WithProperty(propertyName,propertyValue) functions. Both Solace defined config.MessageProperty keys as well as arbitrary user-defined property keys are accepted. The following example shows how to set a custom key-value property on a message:
      messageBuilder := messagingService.MessageBuilder().
      	WithProperty("propertyName", "propertyValue")
      // ...							
      // ...							
      message, err := messageBuilder.BuildWithStringPayload("my message payload")
      if err != nil {
      	panic(err)
      }

The following code example shows how to create a message builder, set message properties and create a message:

/* Builder for creation of similarly configured messages */
messageBuilder := messagingService.MessageBuilder()
message, err := messageBuilder.
	FromConfigurationProvider(messageProperties).     // For Sender ID, Sequence Number etc.  
	WithExpiration(time.Now().Add(10 * time.Second)). // Expire the message in 10 seconds.
	BuildWithStringPayload("my message payload")      // Builds the message.

For more information about the functions, see the PubSub+ Messaging API for Go reference.

Handling Back-Pressure When Publishing Direct Messages

When you use direct messaging, the messages are sent to a PubSub+ event broker using a topic. The event broker relays the messages to any consumers subscribed to that topic.

At the time the client application publishes a direct message, the API queues the message in an internal buffer before it is sent to event broker. In ideal conditions, as soon the application publishes a message, the API sends that message on the network, and that message is eventually received by event broker. It is possible for a client application to publish messages faster than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. This scenario is referred to as back-pressure. It is important to configure your application to handle situations where back-pressure occurs.

In the PubSub+ Go API, the DirectMessagePublisher has the following mechanisms to handle back-pressure:

Rejecting Messages When A Specified Limit is Reached

When back-pressure occurs, you can choose to reject the messages from the client application when a specified limit is reached in the internal buffer. You can use the OnBackPressureReject(bufferSize uint) function to specify a defined buffer capacity for a set number of messages to accumulate. After the specified capacity is reached, it is no longer possible to publish new messages and the API returns errors until the buffer has capacity again. On calls to publish, the value for bufferSize must be greater than or equal to zero.

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublishers. */
directPublisher, builderError := messagingService. 
	CreateDirectMessagePublisherBuilder(). // Creates a DirectMessagePublisherBuilder that can be used to configure direct message publisher instances.
	OnBackPressureReject(500).             // Sets the pressure strategy to reject messages once the bufferSize, in number of messages, is reached.
	Build()                                // Creates a new DirectMessagePublisher instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
	panic(startErr)
}

Using a Publisher Readiness Listener

We recommend that you use a PublisherReadinessListener when you use the OnBackPressureReject() function because it lets your application known when there is capacity available in the buffer and it can resume publishing messages. The back pressure strategy of OnBackPressureWait() will not call a PublisherReadinessListener.

The following is an example of registering an event handler PublisherReadinessListener instance:

func PublisherReadinessListener(){
// What you want your application to do when it can publish messages again.
}
// ...
// ...
directPublisher, builderError := messagingService. 
	CreateDirectMessagePublisherBuilder(). // Creates a DirectMessagePublisherBuilder that can be used to configure direct message publisher instances.
	OnBackPressureReject(500).	       // Sets the pressure strategy to reject messages once the bufferSize, in number of messages, is reached.
	Build()                                // Creates a new DirectMessagePublisher instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
	panic(startErr)
}
directPublisher.SetPublisherReadinessListener(PublisherReadinessListener);    // Registers a listener to be called when the publisher can send messages.       

For the best performance, we recommend you use OnBackPressureReject(0) with a PublisherReadinessListener. A bufferSize of zero means the publisher application will not buffer messages before sending them to the broker, which reduces latency and improves performance. This prevents large numbers of messages accumulating on the publisher's internal buffer and is the most performant method of dealing with back pressure.

Throttling the Publisher When a Specified Limit is Reached

You can choose to throttle the publishing application if a specified limit is reached in the internal buffer. The use of throttling is useful to free capacity to its internal buffers. You can use the OnBackPressureWait(bufferSize uint) function to set the maximum number of messages that can accumulate in the buffer. The default setting for the Go API is for the publisher application to wait with a bufferSize of 50. When this maximum capacity (bufferSize) is reached, the publisher routine pauses and waits for available capacity in the internal buffer before letting the application publish any more messages.

This function should be used when you want application requests to take longer after the buffer's capacity has been reached. Using this mechanism effectively gives time for the API to empty the internal buffer. An additional benefit is that when you use persistent publishing, the API does not discard any messages.

The following shows an example of how to configure an internal buffer to accommodate up to one thousand messages:

/* Creates an instance of DirectMessagePublisherBuilder, which is used to create DirectMessagePublishers. */
directPublisher, builderError := messagingService. 
	CreateDirectMessagePublisherBuilder(). // Creates a DirectMessagePublisherBuilder that can be used to configure direct message publisher instances.
	OnBackPressureWait(1000).               // Sets back pressure strategy to wait and block until there is space in the buffer of size bufferSize.
	Build()                                // Creates a new DirectMessagePublisher instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}

startErr := directPublisher.Start()    // Start starts the publisher synchronously. Before this function is called, the publisher is considered off-duty.
if startErr != nil {
	panic(startErr)
}

Publishing a Direct Message

After you have created the DirectMessagePublisher, you can start sending messages. When you send a message, there are two primary components:

The topic must be an instantiation of the resource.Topic class that follows the Solace hierarchical format, for example: solace/messaging/direct/pub. The publish function currently supports simple string messages, byte arrays, as well as OutboundMessage instances, which can be obtained through an OutboundMessageBuilder via MessagingService.MessageBuilder(). The following are functions to publish a direct message:

  • Publish(message message.OutboundMessage, destination *resource.Topic)
  • PublishWithProperties(message message.OutboundMessage, destination *resource.Topic, properties config.MessagePropertiesConfigurationProvider)
  • PublishBytes(message []byte, destination *resource.Topic)
  • PublishString(message string, destination *resource.Topic)

See the PubSub+ Messaging API for Go reference for information about the publish functions that are available when you use a DirectMessagePublisher.

Here's an example that shows how to publish a direct message:

messageBody := "Hello from Go Direct Publisher Sample"
messageBuilder := messagingService.MessageBuilder().     // Creates an OutboundMessageBuilder that can be used to build messages.
WithProperty("application", "samples").                  // Sets an individual message property on the resulting message.
WithProperty("language", "go")				  // Both Solace defined config.MessageProperty keys as well as user-defined property keys are accepted.

go func() {
	for directPublisher.IsReady() {                   // Checks if the publisher can publish messages.
	message, err := messageBuilder.BuildWithStringPayload(messageBody)
		if err != nil {
			panic(err)
		}

		topic := resource.TopicOf("go/persistent/publisher/")

		publishErr := directPublisher.Publish(message, topic)    // Publishes the message of type OutboundMessage to the specified destination
		if publishErr != nil {
			panic(publishErr)
		}
	}
}()

Handling Errors

The Go API provides the SetPublishFailureListener() function that notifies the client if the API is unable to publish messages. A failed publish event could be due to an issue such as an invalid topic or a termination of the service. See the example below:

func PublishFailureListener(){
// Handle a failed publish event
}
// ...
// ...
directPublisher, builderError := messagingService. 
	CreateDirectMessagePublisherBuilder(). // Creates a DirectMessagePublisherBuilder that can be used to configure direct message publisher instances.
	Build()                                // Creates a new DirectMessagePublisher instance based on the configured properties.
if builderErr != nil {
	panic(builderErr)
}
directPublisher.SetPublishFailureListener(PublishFailureListener) // Sets the listener to call if the publishing of a direct message fails.