diff --git a/examples/tracing/openai/openai_agents_tracing.ipynb b/examples/tracing/openai/openai_agents_tracing.ipynb index b28c003f..32ea7036 100644 --- a/examples/tracing/openai/openai_agents_tracing.ipynb +++ b/examples/tracing/openai/openai_agents_tracing.ipynb @@ -467,12 +467,95 @@ "response\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 11. Add an input guardrail\n", + "\n", + "Guardrails run *before* the main agent and can short-circuit a request. Here we add a relevance guardrail that blocks questions unrelated to airline customer service. A small classifier agent decides whether the message is on-topic; if not, the guardrail trips its tripwire and the SDK raises `InputGuardrailTripwireTriggered`.\n", + "\n", + "In Openlayer, the guardrail appears as a dedicated `GUARDRAIL` step whose `metadata.triggered` reflects the tripwire result." + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "from agents import (\n", + " GuardrailFunctionOutput,\n", + " InputGuardrailTripwireTriggered,\n", + " input_guardrail,\n", + ")\n", + "\n", + "\n", + "class RelevanceCheck(BaseModel):\n", + " \"\"\"Output schema for the relevance classifier.\"\"\"\n", + " is_relevant: bool\n", + " reasoning: str\n", + "\n", + "\n", + "relevance_agent = Agent(\n", + " name=\"Relevance guardrail\",\n", + " instructions=(\n", + " \"Decide whether the user's message is related to airline customer \"\n", + " \"service (flights, seats, baggage, check-in, wifi, etc.).\"\n", + " ),\n", + " output_type=RelevanceCheck,\n", + ")\n", + "\n", + "\n", + "@input_guardrail\n", + "async def relevance_guardrail(context, agent, user_input): # noqa: ARG001\n", + " \"\"\"Trip the tripwire when the request is off-topic.\"\"\"\n", + " check = await Runner.run(relevance_agent, user_input, context=context.context)\n", + " return GuardrailFunctionOutput(\n", + " output_info=check.final_output,\n", + " tripwire_triggered=not check.final_output.is_relevant,\n", + " )\n", + "\n", + "\n", + "# Attach the guardrail to the triage agent (the entry point of the workflow).\n", + "triage_agent.input_guardrails.append(relevance_guardrail)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Test 4: guardrail trips on an off-topic request\n", + "\n", + "The first request is unrelated to the airline and should be blocked; the second is on-topic and should pass through to the triage agent as usual." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def run_guarded(user_input: str) -> str:\n", + " \"\"\"Run the triage agent with the relevance guardrail enabled.\"\"\"\n", + " with agent_trace(\n", + " \"Customer service (guarded)\", group_id=uuid.uuid4().hex[:16]\n", + " ):\n", + " try:\n", + " result = await Runner.run(\n", + " triage_agent, user_input, context=AirlineAgentContext()\n", + " )\n", + " return f\"✅ Allowed → {result.final_output}\"\n", + " except InputGuardrailTripwireTriggered:\n", + " return \"🛑 Guardrail tripped — off-topic request blocked.\"\n", + "\n", + "\n", + "# Off-topic: should trip the guardrail\n", + "print(await run_guarded(\"Can you give me a recipe for chocolate cake?\")) # noqa: T201\n", + "\n", + "# On-topic: should pass\n", + "print(await run_guarded(\"What are the baggage restrictions?\")) # noqa: T201" + ] } ], "metadata": { diff --git a/src/openlayer/lib/integrations/openai_agents.py b/src/openlayer/lib/integrations/openai_agents.py index c4e5e040..f334965b 100644 --- a/src/openlayer/lib/integrations/openai_agents.py +++ b/src/openlayer/lib/integrations/openai_agents.py @@ -284,6 +284,61 @@ def parse_span_data(span_data: Any) -> ParsedSpanData: output_data = data.get("output") metadata.pop("data", None) + elif span_type == "guardrail": + # GuardrailSpanData: name + triggered (tripwire) flag. + triggered = getattr(span_data, "triggered", None) + if triggered is None: + triggered = content.get("triggered", False) + input_data = {"guardrail_name": name} + output_data = {"output": "triggered" if triggered else "passed"} + metadata["triggered"] = bool(triggered) + + elif span_type == "mcp_tools": + # MCPListToolsSpanData: server + the list of tool names it exposes. + server = getattr(span_data, "server", None) or content.get("server") + result = getattr(span_data, "result", None) + if result is None: + result = content.get("result") + tool_names = result if isinstance(result, list) else [] + input_data = {"server": server} if server else {} + output_data = { + "output": ", ".join(tool_names) if tool_names else "No tools listed", + "tools": tool_names, + } + metadata.pop("result", None) + + elif span_type == "transcription": + # TranscriptionSpanData: speech-to-text model call. + model = getattr(span_data, "model", None) + provider = "OpenAI" + input_format = getattr(span_data, "input_format", None) + transcript = getattr(span_data, "output", None) + # The raw input is (potentially large) audio data, so we only surface its + # format rather than embedding the payload itself. + input_data = {"input": f"[audio input: {input_format}]"} + output_data = ( + {"output": transcript} if transcript is not None else None + ) + metadata.pop("input", None) + metadata.pop("output", None) + + elif span_type == "speech": + # SpeechSpanData: text-to-speech model call. Input is the text to speak; + # output is audio data, so we only surface its format. + model = getattr(span_data, "model", None) + provider = "OpenAI" + text_input = getattr(span_data, "input", None) + output_format = getattr(span_data, "output_format", None) + input_data = {"input": text_input} if text_input is not None else {} + output_data = {"output": f"[audio output: {output_format}]"} + metadata.pop("input", None) + metadata.pop("output", None) + + elif span_type == "speech_group": + # SpeechGroupSpanData: container span grouping speech sub-spans. + text_input = getattr(span_data, "input", None) + input_data = {"input": text_input} if text_input is not None else {} + # Ensure input/output are dictionaries if input_data is not None and not isinstance(input_data, dict): input_data = {"input": input_data} @@ -323,6 +378,16 @@ def _get_span_name(span_data: Any, span_type: str) -> str: return "Agent" elif span_type == "function": return "Function" + elif span_type == "guardrail": + return "Guardrail" + elif span_type == "mcp_tools": + return "MCP List Tools" + elif span_type == "transcription": + return "Transcription" + elif span_type == "speech": + return "Speech" + elif span_type == "speech_group": + return "Speech Group" else: return span_type.title() @@ -697,6 +762,34 @@ def __init__(self, **kwargs: Any) -> None: global _active_openlayer_processor _active_openlayer_processor = self + def _finalize_dangling_steps( + self, steps_list: List[steps.Step], cap_end_time: float + ) -> None: + """Backfill end_time/latency for steps that never received on_span_end. + + When a span is aborted before the SDK emits its span-end event (for + example, an LLM response cancelled because an input guardrail tripped), + its end_time stays None. We close such steps at ``cap_end_time`` (the + parent's end time, or the trace end time at the top level) so they render + with a bounded duration that never exceeds their parent, and flag them + with ``metadata.incomplete``. + """ + for step in steps_list: + if getattr(step, "end_time", None) is None: + start = getattr(step, "start_time", None) + if start is None: + start = cap_end_time + step.end_time = max(start, cap_end_time) + step.latency = (step.end_time - start) * 1000 # ms + if isinstance(getattr(step, "metadata", None), dict): + step.metadata["incomplete"] = True + + # Children can end no later than this step's (now finalized) end time. + nested = getattr(step, "steps", None) + if nested: + child_cap = getattr(step, "end_time", None) or cap_end_time + self._finalize_dangling_steps(nested, child_cap) + def on_trace_start(self, trace: "tracing.Trace") -> None: """Handle the start of a trace (root agent workflow).""" try: @@ -721,9 +814,8 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: if not trace_data: return - # Calculate total duration + # Capture the trace end time (used below to set the root step's latency). end_time = time.time() - duration = end_time - trace_data["start_time"] # Get all collected root steps for this trace steps_list = self._trace_root_steps.pop(trace.trace_id, []) @@ -740,6 +832,12 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: steps_list = list(unique_steps.values()) + # Close any spans that never received an on_span_end callback. The SDK + # aborts in-flight spans (e.g. an LLM response running concurrently with + # an input guardrail) when a guardrail tripwire fires, so their end_time + # would otherwise stay None and they would render as incomplete. + self._finalize_dangling_steps(steps_list, end_time) + if steps_list: # Create a root step that encompasses all collected steps trace_name = trace_data.get("trace_name", "Agent Workflow") @@ -778,9 +876,31 @@ def on_trace_end(self, trace: "tracing.Trace") -> None: for step in steps_list: root_step.add_nested_step(step) - # Set the end time to match the trace end time + # Anchor the root step to the real trace start time. + # + # ``steps.Step`` sets ``start_time = time.time()`` in its + # constructor, but this root step is created here inside + # ``on_trace_end`` (i.e. once the workflow has already finished), + # so its default start time is ~``end_time``. That would place the + # parent step *after* its own children on the timeline and produce + # an incorrect ``inferenceTimestamp`` for the whole trace. + # + # Use the start time captured in ``on_trace_start``, clamped to the + # earliest child start time so the parent can never start after a + # nested step even if the SDK's span clock runs slightly ahead of + # the wall clock we sampled at trace start. + trace_start_time = trace_data["start_time"] + child_start_times = [ + step.start_time + for step in steps_list + if getattr(step, "start_time", None) is not None + ] + if child_start_times: + trace_start_time = min(trace_start_time, *child_start_times) + + root_step.start_time = trace_start_time root_step.end_time = end_time - root_step.latency = duration * 1000 # Convert to ms + root_step.latency = (end_time - trace_start_time) * 1000 # ms # Clean up trace-specific data self._current_user_inputs.pop(trace.trace_id, None) @@ -971,7 +1091,15 @@ def _create_step_for_span( return self._create_handoff_step(parsed_data, start_time, metadata) elif parsed_data.span_type == "response": return self._create_response_step(parsed_data, start_time, metadata) + elif parsed_data.span_type == "guardrail": + return self._create_guardrail_step(parsed_data, start_time, metadata) + elif parsed_data.span_type == "mcp_tools": + return self._create_mcp_tools_step(parsed_data, start_time, metadata) + elif parsed_data.span_type in ("speech", "transcription"): + return self._create_voice_step(parsed_data, start_time, metadata) else: + # Includes "speech_group" (a container span) and any future/unknown + # span types the SDK may introduce. return self._create_generic_step(parsed_data, start_time, metadata) except Exception as e: @@ -1021,13 +1149,18 @@ def _create_function_step( inputs = function_input if function_input else {} output = function_output if function_output else "Function completed" - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( + # Create step without immediately sending to Openlayer. Use a ToolStep so the + # backend renders this as a tool call (TraceType.Tool) rather than a generic + # user call. + step = steps.step_factory( + enums.StepType.TOOL, name=f"Tool Call: {function_name}", inputs=inputs, output=output, metadata=metadata, ) + step.function_name = function_name + step.arguments = inputs step.start_time = start_time return step @@ -1059,10 +1192,17 @@ def _create_agent_step( else: output = f"Agent {agent_name} initialized and ready" - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( - name=f"Agent: {agent_name}", inputs=inputs, output=output, metadata=metadata + # Create step without immediately sending to Openlayer. Use an AgentStep so the + # backend renders this as an agent (TraceType.Agent) rather than a generic user + # call. + step = steps.step_factory( + enums.StepType.AGENT, + name=f"Agent: {agent_name}", + inputs=inputs, + output=output, + metadata=metadata, ) + step.agent_type = output_type step.start_time = start_time return step @@ -1084,13 +1224,18 @@ def _create_handoff_step( inputs = {"from_agent": from_agent, "to_agent": to_agent} - # Create step without immediately sending to Openlayer - step = steps.UserCallStep( + # Create step without immediately sending to Openlayer. Use a HandoffStep so the + # backend renders this as a handoff (TraceType.Handoff) rather than a generic + # user call. + step = steps.step_factory( + enums.StepType.HANDOFF, name=f"Handoff: {from_agent} → {to_agent}", inputs=inputs, output=f"Handed off from {from_agent} to {to_agent}", metadata=metadata, ) + step.from_component = from_agent + step.to_component = to_agent step.start_time = start_time return step @@ -1180,6 +1325,77 @@ def _create_response_step( return step + def _create_guardrail_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create a guardrail step from GuardrailSpanData.""" + guardrail_name = parsed_data.name or "Guardrail" + inputs = parsed_data.input_data or {"guardrail_name": guardrail_name} + output = self._extract_output_from_parsed_data(parsed_data, "passed") + + # Use a GuardrailStep so the backend renders this as a guardrail + # (TraceType.Guardrail). The tripwire result is also recorded in metadata as + # ``triggered`` (see parse_span_data). + step = steps.step_factory( + enums.StepType.GUARDRAIL, + name=f"Guardrail: {guardrail_name}", + inputs=inputs, + output=output, + metadata=metadata, + ) + step.action = output # "triggered" or "passed" + step.start_time = start_time + return step + + def _create_mcp_tools_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create an MCP list-tools step from MCPListToolsSpanData.""" + server = (parsed_data.input_data or {}).get("server", "unknown") + inputs = parsed_data.input_data or {} + output = self._extract_output_from_parsed_data(parsed_data, "No tools listed") + + # Use a ToolStep so the backend renders MCP tool discovery under + # TraceType.Tool alongside other tool calls. + step = steps.step_factory( + enums.StepType.TOOL, + name=f"MCP List Tools: {server}", + inputs=inputs, + output=output, + metadata=metadata, + ) + step.function_name = "list_tools" + step.arguments = inputs + step.start_time = start_time + return step + + def _create_voice_step( + self, parsed_data: ParsedSpanData, start_time: float, metadata: Dict[str, Any] + ) -> steps.Step: + """Create a step for voice model calls (speech / transcription spans).""" + model = parsed_data.model or "unknown" + inputs = parsed_data.input_data or {} + output = self._extract_output_from_parsed_data(parsed_data, "") + model_config = parsed_data.metadata.get("model_config", {}) + + step = steps.ChatCompletionStep( + name=f"{parsed_data.span_type.title()} ({model})", + inputs=inputs, + output=output, + metadata=metadata, + ) + + _configure_chat_completion_step( + step=step, + start_time=start_time, + model=model, + provider=parsed_data.provider or "OpenAI", + usage=parsed_data.usage or {}, + model_parameters=model_config, + ) + + return step + def _extract_function_calls_from_messages( self, messages: List[Dict[str, Any]], metadata: Dict[str, Any] ) -> None: @@ -1296,12 +1512,15 @@ def _create_tool_call_step_from_message( inputs = {"arguments": arguments} # Create the Tool Call step - step = steps.UserCallStep( + step = steps.step_factory( + enums.StepType.TOOL, name=f"Tool Call: {function_name}", inputs=inputs, output=output, metadata=metadata, ) + step.function_name = function_name + step.arguments = inputs step.start_time = time.time() step.end_time = time.time() step.latency = 0 # Minimal latency for extracted function calls @@ -1453,6 +1672,10 @@ def _update_step_with_span_data( # Keep original string format if parsing fails pass + # Keep the typed ToolStep arguments in sync. + if isinstance(step, steps.ToolStep): + step.arguments = step.inputs + # Update output if it's still generic if parsed_data.output_data: updated_output = self._extract_output_from_parsed_data(parsed_data, "") @@ -1518,6 +1741,13 @@ def _update_step_with_span_data( if from_agent: step.inputs["from_agent"] = from_agent + # Keep the typed HandoffStep fields in sync with the resolved + # target so the backend renders the correct handoff edge. + if isinstance(step, steps.HandoffStep): + step.to_component = to_agent + if from_agent: + step.from_component = from_agent + # Update the step name and output to reflect the correct handoff step.name = f"Handoff: {from_agent} → {to_agent}" step.output = f"Handed off from {from_agent} to {to_agent}"