solace.messaging.receiver package

solace.messaging.receiver.acknowledgement_support module

This module contains the abstract class and methods for the AcknowledgementSupport.

class solace.messaging.receiver.acknowledgement_support.AcknowledgementSupport

Bases: ABC

A class that defines the interface for manual message acknowledgement (receiver acknowledges message to the broker).

Client acknowledgement signals to the event broker the message has been received and processed. When all receivers have acknowledged that a message has been delivered, the message is removed from the permanent storage on the event broker.

Acknowledgement, or withholding acknowledgement, has no bearing on flow-control or back-pressure.

abstract ack(message: InboundMessage)

Generates and sends an acknowledgement for an inbound message (InboundMessage).

Parameters:

message – The inbound message.

Raises:

PubSubPlusClientError – If it was not possible to acknowledge the message.

solace.messaging.receiver.async_receiver_subscriptions module

This module contains the abstract base class that defines the interface for asynchronous subscription operations.

class solace.messaging.receiver.async_receiver_subscriptions.AsyncReceiverSubscriptions

Bases: ABC

An abstract class for asynchronous receiver subscriptions.

All solace.messaging.receiver.message_receiver.MessageReceiver classes support both synchronous (blocking) and asynchronous (non-blocking) subscription operations. This class defines the interface for asynchronous subscription operations.

abstract add_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to subscribe asynchronously to a given topic subscription.

This method initiates the subscription process on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future object.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to subscribe to. Messages with a topic that matches the subscription are directed to this client.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

Future

Raises:

PubSubPlusClientError – If an operation could not be performed for some internal reason.

abstract remove_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to unsubscribe asynchronously from a given topic subscription.

This method initiates the subscription removal process on the solace.messaging.receiver.message_receiver.MessageReceiver instance.

The unsubscribe request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future. Once the process is complete, no more messages whose topic match the given subscription will be received in the solace.messaging.receiver.message_receiver.MessageReceiver instance.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

concurrent.futures.Future

Raises:

solace.messaging.receiver.direct_message_receiver module

This module contains the abstract base class used to receive direct messages.

A DirectMessageReceiver can be instantiated to receive direct messages from a PubSub+ event broker.

class solace.messaging.receiver.direct_message_receiver.DirectMessageReceiver

Bases: MessageReceiver, ReceiverSubscriptions, AsyncReceiverSubscriptions, ReceiverCacheRequests, ABC

An abstract class that defines the interface to a PubSub+ direct message consumer/receiver.

Note

A caller of any of blocking message receiving methods , those without the async suffix such as the solace.messaging.receiver.message_receiver.MessageReceiver.receive_message() function. will receive a new message for each call.

Warning

When you use this class, these are some considerations to aware of:

  • Concurrent use of asynchronous and synchronous message receiving methods on a single instance of receiver can have some unintended side effects and should be avoided.

  • Asynchronous methods should NOT be called multiple times or in combination with blocking message receiving function on the same solace.messaging.receiver.message_receiver.MessageReceiver object to avoid any unintended side effects.

abstract add_subscription(another_subscription: TopicSubscription)

Makes a request to subscribe synchronously to a given topic subscription.

This definition performs the subscription operation on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds are sent and the function blocks waiting for a response from the PubSub+ event broker.

Parameters:

another_subscription (TopicSubscription) – The additional subscription to attract messages where topics match the subscriptions.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract add_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to subscribe asynchronously to a given topic subscription.

This method initiates the subscription process on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future object.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to subscribe to. Messages with a topic that matches the subscription are directed to this client.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

Future

Raises:

PubSubPlusClientError – If an operation could not be performed for some internal reason.

abstract is_running() bool

Checks if the process was successfully started and not stopped yet.

Returns:

False if process was not started or already stopped, True otherwise.

Return type:

bool

abstract is_terminated() bool

Checks if message delivery process is terminated.

Returns:

True if message delivery process is terminated, False otherwise.

Return type:

bool

abstract is_terminating() bool

Checks if message delivery process termination is on-going.

Returns:

True if message delivery process being terminated, but termination is not finished, False otherwise.

Return type:

bool

abstract receive_async(message_handler: MessageHandler)

Register an asynchronous message receiver on the solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver instance.

Parameters:

message_handler (MessageHandler) – The object that receives all inbound messages (InboundMessage) in its onMessage() callback. If the provided value is None, then asynchronous receiver is removed & receive_message() (solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver) is used.

abstract receive_message(timeout: int | None = None) InboundMessage | None

Blocking request to receive the next message. You acknowledge the message using the AcknowledgementSupport.ack() function for PersistentMessageReceiver.

This method is usually used in loop an its use is mutually exclusive when used asynchronously.

Parameters:

timeout (int) – The time, in milliseconds, to wait for a message to arrive.

Returns:

An object that represents an inbound message. Returns None on timeout, or upon

service or receiver shutdown.

Return type:

InboundMessage

Raises:

PubSubPlusClientError – If error occurred while receiving or processing the message.

abstract remove_subscription(subscription: TopicSubscription)

Makes a request to unsubscribe synchronously from the specified topic subscription.

This method performs the subscription removal operation on the solace.messaging.receiver.message_receiver.MessageReceiver.

Unsubscribe from a previously subscribed message source on the PubSub+ broker. Once the process is complete, no more messages where topics match the given subscription are received by the solace.messaging.receiver.message_receiver.MessageReceiver object.

Parameters:

subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver instance.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract remove_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to unsubscribe asynchronously from a given topic subscription.

This method initiates the subscription removal process on the solace.messaging.receiver.message_receiver.MessageReceiver instance.

The unsubscribe request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future. Once the process is complete, no more messages whose topic match the given subscription will be received in the solace.messaging.receiver.message_receiver.MessageReceiver instance.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

concurrent.futures.Future

Raises:
abstract request_cached(cached_message_subscription_request: CachedMessageSubscriptionRequest, cache_request_id: int, completion_listener: CacheRequestOutcomeListener)

Requests messages from a broker which were previously cached using PubSub+ Cache. Responses to this request are processed by the given CacheRequestOutcomeListener. The cache_request_id parameter is used for correlating requests with responses. It is the application’s responsibility to guarantee that only unique integers are provided to this field, so as to avoid collisions.

Parameters:
  • cached_message_subscription_request (CachedMessageSubscriptionRequest) – Request for cached messages matching specified subscription and other fulfillment criteria.

  • cache_request_id (int) – request identifier which can be used for response callback correlation purposes, this value needs to be unique for the time of the application execution. A valid cache_request_id is within the range of 0 to Unsigned 64 int max. This value will be returned on a on_completion() callback of the CacheRequestOutcomeListener. The same value will be returned.

  • completion_listener (CacheRequestOutcomeListener) – Request completion listener to be notified when cache request is completed.

Raises:
  • PubSubPlusClientException – If the operation could not be performed.

  • IllegalStateException – If the service is not connected or the receiver is not running.

abstract set_termination_notification_listener(listener: TerminationNotificationListener)

Adds a listener to listen for non-recoverable lifecycle object interruption events.

Parameters:

listener – TerminationNotificationListener

abstract start() LifecycleControl

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Raises:
abstract start_async() Future

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally, this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Returns:

An object that the application can use to determine when the service start has completed.

Return type:

concurrent.futures.Future

Raises:

IllegalStateError – If method has been invoked at an illegal or inappropriate time for some another reason.

abstract terminate(grace_period: int = 600000)

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Raises:
abstract terminate_async(grace_period: int = 600000) Future

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Returns:

An future object which the application can use to determine terminate completion.

Return type:

concurrent.futures.Future

Raises:

IllegalArgumentError – If the grace_period is invalid.

solace.messaging.receiver.inbound_message module

This module defines the interface to an inbound message used to receive data from the PubSub+ broker.

class solace.messaging.receiver.inbound_message.CacheStatus(value)

Bases: Enum

An enum with cache status options for the given message.

CACHED = 1

The message was retrieved from a SolCache instance.

LIVE = 0

The message was retrieved directly from a PubSubPlus broker and not from a SolCache instance.

SUSPECT = 2

The message was retrieved from a suspect solCache instance.

class solace.messaging.receiver.inbound_message.InboundMessage

Bases: Message

An abstract class that defines the interfaces for an inbound message.

class MessageDiscardNotification

Bases: ABC

An interface to Discard Notification Information.

abstract has_broker_discard_indication() bool

Retrieves the broker discard indication. A receiving client can use a message discard indication method or function to query whether the event broker has for any reason discarded any Direct messages previous to the current received message.

When the PubSub+ event broker discards messages before sending them, the next message successfully sent to the receiver will have discard indication set.

Returns:

True if PubSub+ event broker has discarded one or more messages prior to the current message.

Return type:

bool

abstract has_internal_discard_indication() bool

Retrieves the internal discard indication. A receiving client can use a message discard indication method or function to query whether Python API has for any reason discarded any messages previous to the current received message.

Returns:

True if the Python API has discarded one or more messages prior to the current message.

Return type:

bool

abstract get_and_convert_payload(converter: BytesToObject[T], output_type: type) T

Retrieve the payload and converts it to the target object using given converter.

Parameters:
  • converter (BytesToObject) – An application provided converter to deserialize the payload to a Python object.

  • output_type (type) – The Python Class returned by the BytesToObject type.

Returns:

The user-defined type for returned value.

Return type:

T

Raises:

PubSubPlusClientError – When the converter returns a non-matching object type.

abstract get_application_message_id()

Gets an optional application message identifier when sender application sets one.

Returns:

Sender application identifier if set by message publisher, or None/empty if not set.

Return type:

str

abstract get_application_message_type()

Gets the application message type. This value is used by applications only, and is passed through the API unmodified.

Returns:

Application message type or None if not set.

Return type:

str

abstract get_cache_request_id() int | None

Retrieves the request ID that was set in the cache request from the message.

Returns:

The request ID, if the message is a cached message. None: If the message was not a cached message, and so didn’t have a request ID to retrieve.

Return type:

int

abstract get_cache_status() CacheStatus

Retrieves the indicator of whether or not this message was part of a cache reply.

Returns:

The indicator.

Return type:

CacheStatus

abstract get_class_of_service() int | None

Retrieves the class of service level of a given message. This feature is only relevant. for direct messaging. If no class of service is set, the message is given a default class of service of 0.

Returns:

An integer between 0 and 2, inclusive, representing the class of service of the message.

Return type:

(int)

Raises:

PubSubPlusClientError – If an error was encountered while trying to retrieve the class of service of the message.

abstract get_correlation_id() str | None

Retrieves the correlation ID from the message. The correlation ID is user-defined, carried end-to-end, and can also be matched in a selector, but otherwise is not relevant to the event broker. The correlation ID may be used for peer-to-peer message synchronization. In JMS applications this field is carried as the JMSCorrelationID Message Header Field.

Returns:

A unique identifier for the message set by producer or None.

Return type:

str

abstract get_destination_name() str | None

Retrieves the destination which the message was received, which can be a topic or a queue.

Returns:

The destination name.

Return type:

str

abstract get_expiration() int | None

Retrieves the expiration time.

The expiration time is the UTC time (in ms, from midnight, January 1, 1970 UTC) when the message is considered expired. A value of 0 means the message never expires. The default value is 0.

Returns:

The UTC time when the message is discarded or moved to a Dead Message Queue by the PubSub+ broker or None if it was not set.

Return type:

int

abstract get_message_discard_notification() MessageDiscardNotification

Retrieves the message discard notification about previously discarded messages. This is for non-durable consumers that use Direct Transport.

Returns:

A value not expected to be None.

Return type:

MessageDiscardNotification

abstract get_payload_as_bytes() bytearray | None

Retrieves the payload of the message.

Returns:

the byte array with the message payload or None if there is no payload.

Return type:

bytearray

abstract get_payload_as_dictionary() Dict | None

Retrieves the dictionary format of payload of message.

PubSub+ messages can be published with a SDTMap payload. This is a platform-agnostic dictionary format that allows data types to be sent and received in messages that is independent of the publisher or consumer applications.

Returns:

dictionary found in the payload or None if there is no payload, or the payload is not a

dictionary

Return type:

dict

Raises:
abstract get_payload_as_list() List | None

Retrieves the list format of payload of message.

PubSub+ messages can be published with a SDTStream payload. This is a platform-agnostic list format that allows data types to be sent and received in messages that is independent of the publisher or consumer applications.

Returns:

list found in the payload or None if there is no payload, or the payload is not a List

Return type:

list

Raises:
abstract get_payload_as_string() str | None

Retrieves the string-encoded payload of message.

PubSub+ messages can be published with a string-encoded payload. This is a platform-agnostic string format that allows strings to be sent and received in messages that is independent of the publisher or consumer applications. For example, in this way a non-Python publishing application can send a Unicode string that can still be consumed by a Python-based application.

If message payload is not specifically encoded as a string, it cannot be retrieved as a string. For instance, a publisher if the publisher sends a UTF-8 string as a bytearray, this method cannot be used to extract the string. Even though the payload is a string (str), it is not encoded to identify it as such.

Returns:

String found in the payload or None if there is no payload, or the payload is not a String.

Return type:

str

abstract get_priority() int | None

Retrieves the priority value. Valid values range from 0 to 255.

Returns:

A priority value from 0 to 255, or None if the priority is not set.

Return type:

int

Raises:

PubSubPlusClientError – Any error if the priority of the message could not be retrieved.

abstract get_properties() Dict[str, str | int | float | bool | dict | list | bytearray | None]

Retrieves the non-solace properties attached to the message.

Any property defined in Message Properties is not returned in this dictionary. Solace defined property keys all begin with “solace.messaging”, however any key even those beginning with “solace.” may be a a non solace property if it is not defined in this API.

Message Properties are carried in Message meta data in addition to the Message payload. Properties are stored in a dictionary of key-value pairs where the key is controlled by the application.

Returns:

The non-solace properties attached to the message.

Return type:

dict

abstract get_property(name: str) str | int | float | bool | dict | list | bytearray | None

Retrieves The value of a specific non-solace property.

Any property defined in Message Properties is not available and this method will return None. Solace defined property keys all begin with “solace.messaging”, however any key even those beginning with “solace.” may be a a non solace property if it is not defined in this API.

Parameters:

name (str) – The name of the property.

Returns:

The value of the named property if found in the

message otherwise, returns None.

Return type:

str, int, float, bool, dict, list, bytearray, None

get_replication_group_message_id() ReplicationGroupMessageId | None

Retrieves the Replication Group Message Id

Returns:

can be None for direct message or unsupported broker versions

Return type:

ReplicationGroupMessageId

abstract get_rest_interoperability_support() RestInteroperabilitySupport

Retrieves access to the optional metadata used for interoperability with REST messaging clients.

Returns:

The metadata collection or None if not set.

Return type:

RestInteroperabilitySupport

abstract get_sender_id() str | None

Returns the sender’s ID. This field can be set automatically during message publishing, but existing values are not overwritten if non-None, as when a message is sent multiple times.

Returns:

The sender’s ID or None if not set.

Return type:

str

abstract get_sender_timestamp() int | None

Retrieves the sender’s timestamp (Unix epoch time). This field can be set during message publishing. The time is in milliseconds.

Returns:

The timestamp (Unix Epoch time) or None if not set. The time is in milliseconds.

Return type:

int

abstract get_sequence_number() int | None

Gets the sequence number of the message.

Sequence numbers may be set by publisher applications or automatically generated by publisher APIs. The sequence number is carried in the Message meta data in addition to the payload and may be retrieved by consumer applications.

Returns:

The positive sequence number or None if it was not set.

Return type:

int

abstract get_time_stamp() int | None

Retrieves the timestamp (Unix epoch time) of the message when it arrived at the Client API. The time is in milliseconds.

Returns:

The timestamp (Unix Epoch time) or None if not set. The time is in milliseconds.

Return type:

int

abstract has_property(name: str) bool

Checks if the message has a specific non-solace property attached.

Any property defined in Message Properties is not available and this method will return false. Solace defined property keys all begin with “solace.messaging”, however any key even those beginning with “solace.” may be a a non solace property if it is not defined in this API.

Parameters:

name (str) – the name of the property.

Returns:

True if the property is present. False otherwise.

abstract is_redelivered() bool

Retrieves the message redelivery status.

Returns:

True if the message redelivery occurred in the past, otherwise False.

Return type:

bool

class solace.messaging.receiver.inbound_message.ReplicationGroupMessageId

Bases: ABC

An abstract class that defines the interfaces for a Replication Group Message ID.

abstract compare(replication_group_message_id: ReplicationGroupMessageId) int

Compare the Replication Group Message Id to another. Not all valid Replication Group Message Id can be compared. If the messages identifed by the Replication Message Id were not published to the same broker or HA pair, then they are not comparable and this method throws an IllegalArgumentError exception.

Parameters:

replication_group_message_id (ReplicationGroupMessageId) – to compare current instance with

Returns:

negative integer, zero, or a positive integer if this object is less than, equal to, or greater than the specified one.

Raises:

IllegalArgumentError if the both Replication Group Message Ids can't be compared, – i.e when corresponding messages were not published to the same broker or HA pair.

static of(replication_group_message_id_string: str)

A factory method to create an instance of a ReplicationGroupMessageId from a specified string. This method can be used to create a ReplicationGroupMessageId for message replay configuration. The string may be retrieved from str() or it can be retrieved from any of the broker admin interfaces.

Parameters:

replication_group_message_id_string (str) – the string identifier associated with Replication Group Message ID previously returned from str() method.

Returns:

object representing the Replication Group Message Id

Return type:

ReplicationGroupMessageId

Raises:
  • IllegalArgumentError – if string argument is empty or is not in the proper format as

  • returned from str() previously

solace.messaging.receiver.message_receiver module

This module abstracts message receiving behavior; it is a base class for all receivers.

class solace.messaging.receiver.message_receiver.MessageHandler

Bases: ABC

An abstract base class that defines the interface for a message handler for inbound messages.

abstract on_message(message: InboundMessage)

Definition for a message processing function.

Parameters:

message (InboundMessage) – The inbound message.

class solace.messaging.receiver.message_receiver.MessageReceiver

Bases: LifecycleControl, AsyncLifecycleControl

An abstract class that provides the message receiver implementation.

abstract is_running() bool

Checks if the process was successfully started and not stopped yet.

Returns:

False if process was not started or already stopped, True otherwise.

Return type:

bool

abstract is_terminated() bool

Checks if message delivery process is terminated.

Returns:

True if message delivery process is terminated, False otherwise.

Return type:

bool

abstract is_terminating() bool

Checks if message delivery process termination is on-going.

Returns:

True if message delivery process being terminated, but termination is not finished, False otherwise.

Return type:

bool

abstract set_termination_notification_listener(listener: TerminationNotificationListener)

Adds a listener to listen for non-recoverable lifecycle object interruption events.

Parameters:

listener – TerminationNotificationListener

abstract start() LifecycleControl

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Raises:
abstract start_async() Future

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally, this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Returns:

An object that the application can use to determine when the service start has completed.

Return type:

concurrent.futures.Future

Raises:

IllegalStateError – If method has been invoked at an illegal or inappropriate time for some another reason.

abstract terminate(grace_period: int = 600000)

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Raises:
abstract terminate_async(grace_period: int = 600000) Future

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Returns:

An future object which the application can use to determine terminate completion.

Return type:

concurrent.futures.Future

Raises:

IllegalArgumentError – If the grace_period is invalid.

solace.messaging.receiver.persistent_message_receiver module

This module contains the abstract base class for a persistent message receiver.

A PersistentMessageReceiver can be instantiated to receive Persistent Messages from a PubSub+ broker.

class solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver

Bases: MessageReceiver, ReceiverFlowControl, AcknowledgementSupport, ReceiverSubscriptions, AsyncReceiverSubscriptions, ManageableReceiver, ABC

An abstract class that defines the interface to a persistent message receiver.

Note

A caller of any of blocking message receiving methods , those without the async suffix such as the PersistentMessageReceiver.receive_message(), method will receive a new message for each call.

Warning

When you use this class, these are some considerations to aware of:

  • Concurrent use of asynchronous and synchronous message receiving methods on a single instance of receiver can have some unintended side effects and should be avoided.

  • Asynchronous methods should NOT be called multiple times or in combination with blocking message receiving function on the same solace.messaging.receiver.message_receiver.MessageReceiver instance to avoid any unintended side effects.

  • After a broker initiated termination has occurred, PersistentMessageReceiver.ack would raise and exception. This behavior can occur before the TerminateEvent is pushed to the application via the handler. Termination Notification Event is dispatched on termination triggered by flow down or flow session down event.

abstract ack(message: InboundMessage)

Generates and sends an acknowledgement for an inbound message (InboundMessage).

Parameters:

message – The inbound message.

Raises:

PubSubPlusClientError – If it was not possible to acknowledge the message.

abstract add_subscription(another_subscription: TopicSubscription)

Makes a request to subscribe synchronously to a given topic subscription.

This definition performs the subscription operation on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds are sent and the function blocks waiting for a response from the PubSub+ event broker.

Parameters:

another_subscription (TopicSubscription) – The additional subscription to attract messages where topics match the subscriptions.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract add_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to subscribe asynchronously to a given topic subscription.

This method initiates the subscription process on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future object.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to subscribe to. Messages with a topic that matches the subscription are directed to this client.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

Future

Raises:

PubSubPlusClientError – If an operation could not be performed for some internal reason.

abstract is_running() bool

Checks if the process was successfully started and not stopped yet.

Returns:

False if process was not started or already stopped, True otherwise.

Return type:

bool

abstract is_terminated() bool

Checks if message delivery process is terminated.

Returns:

True if message delivery process is terminated, False otherwise.

Return type:

bool

abstract is_terminating() bool

Checks if message delivery process termination is on-going.

Returns:

True if message delivery process being terminated, but termination is not finished, False otherwise.

Return type:

bool

abstract pause()

Pauses message delivery for an asynchronous message handler or stream. Message delivery can be resumed by executing ReceiverFlowControl.resume() on a solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver instance.

Raises:

PubSubPlusClientError – If an error occurred while pausing message delivery.

abstract receive_async(message_handler: MessageHandler)

Register an asynchronous message receiver on the solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver instance.

Parameters:

message_handler (MessageHandler) – The object that receives all inbound messages (InboundMessage) in its onMessage() callback. If the provided value is None, then asynchronous receiver is removed & receive_message() (solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver) is used.

abstract receive_message(timeout: int | None = None) InboundMessage | None

Blocking request to receive the next message. You acknowledge the message using the solace.messaging.receiver.acknowledgement_support.AcknowledgementSupport.ack() function for (solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver).

This method is usually used in loop an its use is mutually exclusive when used asynchronously.

Parameters:

timeout (int) – The time, in milliseconds, to wait for a message to arrive.

Returns:

An object that represents an inbound message. Returns None on timeout, or upon

service or receiver shutdown.

Return type:

InboundMessage

Raises:

PubSubPlusClientError – If error occurred while receiving or processing the message.

abstract receiver_info() PersistentReceiverInfo

Provides access to the Persistent receiver information

Returns:

an object that represents message receiver manageability.

Return type:

PersistentReceiverInfo

abstract remove_subscription(subscription: TopicSubscription)

Makes a request to unsubscribe synchronously from the specified topic subscription.

This method performs the subscription removal operation on the solace.messaging.receiver.message_receiver.MessageReceiver.

Unsubscribe from a previously subscribed message source on the PubSub+ broker. Once the process is complete, no more messages where topics match the given subscription are received by the solace.messaging.receiver.message_receiver.MessageReceiver object.

Parameters:

subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver instance.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract remove_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to unsubscribe asynchronously from a given topic subscription.

This method initiates the subscription removal process on the solace.messaging.receiver.message_receiver.MessageReceiver instance.

The unsubscribe request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future. Once the process is complete, no more messages whose topic match the given subscription will be received in the solace.messaging.receiver.message_receiver.MessageReceiver instance.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

concurrent.futures.Future

Raises:
abstract resume()

Resumes a previously paused message delivery.

Raises:

PubSubPlusClientError – If an error occurred while trying to resume a paused messaged delivery.

abstract set_termination_notification_listener(listener: TerminationNotificationListener)

Adds a listener to listen for non-recoverable lifecycle object interruption events.

Parameters:

listener – TerminationNotificationListener

abstract start() LifecycleControl

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Raises:
abstract start_async() Future

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally, this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Returns:

An object that the application can use to determine when the service start has completed.

Return type:

concurrent.futures.Future

Raises:

IllegalStateError – If method has been invoked at an illegal or inappropriate time for some another reason.

abstract terminate(grace_period: int = 600000)

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Raises:
abstract terminate_async(grace_period: int = 600000) Future

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Returns:

An future object which the application can use to determine terminate completion.

Return type:

concurrent.futures.Future

Raises:

IllegalArgumentError – If the grace_period is invalid.

solace.messaging.receiver.receiver_cache_requests module

This module contains an abstract class that is inherited by the DirectMessageReceiver so that this type of receiver may send cache requests.

class solace.messaging.receiver.receiver_cache_requests.ReceiverCacheRequests

Bases: ABC

This abstract class provides the interface for requesting cached messages from a PubSub+ Cache instance.

abstract request_cached(cached_message_subscription_request: CachedMessageSubscriptionRequest, cache_request_id: int, completion_listener: CacheRequestOutcomeListener)

Requests messages from a broker which were previously cached using PubSub+ Cache. Responses to this request are processed by the given CacheRequestOutcomeListener. The cache_request_id parameter is used for correlating requests with responses. It is the application’s responsibility to guarantee that only unique integers are provided to this field, so as to avoid collisions.

Parameters:
  • cached_message_subscription_request (CachedMessageSubscriptionRequest) – Request for cached messages matching specified subscription and other fulfillment criteria.

  • cache_request_id (int) – request identifier which can be used for response callback correlation purposes, this value needs to be unique for the time of the application execution. A valid cache_request_id is within the range of 0 to Unsigned 64 int max. This value will be returned on a on_completion() callback of the CacheRequestOutcomeListener. The same value will be returned.

  • completion_listener (CacheRequestOutcomeListener) – Request completion listener to be notified when cache request is completed.

Raises:
  • PubSubPlusClientException – If the operation could not be performed.

  • IllegalStateException – If the service is not connected or the receiver is not running.

solace.messaging.receiver.receiver_flow_control module

This module contains an abstract base class for flow control interfaces on a persistent message receiver. Only persistent messaging prevents the broker from sending messages through the use of flow control methods.

class solace.messaging.receiver.receiver_flow_control.ReceiverFlowControl

Bases: ABC

An abstract class that defines the interface that may stop the broker from delivering messages to a a solace.messaging.receiver.message_receiver.MessageReceiver.

abstract pause()

Pauses message delivery for an asynchronous message handler or stream. Message delivery can be resumed by executing ReceiverFlowControl.resume() on a solace.messaging.receiver.persistent_message_receiver.PersistentMessageReceiver instance.

Raises:

PubSubPlusClientError – If an error occurred while pausing message delivery.

abstract resume()

Resumes a previously paused message delivery.

Raises:

PubSubPlusClientError – If an error occurred while trying to resume a paused messaged delivery.

solace.messaging.receiver.receiver_subscriptions module

This module contains the abstract base classes that defines the interface for synchronous subscription operations.

class solace.messaging.receiver.receiver_subscriptions.ReceiverSubscriptions

Bases: ABC

This class defines an abstract class for the interface for synchronous subscription operations.

The solace.messaging.receiver.message_receiver.MessageReceiver class supports both synchronous (blocking) and asynchronous (non-blocking) subscription operations.

abstract add_subscription(another_subscription: TopicSubscription)

Makes a request to subscribe synchronously to a given topic subscription.

This definition performs the subscription operation on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds are sent and the function blocks waiting for a response from the PubSub+ event broker.

Parameters:

another_subscription (TopicSubscription) – The additional subscription to attract messages where topics match the subscriptions.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract remove_subscription(subscription: TopicSubscription)

Makes a request to unsubscribe synchronously from the specified topic subscription.

This method performs the subscription removal operation on the solace.messaging.receiver.message_receiver.MessageReceiver.

Unsubscribe from a previously subscribed message source on the PubSub+ broker. Once the process is complete, no more messages where topics match the given subscription are received by the solace.messaging.receiver.message_receiver.MessageReceiver object.

Parameters:

subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver instance.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:

solace.messaging.receiver.request_reply_message_receiver module

This module contains the abstract base class used to receive the reply messages using the direct messaging paradigm.

A RequestReplyMessageReceiver can be instantiated to receive reply messages from a PubSub+ event broker.

class solace.messaging.receiver.request_reply_message_receiver.Replier

Bases: ABC

The replier is used to send a reply for a request message that has been received. The API is responsible for handling any fields which correlate the request message and the reply message.

abstract reply(response_message: OutboundMessage)

Publish a response message as a reply to a received message that has been sent using RequestReply.

Parameters:

response_message – The response message from the replier which correlates to the original request message.

Raises:

PubSubPlusClientError – When a reply can’t be sent.

class solace.messaging.receiver.request_reply_message_receiver.RequestMessageHandler

Bases: ABC

An abstract class that defines the interface for a user defined message handler that can be used by RequestReplyMessageReceiver.receive_async.

abstract on_message(message: InboundMessage, replier: Replier)

Message processing callback method that allow user to implement custom message processing business logic combined with an ability to send a response right away.

Parameters:
  • message – The request message

  • replier – The message publishing utility for sending responses. If the API finds the reply destination in the inbound message, this will be a solace.messaging.receiver.request_reply_message_receiver.Replier object. If the API does not find the reply destination in the inbound message this will be a None type object.

class solace.messaging.receiver.request_reply_message_receiver.RequestReplyMessageReceiver

Bases: MessageReceiver

An interface that abstracts message reply feature for request-reply messaging using direct messaging paradigm

abstract is_running() bool

Checks if the process was successfully started and not stopped yet.

Returns:

False if process was not started or already stopped, True otherwise.

Return type:

bool

abstract is_terminated() bool

Checks if message delivery process is terminated.

Returns:

True if message delivery process is terminated, False otherwise.

Return type:

bool

abstract is_terminating() bool

Checks if message delivery process termination is on-going.

Returns:

True if message delivery process being terminated, but termination is not finished, False otherwise.

Return type:

bool

abstract receive_async(message_handler: RequestMessageHandler)

Register an asynchronous message receiver on the solace.messaging.receiver.request_reply_message_receiver.RequestReplyMessageReceiver instance. This message receiver will use the passed RequestMessageHandler to process the message and send the reply associated with the request.

Parameters:

message_handler (RequestMessageHandler) – User defined request/reply message handler. See solace.messaging.receiver.request_reply_message_receiver.RequestMessageHandler for more information on request message handlers.

abstract receive_message(timeout: int | None = None) Tuple[InboundMessage | None, Replier | None]

This method returns the received message and the solace.messaging.receiver.request_reply_message_receiver.Replier object that can be used to send a reply as a part of the Request/Reply paradigm. This method blocks while waiting for the received message.

Parameters:

timeout (int) – timeout in milliseconds

Returns:

tuple containing:

  • if the message is successfully found in the receiver buffer, and if the reply destination is successfully found in the message:

    • message (InboundMessage): Received message

    • replier (Replier): replier instance to reply back to request

  • if the message is successfully found in the receiver buffer, but the reply destination is not found in the message:

    • message (InboundMessage): Received message

    • replier (None): None type is returned

  • if the receiver buffer is empty or if the timeout passed to this method expires while trying to retrieve a message from the receiver buffer, then there is no message or reply destination to be found:

    • message (None): None type is returned

    • replier (None): None type is returned

Return type:

(tuple)

abstract set_termination_notification_listener(listener: TerminationNotificationListener)

Adds a listener to listen for non-recoverable lifecycle object interruption events.

Parameters:

listener – TerminationNotificationListener

abstract start() LifecycleControl

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Raises:
abstract start_async() Future

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally, this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Returns:

An object that the application can use to determine when the service start has completed.

Return type:

concurrent.futures.Future

Raises:

IllegalStateError – If method has been invoked at an illegal or inappropriate time for some another reason.

abstract terminate(grace_period: int = 600000)

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Raises:
abstract terminate_async(grace_period: int = 600000) Future

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Returns:

An future object which the application can use to determine terminate completion.

Return type:

concurrent.futures.Future

Raises:

IllegalArgumentError – If the grace_period is invalid.

solace.messaging.receiver.transactional_message_receiver module

This module contains the abstract base class used to receive transactional messages.

A TransactionalMessageReceiver can be instantiated to receive messages from a PubSub+ event broker as part of a transacted session.

class solace.messaging.receiver.transactional_message_receiver.TransactionalMessageHandler

Bases: ABC

An interface for the message handler within a transaction.

abstract on_message(message: InboundMessage)

Definition for a message processing function within a transaction.

Parameters:

message (InboundMessage) – The inbound message.

class solace.messaging.receiver.transactional_message_receiver.TransactionalMessageReceiver

Bases: MessageReceiver, ReceiverFlowControl, ReceiverSubscriptions, AsyncReceiverSubscriptions, ManageableReceiver, ABC

An interface for receiving transactional messages.

Warning

Use of an asynchronous (non-blocking) method (has the ‘Async’ suffix) is mutually-exclusive to any another method. An asynchronous method cannot be called multiple times or in combination with any another message receiving method on a same instance of a MessageReceiver.

For LifecycleControl terminate, once terminate completes no further messages will be dispatched, either from receive_async() or receive_message(). The grace_period for terminate is ignored as any pending data is flushed and not included in the transaction.

abstract add_subscription(another_subscription: TopicSubscription)

Makes a request to subscribe synchronously to a given topic subscription.

This definition performs the subscription operation on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds are sent and the function blocks waiting for a response from the PubSub+ event broker.

Parameters:

another_subscription (TopicSubscription) – The additional subscription to attract messages where topics match the subscriptions.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract add_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to subscribe asynchronously to a given topic subscription.

This method initiates the subscription process on the solace.messaging.receiver.message_receiver.MessageReceiver. The subscription request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future object.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to subscribe to. Messages with a topic that matches the subscription are directed to this client.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

Future

Raises:

PubSubPlusClientError – If an operation could not be performed for some internal reason.

abstract is_running() bool

Checks if the process was successfully started and not stopped yet.

Returns:

False if process was not started or already stopped, True otherwise.

Return type:

bool

abstract is_terminated() bool

Checks if message delivery process is terminated.

Returns:

True if message delivery process is terminated, False otherwise.

Return type:

bool

abstract is_terminating() bool

Checks if message delivery process termination is on-going.

Returns:

True if message delivery process being terminated, but termination is not finished, False otherwise.

Return type:

bool

abstract pause()

Pauses message delivery for an asynchronous message handler or stream. Message delivery can be resumed by executing ReceiverFlowControl.resume() on a solace.messaging.receiver.transactional_message_receiver.TransactionalMessageReceiver instance.

Raises:

PubSubPlusClientError – If an error occurred while pausing message delivery.

receive_async(message_handler: TransactionalMessageHandler)

Register an asynchronous message handler on the receiver. Once set, the receiver starts in “push” mode, and the receive_message() method can not be used. “Push” and “pull” receivers do not mix well on a single transactional service. In fact it is strongly advised to either completely avoid this method, or constrain all transactional operations (publish, commit, rollback) to the message handler callback.

Parameters:

message_handler (TransactionalMessageHandler) – The object that receives all inbound messages (InboundMessage) through the on_message() handler method. If the provided value is None, then asynchronous receiver is removed.

receive_message(timeout: int | None = None) InboundMessage | None

Receives a message within a given transaction in a pull fashion. The methods behaviour is undefined when a TransactionalMessageHandler is set using receive_async().

Parameters:

timeout (int) – The time, in milliseconds, to wait for a message to arrive.

Returns:

An object that represents an inbound message. Returns None on timeout, or upon service or receiver shutdown.

Return type:

InboundMessage

Raises:

PubSubPlusClientError – When the receiver can not receive.

abstract receiver_info() TransactionalReceiverInfo

Provides access to the receiver information

Returns:

an object that represents message receiver manageability.

Return type:

TransactionalReceiverInfo

abstract remove_subscription(subscription: TopicSubscription)

Makes a request to unsubscribe synchronously from the specified topic subscription.

This method performs the subscription removal operation on the solace.messaging.receiver.message_receiver.MessageReceiver.

Unsubscribe from a previously subscribed message source on the PubSub+ broker. Once the process is complete, no more messages where topics match the given subscription are received by the solace.messaging.receiver.message_receiver.MessageReceiver object.

Parameters:

subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver instance.

Returns:

When the function successfully completes, it returns otherwise, it raises an exception.

Raises:
abstract remove_subscription_async(topic_subscription: TopicSubscription) Future

Makes a request to unsubscribe asynchronously from a given topic subscription.

This method initiates the subscription removal process on the solace.messaging.receiver.message_receiver.MessageReceiver instance.

The unsubscribe request proceeds asynchronously, with the success or failure status available in the returned concurrent.futures.Future. Once the process is complete, no more messages whose topic match the given subscription will be received in the solace.messaging.receiver.message_receiver.MessageReceiver instance.

Parameters:

topic_subscription (TopicSubscription) – The subscription expression to remove from the MessageReceiver.

Returns:

An object used to determine when the connection attempt has completed.

Return type:

concurrent.futures.Future

Raises:
abstract resume()

Resumes a previously paused message delivery.

Raises:

PubSubPlusClientError – If an error occurred while trying to resume a paused messaged delivery.

abstract set_termination_notification_listener(listener: TerminationNotificationListener)

Adds a listener to listen for non-recoverable lifecycle object interruption events.

Parameters:

listener – TerminationNotificationListener

abstract start() LifecycleControl

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Raises:
abstract start_async() Future

Enables service regular duties. Before this method is called, service is considered off duty. In order to operate normally, this method needs to be called on a service instance. If the service is already started, or starting, this operation has no effect.

Returns:

An object that the application can use to determine when the service start has completed.

Return type:

concurrent.futures.Future

Raises:

IllegalStateError – If method has been invoked at an illegal or inappropriate time for some another reason.

abstract terminate(grace_period: int = 600000)

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Raises:
abstract terminate_async(grace_period: int = 600000) Future

Disables regular duties of a service. If this service is already terminated or terminating, this operation has no effect. All attempts to use a service after termination is requested will be refused with an exception.

Parameters:

grace_period (int) – The positive integer grace period to use. The default is 600000ms.

Returns:

An future object which the application can use to determine terminate completion.

Return type:

concurrent.futures.Future

Raises:

IllegalArgumentError – If the grace_period is invalid.