"""
AdalFlow tracing setup and providers module with OpenAI Agents SDK compatibility.
This module provides the core tracing infrastructure for AdalFlow, including trace providers
and multi-processor management following OpenAI Agents SDK patterns.
References:
- OpenAI Agents SDK: https://github.com/openai/openai-agents-python/blob/main/src/agents/tracing/setup.py
"""
from __future__ import annotations
import logging
import os
import threading
from typing import Any, Optional, Dict, Union
from . import util
from .processor_interface import TracingProcessor
from .scope import Scope
from .spans import NoOpSpan, Span, SpanImpl, TSpanData
from .traces import NoOpTrace, Trace, TraceImpl
logger = logging.getLogger(__name__)
[docs]
class SynchronousMultiTracingProcessor(TracingProcessor):
"""
Forwards all calls to a list of TracingProcessors, in order of registration.
"""
def __init__(self):
# Using a tuple to avoid race conditions when iterating over processors
self._processors: tuple[TracingProcessor, ...] = ()
self._lock = threading.Lock()
[docs]
def add_tracing_processor(self, tracing_processor: TracingProcessor):
"""
Add a processor to the list of processors. Each processor will receive all traces/spans.
"""
with self._lock:
self._processors += (tracing_processor,)
[docs]
def set_processors(self, processors: list[TracingProcessor]):
"""
Set the list of processors. This will replace the current list of processors.
"""
with self._lock:
self._processors = tuple(processors)
[docs]
def on_trace_start(self, trace: Trace) -> None:
"""
Called when a trace is started.
"""
for processor in self._processors:
processor.on_trace_start(trace)
[docs]
def on_trace_end(self, trace: Trace) -> None:
"""
Called when a trace is finished.
"""
for processor in self._processors:
processor.on_trace_end(trace)
[docs]
def on_span_start(self, span: Span[Any]) -> None:
"""
Called when a span is started.
"""
for processor in self._processors:
processor.on_span_start(span)
[docs]
def on_span_end(self, span: Span[Any]) -> None:
"""
Called when a span is finished.
"""
for processor in self._processors:
processor.on_span_end(span)
[docs]
def shutdown(self) -> None:
"""
Called when the application stops.
"""
for processor in self._processors:
try:
logger.debug(f"Shutting down trace processor {processor}")
except (ValueError, OSError):
# Logging system may be closed during shutdown
pass
processor.shutdown()
[docs]
def force_flush(self):
"""
Force the processors to flush their buffers.
"""
for processor in self._processors:
processor.force_flush()
[docs]
class TraceProvider:
def __init__(self):
self._multi_processor = SynchronousMultiTracingProcessor()
# check if ADALFLOW_DISABLE_TRACING is set to true or 1
# if disabled the provider will just return a NoOp Trace and Span
self._disabled = os.environ.get("ADALFLOW_DISABLE_TRACING", "true").lower() in (
"true",
"1",
)
[docs]
def register_processor(self, processor: TracingProcessor):
"""
Add a processor to the list of processors. Each processor will receive all traces/spans.
"""
self._multi_processor.add_tracing_processor(processor)
[docs]
def set_processors(self, processors: list[TracingProcessor]):
"""
Set the list of processors. This will replace the current list of processors.
"""
self._multi_processor.set_processors(processors)
[docs]
def get_current_trace(self) -> Optional[Trace]:
"""
Returns the currently active trace, if any.
"""
return Scope.get_current_trace()
[docs]
def get_current_span(self) -> Optional[Span[Any]]:
"""
Returns the currently active span, if any.
"""
return Scope.get_current_span()
[docs]
def set_disabled(self, disabled: bool) -> None:
"""
Set whether tracing is disabled.
"""
self._disabled = disabled
[docs]
def create_trace(
self,
name: str,
trace_id: Optional[str] = None,
group_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
disabled: bool = False,
) -> Trace:
"""
Create a new trace.
"""
if self._disabled or disabled:
logger.debug(f"Tracing is disabled. Not creating trace {name}")
return NoOpTrace()
trace_id = trace_id or util.gen_trace_id()
logger.debug(f"Creating trace {name} with id {trace_id}")
return TraceImpl(
name=name,
trace_id=trace_id,
group_id=group_id,
metadata=metadata,
processor=self._multi_processor,
)
[docs]
def create_span(
self,
span_data: TSpanData,
span_id: Optional[str] = None,
parent: Optional[Union[Trace, Span[Any]]] = None,
disabled: bool = False,
) -> Span[TSpanData]:
"""
Create a new span.
"""
if self._disabled or disabled:
logger.debug(f"Tracing is disabled. Not creating span {span_data}")
return NoOpSpan(span_data)
if not parent:
current_span = Scope.get_current_span()
current_trace = Scope.get_current_trace()
if current_trace is None:
logger.error(
"No active trace. Make sure to start a trace with `trace()` first"
"Returning NoOpSpan."
)
return NoOpSpan(span_data)
elif isinstance(current_trace, NoOpTrace) or isinstance(
current_span, NoOpSpan
):
logger.debug(
f"Parent {current_span} or {current_trace} is no-op, returning NoOpSpan"
)
return NoOpSpan(span_data)
parent_id = current_span.span_id if current_span else None
trace_id = current_trace.trace_id
elif isinstance(parent, Trace):
if isinstance(parent, NoOpTrace):
logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
return NoOpSpan(span_data)
trace_id = parent.trace_id
parent_id = None
elif isinstance(parent, Span):
if isinstance(parent, NoOpSpan):
logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
return NoOpSpan(span_data)
parent_id = parent.span_id
trace_id = parent.trace_id
logger.debug(f"Creating span {span_data} with id {span_id}")
return SpanImpl(
trace_id=trace_id,
span_id=span_id,
parent_id=parent_id,
processor=self._multi_processor,
span_data=span_data,
)
[docs]
def shutdown(self) -> None:
if self._disabled:
return
try:
try:
logger.debug("Shutting down trace provider")
except (ValueError, OSError):
# Logging system may be closed during shutdown
pass
self._multi_processor.shutdown()
except Exception as e:
try:
logger.error(f"Error shutting down trace provider: {e}")
except (ValueError, OSError):
# Logging system may be closed during shutdown
pass
# Lazy initialization - provider will be created on first access
_GLOBAL_TRACE_PROVIDER = None
[docs]
def get_global_trace_provider():
"""Get the global trace provider, creating it if necessary."""
global _GLOBAL_TRACE_PROVIDER
if _GLOBAL_TRACE_PROVIDER is None:
_GLOBAL_TRACE_PROVIDER = TraceProvider()
return _GLOBAL_TRACE_PROVIDER
# For backward compatibility, create a property-like access
class _GlobalProviderProxy:
"""Proxy to provide backward-compatible access to GLOBAL_TRACE_PROVIDER."""
def __getattr__(self, name):
return getattr(get_global_trace_provider(), name)
def __setattr__(self, name, value):
return setattr(get_global_trace_provider(), name, value)
GLOBAL_TRACE_PROVIDER = _GlobalProviderProxy()