Spaces:
Running
Running
| """ | |
| OpenTelemetry tracing infrastructure for multi-agent MCTS framework. | |
| Provides: | |
| - OpenTelemetry SDK integration | |
| - Automatic span creation for key operations | |
| - Trace context propagation | |
| - OTLP exporter configuration from environment | |
| - Custom attributes for MCTS metrics | |
| - httpx instrumentation for LLM calls | |
| """ | |
| import functools | |
| import os | |
| from contextlib import asynccontextmanager, contextmanager | |
| from typing import Any, Optional | |
| from opentelemetry import trace | |
| from opentelemetry.context import Context | |
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
| from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor | |
| from opentelemetry.propagate import extract, inject | |
| from opentelemetry.sdk.resources import SERVICE_NAME, Resource | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import ( | |
| BatchSpanProcessor, | |
| ConsoleSpanExporter, | |
| SimpleSpanProcessor, | |
| ) | |
| from opentelemetry.trace import Span, SpanKind, Status, StatusCode | |
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator | |
| from .logging import get_correlation_id | |
| class TracingManager: | |
| """ | |
| Manages OpenTelemetry tracing configuration and lifecycle. | |
| Environment Variables: | |
| OTEL_EXPORTER_OTLP_ENDPOINT: OTLP collector endpoint (default: localhost:4317) | |
| OTEL_SERVICE_NAME: Service name for traces (default: mcts-framework) | |
| OTEL_EXPORTER_TYPE: Exporter type (otlp, console, none) (default: otlp) | |
| OTEL_TRACE_SAMPLE_RATE: Sampling rate 0.0-1.0 (default: 1.0) | |
| """ | |
| _instance: Optional["TracingManager"] = None | |
| _provider: TracerProvider | None = None | |
| def __init__(self): | |
| self._initialized = False | |
| self._httpx_instrumented = False | |
| def get_instance(cls) -> "TracingManager": | |
| """Get singleton instance of TracingManager.""" | |
| if cls._instance is None: | |
| cls._instance = cls() | |
| return cls._instance | |
| def initialize( | |
| self, | |
| service_name: str | None = None, | |
| otlp_endpoint: str | None = None, | |
| exporter_type: str | None = None, | |
| additional_resources: dict[str, str] | None = None, | |
| ) -> None: | |
| """ | |
| Initialize OpenTelemetry tracing. | |
| Args: | |
| service_name: Service name for traces | |
| otlp_endpoint: OTLP collector endpoint | |
| exporter_type: Type of exporter (otlp, console, none) | |
| additional_resources: Additional resource attributes | |
| """ | |
| if self._initialized: | |
| return | |
| # Get configuration from environment or parameters | |
| service_name = service_name or os.environ.get("OTEL_SERVICE_NAME", "mcts-framework") | |
| otlp_endpoint = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") | |
| exporter_type = exporter_type or os.environ.get("OTEL_EXPORTER_TYPE", "otlp") | |
| # Build resource attributes | |
| resource_attrs = { | |
| SERVICE_NAME: service_name, | |
| "service.version": os.environ.get("SERVICE_VERSION", "0.1.0"), | |
| "deployment.environment": os.environ.get("ENVIRONMENT", "development"), | |
| } | |
| if additional_resources: | |
| resource_attrs.update(additional_resources) | |
| resource = Resource.create(resource_attrs) | |
| # Create tracer provider | |
| self._provider = TracerProvider(resource=resource) | |
| # Configure exporter based on type | |
| if exporter_type.lower() == "otlp": | |
| exporter = OTLPSpanExporter( | |
| endpoint=otlp_endpoint, | |
| insecure=os.environ.get("OTEL_EXPORTER_OTLP_INSECURE", "true").lower() == "true", | |
| ) | |
| processor = BatchSpanProcessor(exporter) | |
| elif exporter_type.lower() == "console": | |
| exporter = ConsoleSpanExporter() | |
| processor = SimpleSpanProcessor(exporter) | |
| elif exporter_type.lower() == "none": | |
| processor = None | |
| else: | |
| raise ValueError(f"Unknown exporter type: {exporter_type}") | |
| if processor: | |
| self._provider.add_span_processor(processor) | |
| # Set as global provider | |
| trace.set_tracer_provider(self._provider) | |
| # Instrument httpx for LLM calls | |
| self._instrument_httpx() | |
| self._initialized = True | |
| def _instrument_httpx(self) -> None: | |
| """Instrument httpx client for automatic tracing of HTTP requests.""" | |
| if self._httpx_instrumented: | |
| return | |
| try: | |
| HTTPXClientInstrumentor().instrument() | |
| self._httpx_instrumented = True | |
| except Exception: | |
| # httpx instrumentation is optional | |
| pass | |
| def shutdown(self) -> None: | |
| """Shutdown tracing provider.""" | |
| if self._provider: | |
| self._provider.shutdown() | |
| self._initialized = False | |
| def get_tracer(self, name: str = "mcts-framework") -> trace.Tracer: | |
| """Get a tracer instance.""" | |
| if not self._initialized: | |
| self.initialize() | |
| return trace.get_tracer(name) | |
| def get_tracer(name: str = "mcts-framework") -> trace.Tracer: | |
| """Get a tracer instance from the global TracingManager.""" | |
| return TracingManager.get_instance().get_tracer(name) | |
| def add_mcts_attributes(span: Span, **attributes: Any) -> None: | |
| """ | |
| Add MCTS-specific attributes to a span. | |
| Common attributes: | |
| - mcts.iteration: Current MCTS iteration number | |
| - mcts.node_visits: Number of visits to current node | |
| - mcts.node_value: Value of current node | |
| - mcts.ucb_score: UCB score for selection | |
| - mcts.exploration_weight: Exploration weight parameter | |
| - mcts.tree_depth: Current depth in tree | |
| - agent.name: Name of the agent | |
| - agent.confidence: Agent confidence score | |
| """ | |
| for key, value in attributes.items(): | |
| if value is not None: | |
| # Prefix non-standard attributes | |
| if not key.startswith(("mcts.", "agent.", "framework.")): | |
| key = f"custom.{key}" | |
| span.set_attribute(key, value) | |
| def trace_span( | |
| name: str, | |
| kind: SpanKind = SpanKind.INTERNAL, | |
| attributes: dict[str, Any] | None = None, | |
| record_exception: bool = True, | |
| set_status_on_exception: bool = True, | |
| ): | |
| """ | |
| Context manager for creating a traced span. | |
| Args: | |
| name: Name of the span | |
| kind: Span kind (INTERNAL, CLIENT, SERVER, PRODUCER, CONSUMER) | |
| attributes: Initial attributes for the span | |
| record_exception: Record exceptions as span events | |
| set_status_on_exception: Set span status to ERROR on exception | |
| Example: | |
| with trace_span("mcts.selection", attributes={"mcts.iteration": 5}) as span: | |
| # Perform selection | |
| span.set_attribute("mcts.selected_node", node_id) | |
| """ | |
| tracer = get_tracer() | |
| with tracer.start_as_current_span( | |
| name, | |
| kind=kind, | |
| attributes=attributes or {}, | |
| record_exception=record_exception, | |
| set_status_on_exception=set_status_on_exception, | |
| ) as span: | |
| # Add correlation ID as attribute | |
| span.set_attribute("correlation_id", get_correlation_id()) | |
| yield span | |
| async def async_trace_span( | |
| name: str, | |
| kind: SpanKind = SpanKind.INTERNAL, | |
| attributes: dict[str, Any] | None = None, | |
| record_exception: bool = True, | |
| set_status_on_exception: bool = True, | |
| ): | |
| """ | |
| Async context manager for creating a traced span. | |
| Same as trace_span but for async contexts. | |
| """ | |
| tracer = get_tracer() | |
| with tracer.start_as_current_span( | |
| name, | |
| kind=kind, | |
| attributes=attributes or {}, | |
| record_exception=record_exception, | |
| set_status_on_exception=set_status_on_exception, | |
| ) as span: | |
| # Add correlation ID as attribute | |
| span.set_attribute("correlation_id", get_correlation_id()) | |
| yield span | |
| def trace_operation( | |
| name: str | None = None, | |
| kind: SpanKind = SpanKind.INTERNAL, | |
| attributes: dict[str, Any] | None = None, | |
| ): | |
| """ | |
| Decorator for tracing function execution. | |
| Args: | |
| name: Span name (defaults to function name) | |
| kind: Span kind | |
| attributes: Additional attributes | |
| Example: | |
| @trace_operation(attributes={"component": "mcts"}) | |
| async def select_best_child(node): | |
| ... | |
| """ | |
| def decorator(func): | |
| span_name = name or f"{func.__module__}.{func.__name__}" | |
| def sync_wrapper(*args, **kwargs): | |
| with trace_span(span_name, kind=kind, attributes=attributes) as span: | |
| # Add function arguments as attributes (limited) | |
| span.set_attribute("function.args_count", len(args)) | |
| span.set_attribute("function.kwargs_count", len(kwargs)) | |
| result = func(*args, **kwargs) | |
| # Mark as successful | |
| span.set_status(Status(StatusCode.OK)) | |
| return result | |
| async def async_wrapper(*args, **kwargs): | |
| async with async_trace_span(span_name, kind=kind, attributes=attributes) as span: | |
| # Add function arguments as attributes (limited) | |
| span.set_attribute("function.args_count", len(args)) | |
| span.set_attribute("function.kwargs_count", len(kwargs)) | |
| result = await func(*args, **kwargs) | |
| # Mark as successful | |
| span.set_status(Status(StatusCode.OK)) | |
| return result | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| return sync_wrapper | |
| return decorator | |
| class SpanContextPropagator: | |
| """ | |
| Utility for propagating trace context across service boundaries. | |
| Example: | |
| # Inject context into headers | |
| headers = {} | |
| propagator = SpanContextPropagator() | |
| propagator.inject(headers) | |
| # Extract context from headers | |
| context = propagator.extract(headers) | |
| with trace_span("operation", context=context): | |
| ... | |
| """ | |
| def __init__(self): | |
| self._propagator = TraceContextTextMapPropagator() | |
| def inject(self, carrier: dict[str, str], context: Context | None = None) -> None: | |
| """Inject trace context into a carrier (e.g., HTTP headers).""" | |
| inject(carrier, context=context) | |
| def extract(self, carrier: dict[str, str]) -> Context: | |
| """Extract trace context from a carrier.""" | |
| return extract(carrier) | |
| def get_trace_parent(self) -> str | None: | |
| """Get the traceparent header value for the current span.""" | |
| carrier = {} | |
| self.inject(carrier) | |
| return carrier.get("traceparent") | |
| def record_mcts_iteration( | |
| iteration: int, | |
| selected_node_id: str, | |
| ucb_score: float, | |
| node_visits: int, | |
| node_value: float, | |
| tree_depth: int, | |
| ) -> None: | |
| """ | |
| Record MCTS iteration as a span event. | |
| Call this within an active span to add iteration details. | |
| """ | |
| current_span = trace.get_current_span() | |
| if current_span: | |
| current_span.add_event( | |
| "mcts.iteration", | |
| attributes={ | |
| "mcts.iteration": iteration, | |
| "mcts.selected_node_id": selected_node_id, | |
| "mcts.ucb_score": ucb_score, | |
| "mcts.node_visits": node_visits, | |
| "mcts.node_value": node_value, | |
| "mcts.tree_depth": tree_depth, | |
| }, | |
| ) | |
| def record_agent_execution( | |
| agent_name: str, | |
| confidence: float, | |
| execution_time_ms: float, | |
| success: bool, | |
| error: str | None = None, | |
| ) -> None: | |
| """ | |
| Record agent execution as a span event. | |
| Call this within an active span to add agent execution details. | |
| """ | |
| current_span = trace.get_current_span() | |
| if current_span: | |
| attrs = { | |
| "agent.name": agent_name, | |
| "agent.confidence": confidence, | |
| "agent.execution_time_ms": execution_time_ms, | |
| "agent.success": success, | |
| } | |
| if error: | |
| attrs["agent.error"] = error | |
| current_span.add_event("agent.execution", attributes=attrs) | |
| # Import asyncio for decorator | |
| import asyncio # noqa: E402 | |