Source code for tracing.mlflow_integration

"""MLflow integration helpers for AdalFlow tracing."""

import logging
import os
from pathlib import Path

from adalflow.utils import get_adalflow_default_root_path

log = logging.getLogger(__name__)

# Check if MLflow is available
try:
    # Do NOT set ADALFLOW_DISABLE_TRACING here - it should be set before imports
    import mlflow
    from mlflow.openai._agent_tracer import MlflowOpenAgentTracingProcessor

    MLFLOW_AVAILABLE = True
except ImportError:
    MLFLOW_AVAILABLE = False
    log.warning("MLflow not available. Install with: pip install mlflow")


from .setup import GLOBAL_TRACE_PROVIDER

# ═══════════════════════════════════════════════════════════════════════════════
# Convenience functions for managing trace processors
# ═══════════════════════════════════════════════════════════════════════════════


[docs] def set_trace_processors(processors): """Set the global trace processors.""" GLOBAL_TRACE_PROVIDER.set_processors(processors)
[docs] def enable_mlflow_local( tracking_uri: str = None, experiment_name: str = "AdalFlow-Agent-Experiment", project_name: str = "AdalFlow-Agent-Project", port: int = 8080, ) -> bool: """Enable MLflow local tracing without auto-starting server. This function sets up MLflow tracing but does NOT automatically start an MLflow server. You need to have an MLflow server already running. To start an MLflow server: ```bash mlflow server --host 127.0.0.1 --port 8080 ``` Args: tracking_uri: MLflow tracking server URI. If None, defaults to http://localhost:{port} experiment_name: Name of the MLflow experiment to create/use project_name: Project name for the MLflow tracing processor port: Port for the default tracking URI if tracking_uri is None Returns: bool: True if MLflow was successfully enabled, False otherwise Example: >>> from adalflow.tracing import enable_mlflow_local >>> # Use with existing MLflow server on default port >>> enable_mlflow_local() >>> # Or with custom tracking URI: >>> enable_mlflow_local(tracking_uri="http://localhost:5000") """ if not MLFLOW_AVAILABLE: log.error("MLflow is not installed. Cannot enable MLflow tracing.") return False try: # Set the environment variable BEFORE accessing the provider # This ensures the TraceProvider is created with tracing enabled os.environ["ADALFLOW_DISABLE_TRACING"] = "False" # Now access the provider - it will be created with tracing enabled GLOBAL_TRACE_PROVIDER.set_disabled(False) # Determine tracking URI if tracking_uri is None: # Default to localhost with specified port tracking_uri = f"http://localhost:{port}" # Set MLflow tracking URI if tracking_uri: mlflow.set_tracking_uri(tracking_uri) log.info(f"MLflow tracking URI set to: {tracking_uri}") # Create or set the experiment experiment = mlflow.set_experiment(experiment_name) log.info( f"MLflow experiment set: {experiment_name} (ID: {experiment.experiment_id})" ) # Create MLflow processor mlflow_processor = MlflowOpenAgentTracingProcessor(project_name=project_name) log.info(f"MLflow processor created for project: {project_name}") # Replace all existing processors with MLflow set_trace_processors([mlflow_processor]) log.info( f"✅ MLflow tracing enabled: tracking_uri={tracking_uri}, " f"experiment={experiment_name}, project={project_name}" ) return True except Exception as e: log.error(f"Failed to enable MLflow tracing: {e}") return False
[docs] def enable_mlflow_local_with_server( tracking_uri: str = None, experiment_name: str = "AdalFlow-Agent-Experiment", project_name: str = "AdalFlow-Agent-Project", use_adalflow_dir: bool = True, port: int = 8080, kill_existing_server: bool = False, ) -> bool: """Enable MLflow local tracing with auto-starting server. This function sets up MLflow tracing and automatically starts an MLflow server if needed. Args: tracking_uri: MLflow tracking server URI. If None, will auto-start server experiment_name: Name of the MLflow experiment to create/use project_name: Project name for the MLflow tracing processor use_adalflow_dir: If True and server fails to start, use ~/.adalflow/mlruns as fallback port: Port to run MLflow server on kill_existing_server: If True, kill any existing MLflow server on the port before starting Returns: bool: True if MLflow was successfully enabled, False otherwise Example: >>> from adalflow.tracing import enable_mlflow_local_with_server >>> # Auto-start server on port 8080 >>> enable_mlflow_local_with_server() >>> # Or kill existing and start fresh: >>> enable_mlflow_local_with_server(kill_existing_server=True) >>> # Or use custom port: >>> enable_mlflow_local_with_server(port=5000) """ if not MLFLOW_AVAILABLE: log.error("MLflow is not installed. Cannot enable MLflow tracing.") return False try: # Set the environment variable BEFORE accessing the provider # This ensures the TraceProvider is created with tracing enabled os.environ["ADALFLOW_DISABLE_TRACING"] = "False" # Now access the provider - it will be created with tracing enabled GLOBAL_TRACE_PROVIDER.set_disabled(False) # Determine if we need to start a server if tracking_uri is None: # Try to start MLflow server with AdalFlow backend if start_mlflow_server(port=port, kill_existing=kill_existing_server): tracking_uri = f"http://localhost:{port}" log.info(f"Successfully started MLflow server on port {port}") else: log.warning( "Failed to auto-start MLflow server, falling back to file backend" ) if use_adalflow_dir: # Use adalflow root dir/mlruns for local file storage adalflow_dir = Path(get_adalflow_default_root_path()) / "mlruns" adalflow_dir.mkdir(parents=True, exist_ok=True) tracking_uri = str(adalflow_dir.absolute()) log.info( f"Using AdalFlow directory for MLflow artifacts: {adalflow_dir}" ) else: # Still try to use the localhost URL in case server is running externally tracking_uri = f"http://localhost:{port}" # Set MLflow tracking URI if tracking_uri: mlflow.set_tracking_uri(tracking_uri) log.info(f"MLflow tracking URI set to: {tracking_uri}") # Create or set the experiment experiment = mlflow.set_experiment(experiment_name) log.info( f"MLflow experiment set: {experiment_name} (ID: {experiment.experiment_id})" ) # Create MLflow processor mlflow_processor = MlflowOpenAgentTracingProcessor(project_name=project_name) log.info(f"MLflow processor created for project: {project_name}") # Replace all existing processors with MLflow set_trace_processors([mlflow_processor]) log.info( f"✅ MLflow tracing enabled with server: tracking_uri={tracking_uri}, " f"experiment={experiment_name}, project={project_name}" ) return True except Exception as e: log.error(f"Failed to enable MLflow tracing with server: {e}") return False
[docs] def get_mlflow_server_command(host: str = "0.0.0.0", port: int = 8080) -> str: """Get the MLflow server command with AdalFlow backend store. Args: host: Host to bind the server to (default: "0.0.0.0") port: Port to run the server on (default: 8080) Returns: str: The MLflow server command to run Example: >>> from adalflow.tracing import get_mlflow_server_command >>> print(get_mlflow_server_command()) mlflow server --backend-store-uri file:///Users/yourname/.adalflow/mlruns --host 0.0.0.0 --port 8080 """ adalflow_dir = Path(get_adalflow_default_root_path()) / "mlruns" adalflow_dir.mkdir(parents=True, exist_ok=True) # Use absolute path without file:// prefix for the command backend_path = str(adalflow_dir.absolute()) return ( f"mlflow server --backend-store-uri {backend_path} --host {host} --port {port}" )
[docs] def start_mlflow_server( host: str = "0.0.0.0", port: int = 8080, wait: bool = True, kill_existing: bool = False, ) -> bool: """Start MLflow server with AdalFlow backend store. Args: host: Host to bind the server to (default: "0.0.0.0") port: Port to run the server on (default: 8080) wait: If True, wait for server to be ready before returning kill_existing: If True, kill any existing process on the port before starting Returns: bool: True if server started successfully, False otherwise Example: >>> from adalflow.tracing import start_mlflow_server >>> start_mlflow_server() # Starts server on http://0.0.0.0:8080 >>> # Or with custom settings: >>> start_mlflow_server(port=5000) """ if not MLFLOW_AVAILABLE: log.error("MLflow is not installed. Cannot start MLflow server.") return False import subprocess import time import urllib.request import signal try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False # Check if server is already running try: with urllib.request.urlopen(f"http://localhost:{port}", timeout=1) as response: if response.status == 200: if kill_existing: log.info(f"Killing existing MLflow server on port {port}") # Find and kill processes using the port if PSUTIL_AVAILABLE: for proc in psutil.process_iter(["pid", "name", "cmdline"]): try: cmdline = proc.info.get("cmdline") or [] if "mlflow" in " ".join(cmdline) and str( port ) in " ".join(cmdline): log.info(f"Killing MLflow process {proc.pid}") proc.kill() time.sleep(1) # Give it time to shut down except (psutil.NoSuchProcess, psutil.AccessDenied): continue else: # Alternative: use lsof on Unix-like systems try: result = subprocess.run( ["lsof", "-ti", f":{port}"], capture_output=True, text=True, ) if result.stdout: pids = result.stdout.strip().split("\n") for pid in pids: try: os.kill(int(pid), signal.SIGTERM) except (ProcessLookupError, ValueError, OSError): pass time.sleep(1) except Exception: log.warning("Could not kill existing server") else: log.info(f"MLflow server already running on port {port}") return True except Exception: pass # Get server command cmd = get_mlflow_server_command(host, port) try: # Start server in background log.info(f"Starting MLflow server: {cmd}") subprocess.Popen( cmd.split(), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) if wait: # Wait for server to be ready (max 10 seconds) for _ in range(20): time.sleep(0.5) try: with urllib.request.urlopen( f"http://localhost:{port}", timeout=1 ) as response: if response.status == 200: log.info( f"✅ MLflow server started successfully at http://{host}:{port}" ) return True except Exception: continue log.warning("MLflow server started but may not be ready yet") else: log.info(f"MLflow server starting in background at http://{host}:{port}") return True except Exception as e: log.error(f"Failed to start MLflow server: {e}") return False