Using PubSub+ Cache With the Python API

PubSub+ Cache is a scalable, in-memory message cache for direct messaging. It allows client applications to request the most current messages for a topic of interest when they come online, or when they start subscribing to topics that they were not originally subscribed to. For information about setting up a PubSub+ Cache instance, see PubSub+ Cache. The PubSub+ Python API allows your applications to send cache requests to receive cached messages that your receiving application is interested in. The following sections describe how to send cache requests and receive cached messages:

To consume cached messages, your application needs access to the ReceiverCacheRequests interface. The DirectMessageReceiver interface includes the ReceiverCacheRequests interface, so a DirectMessageReceiver instance is required to consume cached messages. See Creating a DirectMessageReceiver for more information.

  1. Creating a CachedMessageSubscriptionRequest Object

  2. Sending a Cache Request

  3. Creating an Event Listener Using the CacheRequestOutcomeListener Class

  4. Considerations When Receiving Cached Messages

Creating a CachedMessageSubscriptionRequest Object

To send a cache request, you first create an object of the CachedMessageSubscriptionRequest class. This class contains four different functions for requesting cached messages that each return an instance of CachedMessageSubscriptionRequest. Select the function that best suits the needs of your application:

  • as_available(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)

    • Returns an instance of a CachedMessageSubscriptionRequest used to configure a cached data request. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. This means your application receives a mix of cached messages from the cache request and live messages from the topic subscription.
  • live_cancels_cached(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)

    • Returns an instance of a CachedMessageSubscriptionRequest that configures the cache request application to receive the latest messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. When there are no live (non-cached) messages, cached messages that match your chosen topic subscription are considered the latest. Otherwise, live messages that match your chosen topic subscription are considered the latest.
  • cached_first(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)

    • Returns an instance of CachedMessageSubscriptionRequest that configures the cache request to receive cached messages when available, followed by live (non-cached) messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic.
  • cached_only(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)

    • Returns an instance of a CachedMessageSubscriptionRequest that configures your application to receive cached messages only for the duration of the cache request.
    • Only use cached_only() when your receiver has no live subscriptions that match the cache request subscription, because your receiver will receive duplicates of any cached messages on overlapping topic subscriptions. When you have pre-existing live data subscriptions that overlap with your cache request topic subscription, use as_available() or cached_first() to avoid message duplication.

When you use the as_available(), live_cancels_cached(), and cached_first() functions, the cache request creates a live topic subscription if one does not already exist on your receiver. This subscription persists even after the cache request ends. You can remove the subscription by calling the remove_subscription(topic_sub) function on your receiver object after the cache outcome returns, which cancels the receipt of any live data to your client application on that subscription.

The parameters of all four functions are the same:

  • cache_name—name of the Solace cache to retrieve from.
  • subscription—matching topic subscription.
  • cache_access_timeout—cache request timeout (in milliseconds). This value must be between 3,000 and 2,147,483,647, for most applications a value of 10,000 (10 seconds) is sufficient. See Assign an Appropriate cache_access_timeout Value for more information.
  • max_cached_messages—max number of messages expected to be received from a cache. Default value is 0, which means there is no limit on the number of messages that can be received from a cache.
  • cached_message_age—maximum age (in seconds) of the messages to be retrieved from a cache. Default value is 0, which means there is no limit on the age of messages that can be received from a cache.

The following example shows how to create an instance of CachedMessageSubscriptionRequest using the as_available() function:

cached_message_subscription_request = CachedMessageSubscriptionRequest.as_available("my_cache_name",
                                                                                   TopicSubscription.of("my/cache/example"),
                                                                                   10000)

For more information, see the PubSub+ Messaging API for Python reference.

Sending a Cache Request

To create a cache request and begin receiving cached messages from a PubSub+ cache instance, you use the request_cached() function from the ReceiverCacheRequests class. It is important to note that when you use the Python API to make a cache request, it uses a non-blocking function that returns immediately, but the actual request may be deferred. If you make multiple cache requests prior to receiving a cache outcome, you need to use unique cache request ids to avoid receiving duplicate messages. This function requests messages from a broker which were previously cached using PubSub+ cache:

request_cached(cached_message_subscription_request: CachedMessageSubscriptionRequest, cache_request_id: int, completion_listener: CacheRequestOutcomeListener)

  • cached_message_subscription_request—Request for cached messages matching specified subscription and other fulfillment criteria.
  • cache_request_id—request identifier which can be used for response callback correlation purposes. This ID needs to be unique for the duration of each cache request that occurs on a MessagingService instance. This value is returned on a callback of the CacheRequestOutcomeListener's on_completion( ) function. This identifier is how the application correlates cache requests with cache responses and data messages. Live messages do not have a cache request ID. It is the responsibility of the application to ensure the identifier is unique so that no collisions in your correlations occur. This value should not be confused with the CORRELATION_ID property used for general message correlation in the InboundMessage class.
  • completion_listener—an instance of CacheRequestOutcomeListener, a callback that notifies the application when a cache request is completed.

The following example shows how to send a cache request:

cache_request_id = 12345
        
cache_request_outcome_listener = MyCacheRequestOutcomeListener()
        
direct_message_receiver.requestCached(cached_message_subscription_request, cache_request_id, cache_request_outcome_listener)

After a cache request is made, your MessageHandler begins receiving messages, that can be cached, live or both depending on the cache request function used to configure your cache request. You can check whether a message is a cached message with the function get_cache_status(). This function returns a CacheStatusenum, which can be one of three values:

  • LIVE—The message was retrieved directly from a PubSub+ event broker and not from a cache instance.

  • CACHED—The message was retrieved from a cache instance.

  • SUSPECT—The message was retrieved from a suspected cache instance.

You can get the cache request identifier from a cached message with the get_cache_request_id() function. The example below shows a MessageHandler implementation that checks the cache status of inbound messages and retrieves the cache request ID from cached messages:

class MessageHandlerExample(MessageHandler):
    def __init__(self, direct_receiver: DirectMessageReceiver):
        self.receiver: DirectMessageReceiver = direct_receiver

    def on_message(self, message: InboundMessage):
        message_type = message.get_cache_status()
        if message_type == CacheStatus.LIVE:
            print("Message is not cached.")
        elif message_type == CacheStatus.CACHED:
            print("Message is cached.")
            cache_request_id = message.get_cache_request_id()
        else:
            print("Message is from a suspected cache instance. Check your cache instance.")

# Register an asynchronous message receiver on the DirectMessageReceiver instance.
direct_receiver.receive_async(MessageHandlerExample())

If an error occurs during a cache request or if the message receiver or MessagingService is terminated, the API cancels all outstanding cache requests. This termination process blocks your application until the API has passed notifications to the application for all canceled cache requests.

For more information, see the PubSub+ Messaging API for Python reference.

Creating an Event Listener Using the CacheRequestOutcomeListener Class

This class is a callback for listening for the results of a cache request, with cache request identifier support. It notifies your application when a cache request is complete. It must contain one function, on_completion():

on_completion (result: CacheRequestOutcome, cache_request_id: int, exception: Union[Exception,None])

  • result—the outcome of the cache request, can be one of the following enums:
    • OK—cached data was returned in a cache reply, or the cache request was fulfilled by live data.
    • NO_DATA—there was no data in the cache reply.
    • SUSPECT_DATA—there was suspect data in the cache reply.
    • FAILED—The request failed for some reason, such as a timeout. This is the only CacheRequestOutcome that results in an exception being set on the on_completion() callback.
  • cache_request_id—the cache request identifier associated with the given completed computation unit. This ID can be used for response callback correlation purposes, to correlate cache requests with their associated messages.
  • exception—an Exception if the cache request fails. The exception is None if you get a result other than FAILED.

The following example shows a simple example implementation of a CacheRequestOutcomeListener:

class MyCacheRequestOutcomeListener(CacheRequestOutcomeListener):
    def on_completion(result: CacheRequestOutcome, cache_request_id: int, exception: Exception):
        print("Completed!")

For more information, see the PubSub+ Messaging API for Python reference.

Considerations When Receiving Cached Messages

There are considerations to be aware of when you receive cached messages with the PubSub+ Python API. The following two sections explain what can happen you have overlapping topic subscriptions and provide additional information about how cache_access_timeout works.

Avoid Overlapping topic subscriptions when possible

When you use the PubSub+ Python API to consume cached messages, we recommend that you do not have overlapping topic subscriptions. Overlapping topic subscriptions occur when you use the same or overlapping topic subscriptions across one or more message consumers connected to the same MessagingService instance. If you have overlapping topic subscriptions, it is important to understand how cache requests can affect your applications.

Multiple Message Consumers with Topic Subscription Overlap

The table below explains what happens when a cache request is made by a message consumer when

  • more than one message consumer is connected to the same MessagingService instance
    AND

  • the topic subscription used in the cache request overlaps with a topic subscription used by one or more of those message consumers.

Function used to configure cache request Result
as_available() The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.
live_cancels_cached() The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.
cached_first()

The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription.

AND

All consumers with live data subscriptions will stop receiving live data until all cached messages are received for any overlapping topic subscriptions.

cached_only() The cache response delivers cached messages to all consumers that have a matching topic subscription.

Single Message Consumer with Topic Overlap

If you use the cached_only() function in your cache message request, and your application meets the following two conditions:

  1. Only one message consumer is connected to the MessagingService instance.

  2. The topic subscription used in the cache request matches any of the message consumer's live topic subscriptions.

Your message consumer receives duplicates of any cached messages on those overlapping topic subscriptions. To avoid message duplication in this scenario, use the as_available() or cached_first() functions.

For more information, see the PubSub+ Messaging API for Python reference.

Assign an Appropriate cache_access_timeout Value

There is no default value for cache_access_timeout. In most cases, a value of 10,000 (equal to 10 seconds) is sufficient. This value specifies a timer for the internal requests that occur between the Python API and a PubSub+ cache instance. A single call to request_cached() can lead to one or more of these internal requests. As long as each of these internal requests complete before the specified cache_access_timeout value, a timeout does not occur.

For example, if you specify a cache_access_timeout of 3000 milliseconds and there are 10 internal requests that each take 2000 milliseconds to complete, the time that it takes the request_cached() function to return is the sum of the those requests, which is 20,000 milliseconds. Because none of the individual requests took longer than 3000 milliseconds no timeout occurs in this scenario.

In some scenarios, such as high-network latency and which back-pressure strategy your application uses, increases to the cache_access_timeout value may be required. For example, if you use the default back-pressure strategy of on_back_pressure_elastic(), where you have an unlimited internal message buffer, this back-pressure can cause delays in the API that affect how long it takes internal requests to complete. You may need to increase your cache_access_timeout value to account for these delays.