Using PubSub+ Cache With the Go API

PubSub+ Cache is a scalable, in-memory message cache for direct messaging. It allows client applications to retrieve previously preserved messages for topics of interest when they come online, or when they start subscribing to topics they were not originally subscribed to. Depending on the type of cache request, clients can access both preserved messages and live messages that arrive after the cache request is initiated. For information about setting up a PubSub+ Cache instance, see PubSub+ Cache. The PubSub+ Go API allows your applications to send cache requests to receive cached messages that your receiving application is interested in. The following sections describe how to send cache requests and receive cached messages:

To consume cached messages, your application needs access to the ReceiverCacheRequests interface. The DirectMessageReceiver interface includes the ReceiverCacheRequests interface, so a DirectMessageReceiver instance is required to consume cached messages. See Creating a DirectMessageReceiver for more information.

  1. Creating a CachedMessageSubscriptionRequest Instance
  2. Sending a Cache Request
  3. Checking Cache Status and Cache Request IDs in Received Messages
  4. Considerations When Receiving Cached Messages

Creating a CachedMessageSubscriptionRequest Instance

To send a cache request, you first call the NewCachedMessageSubscriptionRequest() constructor function. This function takes one of four different constants that represent different strategies for requesting cached messages. Select the one that best suits the needs of your application:

  • As Available (CacheRequestStrategyAsAvailable)

    • Returns an instance of a CachedMessageSubscriptionRequest used to configure a cached data request. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. This means your application receives a mix of cached messages from the cache request and live messages from the topic subscription.
  • Live Cancels Cached (CacheRequestStrategyLiveCancelsCached)

    • Returns an instance of a CachedMessageSubscriptionRequest that configures the cache request application to receive the latest messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. When there are no live (non-cached) messages, cached messages that match your chosen topic subscription are considered the latest. Otherwise, live messages that match your chosen topic subscription are considered the latest.
  • Cached First (CacheRequestStrategyCachedFirst)

    • Returns an instance of CachedMessageSubscriptionRequest that configures the cache request to receive cached messages when available, followed by live (non-cached) messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic.
  • Cached Only (CacheRequestStrategyCachedOnly)

    • Returns an instance of a CachedMessageSubscriptionRequest that configures your application to receive cached messages only for the duration of the cache request.
    • Only use CacheRequestStrategyCachedOnly when your receiver has no live subscriptions that match the cache request subscription, because your receiver receives duplicates of any cached messages on overlapping topic subscriptions. When you have pre-existing live data subscriptions that overlap with your cache request topic subscription, use CacheRequestStrategyAsAvailable or CacheRequestStrategyCachedFirst to avoid message duplication.

When you use the CacheRequestStrategyAsAvailable, CacheRequestStrategyLiveCancelsCached, and CacheRequestStrategyCachedFirst constants, the cache request creates a live topic subscription if one does not already exist on your receiver. This subscription persists even after the cache request ends. You can remove the subscription by calling the RemoveSubscription() function on your receiver object after the cache outcome returns, which cancels the receipt of any live data to your client application on that subscription.

The NewCachedMessageSubscriptionRequest()function returns a CachedMessageSubscriptionRequest instance. The function takes the following parameters:

  • cachedMessageSubscriptionStrategy—A constant representing the cache subscription strategy.
  • cacheName—A string representing the name of the PubSub+ Cache to retrieve from.
  • subscription—A topic subscription the cache request should match against.
  • cacheAccessTimeout—An integer representing the cache request timeout (in milliseconds). This value must be between 3,000 and 2,147,483,647, for most applications a value of 10,000 (10 seconds) is sufficient. See Assign an Appropriate cacheAccessTimeout Value for more information.
  • maxCachedMessages—An integer representing the max number of messages expected to be received from a cache. A value of 0 means there is no limit on the number of messages that can be received from a cache.
    • cachedMessageAge—An integer representing the maximum age (in seconds) of the messages to be retrieved from a cache. A value of 0 means there is no limit on the age of messages that can be received from a cache.

The following example shows how to create an instance of CachedMessageSubscriptionRequest with the CacheRequestStrategyAsAvailable strategy:

const (
    myCacheInstanceName  = "myCacheInstanceName"
    cacheAccessTimeout   = 4000
    maxCachedMessages    = 0
    cachedMessageAge     = 30
)

var myTopicSubscription  = TopicSubscriptionOf("myTopic")

cachedMessageSubscriptionRequest, err := NewCachedMessageSubscriptionRequest(
    CacheRequestStrategyAsAvailable,
    myCacheInstanceName,
    myTopicSubscription,
    cacheAccessTimeout,
    maxCachedMessages,
    cachedMessageAge,
)

For more information, see the PubSub+ Messaging API for Go reference.

Sending a Cache Request

To create a cache request and begin receiving cached messages from a PubSub+ Cache instance, you use either the RequestCachedAsync() or RequestCachedAsyncWithCallback()function from the ReceiverCacheRequests interface. If you make multiple cache requests prior to receiving a cache outcome, you need to use unique cache request ids to avoid receiving duplicate messages. These functions request previously cached messages from an event broker set up to use PubSub+ cache.

Sending a Cache Request with a Channel

The RequestCachedAsync() function provides a channel-based approach for requesting cached messages. It returns a Go channel that receives CacheResponse objects asynchronously. These responses contain information about the cache request results. For more information, see Cache Response. The function takes the following parameters:

RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID) (<- chan CacheResponse, error)

  • cachedMessageSubscriptionRequest—Request for cached messages matching specified subscription.
  • cacheRequestID—Cache request identifier which can be used for response callback correlation purposes. This ID needs to be unique across the lifetime of the MessagingService instance, and it is the responsibility of the application to guarantee this. Failing to do so results in undefined behavior. This value is received as part of the CacheResponse from the channel returned by the function. This identifier is how the application correlates cache requests with cache responses and data messages. Live messages do not have a cache request ID. It is the responsibility of the application to ensure the identifier is unique so that no collisions in your correlations occur. This value should not be confused with the correlationID property used for general message correlation in the InboundMessage class.

The following example shows how to send a cache request with the RequestCachedAsync() function:

cacheRequestID := 12345

responseChannel, err := directMessageReceiver.RequestCachedAsync(cachedMessageSubscriptionRequest, cacheRequestID)
if err != nil {
    fmt.Printf("Failed to request cache: %s\n", err)
    return
}

// Wait for response on the channel
cacheResponse := <-responseChannel
if cacheResponse.GetError() != nil {
    fmt.Printf("Cache request failed: %s\n", cacheResponse.GetError())
    return
}
        
fmt.Printf("Cache request %d completed with outcome: %d\n",
    cacheResponse.GetCacheRequestID(),
    cacheResponse.GetCacheRequestOutcome())

Sending a Cache Request with a Callback

The RequestCachedAsyncWithCallback() function provides a callback-based approach for requesting cached messages. It processes cache responses through a provided callback function. The function takes the following parameters:

  • RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID, callback func(CacheResponse)) error
    • cachedMessageSubscriptionRequest—Request for cached messages matching specified subscription.
    • cacheRequestID—Request identifier which can be used for response callback correlation purposes. This ID needs to be unique across the lifetime of the MessagingService instance, and it is the responsibility of the application to guarantee this. Failing to do so results in undefined behavior. This value is received as part of the CacheResponse passed to the callback function provided. This identifier is how the application correlates cache requests with cache responses and data messages. Live messages do not have a cacheRequestID. It is the responsibility of the application to ensure the identifier is unique so that no collisions in your correlations occur. This value should not be confused with the correlationID property used for general message correlation in the InboundMessage class.
    • callback—Takes a callback function that is called for each cache response received. The callback receives a CacheResponse parameter which provides information about the response received from the cache. For more information, see Cache Response.

The following example shows how to send a cache request with the RequestCachedAsyncwithCallback() function:

cacheRequestID := 12345

cacheCallback := func(cacheResponse CacheResponse) {
    if cacheResponse.GetError() != nil {
        fmt.Printf("Cache request failed: %s\n", cacheResponse.GetError())
        return
    }
        
    fmt.Printf("Cache request %d completed with outcome: %d\n",
        cacheResponse.GetCacheRequestID(),
        cacheResponse.GetCacheRequestOutcome())
    }
        
err := directMessageReceiver.RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest, cacheRequestID, cacheCallback)
if err != nil {
    fmt.Printf("Failed to request cache: %s\n", err)
    return
} 

Cache Response

The CacheResponse interface provides the following information about a response received from a cache:

  • CacheRequestOutcome—The outcome of the cache request, can be one of the following constants:
    • CacheRequestOutcomeOk—Cached data was returned in a cache reply, or the cache request was fulfilled by live data.
    • CacheRequestOutcomeNoData—There was no data in the cache reply.
    • CacheRequestOutcomeSuspectData—There was suspect data in the cache reply.
    • CacheRequestOutcomeFailed—The request failed for some reason, such as a timeout. This is the only CacheRequestOutcome that results in an error being returned on the CacheResponse.
  • CacheRequestID—The cache request identifier associated with the submitted cache request. This ID can be used for response callback correlation purposes, to correlate cache requests with their associated messages.
  • error—This value is nil if the cache response is successful. If the CacheRequestOutcome is CacheRequestOutcomeFailed, the CacheResponse returns an error.

Checking Cache Status and Cache Request IDs in Received Messages

After a cache request is made, your DirectMessageReceiver begins receiving messages that can be cached or live. You can check whether a message is a cached message with the function GetCacheStatus(). This function returns a CacheStatusconstant, which can be one of three values:

  • Live—The message was retrieved directly from a PubSub+ event broker and not from a cache instance.

  • Cached—The message was retrieved from a cache instance.

  • Suspect—The message was retrieved from a suspected cache instance.

You can get the cache request identifier from a cached message with the GetCacheRequestID() function. The example below shows a MessageHandler implementation that checks the cache status of inbound messages and retrieves the cache request ID from cached messages:

func messageHandler(inboundMessage message.InboundMessage) {
    if inboundMessage.GetCacheStatus() == message.Cached {
        requestID, ok := inboundMessage.GetCacheRequestId()
        if !ok {
            fmt.Println("failed to retrieve cache request ID from cached message")
            return
        }
        fmt.Printf("Message is cached from cache response %d", requestID)
    } else if inboundMessage.GetCacheStatus() == message.Suspect {
        requestID, ok := inboundMessage.GetCacheRequestId()
        if !ok {
            fmt.Println("failed to retrieve cache request ID from suspect message")
            return
        }
        fmt.Printf("Message is suspect from cache response %d, check on cache", requestID)
    } else {
        fmt.Println("Message is live, continue")
        requestID, ok := inboundMessage.GetCacheRequestId()
        if ok {
            fmt.Println("unexpected: retrieved cache request ID from live message")
            return
        }
        // If the cache request ID is not available, the returned value is 0:
        fmt.Printf("As expected, no cache request ID found on live message (returned value: %d)\n", requestID)
    }
}

directMessageReceiver.ReceiveAsync(messageHandler)

If an error occurs during a cache request or if the message receiver or MessagingService is terminated, the PubSub+ Go API cancels all outstanding cache requests. The termination process blocks until the PubSub+ Go API has passed all cache responses to the application. For cache requests submitted using the RequestCachedAsync() function, the API pushes the response to the channel, closes the channel, and finishes terminating. For cache requests submitted using RequestCachedAsyncWithCallback(), the application must execute the callback that was provided to the API before the API can finish terminating.

For more information, see the PubSub+ Messaging API for Go reference.

Considerations When Receiving Cached Messages

There are considerations to be aware of when you receive cached messages with the PubSub+ Go API.

Maximum Concurrent Cache Requests

Applications can submit a maximum of 1024 concurrent, outstanding cache requests. If your PubSub+ Go API applications exceed this number, it results in an error.

Avoid Overlapping Topic Subscriptions When Possible

When you use the PubSub+ Go API to consume cached messages, we recommend that you do not have overlapping topic subscriptions. Overlapping topic subscriptions occur when you use the same or overlapping topic subscriptions across one or more message consumers connected to the same MessagingService instance. If you have overlapping topic subscriptions, it is important to understand how cache requests can affect your applications.

Multiple Message Consumers with Topic Subscription Overlap

The table below explains what happens when a cache request is made by a message consumer when

  • more than one message consumer is connected to the same MessagingService instance
    AND

  • the topic subscription used in the cache request overlaps with a topic subscription used by one or more of those message consumers.

Strategy used to configure cache request Result

As Available

The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.

Live Cancels Cached

The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.
Cached First

The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.

AND

All consumers with live data subscriptions stop receiving live data until all cached messages are received for any overlapping topic subscriptions.

Cached Only The cache response delivers cached messages to all consumers that have a matching topic subscription.

Single Message Consumer with Topic Overlap

If you use the CacheRequestStrategyCachedOnly strategy in your cache message request, and your application meets the following two conditions:

  1. Only one message consumer is connected to the MessagingService instance.

  2. The topic subscription used in the cache request matches any of the message consumer's live topic subscriptions.

Your message consumer receives duplicates of any cached messages on those overlapping topic subscriptions. To avoid message duplication in this scenario, use the CacheRequestStrategyAsAvailable or CacheRequestStrategyCachedFirst strategies.

For more information, see the PubSub+ Messaging API for Go reference.

Assign an Appropriate cacheAccessTimeout Value

There is no default value for cacheAccessTimeout. In most cases, a value of 10,000 (equal to 10 seconds) is sufficient. This value specifies a timer for the internal requests that occur between the PubSub+ Go API and a PubSub+ Cache instance. A single call to requestCached() can lead to one or more of these internal requests. As long as each of these internal requests complete before the specified cacheAccessTimeout value, a timeout does not occur.

For example, if you specify a cacheAccessTimeout of 3000 milliseconds and there are 10 internal requests that each take 2000 milliseconds to complete, the time that it takes the RequestCachedAsync() or RequestCachedAsyncWithCallback() functions to return is the sum of those requests, which is 20,000 milliseconds. Because none of the individual requests took longer than 3000 milliseconds no timeout occurs in this scenario.

In some scenarios, such as high-network latency, increases to the cacheAccessTimeout value may be required.