Using Local Transactions in the Python API
Local transactions in the PubSub+ Python API allow your application to group multiple guaranteed message send and/or guaranteed message receive operations together as a single, atomic unit known as a transaction. Local transactions ensure that all the operations within a transaction are either committed or rolled back. Local transactions are used to maintain data integrity and consistency, and can be useful in situations such as order processing and financial transactions. For more information see Using Local Transactions. The following sections show how to use local transactions in the PubSub+ Python API.
- Create a TransactionalMessagingService.
- Publishing Messages in a Local Transaction.
- Receiving Messages in a Local Transaction.
Creating a TransactionalMessagingService Object
A TransactionalMessagingService
object allows the API to establish a connection to the event broker and create transactions. To create a TransactionalMessagingService
object, do the following:
-
Call the
MessagingService
class'create_transactional_service_builder()
function to return aTransactionalMessagingServiceBuilder
object. -
The
TransactionalMessagingServiceBuilder
object gives you access to a number of functions that let you customize aTransactionalMessagingService
object. These include the following:from_properties(configuration: dict)
—Pass the necessary broker properties dictionary to theTransactionalMessagingServiceBuilder
.set_transaction_request_timeout(timeout: int = 10000)
—Timeout (in milliseconds) to wait for a response. Default is 10000.
-
Call the
build()
function on theTransactionalMessagingServiceBuilder
object to return aTransactionalMessagingService
object. -
Call the
connect()
function on yourTransactionalMessagingService
object to connect to the event broker.
For more information, see the PubSub+ Messaging API for Python reference.
The following sample code shows how to create a simple TransactionalMessagingService
instance and connect it to an event broker:
transactional_service = messaging_service.create_transactional_service_builder().build().connect()
Publishing Messages in a Local Transaction
After a TransactionalMessagingService
instance has established a connection to an event broker, use a TransactionalMessagePublisher
to publish guaranteed messages. To create a TransactionalMessagePublisher
object, do the following:
-
Call the
create_transactional_message_publisher_builder()
function on aTransactionalMessagingService
object. This returns aTransactionalMessagePublisherBuilder
object. -
Call the
build()
function on yourTransactionalMessagePublisherBuilder
to return aTransactionalMessagePublisher
object. -
To enable your
TransactionalMessagePublisher
to start publishing messages, call thestart()
function on it.
For more information, see the PubSub+ Messaging API for Python reference.
The following is an example that shows how to use a transactional message publisher to enable your application to publish messages to the event broker:
transactional_publisher = transactional_service.create_transactional_message_publisher_builder().build().start()
Local Transaction publish()
Example
After you create a TransactionalMessagingService
and a TransactionalMessagePublisher
, you can now use the publish()
function to publish messages in a local transaction. The example below will attempt to publish ten messages to each topic in the topics
array. In each run through the for-loop, the application attempts to publish ten messages to a topic as a single transaction with the commit()
function. If something goes wrong during the code execution, the transaction is rolled back and none of the messages will be published for that iteration of the loop.
topics = [Topic.of("my/topic/1"), Topic.of("my/topic/2"), Topic.of("my/topic/3")] for topic in topics: try: for i in range(10): message = messaging_service.message_builder().build(f"Message #{i} for topic {topic.get_name()}") transactional_publisher.publish(message, topic) # Publish all messages to a topic, or none at all: transactional_service.commit() # Can't be sure messages were actually published until the commit succeeds. except TransactionRollbackError: # Commit failed. print("Commit failed.") except UnknownTransactionStateError: # Commit outcome unknown. This should be very rare. print("Commit outcome unknown.") transactional_service.disconnect()
Receiving Messages in a Local Transaction
After a TransactionalMessagingService
instance has established a connection to an event broker, use a TransactionalMessageReceiver
to receive guaranteed messages. To create a TransactionalMessageReceiver
object, do the following:
-
Call the
create_transactional_message_receiver_builder()
function on aTransactionalMessagingService
object. This returns aTransactionalMessageReceiverBuilder
object. -
You can now use the functions in the
TransactionalMessageReceiverBuilder
interface to configure aTransactionalMessageReceiver
to use certain features of the API:with_message_replay(replay_strategy: ReplayStrategy)
—Add a message replay message strategy to a persistent receiver.with_message_selector(selector_query_expression: str)
—Enables support for message selection based on message header parameter and message properties values.with_missing_resources_creation_strategy(strategy: MissingResourcesCreationStrategy)
—Add the missing queue creation strategy that defines what action the API may take.
-
Call the
build(endpoint_to_consume_from: Queue)
function on yourTransactionalMessageReceiverBuilder
to return aTransactionalMessageReceiver
object. Thebuild()
function takes an explicit queue to consume from as a parameter. -
To enable your
TransactionalMessageReceiver
to start receiving messages, call thestart()
function on it.
For more information, see the PubSub+ Messaging API for Python reference.
The following is an example that shows how to use a transactional message receiver to enable your application to receive messages from the event broker:
transactional_receiver= transactional_service.create_transactional_message_receiver_builder().build(my_queue).start()
Receiving Messages Synchronously in a Local Transaction
After you create a TransactionalMessagingService
and a TransactionalMessageReceiver
, you can now receive messages in a transaction. The example below will attempt to receive ten messages using the blocking receive_message()
function from each queue in the queues array using a for-loop. In each run through the for-loop, the application attempts to receive ten messages from a queue as a single transaction with the commit()
function. If something goes wrong during the code execution, the transaction is rolled back and none of the messages will be consumed from the queue and removed from the event broker for that iteration of the loop.
# Receive a message from each queue in a transaction, 'count' times. Each count is a separate transaction: A message is taken from every queue or none. count = 10 queues = [queue1, queue2, queue3] messages = [] receivers = [] # Start a receiver for every queue for queue in queues: transactional_receiver = transactional_service.create_transactional_message_receiver_builder().build(queue).start() receivers.append(transactional_receiver) for _ in range(count): # Grab a message from each receiver/queue messages_in_this_transaction = [] for receiver in receivers: message = receiver.receive_message() if message is None: if messages_in_this_transaction: # One of the queues ran empty, rolling back the partial transaction: transactional_service.rollback() break messages_in_this_transaction.append(message) else: # no break try: # Consume a message from all queues, or from none: transactional_service.commit() messages.extend(messages_in_this_transaction) except TransactionRollbackError: # Commit failed. print("Commit error") except UnknownTransactionStateError: # Commit outcome unknown. This should be very rare. print("Unknown commit error") transactional_service.disconnect()
Receiving Messages with a Message Handler in a Local Transaction
After you create a TransactionalMessagingService
and a TransactionalMessageReceiver
, you can now use the non blocking receive_async()
function to receive messages in a transaction.
The non-blocking receiver function is called receive_async()
, however it is not an asynchronous co-routine or generator,
and is not asyncio
compatible.
receive_async()
returns immediately, and works with native threads under the hood. This function invokes the callback on a new python thread for every message.
The receive_async()
function takes a message handler as a parameter. The PubSub+ Python API has a TransactionalMessageHandler
designed for local transactions. This interface contains the abstract on_message()
function that you implement to handle the processing of inbound messages, such as print, modify, re-publish or commit operations. The example implementation of the on_message()
function below is blocking, which is a way to control the flow of incoming messages.
Do not perform operations on the same transactional service from outside the message handler or from other threads because it is not thread safe.
The example below will attempt to receive ten messages using the non-blocking receive_async()
function from the queue passed to the build()
function. Either all messages will be received from the event broker queue when the transaction succeeds and is committed with the commit()
function, or if something goes wrong at any point in the code execution the transaction is rolled back and none of the messages will be consumed from any queue.
import threading from solace.messaging.receiver.transactional_message_receiver import TransactionalMessageHandler from solace.messaging.receiver.inbound_message import InboundMessage from solace.messaging.resources.topic_subscription import TopicSubscription transactional_service = messaging_service.create_transactional_service_builder().build().connect() queue = Queue.durable_exclusive_queue("myQueue") count = 11 messages_processed = 0 finished = threading.Event() # The message dispatch function is wrapped in a class class MsgHandler(TransactionalMessageHandler): def on_message(self, message: InboundMessage): # Process message, for example: print, modify, re-publish, commit. # Blocking in this method is a way to control the flow of incoming messages. # Do not perform operations on the same transactional service from anywhere else. global transactional_service, messages_processed, finished print(f'Transactional message callback processing message: {message.get_payload_as_string()}') transactional_service.commit() messages_processed += 1 if messages_processed >= count: # Remember it's not safe to perform operations on the transactional service # from other threads, including the main thread. finished.set() msgHandler = MsgHandler() receiver_builder = transactional_service.create_transactional_message_receiver_builder() receiver = receiver_builder.build(queue) # The receiver's mode of operation (blocking vs non-blocking) must be decided before the receiver is started. receiver.receive_async(msgHandler) receiver.start() finished.wait(10) transactional_service.disconnect()