Using PubSub+ Cache With the Python API
PubSub+ Cache is a scalable, in-memory message cache for direct messaging. It allows client applications to request the most current messages for a topic of interest when they come online, or when they start subscribing to topics that they were not originally subscribed to. For information about setting up a PubSub+ Cache instance, see PubSub+ Cache. The PubSub+ Python API allows your applications to send cache requests to receive cached messages that your receiving application is interested in. The following sections describe how to send cache requests and receive cached messages:
To consume cached messages, your application needs access to the ReceiverCacheRequests
interface. The DirectMessageReceiver
interface includes the ReceiverCacheRequests
interface, so a DirectMessageReceiver
instance is required to consume cached messages. See Creating a DirectMessageReceiver for more information.
Creating a CachedMessageSubscriptionRequest Object
To send a cache request, you first create an object of the CachedMessageSubscriptionRequest
class. This class contains four different functions for requesting cached messages that each return an instance of CachedMessageSubscriptionRequest
. Select the function that best suits the needs of your application:
-
as_available(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
- Returns an instance of a
CachedMessageSubscriptionRequest
used to configure a cached data request. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. This means your application receives a mix of cached messages from the cache request and live messages from the topic subscription.
- Returns an instance of a
-
live_cancels_cached(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
- Returns an instance of a
CachedMessageSubscriptionRequest
that configures the cache request application to receive the latest messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic. When there are no live (non-cached) messages, cached messages that match your chosen topic subscription are considered the latest. Otherwise, live messages that match your chosen topic subscription are considered the latest.
- Returns an instance of a
-
cached_first(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
- Returns an instance of
CachedMessageSubscriptionRequest
that configures the cache request to receive cached messages when available, followed by live (non-cached) messages. The request object adds a live data subscription to the topic passed into the function parameter if your receiver is not already subscribed to that topic.
- Returns an instance of
-
cached_only(cache_name: str, subscription: TopicSubscription, cache_access_timeout: int, max_cached_messages: int=0, cached_message_age: int=0)
- Returns an instance of a
CachedMessageSubscriptionRequest
that configures your application to receive cached messages only for the duration of the cache request. - Only use
cached_only()
when your receiver has no live subscriptions that match the cache request subscription, because your receiver will receive duplicates of any cached messages on overlapping topic subscriptions. When you have pre-existing live data subscriptions that overlap with your cache request topic subscription, useas_available()
orcached_first()
to avoid message duplication.
- Returns an instance of a
When you use the as_available()
, live_cancels_cached()
, and cached_first()
functions, the cache request creates a live topic subscription if one does not already exist on your receiver. This subscription persists even after the cache request ends. You can remove the subscription by calling the remove_subscription(topic_sub)
function on your receiver object after the cache outcome returns, which cancels the receipt of any live data to your client application on that subscription.
The parameters of all four functions are the same:
cache_name
—name of the Solace cache to retrieve from.subscription
—matching topic subscription.cache_access_timeout
—cache request timeout (in milliseconds). This value must be between 3,000 and 2,147,483,647, for most applications a value of 10,000 (10 seconds) is sufficient. See Assign an Appropriate cache_access_timeout Value for more information.max_cached_messages
—max number of messages expected to be received from a cache. Default value is 0, which means there is no limit on the number of messages that can be received from a cache.cached_message_age
—maximum age (in seconds) of the messages to be retrieved from a cache. Default value is 0, which means there is no limit on the age of messages that can be received from a cache.
The following example shows how to create an instance of CachedMessageSubscriptionRequest
using the as_available()
function:
cached_message_subscription_request = CachedMessageSubscriptionRequest.as_available("my_cache_name", TopicSubscription.of("my/cache/example"), 10000)
For more information, see the PubSub+ Messaging API for Python reference.
Sending a Cache Request
To create a cache request and begin receiving cached messages from a PubSub+ cache instance, you use the request_cached()
function from the ReceiverCacheRequests
class. It is important to note that when you use the Python API to make a cache request, it uses a non-blocking function that returns immediately, but the actual request may be deferred. If you make multiple cache requests prior to receiving a cache outcome, you need to use unique cache request ids to avoid receiving duplicate messages. This function requests messages from a broker which were previously cached using PubSub+ cache:
request_cached(cached_message_subscription_request: CachedMessageSubscriptionRequest, cache_request_id: int, completion_listener: CacheRequestOutcomeListener)
cached_message_subscription_request
—Request for cached messages matching specified subscription and other fulfillment criteria.cache_request_id
—request identifier which can be used for response callback correlation purposes. This ID needs to be unique for the duration of each cache request that occurs on aMessagingService
instance. This value is returned on a callback of theCacheRequestOutcomeListener
'son_completion( )
function. This identifier is how the application correlates cache requests with cache responses and data messages. Live messages do not have a cache request ID. It is the responsibility of the application to ensure the identifier is unique so that no collisions in your correlations occur. This value should not be confused with theCORRELATION_ID
property used for general message correlation in theInboundMessage
class.completion_listener
—an instance ofCacheRequestOutcomeListener
, a callback that notifies the application when a cache request is completed.
The following example shows how to send a cache request:
cache_request_id = 12345 cache_request_outcome_listener = MyCacheRequestOutcomeListener() direct_message_receiver.requestCached(cached_message_subscription_request, cache_request_id, cache_request_outcome_listener)
After a cache request is made, your MessageHandler
begins receiving messages, that can be cached, live or both depending on the cache request function used to configure your cache request. You can check whether a message is a cached message with the function get_cache_status()
. This function returns a CacheStatus
enum, which can be one of three values:
-
LIVE
—The message was retrieved directly from a PubSub+ event broker and not from a cache instance. -
CACHED
—The message was retrieved from a cache instance. -
SUSPECT
—The message was retrieved from a suspected cache instance.
You can get the cache request identifier from a cached message with the get_cache_request_id()
function. The example below shows a MessageHandler
implementation that checks the cache status of inbound messages and retrieves the cache request ID from cached messages:
class MessageHandlerExample(MessageHandler): def __init__(self, direct_receiver: DirectMessageReceiver): self.receiver: DirectMessageReceiver = direct_receiver def on_message(self, message: InboundMessage): message_type = message.get_cache_status() if message_type == CacheStatus.LIVE: print("Message is not cached.") elif message_type == CacheStatus.CACHED: print("Message is cached.") cache_request_id = message.get_cache_request_id() else: print("Message is from a suspected cache instance. Check your cache instance.") # Register an asynchronous message receiver on the DirectMessageReceiver instance. direct_receiver.receive_async(MessageHandlerExample())
If an error occurs during a cache request or if the message receiver or MessagingService
is terminated, the API cancels all outstanding cache requests. This termination process blocks your application until the API has passed notifications to the application for all canceled cache requests.
For more information, see the PubSub+ Messaging API for Python reference.
Creating an Event Listener Using the CacheRequestOutcomeListener Class
This class is a callback for listening for the results of a cache request, with cache request identifier support. It notifies your application when a cache request is complete. It must contain one function, on_completion()
:
on_completion (result: CacheRequestOutcome, cache_request_id: int, exception: Union[Exception,None])
result
—the outcome of the cache request, can be one of the following enums:OK
—cached data was returned in a cache reply, or the cache request was fulfilled by live data.NO_DATA
—there was no data in the cache reply.SUSPECT_DATA
—there was suspect data in the cache reply.FAILED
—The request failed for some reason, such as a timeout. This is the onlyCacheRequestOutcome
that results in an exception being set on theon_completion()
callback.
cache_request_id
—the cache request identifier associated with the given completed computation unit. This ID can be used for response callback correlation purposes, to correlate cache requests with their associated messages.exception
—anException
if the cache request fails. The exception isNone
if you get a result other thanFAILED
.
The following example shows a simple example implementation of a CacheRequestOutcomeListener
:
class MyCacheRequestOutcomeListener(CacheRequestOutcomeListener):
def on_completion(result: CacheRequestOutcome, cache_request_id: int, exception: Exception):
print("Completed!")
For more information, see the PubSub+ Messaging API for Python reference.
Considerations When Receiving Cached Messages
There are considerations to be aware of when you receive cached messages with the PubSub+ Python API. The following two sections explain what can happen you have overlapping topic subscriptions and provide additional information about how cache_access_timeout
works.
Avoid Overlapping topic subscriptions when possible
When you use the PubSub+ Python API to consume cached messages, we recommend that you do not have overlapping topic subscriptions. Overlapping topic subscriptions occur when you use the same or overlapping topic subscriptions across one or more message consumers connected to the same MessagingService
instance. If you have overlapping topic subscriptions, it is important to understand how cache requests can affect your applications.
Multiple Message Consumers with Topic Subscription Overlap
The table below explains what happens when a cache request is made by a message consumer when
-
more than one message consumer is connected to the same
MessagingService
instance
AND -
the topic subscription used in the cache request overlaps with a topic subscription used by one or more of those message consumers.
Function used to configure cache request | Result |
|
---|---|---|
as_available()
|
The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription. |
|
live_cancels_cached()
|
The cache response delivers cached messages to all message consumers connected to the MessagingService instance that have a matching topic subscription. |
|
cached_first()
|
The cache response delivers cached messages to all message consumers connected to the AND All consumers with live data subscriptions will stop receiving live data until all cached messages are received for any overlapping topic subscriptions. |
|
cached_only()
|
The cache response delivers cached messages to all consumers that have a matching topic subscription. |
Single Message Consumer with Topic Overlap
If you use the cached_only()
function in your cache message request, and your application meets the following two conditions:
-
Only one message consumer is connected to the
MessagingService
instance. -
The topic subscription used in the cache request matches any of the message consumer's live topic subscriptions.
Your message consumer receives duplicates of any cached messages on those overlapping topic subscriptions. To avoid message duplication in this scenario, use the as_available()
or cached_first()
functions.
For more information, see the PubSub+ Messaging API for Python reference.
Assign an Appropriate cache_access_timeout Value
There is no default value for cache_access_timeout
. In most cases, a value of 10,000 (equal to 10 seconds) is sufficient. This value specifies a timer for the internal requests that occur between the Python API and a PubSub+ cache instance. A single call to request_cached()
can lead to one or more of these internal requests. As long as each of these internal requests complete before the specified cache_access_timeout
value, a timeout does not occur.
For example, if you specify a cache_access_timeout
of 3000 milliseconds and there are 10 internal requests that each take 2000 milliseconds to complete, the time that it takes the request_cached()
function to return is the sum of the those requests, which is 20,000 milliseconds. Because none of the individual requests took longer than 3000 milliseconds no timeout occurs in this scenario.
In some scenarios, such as high-network latency and which back-pressure strategy your application uses, increases to the cache_access_timeout
value may be required. For example, if you use the default back-pressure strategy of on_back_pressure_elastic()
, where you have an unlimited internal message buffer, this back-pressure can cause delays in the API that affect how long it takes internal requests to complete. You may need to increase your cache_access_timeout
value to account for these delays.