diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py new file mode 100644 index 00000000..0e4ae9fd --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/models/response.py @@ -0,0 +1,12 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from dataclasses import dataclass + + +@dataclass +class Response: + """Response details from agent execution.""" + + """The list of response messages from the agent.""" + messages: list[str] diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py index eb92c81f..90b3e84b 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py @@ -10,7 +10,14 @@ from typing import TYPE_CHECKING, Any from opentelemetry import baggage, context, trace -from opentelemetry.trace import Span, SpanKind, Status, StatusCode, Tracer, set_span_in_context +from opentelemetry.trace import ( + Span, + SpanKind, + Status, + StatusCode, + Tracer, + set_span_in_context, +) from .constants import ( ENABLE_A365_OBSERVABILITY, @@ -32,6 +39,7 @@ SOURCE_NAME, TENANT_ID_KEY, ) +from .utils import parse_parent_id_to_context if TYPE_CHECKING: from .agent_details import AgentDetails @@ -71,6 +79,7 @@ def __init__( activity_name: str, agent_details: "AgentDetails | None" = None, tenant_details: "TenantDetails | None" = None, + parent_id: str | None = None, ): """Initialize the OpenTelemetry scope. @@ -80,6 +89,8 @@ def __init__( activity_name: The name of the activity for display purposes agent_details: Optional agent details tenant_details: Optional tenant details + parent_id: Optional parent Activity ID used to link this span to an upstream + operation """ self._span: Span | None = None self._start_time = time.time() @@ -102,12 +113,13 @@ def __init__( elif kind.lower() == "consumer": activity_kind = SpanKind.CONSUMER - # Get current context for parent relationship - current_context = context.get_current() + # Get context for parent relationship + # If parent_id is provided, parse it and use it as the parent context + # Otherwise, use the current context + parent_context = parse_parent_id_to_context(parent_id) + span_context = parent_context if parent_context else context.get_current() - self._span = tracer.start_span( - activity_name, kind=activity_kind, context=current_context - ) + self._span = tracer.start_span(activity_name, kind=activity_kind, context=span_context) # Log span creation if self._span: diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/__init__.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py new file mode 100644 index 00000000..6f01c42c --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py @@ -0,0 +1,78 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from ..agent_details import AgentDetails +from ..constants import GEN_AI_OUTPUT_MESSAGES_KEY +from ..models.response import Response +from ..opentelemetry_scope import OpenTelemetryScope +from ..tenant_details import TenantDetails +from ..utils import safe_json_dumps + +OUTPUT_OPERATION_NAME = "output_messages" + + +class OutputScope(OpenTelemetryScope): + """Provides OpenTelemetry tracing scope for output messages.""" + + @staticmethod + def start( + agent_details: AgentDetails, + tenant_details: TenantDetails, + response: Response, + parent_id: str | None = None, + ) -> "OutputScope": + """Creates and starts a new scope for output tracing. + + Args: + agent_details: The details of the agent + tenant_details: The details of the tenant + response: The response details from the agent + parent_id: Optional parent Activity ID used to link this span to an upstream + operation + + Returns: + A new OutputScope instance + """ + return OutputScope(agent_details, tenant_details, response, parent_id) + + def __init__( + self, + agent_details: AgentDetails, + tenant_details: TenantDetails, + response: Response, + parent_id: str | None = None, + ): + """Initialize the output scope. + + Args: + agent_details: The details of the agent + tenant_details: The details of the tenant + response: The response details from the agent + parent_id: Optional parent Activity ID used to link this span to an upstream + operation + """ + super().__init__( + kind="Client", + operation_name=OUTPUT_OPERATION_NAME, + activity_name=(f"{OUTPUT_OPERATION_NAME} {agent_details.agent_id}"), + agent_details=agent_details, + tenant_details=tenant_details, + parent_id=parent_id, + ) + + # Initialize accumulated messages list + self._output_messages: list[str] = list(response.messages) + + # Set response messages + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages)) + + def record_output_messages(self, messages: list[str]) -> None: + """Records the output messages for telemetry tracking. + + Appends the provided messages to the accumulated output messages list. + + Args: + messages: List of output messages to append + """ + self._output_messages.extend(messages) + self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages)) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/utils.py index 73622815..571b95c5 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/utils.py @@ -13,11 +13,12 @@ from threading import RLock from typing import Any, Generic, TypeVar, cast +from opentelemetry import context from opentelemetry.semconv.attributes.exception_attributes import ( EXCEPTION_MESSAGE, EXCEPTION_STACKTRACE, ) -from opentelemetry.trace import Span +from opentelemetry.trace import NonRecordingSpan, Span, SpanContext, TraceFlags, set_span_in_context from opentelemetry.util.types import AttributeValue from wrapt import ObjectProxy @@ -27,6 +28,128 @@ logger.addHandler(logging.NullHandler()) +# W3C Trace Context constants +W3C_TRACE_CONTEXT_VERSION = "00" +W3C_TRACE_ID_LENGTH = 32 # 32 hex chars = 128 bits +W3C_SPAN_ID_LENGTH = 16 # 16 hex chars = 64 bits + + +def validate_w3c_trace_context_version(version: str) -> bool: + """Validate W3C Trace Context version. + + Args: + version: The version string to validate + + Returns: + True if valid, False otherwise + """ + return version == W3C_TRACE_CONTEXT_VERSION + + +def _is_valid_hex(hex_string: str) -> bool: + """Check if a string contains only valid hexadecimal characters. + + Args: + hex_string: The string to validate + + Returns: + True if all characters are valid hexadecimal (0-9, a-f, A-F), False otherwise + """ + return all(c in "0123456789abcdefABCDEF" for c in hex_string) + + +def validate_trace_id(trace_id_hex: str) -> bool: + """Validate W3C Trace Context trace_id format. + + Args: + trace_id_hex: The trace_id hex string to validate (should be 32 hex chars) + + Returns: + True if valid (32 hex chars), False otherwise + """ + return len(trace_id_hex) == W3C_TRACE_ID_LENGTH and _is_valid_hex(trace_id_hex) + + +def validate_span_id(span_id_hex: str) -> bool: + """Validate W3C Trace Context span_id format. + + Args: + span_id_hex: The span_id hex string to validate (should be 16 hex chars) + + Returns: + True if valid (16 hex chars), False otherwise + """ + return len(span_id_hex) == W3C_SPAN_ID_LENGTH and _is_valid_hex(span_id_hex) + + +def parse_parent_id_to_context(parent_id: str | None) -> context.Context | None: + """Parse a W3C trace context parent ID and return a context with the parent span. + + The parent_id format is expected to be W3C Trace Context format: + "00-{trace_id}-{span_id}-{trace_flags}" + Example: "00-1234567890abcdef1234567890abcdef-abcdefabcdef1234-01" + + Args: + parent_id: The W3C Trace Context format parent ID string + + Returns: + A context containing the parent span, or None if parent_id is invalid + """ + if not parent_id: + return None + + try: + # W3C Trace Context format: "00-{trace_id}-{span_id}-{trace_flags}" + parts = parent_id.split("-") + if len(parts) != 4: + logger.warning(f"Invalid parent_id format (expected 4 parts): {parent_id}") + return None + + version, trace_id_hex, span_id_hex, trace_flags_hex = parts + + # Validate W3C Trace Context version + if not validate_w3c_trace_context_version(version): + logger.warning(f"Unsupported W3C Trace Context version: {version}") + return None + + # Validate trace_id (must be 32 hex chars) + if not validate_trace_id(trace_id_hex): + logger.warning( + f"Invalid trace_id (expected {W3C_TRACE_ID_LENGTH} hex chars): '{trace_id_hex}'" + ) + return None + + # Validate span_id (must be 16 hex chars) + if not validate_span_id(span_id_hex): + logger.warning( + f"Invalid span_id (expected {W3C_SPAN_ID_LENGTH} hex chars): '{span_id_hex}'" + ) + return None + + # Parse the hex values + trace_id = int(trace_id_hex, 16) + span_id = int(span_id_hex, 16) + trace_flags = TraceFlags(int(trace_flags_hex, 16)) + + # Create a SpanContext from the parsed values + parent_span_context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=True, + trace_flags=trace_flags, + ) + + # Create a NonRecordingSpan with the parent context + parent_span = NonRecordingSpan(parent_span_context) + + # Create a context with the parent span + return set_span_in_context(parent_span) + + except (ValueError, IndexError) as e: + logger.warning(f"Failed to parse parent_id '{parent_id}': {e}") + return None + + def safe_json_dumps(obj: Any, **kwargs: Any) -> str: return json.dumps(obj, default=str, ensure_ascii=False, **kwargs) diff --git a/tests/observability/core/test_output_scope.py b/tests/observability/core/test_output_scope.py new file mode 100644 index 00000000..c1886657 --- /dev/null +++ b/tests/observability/core/test_output_scope.py @@ -0,0 +1,148 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +import sys +import unittest +from pathlib import Path + +import pytest +from microsoft_agents_a365.observability.core import ( + AgentDetails, + TenantDetails, + configure, + get_tracer_provider, +) +from microsoft_agents_a365.observability.core.config import _telemetry_manager +from microsoft_agents_a365.observability.core.constants import GEN_AI_OUTPUT_MESSAGES_KEY +from microsoft_agents_a365.observability.core.models.response import Response +from microsoft_agents_a365.observability.core.opentelemetry_scope import OpenTelemetryScope +from microsoft_agents_a365.observability.core.spans_scopes.output_scope import OutputScope +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +class TestOutputScope(unittest.TestCase): + """Unit tests for OutputScope.""" + + @classmethod + def setUpClass(cls): + """Set up test environment once for all tests.""" + os.environ["ENABLE_A365_OBSERVABILITY"] = "true" + + configure( + service_name="test-output-scope-service", + service_namespace="test-namespace", + ) + + cls.tenant_details = TenantDetails(tenant_id="12345678-1234-5678-1234-567812345678") + cls.agent_details = AgentDetails( + agent_id="test-agent-123", + agent_name="Test Agent", + agent_description="A test agent for output scope testing", + ) + + def setUp(self): + super().setUp() + + # Reset TelemetryManager state + _telemetry_manager._tracer_provider = None + _telemetry_manager._span_processors = {} + OpenTelemetryScope._tracer = None + + configure( + service_name="test-output-scope-service", + service_namespace="test-namespace", + ) + + self.span_exporter = InMemorySpanExporter() + tracer_provider = get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter)) + + def tearDown(self): + super().tearDown() + self.span_exporter.clear() + + def _get_last_span(self): + """Helper to get the last finished span and its attributes.""" + finished_spans = self.span_exporter.get_finished_spans() + self.assertTrue(finished_spans, "Expected at least one span to be created") + span = finished_spans[-1] + attributes = getattr(span, "attributes", {}) or {} + return span, attributes + + def test_output_scope_creates_span_with_messages(self): + """Test OutputScope creates span with output messages attribute.""" + response = Response(messages=["First message", "Second message"]) + + with OutputScope.start(self.agent_details, self.tenant_details, response): + pass + + span, attributes = self._get_last_span() + + # Verify span name contains operation name and agent id + self.assertIn("output_messages", span.name) + self.assertIn(self.agent_details.agent_id, span.name) + + # Verify output messages are set + self.assertIn(GEN_AI_OUTPUT_MESSAGES_KEY, attributes) + output_value = attributes[GEN_AI_OUTPUT_MESSAGES_KEY] + self.assertIn("First message", output_value) + self.assertIn("Second message", output_value) + + def test_record_output_messages_appends(self): + """Test record_output_messages appends to accumulated messages.""" + response = Response(messages=["Initial"]) + + with OutputScope.start(self.agent_details, self.tenant_details, response) as scope: + scope.record_output_messages(["Appended 1"]) + scope.record_output_messages(["Appended 2", "Appended 3"]) + + _, attributes = self._get_last_span() + + output_value = attributes[GEN_AI_OUTPUT_MESSAGES_KEY] + # All messages should be present (initial + all appended) + self.assertIn("Initial", output_value) + self.assertIn("Appended 1", output_value) + self.assertIn("Appended 2", output_value) + self.assertIn("Appended 3", output_value) + + def test_output_scope_with_parent_id(self): + """Test OutputScope uses parent_id to link span to parent context.""" + response = Response(messages=["Test"]) + parent_trace_id = "1234567890abcdef1234567890abcdef" + parent_span_id = "abcdefabcdef1234" + parent_id = f"00-{parent_trace_id}-{parent_span_id}-01" + + with OutputScope.start( + self.agent_details, self.tenant_details, response, parent_id=parent_id + ): + pass + + span, _ = self._get_last_span() + + # Verify span inherits parent's trace_id + span_trace_id = f"{span.context.trace_id:032x}" + self.assertEqual(span_trace_id, parent_trace_id) + + # Verify span's parent_span_id matches + self.assertIsNotNone(span.parent, "Expected span to have a parent") + self.assertTrue(hasattr(span.parent, "span_id"), "Expected parent to have span_id") + span_parent_id = f"{span.parent.span_id:016x}" + self.assertEqual(span_parent_id, parent_span_id) + + def test_output_scope_dispose(self): + """Test OutputScope dispose method ends the span.""" + response = Response(messages=["Test"]) + + scope = OutputScope.start(self.agent_details, self.tenant_details, response) + self.assertIsNotNone(scope) + scope.dispose() + + # Verify span was created and ended + finished_spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(finished_spans), 1) + + +if __name__ == "__main__": + sys.exit(pytest.main([str(Path(__file__))] + sys.argv[1:]))