From 2deef85dbb49422a6c15246b3158ba4cdd86e989 Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Tue, 2 Jun 2026 20:53:17 -0300 Subject: [PATCH] fix(openai-agents): correct root step timestamp and use semantic step types Review of the OpenAI Agents SDK tracing processor (OPEN-10883). Addresses several issues affecting how traces render in Openlayer: 1. Root step timestamp (the reported bug): the synthetic "Agent Workflow" root step was constructed inside on_trace_end, so steps.Step.__init__ stamped its start_time with time.time() at trace-end. This placed the parent step after its own children on the timeline and produced an incorrect inferenceTimestamp for the whole trace. The root step is now anchored to the start time captured in on_trace_start, clamped to the earliest child start time. This confirms the issue was in the instrumentation rather than the UI. 2. Step types: agent, function and handoff spans were all emitted as generic UserCallStep, so the backend bucketed them as user_call and lost dedicated rendering. They now use the SDK's typed steps via step_factory (AgentStep/ToolStep/HandoffStep) with their typed fields populated, matching the convention used by the langchain/google_adk/claude_agent_sdk integrations. 3. New SDK span types (openai-agents 0.4.2): guardrail, mcp_tools, speech, transcription and speech_group spans previously fell through to the generic handler. They are now parsed and mapped to GuardrailStep, ToolStep and ChatCompletionStep (speech/transcription) respectively; speech_group remains a generic container step. The TracingProcessor interface itself is unchanged. 4. Dangling spans: when a guardrail tripwire fires the SDK aborts in-flight spans without emitting on_span_end, leaving end_time None. on_trace_end now backfills those steps (bounded by their parent's end time) and flags them with metadata.incomplete so they render with a duration instead of no end. Also adds a guardrail scenario to the existing examples/tracing/openai/openai_agents_tracing.ipynb notebook so it exercises the GUARDRAIL step type alongside the existing agent/tool/handoff steps. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../openai/openai_agents_tracing.ipynb | 85 +++++- .../lib/integrations/openai_agents.py | 254 +++++++++++++++++- 2 files changed, 326 insertions(+), 13 deletions(-) diff --git a/examples/tracing/openai/openai_agents_tracing.ipynb b/examples/tracing/openai/openai_agents_tracing.ipynb index b28c003f..32ea7036 100644 --- a/examples/tracing/openai/openai_agents_tracing.ipynb +++ b/examples/tracing/openai/openai_agents_tracing.ipynb @@ -467,12 +467,95 @@ "response\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 11. Add an input guardrail\n", + "\n", + "Guardrails run *before* the main agent and can short-circuit a request. Here we add a relevance guardrail that blocks questions unrelated to airline customer service. A small classifier agent decides whether the message is on-topic; if not, the guardrail trips its tripwire and the SDK raises `InputGuardrailTripwireTriggered`.\n", + "\n", + "In Openlayer, the guardrail appears as a dedicated `GUARDRAIL` step whose `metadata.triggered` reflects the tripwire result." + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "from agents import (\n", + " GuardrailFunctionOutput,\n", + " InputGuardrailTripwireTriggered,\n", + " input_guardrail,\n", + ")\n", + "\n", + "\n", + "class RelevanceCheck(BaseModel):\n", + " \"\"\"Output schema for the relevance classifier.\"\"\"\n", + " is_relevant: bool\n", + " reasoning: str\n", + "\n", + "\n", + "relevance_agent = Agent(\n", + " name=\"Relevance guardrail\",\n", + " instructions=(\n", + " \"Decide whether the user's message is related to airline customer \"\n", + " \"service (flights, seats, baggage, check-in, wifi, etc.).\"\n", + " ),\n", + " output_type=RelevanceCheck,\n", + ")\n", + "\n", + "\n", + "@input_guardrail\n", + "async def relevance_guardrail(context, agent, user_input): # noqa: ARG001\n", + " \"\"\"Trip the tripwire when the request is off-topic.\"\"\"\n", + " check = await Runner.run(relevance_agent, user_input, context=context.context)\n", + " return GuardrailFunctionOutput(\n", + " output_info=check.final_output,\n", + " tripwire_triggered=not check.final_output.is_relevant,\n", + " )\n", + "\n", + "\n", + "# Attach the guardrail to the triage agent (the entry point of the workflow).\n", + "triage_agent.input_guardrails.append(relevance_guardrail)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test 4: guardrail trips on an off-topic request\n", + "\n", + "The first request is unrelated to the airline and should be blocked; the second is on-topic and should pass through to the triage agent as usual." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def run_guarded(user_input: str) -> str:\n", + " \"\"\"Run the triage agent with the relevance guardrail enabled.\"\"\"\n", + " with agent_trace(\n", + " \"Customer service (guarded)\", group_id=uuid.uuid4().hex[:16]\n", + " ):\n", + " try:\n", + " result = await Runner.run(\n", + " triage_agent, user_input, context=AirlineAgentContext()\n", + " )\n", + " return f\"✅ Allowed → {result.final_output}\"\n", + " except InputGuardrailTripwireTriggered:\n", + " return \"🛑 Guardrail tripped — off-topic request blocked.\"\n", + "\n", + "\n", + "# Off-topic: should trip the guardrail\n", + "print(await run_guarded(\"Can you give me a recipe for chocolate cake?\")) # noqa: T201\n", + "\n", + "# On-topic: should pass\n", + "print(await run_guarded(\"What are the baggage restrictions?\")) # noqa: T201" + ] } ], "metadata": { diff --git a/src/openlayer/lib/integrations/openai_agents.py b/src/openlayer/lib/integrations/openai_agents.py index c4e5e040..f334965b 100644 --- a/src/openlayer/lib/integrations/openai_agents.py +++ b/src/openlayer/lib/integrations/openai_agents.py @@ -284,6 +284,61 @@ def parse_span_data(span_data: Any) -> ParsedSpanData: output_data = data.get("output") metadata.pop("data", None) + elif span_type == "guardrail": + # GuardrailSpanData: name + triggered (tripwire) flag. + triggered = getattr(span_data, "triggered", None) + if triggered is None: + triggered = content.get("triggered", False) + input_data = {"guardrail_name": name} + output_data = {"output": "triggered" if triggered else "passed"} + metadata["triggered"] = bool(triggered) + + elif span_type == "mcp_tools": + # MCPListToolsSpanData: server + the list of tool names it exposes. + server = getattr(span_data, "server", None) or content.get("server") + result = getattr(span_data, "result", None) + if result is None: + result = content.get("result") + tool_names = result if isinstance(result, list) else [] + input_data = {"server": server} if server else {} + output_data = { + "output": ", ".join(tool_names) if tool_names else "No tools listed", + "tools": tool_names, + } + metadata.pop("result", None) + + elif span_type == "transcription": + # TranscriptionSpanData: speech-to-text model call. + model = getattr(span_data, "model", None) + provider = "OpenAI" + input_format = getattr(span_data, "input_format", None) + transcript = getattr(span_data, "output", None) + # The raw input is (potentially large) audio data, so we only surface its + # format rather than embedding the payload itself. + input_data = {"input": f"[audio input: {input_format}]"} + output_data = ( + {"output": transcript} if transcript is not None else None + ) + metadata.pop("input", None) + metadata.pop("output", None) + + elif span_type == "speech": + # SpeechSpanData: text-to-speech model call. Input is the text to speak; + # output is audio data, so we only surface its format. + model = getattr(span_data, "model", None) + provider = "OpenAI" + text_input = getattr(span_data, "input", None) + output_format = getattr(span_data, "output_format", None) + input_data = {"input": text_input} if text_input is not None else {} + output_data = {"output": f"[audio output: {output_format}]"} + metadata.pop("input", None) + metadata.pop("output", None) + + elif span_type == "speech_group": + # SpeechGroupSpanData: container span grouping speech sub-spans. + text_input = getattr(span_data, "input", None) + input_data = {"input": text_input} if text_input is not None else {} + # Ensure input/output are dictionaries if input_data is not None and not isinstance(input_data, dict): input_data = {"input": input_data} @@ -323,6 +378,16 @@ def _get_span_name(span_data: Any, span_type: str) -> str: return "Agent" elif span_type == "function": return "Function" + elif span_type == "guardrail": + return "Guardrail" + elif span_type == "mcp_tools": + return "MCP List Tools" + elif span_type == "transcription": + return "Transcription" + elif span_type == "speech": + return "Speech" + elif span_type == "speech_group": + return "Speech Group" else: return span_type.title() @@ -697,6 +762,34 @@ def __init__(self, **kwargs: Any) -> None: global _active_openlayer_processor _active_openlayer_processor = self + def _finalize_dangling_steps( + self, steps_list: List[steps.Step], cap_end_time: float + ) -> None: + """Backfill end_time/latency for steps that never received on_span_end. + + When a span is aborted before the SDK emits its span-end event (for + example, an LLM response cancelled because an input guardrail tripped), + its end_time stays None. We close such steps at ``cap_end_time`` (the + parent's end time, or the trace end time at the top level) so they render + with a bounded duration that never exceeds their parent, and flag them + with ``metadata.incomplete``. + """ + for step in steps_list: + if getattr(step, "end_time", None) is None: + start = getattr(step, "start_time", None) + if start is None: + start = cap_end_time + step.end_time = max(start, cap_end_time) + step.latency = (step.end_time - start) * 1000 # ms + if isinstance(getattr(step, "metadata", None), dict): + step.metadata["incomplete"] = True + + # Children can end no later than this step's (now finalized) end time. + nested = getattr(step, "steps", None) + if nested: + child_cap = getattr(step, "end_time", None) or cap_end_time + self._finalize_dangling_steps(nested, child_cap) + def on_trace_start(self, trace: "tracing.Trace") -> None: """Handle the start of a trace (root agent workflow).""" try: @@ -721,9 +814,8 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: if not trace_data: return - # Calculate total duration + # Capture the trace end time (used below to set the root step's latency). end_time = time.time() - duration = end_time - trace_data["start_time"] # Get all collected root steps for this trace steps_list = self._trace_root_steps.pop(trace.trace_id, []) @@ -740,6 +832,12 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: steps_list = list(unique_steps.values()) + # Close any spans that never received an on_span_end callback. The SDK + # aborts in-flight spans (e.g. an LLM response running concurrently with + # an input guardrail) when a guardrail tripwire fires, so their end_time + # would otherwise stay None and they would render as incomplete. + self._finalize_dangling_steps(steps_list, end_time) + if steps_list: # Create a root step that encompasses all collected steps trace_name = trace_data.get("trace_name", "Agent Workflow") @@ -778,9 +876,31 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: for step in steps_list: root_step.add_nested_step(step) - # Set the end time to match the trace end time + # Anchor the root step to the real trace start time. + # + # ``steps.Step`` sets ``start_time = time.time()`` in its + # constructor, but this root step is created here inside + # ``on_trace_end`` (i.e. once the workflow has already finished), + # so its default start time is ~``end_time``. That would place the + # parent step *after* its own children on the timeline and produce + # an incorrect ``inferenceTimestamp`` for the whole trace. + # + # Use the start time captured in ``on_trace_start``, clamped to the + # earliest child start time so the parent can never start after a + # nested step even if the SDK's span clock runs slightly ahead of + # the wall clock we sampled at trace start. + trace_start_time = trace_data["start_time"] + child_start_times = [ + step.start_time + for step in steps_list + if getattr(step, "start_time", None) is not None + ] + if child_start_times: + trace_start_time = min(trace_start_time, *child_start_times) + + root_step.start_time = trace_start_time root_step.end_time = end_time - root_step.latency = duration * 1000 # Convert to ms + root_step.latency = (end_time - trace_start_time) * 1000 # ms # Clean up trace-specific data self._current_user_inputs.pop(trace.trace_id, None) @@ -971,7 +1091,15 @@ def _create_step_for_span( return self._create_handoff_step(parsed_data, start_time, metadata) elif parsed_data.span_type == "response": return self._create_response_step(parsed_data, start_time, metadata) + elif parsed_data.span_type == "guardrail": + return self._create_guardrail_step(parsed_data, start_time, metadata) + elif parsed_data.span_type == "mcp_tools": + return self._create_mcp_tools_step(parsed_data, start_time, metadata) + elif parsed_data.span_type in ("speech", "transcription"): + return self._create_voice_step(parsed_data, start_time, metadata) else: + # Includes "speech_group" (a container span) and any future/unknown + # span types the SDK may introduce. return self._create_generic_step(parsed_data, start_time, metadata) except Exception as e: @@ -1021,13 +1149,18 @@ def _create_function_step( inputs = function_input if function_input else {} output = function_output if function_output else "Function completed" - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( + # Create step without immediately sending to Openlayer. Use a ToolStep so the + # backend renders this as a tool call (TraceType.Tool) rather than a generic + # user call. + step = steps.step_factory( + enums.StepType.TOOL, name=f"Tool Call: {function_name}", inputs=inputs, output=output, metadata=metadata, ) + step.function_name = function_name + step.arguments = inputs step.start_time = start_time return step @@ -1059,10 +1192,17 @@ def _create_agent_step( else: output = f"Agent {agent_name} initialized and ready" - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( - name=f"Agent: {agent_name}", inputs=inputs, output=output, metadata=metadata + # Create step without immediately sending to Openlayer. Use an AgentStep so the + # backend renders this as an agent (TraceType.Agent) rather than a generic user + # call. + step = steps.step_factory( + enums.StepType.AGENT, + name=f"Agent: {agent_name}", + inputs=inputs, + output=output, + metadata=metadata, ) + step.agent_type = output_type step.start_time = start_time return step @@ -1084,13 +1224,18 @@ def _create_handoff_step( inputs = {"from_agent": from_agent, "to_agent": to_agent} - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( + # Create step without immediately sending to Openlayer. Use a HandoffStep so the + # backend renders this as a handoff (TraceType.Handoff) rather than a generic + # user call. + step = steps.step_factory( + enums.StepType.HANDOFF, name=f"Handoff: {from_agent} → {to_agent}", inputs=inputs, output=f"Handed off from {from_agent} to {to_agent}", metadata=metadata, ) + step.from_component = from_agent + step.to_component = to_agent step.start_time = start_time return step @@ -1180,6 +1325,77 @@ def _create_response_step( return step + def _create_guardrail_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create a guardrail step from GuardrailSpanData.""" + guardrail_name = parsed_data.name or "Guardrail" + inputs = parsed_data.input_data or {"guardrail_name": guardrail_name} + output = self._extract_output_from_parsed_data(parsed_data, "passed") + + # Use a GuardrailStep so the backend renders this as a guardrail + # (TraceType.Guardrail). The tripwire result is also recorded in metadata as + # ``triggered`` (see parse_span_data). + step = steps.step_factory( + enums.StepType.GUARDRAIL, + name=f"Guardrail: {guardrail_name}", + inputs=inputs, + output=output, + metadata=metadata, + ) + step.action = output # "triggered" or "passed" + step.start_time = start_time + return step + + def _create_mcp_tools_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create an MCP list-tools step from MCPListToolsSpanData.""" + server = (parsed_data.input_data or {}).get("server", "unknown") + inputs = parsed_data.input_data or {} + output = self._extract_output_from_parsed_data(parsed_data, "No tools listed") + + # Use a ToolStep so the backend renders MCP tool discovery under + # TraceType.Tool alongside other tool calls. + step = steps.step_factory( + enums.StepType.TOOL, + name=f"MCP List Tools: {server}", + inputs=inputs, + output=output, + metadata=metadata, + ) + step.function_name = "list_tools" + step.arguments = inputs + step.start_time = start_time + return step + + def _create_voice_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create a step for voice model calls (speech / transcription spans).""" + model = parsed_data.model or "unknown" + inputs = parsed_data.input_data or {} + output = self._extract_output_from_parsed_data(parsed_data, "") + model_config = parsed_data.metadata.get("model_config", {}) + + step = steps.ChatCompletionStep( + name=f"{parsed_data.span_type.title()} ({model})", + inputs=inputs, + output=output, + metadata=metadata, + ) + + _configure_chat_completion_step( + step=step, + start_time=start_time, + model=model, + provider=parsed_data.provider or "OpenAI", + usage=parsed_data.usage or {}, + model_parameters=model_config, + ) + + return step + def _extract_function_calls_from_messages( self, messages: List[Dict[str, Any]], metadata: Dict[str, Any] ) -> None: @@ -1296,12 +1512,15 @@ def _create_tool_call_step_from_message( inputs = {"arguments": arguments} # Create the Tool Call step - step = steps.UserCallStep( + step = steps.step_factory( + enums.StepType.TOOL, name=f"Tool Call: {function_name}", inputs=inputs, output=output, metadata=metadata, ) + step.function_name = function_name + step.arguments = inputs step.start_time = time.time() step.end_time = time.time() step.latency = 0 # Minimal latency for extracted function calls @@ -1453,6 +1672,10 @@ def _update_step_with_span_data( # Keep original string format if parsing fails pass + # Keep the typed ToolStep arguments in sync. + if isinstance(step, steps.ToolStep): + step.arguments = step.inputs + # Update output if it's still generic if parsed_data.output_data: updated_output = self._extract_output_from_parsed_data(parsed_data, "") @@ -1518,6 +1741,13 @@ def _update_step_with_span_data( if from_agent: step.inputs["from_agent"] = from_agent + # Keep the typed HandoffStep fields in sync with the resolved + # target so the backend renders the correct handoff edge. + if isinstance(step, steps.HandoffStep): + step.to_component = to_agent + if from_agent: + step.from_component = from_agent + # Update the step name and output to reflect the correct handoff step.name = f"Handoff: {from_agent} → {to_agent}" step.output = f"Handed off from {from_agent} to {to_agent}"