Browsing Queues with the Solace Python API

Queue browsing lets a client application view guaranteed messages in order, from oldest to newest, without consuming them from the event broker. Queue browsing functions independently of consumers so existing message consumers are not affected. You may optionally remove browsed messages from a queue or leave them as needed. For more information see Browsing Guaranteed Messages.

This page covers the following topics:

Basic Queue Browsing

The following sections explain the essential operations for queue browsing using the Solace Python API.

  1. Creating a Queue Browser
  2. Receiving Messages
  3. Removing Messages

Creating a Queue Browser

First, create a queue browser using a MessagingService object. The browser allows you to examine messages on a queue without consuming them. You'll need to choose the appropriate queue type based on your requirements and call the start() method to connect your browser to the event broker:

# You can use either durable or non-durable queues depending on your requirements

# For persistent queues that survive broker restarts:
durable_non_exclusive_queue = Queue.durable_non_exclusive_queue(queue_name)

# For temporary queues that exist only during the session:
non_durable_exclusive_queue = Queue.non_durable_exclusive_queue(queue_name)

# Create and start the browser
try:
    browser = messaging_service.create_message_queue_browser_builder() \
        .build(durable_non_exclusive_queue)
    browser.start()
    
    # You can also start the browser asynchronously
    # future = browser.start_async()
    # Do other work while browser is starting
    # future.result()  # Wait for completion if needed
finally:
    # Always terminate the browser when done (shown in later examples)
    pass

Receiving Messages

Once your browser is connected, you can browse messages that are spooled on the queue by calling the receive_message() method. This method returns the oldest message on the queue without removing it. The timeout parameter specifies how long to wait for a message before returning. Always check if a message was received before attempting to access its properties:

try:
    # DEFAULT_TIMEOUT is the time in milliseconds that the receive_message() method blocks before exiting
    message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
    
    # Check if a message was received before accessing its properties
    if message_received:
        print(f"Received message: {message_received.get_payload_as_string()}")
finally:
    # Always terminate the browser when done
    browser.terminate()

Removing Messages

While browsing primarily allows you to examine messages without removing them, you can also selectively remove messages from the queue using the remove() method. This is useful for scenarios where you need to inspect a message before deciding whether to process it or discard it:

try:
    message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
    if message_received:
        browser.remove(message_received)
        print(f"Removed message from queue: {queue_name}")
finally:
    # Always terminate the browser when done
    browser.terminate()

Using Message Selectors

For more targeted browsing, you can use message selectors to filter messages based on their properties. The with_message_selector() method accepts SQL-92 style expressions that can match against standard JMS properties or your own custom message properties. This is particularly useful when dealing with large queues where you only need to examine specific messages:

try:
    # Example of a message selector expression with JMS properties
    jms_selector = "JMSCorrelationID = '1' AND JMSPriority = 1"
    
    # Example with custom properties
    custom_selector = "myProperty = 'myValue'"
    
    # Example using LIKE operator with wildcards
    wildcard_selector = "myProperty LIKE 'prefix%'"
    
    # Create browser with message selector
    browser = messaging_service.create_message_queue_browser_builder() \
        .with_message_selector(custom_selector) \
        .build(durable_non_exclusive_queue)
    browser.start()
    
    # Only messages with matching message properties from a selector expression will be received
    message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
    if message_received:
        print(f"Received message with selector: {message_received.get_payload_as_string()}")
finally:
    # Always terminate the browser when done
    browser.terminate()

Optimizing Performance with Window Size

To optimize performance when browsing large queues, you can configure the window size to control how many messages are fetched from the broker at once. The window size (ranging from 1 to 255) determines the number of messages that will be prefetched into the client's local buffer. A larger window size can improve throughput but uses more memory, while a smaller window size uses less memory but may require more network round-trips:

try:
    # Configure window size (controls how many messages are fetched at once)
    browser = messaging_service.create_message_queue_browser_builder() \
        .with_queue_browser_window_size(10) \
        .build(durable_non_exclusive_queue)
    browser.start()
    
    # Process messages
    while True:
        message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
        if message_received is None:
            break
        print(f"Received message: {message_received.get_payload_as_string()}")
finally:
    # Always terminate the browser when done
    browser.terminate()

Configuring Reconnection Behavior

For production environments with potential network instability, you can configure reconnection behavior and other advanced options using properties. This allows you to fine-tune how the browser handles disconnections, how many reconnection attempts it should make, and how long to wait between attempts. You can combine multiple configuration options in a single properties object:

try:
    # Import the necessary properties
    from solace.messaging.config.solace_properties.queue_browser_properties import QUEUE_BROWSER_MESSAGE_SELECTOR_QUERY, \
        QUEUE_BROWSER_WINDOW_SIZE, QUEUE_BROWSER_RECONNECTION_ATTEMPTS, QUEUE_BROWSER_RECONNECTION_ATTEMPTS_WAIT_INTERVAL
    
    # Configure browser with multiple properties
    browser = messaging_service.create_message_queue_browser_builder() \
        .from_properties({
            QUEUE_BROWSER_RECONNECTION_ATTEMPTS: 3,
            QUEUE_BROWSER_RECONNECTION_ATTEMPTS_WAIT_INTERVAL: 5000,  # milliseconds
            QUEUE_BROWSER_WINDOW_SIZE: 10,
            QUEUE_BROWSER_MESSAGE_SELECTOR_QUERY: "myProperty = 'myValue'"
        }) \
        .build(durable_non_exclusive_queue)
    browser.start()
    
    # Process messages
    message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
    if message_received:
        print(f"Received message: {message_received.get_payload_as_string()}")
finally:
    # Always terminate the browser when done
    browser.terminate()

Implementing Error Handling

To implement robust error handling and resource management, you can monitor browser lifecycle events using termination notification listeners. These listeners allow your application to react to browser state changes, such as unexpected disconnections or normal termination. You can check the browser's state at any time using methods like is_running() and is_terminated(). This is particularly important in long-running applications where you need to ensure proper cleanup of resources:

# Define a termination listener
class TermListener:
    def on_termination(self, event):
        print(f"Browser terminated: {event.message}")
        # event.cause contains the reason for termination

try:
    browser = messaging_service.create_message_queue_browser_builder() \
        .build(durable_non_exclusive_queue)
    
    # Set the termination notification listener
    browser.set_termination_notification_listener(TermListener())
    
    browser.start()
    
    # Check browser state
    if browser.is_running():
        print("Browser is running")
    
    # Process messages
    message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
    if message_received:
        print(f"Received message: {message_received.get_payload_as_string()}")
finally:
    # Always terminate the browser when done
    browser.terminate()
    
    # After termination
    if browser.is_terminated():
        print("Browser is terminated")