Context Propagation for Distributed Tracing in the Python API

Distributed tracing allows your enterprise applications to trace the flow of messages as they travel from your publisher, through the event mesh and to the receiving application. For a detailed overview see Distributed Tracing . For information about version requirements, see Distributed Tracing Version Compatibility.

  • The PubSub+ OpenTelemetry API Libraries support W3C propagators only.
  • For information about configuring OpenTelemetry SDK environment variables see OpenTelemetry SDK Configuration.
  • By default, traces include command line parameters visible to backend applications like Jaeger. Solace recommends disabling this feature for security purposes because these parameters may contain sensitive information such as your user name and password. For instructions, see Disabling Automatic Resource Providers in the OpenTelemetry documentation in GitHub.

Instrumenting Python for Distributed Tracing

Manual Instrumentation involves making changes to your enterprise application's source code, and allows you to inject and extract additional context, such as baggage and trace states, into and from messages. Context propagation makes it easy to debug and optimize your application. For more information about context propagation in Solace event messages, see Distributed Tracing Context Propagation. The following examples show you how to create spans using the OpenTelemetry API.

Understanding How Context Propagation Enables Distributed Tracing in the Python PubSub+ API

In your client application, you can use the OpenTelemetry API to create a span, which contains metadata about an operation in a distributed system. This span is associated with a context, which includes a unique TraceID. Next, when you use a PubSub+ message producer to publish a message, the Solace OTel integration package injects the context, which contains the TraceID, into the message. As the message travels through the event broker and is received by a consuming application, spans are generated at each step and have the same TraceID present in the original message context. When each span is closed in the publishing or consuming application, the Python OpenTelemetry API sends it to an OpenTelemetry collector, which collects, processes and exports the spans to a backend application that correlates the spans using their unique TraceID. A backend application uses the correlated spans to create a trace, which is an end-to-end snapshot detailing how the message traveled through the distributed system. If you do not use context propagation, then backend applications cannot use a unique TraceID to link the spans, making it difficult to trace the flow of messages through the distributed system.

Dependencies

To enable context propagation for distributed tracing, you must first add the Solace PubSub+ OpenTelemetry Python Integration package as a dependency in your application. You can also add this package with the following command:

pip install pubsubplus-opentelemetry-integration

Then add the OpenTelemetry API and SDK libraries required for context propagation with the following commands:

pip install opentelemetry-api
pip install opentelemetry-sdk	

For OpenTelemetry version compatibility, see Distributed Tracing Version Compatibility. Adding these libraries gives your application access to the following:

  • InboundMessageCarrier—OpenTelemetry message carrier and Solace message wrapper for inbound messages
  • InboundMessageGetter—enables the API to extract propagated fields from an InboundMessageCarrier
  • OutboundMessageCarrier—OpenTelemetry message carrier and Solace message wrapper for outbound messages
  • OutboundMessageSetter—enables the API to inject propagated fields into an OutboundMessageCarrier
  • OutboundMessageGetter—enables the API to extract propagated fields from an OutboundMessageCarrier

This guide presumes you are familiar with configuring an instance of the OpenTelemetry class. For instructions for configuring OpenTelemetry objects, see OpenTelemetry Manual Instrumentation in Python in the OpenTelemetry documentation.

To use context propagation in the Python PubSub+ API, include the following packages in your application:

from opentelemetry.trace import StatusCode, Status  # Trace statuses
from opentelemetry import context                   # Context functionality
from opentelemetry import propagate                 # Trace context propagation
from opentelemetry import trace, baggage            # Trace and baggage functionality
from opentelemetry.sdk.resources import Resource    # Define resources in tracing
from opentelemetry.sdk.trace import TracerProvider  # Trace functionality
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter  # Span exporters for OTLP server using gRPC
from opentelemetry.sdk.trace.export import (        # Generic span exporters
    BatchSpanProcessor,
    ConsoleSpanExporter,
    SimpleSpanProcessor
)
from opentelemetry.semconv.trace import (               # Standard semantic conventions for tracing
    SpanAttributes,
    MessagingDestinationKindValues,
    MessagingOperationValues
)
from solace_otel.messaging.trace.propagation import (   # Solace message carriers for trace propagation
    OutboundMessageCarrier,
    OuboundMessageGetter, 
    OutboundMessageSetter,
    InboundMessageCarrier,
    InboundMessageGetter		
)

Generating a Send Span on Message Publish

Your publishing application can generate a send span and export it to the OpenTelemetry Collector. To inject context into a message and generate a send span for a published message, perform these steps:

  1. Create an instance of an OutboundMessageSetter, which enables the API to inject propagated fields into a Solace outbound message carrier. Next, create a propagator instance with the get_global_textmap() function. This function returns a global text map propagator which makes the TraceContext and Baggage structures available across your application.
  2. default_setter = OutboundMessageSetter()
    propagator = propagate.get_global_textmap()
  3. Use the get_tracer() function to create a tracer instance. A tracer instance allows your application to create, start, and end spans.
  4. tracer = trace.get_tracer("my_tracer")	
  5. Create your OutboundMessage to publish, and use your tracer instance to create and start a span with the start_as_current_span() function. The span automatically ends when the code in the with block completes:

  6. outbound_message = message_builder.build('my message body', additional_message_properties=additional_properties)
    with tracer.start_as_current_span(f"{my_topic}_publish") as span:	
  7. Set span attributes with the set_attribute() function, which lets you attach additional context and meta-data to a span in the form of key-value pairs:
  8.     span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+")
        span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value)
        span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, my_topic)
        span.set_attribute(SpanAttributes.MESSAGING_OPERATION, "publish")
        span.set_attribute(SpanAttributes.MESSAGING_PROTOCOL, "SMF")
        span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, outbound_message.get_message_id())
  9. (Optional) Attach baggage to the span with the set_baggage() function. The attach() function implicitly retrieves the current context if you do not explicitly provide context. This function uses key-value pairs:
  10.     BAGGAGE_KEY = "my_key"
        BAGGAGE_VALUE = "my_value"
        context.attach(baggage.set_baggage(BAGGAGE_KEY, BAGGAGE_VALUE))	
  11. Create an OutboundMessageCarrier instance and pass in your outbound message object. Then use the inject() function to inject context into your message:
  12.     carrier = OutboundMessageCarrier(outbound_message)
        propagator.inject(carrier=carrier, setter=default_setter)			
  13. Publish the message:
  14.     try:
            direct_publisher.publish(
                destination=Topic.of("my/sample/topic"),
                message=outbound_message)
            span.set_status(Status(StatusCode.OK))
        except Exception as ex:
            span.set_status(Status(StatusCode.ERROR))
            span.record_exception(ex)		

In the code snippets shown above, the span ends when the code in the with block completes.If you do not use a with code block as demonstrated in step 3, you must explicitly call span.end() to end the span.

Generating a Receive Span on Message Receive

Your consuming application can generate a receive span and then export it to the OpenTelemetry Collector. To extract tracing context from a message and generate a receive span for a received message, perform these steps:

  1. Create an instance of an InboundMessageGetter, which enables the API to extract propagated fields from a Solace InboundMessageCarrier.
  2. default_getter = InboundMessageGetter()
    
  3. Create a MessageHandler to receive messages. In your handler, use the get_tracer() function to create a tracer instance. A tracer instance allows your application to create, start, and end spans. Then, create a propagator instance with the get_global_textmap() function. This function returns a global text map propagator which makes the TraceContext and Baggage structures available across your application.
  4. class MessageHandlerImpl(MessageHandler):
        def on_message(self, message: 'InboundMessage'):
            tracer = trace.get_tracer("my.tracer")
            propagator = propagate.get_global_textmap()		
  5. Create an InboundMessageCarrier instance and pass in your inbound message object. Then use the extract() function to extract context from the received message:
  6. carrier = InboundMessageCarrier(message)
    extracted_ctx = propagator.extract(carrier=carrier, getter=default_getter)			
  7. (Optional) Extract baggage from the span:
  8. baggage_entries = baggage.get_all(extracted_ctx) 
  9. Use the attach() function to link the extracted trace context to the current context. This allows you to propagate the trace context across distributed components in your application. Assign this to a token instance, which you use later to detach the trace context from the current context when the receive message operation is complete. This keeps changes made to the current context isolated to each individual message receive operation, which means subsequent operations are not affected by the trace context of earlier operations.
  10. token = context.attach(extracted_ctx)
  11. Use your tracer instance to create and start a span with the start_as_current_span() function. Create a new span for every message. The span automatically ends when the code in the with block completes:
  12. try:			
        with tracer.start_as_current_span("{topicName} process".format(topicName=message.get_destination_name())) as span:
  13. Set span attributes with the set_attribute() function, which lets you attach additional context and meta-data to a span in the form of key-value pairs:
  14.         span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+")
            span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value)
            span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, message.get_destination_name())
            span.set_attribute(SpanAttributes.MESSAGING_OPERATION, MessagingOperationValues.PROCESS.value)
            span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message.get_application_message_id())
    
  15. Process the received message:
  16.     try:
            # Process received messages here
            span.set_status(Status(StatusCode.OK))
        except Exception as ex:
            span.set_status(Status(StatusCode.ERROR))
            span.record_exception(ex)
  17. Call the detach() function to end the current context, which marks the end of the scope for this step in the tracing process. detach() restores the context to its state before the call to attach() in step 5, which means that any modifications made between attach() and detach() are no longer part of the active context. This allows subsequent operations to proceed without being affected by changes made during this period.
  18.     finally:
            context.detach(token)
    

In the code snippets shown above, the span ends when the code in the with block completes.If you do not use a with code block as demonstrated in step 6, you must explicitly call span.end() to end the span.