Browsing Queues with the Solace Python API
Queue browsing lets a client application view guaranteed messages in order, from oldest to newest, without consuming them from the event broker. Queue browsing functions independently of consumers so existing message consumers are not affected. You may optionally remove browsed messages from a queue or leave them as needed. For more information see Browsing Guaranteed Messages.
This page covers the following topics:
- Basic Queue Browsing
- Using Message Selectors
- Optimizing Performance with Window Size
- Configuring Reconnection Behavior
- Implementing Error Handling
Basic Queue Browsing
The following sections explain the essential operations for queue browsing using the Solace Python API.
Creating a Queue Browser
First, create a queue browser using a MessagingService object. The browser allows you to examine messages on a queue without consuming them. You'll need to choose the appropriate queue type based on your requirements and call the start() method to connect your browser to the event broker:
# You can use either durable or non-durable queues depending on your requirements
# For persistent queues that survive broker restarts:
durable_non_exclusive_queue = Queue.durable_non_exclusive_queue(queue_name)
# For temporary queues that exist only during the session:
non_durable_exclusive_queue = Queue.non_durable_exclusive_queue(queue_name)
# Create and start the browser
try:
browser = messaging_service.create_message_queue_browser_builder() \
.build(durable_non_exclusive_queue)
browser.start()
# You can also start the browser asynchronously
# future = browser.start_async()
# Do other work while browser is starting
# future.result() # Wait for completion if needed
finally:
# Always terminate the browser when done (shown in later examples)
pass
Receiving Messages
Once your browser is connected, you can browse messages that are spooled on the queue by calling the receive_message() method. This method returns the oldest message on the queue without removing it. The timeout parameter specifies how long to wait for a message before returning. Always check if a message was received before attempting to access its properties:
try:
# DEFAULT_TIMEOUT is the time in milliseconds that the receive_message() method blocks before exiting
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
# Check if a message was received before accessing its properties
if message_received:
print(f"Received message: {message_received.get_payload_as_string()}")
finally:
# Always terminate the browser when done
browser.terminate()
Removing Messages
While browsing primarily allows you to examine messages without removing them, you can also selectively remove messages from the queue using the remove() method. This is useful for scenarios where you need to inspect a message before deciding whether to process it or discard it:
try:
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
if message_received:
browser.remove(message_received)
print(f"Removed message from queue: {queue_name}")
finally:
# Always terminate the browser when done
browser.terminate()
Using Message Selectors
For more targeted browsing, you can use message selectors to filter messages based on their properties. The with_message_selector() method accepts SQL-92 style expressions that can match against standard JMS properties or your own custom message properties. This is particularly useful when dealing with large queues where you only need to examine specific messages:
try:
# Example of a message selector expression with JMS properties
jms_selector = "JMSCorrelationID = '1' AND JMSPriority = 1"
# Example with custom properties
custom_selector = "myProperty = 'myValue'"
# Example using LIKE operator with wildcards
wildcard_selector = "myProperty LIKE 'prefix%'"
# Create browser with message selector
browser = messaging_service.create_message_queue_browser_builder() \
.with_message_selector(custom_selector) \
.build(durable_non_exclusive_queue)
browser.start()
# Only messages with matching message properties from a selector expression will be received
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
if message_received:
print(f"Received message with selector: {message_received.get_payload_as_string()}")
finally:
# Always terminate the browser when done
browser.terminate()
Optimizing Performance with Window Size
To optimize performance when browsing large queues, you can configure the window size to control how many messages are fetched from the broker at once. The window size (ranging from 1 to 255) determines the number of messages that will be prefetched into the client's local buffer. A larger window size can improve throughput but uses more memory, while a smaller window size uses less memory but may require more network round-trips:
try:
# Configure window size (controls how many messages are fetched at once)
browser = messaging_service.create_message_queue_browser_builder() \
.with_queue_browser_window_size(10) \
.build(durable_non_exclusive_queue)
browser.start()
# Process messages
while True:
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
if message_received is None:
break
print(f"Received message: {message_received.get_payload_as_string()}")
finally:
# Always terminate the browser when done
browser.terminate()
Configuring Reconnection Behavior
For production environments with potential network instability, you can configure reconnection behavior and other advanced options using properties. This allows you to fine-tune how the browser handles disconnections, how many reconnection attempts it should make, and how long to wait between attempts. You can combine multiple configuration options in a single properties object:
try:
# Import the necessary properties
from solace.messaging.config.solace_properties.queue_browser_properties import QUEUE_BROWSER_MESSAGE_SELECTOR_QUERY, \
QUEUE_BROWSER_WINDOW_SIZE, QUEUE_BROWSER_RECONNECTION_ATTEMPTS, QUEUE_BROWSER_RECONNECTION_ATTEMPTS_WAIT_INTERVAL
# Configure browser with multiple properties
browser = messaging_service.create_message_queue_browser_builder() \
.from_properties({
QUEUE_BROWSER_RECONNECTION_ATTEMPTS: 3,
QUEUE_BROWSER_RECONNECTION_ATTEMPTS_WAIT_INTERVAL: 5000, # milliseconds
QUEUE_BROWSER_WINDOW_SIZE: 10,
QUEUE_BROWSER_MESSAGE_SELECTOR_QUERY: "myProperty = 'myValue'"
}) \
.build(durable_non_exclusive_queue)
browser.start()
# Process messages
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
if message_received:
print(f"Received message: {message_received.get_payload_as_string()}")
finally:
# Always terminate the browser when done
browser.terminate()
Implementing Error Handling
To implement robust error handling and resource management, you can monitor browser lifecycle events using termination notification listeners. These listeners allow your application to react to browser state changes, such as unexpected disconnections or normal termination. You can check the browser's state at any time using methods like is_running() and is_terminated(). This is particularly important in long-running applications where you need to ensure proper cleanup of resources:
# Define a termination listener
class TermListener:
def on_termination(self, event):
print(f"Browser terminated: {event.message}")
# event.cause contains the reason for termination
try:
browser = messaging_service.create_message_queue_browser_builder() \
.build(durable_non_exclusive_queue)
# Set the termination notification listener
browser.set_termination_notification_listener(TermListener())
browser.start()
# Check browser state
if browser.is_running():
print("Browser is running")
# Process messages
message_received = browser.receive_message(timeout=DEFAULT_TIMEOUT)
if message_received:
print(f"Received message: {message_received.get_payload_as_string()}")
finally:
# Always terminate the browser when done
browser.terminate()
# After termination
if browser.is_terminated():
print("Browser is terminated")