Context Propagation for Distributed Tracing in the Python API
Distributed tracing allows your enterprise applications to trace the flow of messages as they travel from your publisher, through the event mesh and to the receiving application. For a detailed overview see Distributed Tracing . For information about version requirements, see Distributed Tracing Version Compatibility.
- The PubSub+ OpenTelemetry API Libraries support W3C propagators only.
- For information about configuring OpenTelemetry SDK environment variables see OpenTelemetry SDK Configuration.
- By default, traces include command line parameters visible to backend applications like Jaeger. Solace recommends disabling this feature for security purposes because these parameters may contain sensitive information such as your user name and password. For instructions, see Disabling Automatic Resource Providers in the OpenTelemetry documentation in GitHub.
Instrumenting Python for Distributed Tracing
Manual Instrumentation involves making changes to your enterprise application's source code, and allows you to inject and extract additional context, such as baggage and trace states, into and from messages. Context propagation makes it easy to debug and optimize your application. For more information about context propagation in Solace event messages, see Distributed Tracing Context Propagation. The following examples show you how to create spans using the OpenTelemetry API.
Understanding How Context Propagation Enables Distributed Tracing in the Python PubSub+ API
In your client application, you can use the OpenTelemetry API to create a span, which contains metadata about an operation in a distributed system. This span is associated with a context, which includes a unique TraceID
. Next, when you use a PubSub+ message producer to publish a message, the Solace OTel integration package injects the context, which contains the TraceID
, into the message. As the message travels through the event broker and is received by a consuming application, spans are generated at each step and have the same TraceID
present in the original message context. When each span is closed in the publishing or consuming application, the Python OpenTelemetry API sends it to an OpenTelemetry collector, which collects, processes and exports the spans to a backend application that correlates the spans using their unique TraceID
. A backend application uses the correlated spans to create a trace, which is an end-to-end snapshot detailing how the message traveled through the distributed system. If you do not use context propagation, then backend applications cannot use a unique TraceID to link the spans, making it difficult to trace the flow of messages through the distributed system.
Dependencies
To enable context propagation for distributed tracing, you must first add the Solace PubSub+ OpenTelemetry Python Integration package as a dependency in your application. You can also add this package with the following command:
pip install pubsubplus-opentelemetry-integration
Then add the OpenTelemetry API and SDK libraries required for context propagation with the following commands:
pip install opentelemetry-api pip install opentelemetry-sdk
InboundMessageCarrier
—OpenTelemetry message carrier and Solace message wrapper for inbound messagesInboundMessageGetter
—enables the API to extract propagated fields from anInboundMessageCarrier
OutboundMessageCarrier
—OpenTelemetry message carrier and Solace message wrapper for outbound messagesOutboundMessageSetter
—enables the API to inject propagated fields into anOutboundMessageCarrier
OutboundMessageGetter
—enables the API to extract propagated fields from anOutboundMessageCarrier
This guide presumes you are familiar with configuring an instance of the OpenTelemetry class. For instructions for configuring OpenTelemetry objects, see OpenTelemetry Manual Instrumentation in Python in the OpenTelemetry documentation.
To use context propagation in the Python PubSub+ API, include the following packages in your application:
from opentelemetry.trace import StatusCode, Status # Trace statuses
from opentelemetry import context # Context functionality
from opentelemetry import propagate # Trace context propagation
from opentelemetry import trace, baggage # Trace and baggage functionality
from opentelemetry.sdk.resources import Resource # Define resources in tracing
from opentelemetry.sdk.trace import TracerProvider # Trace functionality
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter # Span exporters for OTLP server using gRPC
from opentelemetry.sdk.trace.export import ( # Generic span exporters
BatchSpanProcessor,
ConsoleSpanExporter,
SimpleSpanProcessor
)
from opentelemetry.semconv.trace import ( # Standard semantic conventions for tracing
SpanAttributes,
MessagingDestinationKindValues,
MessagingOperationValues
)
from solace_otel.messaging.trace.propagation import ( # Solace message carriers for trace propagation
OutboundMessageCarrier,
OutboundMessageGetter,
OutboundMessageSetter,
InboundMessageCarrier,
InboundMessageGetter
)
Generating a Send Span on Message Publish
Your publishing application can generate a send span and export it to the OpenTelemetry Collector. To inject context into a message and generate a send span for a published message, perform these steps:
- Create an instance of an
OutboundMessageSetter
, which enables the API to inject propagated fields into a Solace outbound message carrier. Next, create a propagator instance with theget_global_textmap()
function. This function returns a global text map propagator which makes theTraceContext
andBaggage
structures available across your application. - Use the
get_tracer()
function to create a tracer instance. A tracer instance allows your application to create, start, and end spans. -
Create your
OutboundMessage
to publish, and use your tracer instance to create and start a span with thestart_as_current_span()
function. The span automatically ends when the code in thewith
block completes: - Set span attributes with the
set_attribute()
function, which lets you attach additional context and meta-data to a span in the form of key-value pairs: - (Optional) Attach baggage to the span with the
set_baggage()
function. Theattach()
function implicitly retrieves the current context if you do not explicitly provide context. This function uses key-value pairs: - Create an
OutboundMessageCarrier
instance and pass in your outbound message object. Then use theinject()
function to inject context into your message: - Publish the message:
default_setter = OutboundMessageSetter() propagator = propagate.get_global_textmap()
tracer = trace.get_tracer("my_tracer")
outbound_message = message_builder.build('my message body', additional_message_properties=additional_properties) with tracer.start_as_current_span(f"{my_topic}_publish") as span:
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value) span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, my_topic) span.set_attribute(SpanAttributes.MESSAGING_OPERATION, "publish") span.set_attribute(SpanAttributes.MESSAGING_PROTOCOL, "SMF") span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, outbound_message.get_message_id())
BAGGAGE_KEY = "my_key" BAGGAGE_VALUE = "my_value" context.attach(baggage.set_baggage(BAGGAGE_KEY, BAGGAGE_VALUE))
carrier = OutboundMessageCarrier(outbound_message) propagator.inject(carrier=carrier, setter=default_setter)
try: direct_publisher.publish( destination=Topic.of("my/sample/topic"), message=outbound_message) span.set_status(Status(StatusCode.OK)) except Exception as ex: span.set_status(Status(StatusCode.ERROR)) span.record_exception(ex)
In the code snippets shown above, the span ends when the code in the with
block completes.If you do not use a with
code block as demonstrated in step 3, you must explicitly call span.end()
to end the span.
Generating a Receive Span on Message Receive
Your consuming application can generate a receive span and then export it to the OpenTelemetry Collector. To extract tracing context from a message and generate a receive span for a received message, perform these steps:
- Create an instance of an
InboundMessageGetter
, which enables the API to extract propagated fields from a SolaceInboundMessageCarrier
. - Create a
MessageHandler
to receive messages. In your handler, use theget_tracer()
function to create a tracer instance. A tracer instance allows your application to create, start, and end spans. Then, create a propagator instance with theget_global_textmap()
function. This function returns a global text map propagator which makes theTraceContext
andBaggage
structures available across your application. - Create an
InboundMessageCarrier
instance and pass in your inbound message object. Then use theextract()
function to extract context from the received message: - (Optional) Extract baggage from the span:
- Use the
attach()
function to link the extracted trace context to the current context. This allows you to propagate the trace context across distributed components in your application. Assign this to a token instance, which you use later to detach the trace context from the current context when the receive message operation is complete. This keeps changes made to the current context isolated to each individual message receive operation, which means subsequent operations are not affected by the trace context of earlier operations. - Use your tracer instance to create and start a span with the
start_as_current_span()
function. Create a new span for every message. The span automatically ends when the code in thewith
block completes: - Set span attributes with the
set_attribute()
function, which lets you attach additional context and meta-data to a span in the form of key-value pairs: - Process the received message:
-
Call the
detach()
function to end the current context, which marks the end of the scope for this step in the tracing process.detach()
restores the context to its state before the call toattach()
in step 5, which means that any modifications made betweenattach()
anddetach()
are no longer part of the active context. This allows subsequent operations to proceed without being affected by changes made during this period.
default_getter = InboundMessageGetter()
class MessageHandlerImpl(MessageHandler): def on_message(self, message: 'InboundMessage'): tracer = trace.get_tracer("my.tracer") propagator = propagate.get_global_textmap()
carrier = InboundMessageCarrier(message) extracted_ctx = propagator.extract(carrier=carrier, getter=default_getter)
baggage_entries = baggage.get_all(extracted_ctx)
token = context.attach(extracted_ctx)
try: with tracer.start_as_current_span("{topicName} process".format(topicName=message.get_destination_name())) as span:
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "PubSub+") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC.value) span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, message.get_destination_name()) span.set_attribute(SpanAttributes.MESSAGING_OPERATION, MessagingOperationValues.PROCESS.value) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message.get_application_message_id())
try: # Process received messages here span.set_status(Status(StatusCode.OK)) except Exception as ex: span.set_status(Status(StatusCode.ERROR)) span.record_exception(ex)
finally: context.detach(token)
In the code snippets shown above, the span ends when the code in the with
block completes.If you do not use a with
code block as demonstrated in step 6, you must explicitly call span.end()
to end the span.