Using Local Transactions in the Python API

Local transactions in the PubSub+ Python API allow your application to group multiple guaranteed message send and/or guaranteed message receive operations together as a single, atomic unit known as a transaction. Local transactions ensure that all the operations within a transaction are either committed or rolled back. Local transactions are used to maintain data integrity and consistency, and can be useful in situations such as order processing and financial transactions. For more information see Using Local Transactions. The following sections show how to use local transactions in the PubSub+ Python API.

  1. Create a TransactionalMessagingService.
  2. Publishing Messages in a Local Transaction.
  3. Receiving Messages in a Local Transaction.

Creating a TransactionalMessagingService Object

A TransactionalMessagingService object allows the API to establish a connection to the event broker and create transactions. To create a TransactionalMessagingService object, do the following:

  1. Call the MessagingService class' create_transactional_service_builder() function to return a TransactionalMessagingServiceBuilder object.

  2. The TransactionalMessagingServiceBuilder object gives you access to a number of functions that let you customize a TransactionalMessagingService object. These include the following:

    • from_properties(configuration: dict)—Pass the necessary broker properties dictionary to the TransactionalMessagingServiceBuilder.
    • set_transaction_request_timeout(timeout: int = 10000)—Timeout (in milliseconds) to wait for a response. Default is 10000.
  3. Call the build() function on the TransactionalMessagingServiceBuilder object to return a TransactionalMessagingService object.

  4. Call the connect() function on your TransactionalMessagingService object to connect to the event broker.

For more information, see the PubSub+ Messaging API for Python reference.

The following sample code shows how to create a simple TransactionalMessagingService instance and connect it to an event broker:

transactional_service = messaging_service.create_transactional_service_builder().build().connect()

Publishing Messages in a Local Transaction

After a TransactionalMessagingService instance has established a connection to an event broker, use a TransactionalMessagePublisher to publish guaranteed messages. To create a TransactionalMessagePublisher object, do the following:

  1. Call the create_transactional_message_publisher_builder() function on a TransactionalMessagingService object. This returns a TransactionalMessagePublisherBuilder object.

  2. Call the build() function on your TransactionalMessagePublisherBuilder to return a TransactionalMessagePublisher object.

  3. To enable your TransactionalMessagePublisher to start publishing messages, call the start() function on it.

For more information, see the PubSub+ Messaging API for Python reference.

The following is an example that shows how to use a transactional message publisher to enable your application to publish messages to the event broker:

transactional_publisher = transactional_service.create_transactional_message_publisher_builder().build().start()

Local Transaction publish() Example

After you create a TransactionalMessagingService and a TransactionalMessagePublisher, you can now use the publish() function to publish messages in a local transaction. The example below will attempt to publish ten messages to each topic in the topics array. In each run through the for-loop, the application attempts to publish ten messages to a topic as a single transaction with the commit() function. If something goes wrong during the code execution, the transaction is rolled back and none of the messages will be published for that iteration of the loop.

topics = [Topic.of("my/topic/1"), Topic.of("my/topic/2"), Topic.of("my/topic/3")]

for topic in topics:
    try:
        for i in range(10):
            message = messaging_service.message_builder().build(f"Message #{i} for topic {topic.get_name()}")
            transactional_publisher.publish(message, topic)
        # Publish all messages to a topic, or none at all:
        transactional_service.commit()
        # Can't be sure messages were actually published until the commit succeeds.
    except TransactionRollbackError:
        # Commit failed.
        print("Commit failed.")
    except UnknownTransactionStateError:
        # Commit outcome unknown. This should be very rare.
        print("Commit outcome unknown.")
transactional_service.disconnect()

 

Receiving Messages in a Local Transaction

After a TransactionalMessagingService instance has established a connection to an event broker, use a TransactionalMessageReceiver to receive guaranteed messages. To create a TransactionalMessageReceiver object, do the following:

  1. Call the create_transactional_message_receiver_builder() function on a TransactionalMessagingService object. This returns a TransactionalMessageReceiverBuilder object.

  2. You can now use the functions in the TransactionalMessageReceiverBuilder interface to configure a TransactionalMessageReceiver to use certain features of the API:

    • with_message_replay(replay_strategy: ReplayStrategy)—Add a message replay message strategy to a persistent receiver.
    • with_message_selector(selector_query_expression: str)—Enables support for message selection based on message header parameter and message properties values.
    • with_missing_resources_creation_strategy(strategy: MissingResourcesCreationStrategy)—Add the missing queue creation strategy that defines what action the API may take.
  3. Call the build(endpoint_to_consume_from: Queue) function on your TransactionalMessageReceiverBuilder to return a TransactionalMessageReceiver object. The build() function takes an explicit queue to consume from as a parameter.

  4. To enable your TransactionalMessageReceiver to start receiving messages, call the start() function on it.

For more information, see the PubSub+ Messaging API for Python reference.

The following is an example that shows how to use a transactional message receiver to enable your application to receive messages from the event broker:

transactional_receiver= transactional_service.create_transactional_message_receiver_builder().build(my_queue).start()

Receiving Messages Synchronously in a Local Transaction

After you create a TransactionalMessagingService and a TransactionalMessageReceiver, you can now receive messages in a transaction. The example below will attempt to receive ten messages using the blocking receive_message() function from each queue in the queues array using a for-loop. In each run through the for-loop, the application attempts to receive ten messages from a queue as a single transaction with the commit() function. If something goes wrong during the code execution, the transaction is rolled back and none of the messages will be consumed from the queue and removed from the event broker for that iteration of the loop.

# Receive a message from each queue in a transaction, 'count' times. Each count is a separate transaction: A message is taken from every queue or none.
count = 10
queues = [queue1, queue2, queue3]
messages = []
receivers = []
# Start a receiver for every queue
for queue in queues:
    transactional_receiver = transactional_service.create_transactional_message_receiver_builder().build(queue).start()
    receivers.append(transactional_receiver)

for _ in range(count):
    # Grab a message from each receiver/queue
    messages_in_this_transaction = []
    for receiver in receivers:
        message = receiver.receive_message()
        if message is None:
            if messages_in_this_transaction:
                # One of the queues ran empty, rolling back the partial transaction:
                transactional_service.rollback()
            break
        messages_in_this_transaction.append(message)
    else:  # no break
        try:
            # Consume a message from all queues, or from none:
            transactional_service.commit()
            messages.extend(messages_in_this_transaction)
        except TransactionRollbackError:
            # Commit failed.
            print("Commit error")
        except UnknownTransactionStateError:
            # Commit outcome unknown. This should be very rare.
            print("Unknown commit error")
transactional_service.disconnect()

Receiving Messages with a Message Handler in a Local Transaction

After you create a TransactionalMessagingService and a TransactionalMessageReceiver, you can now use the non blocking receive_async() function to receive messages in a transaction.

The non-blocking receiver function is called receive_async(), however it is not an asynchronous co-routine or generator, and is not asyncio compatible. receive_async() returns immediately, and works with native threads under the hood. This function invokes the callback on a new python thread for every message.

The receive_async() function takes a message handler as a parameter. The PubSub+ Python API has a TransactionalMessageHandler designed for local transactions. This interface contains the abstract on_message() function that you implement to handle the processing of inbound messages, such as print, modify, re-publish or commit operations. The example implementation of the on_message() function below is blocking, which is a way to control the flow of incoming messages.

Do not perform operations on the same transactional service from outside the message handler or from other threads because it is not thread safe.

The example below will attempt to receive ten messages using the non-blocking receive_async() function from the queue passed to the build() function. Either all messages will be received from the event broker queue when the transaction succeeds and is committed with the commit() function, or if something goes wrong at any point in the code execution the transaction is rolled back and none of the messages will be consumed from any queue.

import threading
from solace.messaging.receiver.transactional_message_receiver import TransactionalMessageHandler
from solace.messaging.receiver.inbound_message import InboundMessage
from solace.messaging.resources.topic_subscription import TopicSubscription

transactional_service = messaging_service.create_transactional_service_builder().build().connect()
queue = Queue.durable_exclusive_queue("myQueue")
count = 11
messages_processed = 0
finished = threading.Event()

# The message dispatch function is wrapped in a class
class MsgHandler(TransactionalMessageHandler):
    def on_message(self, message: InboundMessage):
        # Process message, for example: print, modify, re-publish, commit.
        # Blocking in this method is a way to control the flow of incoming messages.
        # Do not perform operations on the same transactional service from anywhere else.
        global transactional_service, messages_processed, finished
        print(f'Transactional message callback processing message: {message.get_payload_as_string()}')
		
        transactional_service.commit()
        messages_processed += 1
        if messages_processed >= count:
            # Remember it's not safe to perform operations on the transactional service
            # from other threads, including the main thread.
            finished.set()

msgHandler = MsgHandler()
receiver_builder = transactional_service.create_transactional_message_receiver_builder()
receiver = receiver_builder.build(queue)

# The receiver's mode of operation (blocking vs non-blocking) must be decided before the receiver is started.
receiver.receive_async(msgHandler)
receiver.start()
finished.wait(10)
transactional_service.disconnect()