Source code for tracing.spans

"""
AdalFlow spans module with OpenAI Agents SDK compatible interface.

This module provides span implementations for AdalFlow tracing that follow
the OpenAI Agents SDK patterns for maximum compatibility with existing
observability backends.

References:
- OpenAI Agents SDK: https://github.com/openai/openai-agents-python/blob/main/src/agents/tracing/spans.py
"""

from __future__ import annotations

import abc
import contextvars
import logging
from typing import Any, Generic, TypeVar, Optional, Dict

from typing_extensions import TypedDict

logger = logging.getLogger(__name__)

from . import util
from .processor_interface import TracingProcessor
from .span_data import SpanData

TSpanData = TypeVar("TSpanData", bound=SpanData)


[docs] class SpanError(TypedDict): message: str data: Optional[Dict[str, Any]]
[docs] class Span(abc.ABC, Generic[TSpanData]): @property @abc.abstractmethod def trace_id(self) -> str: pass @property @abc.abstractmethod def span_id(self) -> str: pass @property @abc.abstractmethod def span_data(self) -> TSpanData: pass
[docs] @abc.abstractmethod def start(self, mark_as_current: bool = False): """ Start the span. Args: mark_as_current: If true, the span will be marked as the current span. """ pass
[docs] @abc.abstractmethod def finish(self, reset_current: bool = False) -> None: """ Finish the span. Args: reset_current: If true, the span will be reset as the current span. """ pass
@abc.abstractmethod def __enter__(self) -> Span[TSpanData]: pass @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): pass @property @abc.abstractmethod def parent_id(self) -> Optional[str]: pass
[docs] @abc.abstractmethod def set_error(self, error: SpanError) -> None: pass
@property @abc.abstractmethod def error(self) -> Optional[SpanError]: pass
[docs] @abc.abstractmethod def export(self) -> Optional[Dict[str, Any]]: pass
@property @abc.abstractmethod def started_at(self) -> Optional[str]: pass @property @abc.abstractmethod def ended_at(self) -> Optional[str]: pass
[docs] class NoOpSpan(Span[TSpanData]): __slots__ = ("_span_data", "_prev_span_token") def __init__(self, span_data: TSpanData): self._span_data = span_data self._prev_span_token: Optional[ contextvars.Token[Optional[Span[TSpanData]]] ] = None @property def trace_id(self) -> str: return "no-op" @property def span_id(self) -> str: return "no-op" @property def span_data(self) -> TSpanData: return self._span_data @property def parent_id(self) -> Optional[str]: return None
[docs] def start(self, mark_as_current: bool = False): from .scope import Scope if mark_as_current: self._prev_span_token = Scope.set_current_span(self)
[docs] def finish(self, reset_current: bool = False) -> None: from .scope import Scope if reset_current and self._prev_span_token is not None: Scope.reset_current_span(self._prev_span_token) self._prev_span_token = None
def __enter__(self) -> Span[TSpanData]: self.start(mark_as_current=True) return self def __exit__(self, exc_type, exc_val, exc_tb): reset_current = True if exc_type is GeneratorExit: logger.debug("GeneratorExit, skipping span reset") reset_current = False self.finish(reset_current=reset_current)
[docs] def set_error(self, error: SpanError) -> None: pass
@property def error(self) -> Optional[SpanError]: return None
[docs] def export(self) -> Optional[Dict[str, Any]]: return None
@property def started_at(self) -> Optional[str]: return None @property def ended_at(self) -> Optional[str]: return None
[docs] class SpanImpl(Span[TSpanData]): __slots__ = ( "_trace_id", "_span_id", "_parent_id", "_started_at", "_ended_at", "_error", "_prev_span_token", "_processor", "_span_data", ) def __init__( self, trace_id: str, span_id: Optional[str], parent_id: Optional[str], processor: TracingProcessor, span_data: TSpanData, ): self._trace_id = trace_id self._span_id = span_id or util.gen_span_id() self._parent_id = parent_id self._started_at: Optional[str] = None self._ended_at: Optional[str] = None self._processor = processor self._error: Optional[SpanError] = None self._prev_span_token: Optional[ contextvars.Token[Optional[Span[TSpanData]]] ] = None self._span_data = span_data @property def trace_id(self) -> str: return self._trace_id @property def span_id(self) -> str: return self._span_id @property def span_data(self) -> TSpanData: return self._span_data @property def parent_id(self) -> Optional[str]: return self._parent_id
[docs] def start(self, mark_as_current: bool = False): from .scope import Scope if self.started_at is not None: logger.warning("Span already started") return self._started_at = util.time_iso() self._processor.on_span_start(self) if mark_as_current: self._prev_span_token = Scope.set_current_span(self)
[docs] def finish(self, reset_current: bool = False) -> None: from .scope import Scope if self.ended_at is not None: logger.warning("Span already finished") return self._ended_at = util.time_iso() self._processor.on_span_end(self) if reset_current and self._prev_span_token is not None: Scope.reset_current_span(self._prev_span_token) self._prev_span_token = None
def __enter__(self) -> Span[TSpanData]: self.start(mark_as_current=True) return self def __exit__(self, exc_type, exc_val, exc_tb): reset_current = True if exc_type is GeneratorExit: logger.debug("GeneratorExit, skipping span reset") reset_current = False self.finish(reset_current=reset_current)
[docs] def set_error(self, error: SpanError) -> None: self._error = error
@property def error(self) -> Optional[SpanError]: return self._error @property def started_at(self) -> Optional[str]: return self._started_at @property def ended_at(self) -> Optional[str]: return self._ended_at
[docs] def export(self) -> Optional[Dict[str, Any]]: return { "object": "trace.span", "id": self.span_id, "trace_id": self.trace_id, "parent_id": self._parent_id, "started_at": self._started_at, "ended_at": self._ended_at, "span_data": self.span_data.export(), "error": self._error, }