829 lines
34 KiB
Python
829 lines
34 KiB
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from functools import partial
|
|
from typing import Annotated, Literal
|
|
|
|
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
|
|
from langchain_core.runnables import RunnableConfig
|
|
from langchain_core.tools import tool
|
|
from langchain_mcp_adapters.client import MultiServerMCPClient
|
|
from langgraph.types import Command, interrupt
|
|
|
|
from src.agents import create_agent
|
|
from src.config.agents import AGENT_LLM_MAP
|
|
from src.config.configuration import Configuration
|
|
from src.llms.llm import get_llm_by_type, get_llm_token_limit_by_type
|
|
from src.prompts.planner_model import Plan
|
|
from src.prompts.template import apply_prompt_template
|
|
from src.tools import (
|
|
crawl_tool,
|
|
get_retriever_tool,
|
|
get_web_search_tool,
|
|
python_repl_tool,
|
|
)
|
|
from src.tools.search import LoggedTavilySearch
|
|
from src.utils.context_manager import ContextManager
|
|
from src.utils.json_utils import repair_json_output
|
|
|
|
from ..config import SELECTED_SEARCH_ENGINE, SearchEngine
|
|
from .types import State
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@tool
|
|
def handoff_to_planner(
|
|
research_topic: Annotated[str, "The topic of the research task to be handed off."],
|
|
locale: Annotated[str, "The user's detected language locale (e.g., en-US, zh-CN)."],
|
|
):
|
|
"""Handoff to planner agent to do plan."""
|
|
# This tool is not returning anything: we're just using it
|
|
# as a way for LLM to signal that it needs to hand off to planner agent
|
|
return
|
|
|
|
|
|
@tool
|
|
def handoff_after_clarification(
|
|
locale: Annotated[str, "The user's detected language locale (e.g., en-US, zh-CN)."],
|
|
):
|
|
"""Handoff to planner after clarification rounds are complete. Pass all clarification history to planner for analysis."""
|
|
return
|
|
|
|
|
|
def needs_clarification(state: dict) -> bool:
|
|
"""
|
|
Check if clarification is needed based on current state.
|
|
Centralized logic for determining when to continue clarification.
|
|
"""
|
|
if not state.get("enable_clarification", False):
|
|
return False
|
|
|
|
clarification_rounds = state.get("clarification_rounds", 0)
|
|
is_clarification_complete = state.get("is_clarification_complete", False)
|
|
max_clarification_rounds = state.get("max_clarification_rounds", 3)
|
|
|
|
# Need clarification if: enabled + has rounds + not complete + not exceeded max
|
|
# Use <= because after asking the Nth question, we still need to wait for the Nth answer
|
|
return (
|
|
clarification_rounds > 0
|
|
and not is_clarification_complete
|
|
and clarification_rounds <= max_clarification_rounds
|
|
)
|
|
|
|
|
|
def background_investigation_node(state: State, config: RunnableConfig):
|
|
logger.info("background investigation node is running.")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
query = state.get("research_topic")
|
|
background_investigation_results = None
|
|
if SELECTED_SEARCH_ENGINE == SearchEngine.TAVILY.value:
|
|
searched_content = LoggedTavilySearch(
|
|
max_results=configurable.max_search_results
|
|
).invoke(query)
|
|
# check if the searched_content is a tuple, then we need to unpack it
|
|
if isinstance(searched_content, tuple):
|
|
searched_content = searched_content[0]
|
|
if isinstance(searched_content, list):
|
|
background_investigation_results = [
|
|
f"## {elem['title']}\n\n{elem['content']}" for elem in searched_content
|
|
]
|
|
return {
|
|
"background_investigation_results": "\n\n".join(
|
|
background_investigation_results
|
|
)
|
|
}
|
|
else:
|
|
logger.error(
|
|
f"Tavily search returned malformed response: {searched_content}"
|
|
)
|
|
else:
|
|
background_investigation_results = get_web_search_tool(
|
|
configurable.max_search_results
|
|
).invoke(query)
|
|
return {
|
|
"background_investigation_results": json.dumps(
|
|
background_investigation_results, ensure_ascii=False
|
|
)
|
|
}
|
|
|
|
|
|
def planner_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["human_feedback", "reporter"]]:
|
|
"""Planner node that generate the full plan."""
|
|
logger.info("Planner generating full plan")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
|
|
|
|
# For clarification feature: only send the final clarified question to planner
|
|
if state.get("enable_clarification", False) and state.get("clarified_question"):
|
|
# Create a clean state with only the clarified question
|
|
clean_state = {
|
|
"messages": [{"role": "user", "content": state["clarified_question"]}],
|
|
"locale": state.get("locale", "en-US"),
|
|
"research_topic": state["clarified_question"],
|
|
}
|
|
messages = apply_prompt_template("planner", clean_state, configurable)
|
|
logger.info(
|
|
f"Clarification mode: Using clarified question: {state['clarified_question']}"
|
|
)
|
|
else:
|
|
# Normal mode: use full conversation history
|
|
messages = apply_prompt_template("planner", state, configurable)
|
|
|
|
if state.get("enable_background_investigation") and state.get(
|
|
"background_investigation_results"
|
|
):
|
|
messages += [
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
"background investigation results of user query:\n"
|
|
+ state["background_investigation_results"]
|
|
+ "\n"
|
|
),
|
|
}
|
|
]
|
|
|
|
if configurable.enable_deep_thinking:
|
|
llm = get_llm_by_type("reasoning")
|
|
elif AGENT_LLM_MAP["planner"] == "basic":
|
|
llm = get_llm_by_type("basic").with_structured_output(
|
|
Plan,
|
|
method="json_mode",
|
|
)
|
|
else:
|
|
llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
|
|
|
|
# if the plan iterations is greater than the max plan iterations, return the reporter node
|
|
if plan_iterations >= configurable.max_plan_iterations:
|
|
return Command(goto="reporter")
|
|
|
|
full_response = ""
|
|
if AGENT_LLM_MAP["planner"] == "basic" and not configurable.enable_deep_thinking:
|
|
response = llm.invoke(messages)
|
|
full_response = response.model_dump_json(indent=4, exclude_none=True)
|
|
else:
|
|
response = llm.stream(messages)
|
|
for chunk in response:
|
|
full_response += chunk.content
|
|
logger.debug(f"Current state messages: {state['messages']}")
|
|
logger.info(f"Planner response: {full_response}")
|
|
|
|
try:
|
|
curr_plan = json.loads(repair_json_output(full_response))
|
|
except json.JSONDecodeError:
|
|
logger.warning("Planner response is not a valid JSON")
|
|
if plan_iterations > 0:
|
|
return Command(goto="reporter")
|
|
else:
|
|
return Command(goto="__end__")
|
|
if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):
|
|
logger.info("Planner response has enough context.")
|
|
new_plan = Plan.model_validate(curr_plan)
|
|
return Command(
|
|
update={
|
|
"messages": [AIMessage(content=full_response, name="planner")],
|
|
"current_plan": new_plan,
|
|
},
|
|
goto="reporter",
|
|
)
|
|
return Command(
|
|
update={
|
|
"messages": [AIMessage(content=full_response, name="planner")],
|
|
"current_plan": full_response,
|
|
},
|
|
goto="human_feedback",
|
|
)
|
|
|
|
|
|
def human_feedback_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["planner", "research_team", "reporter", "__end__"]]:
|
|
current_plan = state.get("current_plan", "")
|
|
# check if the plan is auto accepted
|
|
auto_accepted_plan = state.get("auto_accepted_plan", False)
|
|
if not auto_accepted_plan:
|
|
feedback = interrupt("Please Review the Plan.")
|
|
|
|
# if the feedback is not accepted, return the planner node
|
|
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
|
|
return Command(
|
|
update={
|
|
"messages": [
|
|
HumanMessage(content=feedback, name="feedback"),
|
|
],
|
|
},
|
|
goto="planner",
|
|
)
|
|
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
|
|
logger.info("Plan is accepted by user.")
|
|
else:
|
|
raise TypeError(f"Interrupt value of {feedback} is not supported.")
|
|
|
|
# if the plan is accepted, run the following node
|
|
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
|
|
goto = "research_team"
|
|
try:
|
|
current_plan = repair_json_output(current_plan)
|
|
# increment the plan iterations
|
|
plan_iterations += 1
|
|
# parse the plan
|
|
new_plan = json.loads(current_plan)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Planner response is not a valid JSON")
|
|
if plan_iterations > 1: # the plan_iterations is increased before this check
|
|
return Command(goto="reporter")
|
|
else:
|
|
return Command(goto="__end__")
|
|
|
|
return Command(
|
|
update={
|
|
"current_plan": Plan.model_validate(new_plan),
|
|
"plan_iterations": plan_iterations,
|
|
"locale": new_plan["locale"],
|
|
},
|
|
goto=goto,
|
|
)
|
|
|
|
|
|
def coordinator_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["planner", "background_investigator", "coordinator", "__end__"]]:
|
|
"""Coordinator node that communicate with customers and handle clarification."""
|
|
logger.info("Coordinator talking.")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
|
|
# Check if clarification is enabled
|
|
enable_clarification = state.get("enable_clarification", False)
|
|
|
|
# ============================================================
|
|
# BRANCH 1: Clarification DISABLED (Legacy Mode)
|
|
# ============================================================
|
|
if not enable_clarification:
|
|
# Use normal prompt with explicit instruction to skip clarification
|
|
messages = apply_prompt_template("coordinator", state)
|
|
messages.append(
|
|
{
|
|
"role": "system",
|
|
"content": "CRITICAL: Clarification is DISABLED. You MUST immediately call handoff_to_planner tool with the user's query as-is. Do NOT ask questions or mention needing more information.",
|
|
}
|
|
)
|
|
|
|
# Only bind handoff_to_planner tool
|
|
tools = [handoff_to_planner]
|
|
response = (
|
|
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
|
|
.bind_tools(tools)
|
|
.invoke(messages)
|
|
)
|
|
|
|
# Process response - should directly handoff to planner
|
|
goto = "__end__"
|
|
locale = state.get("locale", "en-US")
|
|
research_topic = state.get("research_topic", "")
|
|
|
|
# Process tool calls for legacy mode
|
|
if response.tool_calls:
|
|
try:
|
|
for tool_call in response.tool_calls:
|
|
tool_name = tool_call.get("name", "")
|
|
tool_args = tool_call.get("args", {})
|
|
|
|
if tool_name == "handoff_to_planner":
|
|
logger.info("Handing off to planner")
|
|
goto = "planner"
|
|
|
|
# Extract locale and research_topic if provided
|
|
if tool_args.get("locale") and tool_args.get("research_topic"):
|
|
locale = tool_args.get("locale")
|
|
research_topic = tool_args.get("research_topic")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tool calls: {e}")
|
|
goto = "planner"
|
|
|
|
# ============================================================
|
|
# BRANCH 2: Clarification ENABLED (New Feature)
|
|
# ============================================================
|
|
else:
|
|
# Load clarification state
|
|
clarification_rounds = state.get("clarification_rounds", 0)
|
|
clarification_history = state.get("clarification_history", [])
|
|
max_clarification_rounds = state.get("max_clarification_rounds", 3)
|
|
|
|
# Prepare the messages for the coordinator
|
|
messages = apply_prompt_template("coordinator", state)
|
|
|
|
# Add clarification status for first round
|
|
if clarification_rounds == 0:
|
|
messages.append(
|
|
{
|
|
"role": "system",
|
|
"content": "Clarification mode is ENABLED. Follow the 'Clarification Process' guidelines in your instructions.",
|
|
}
|
|
)
|
|
|
|
# Add clarification context if continuing conversation (round > 0)
|
|
elif clarification_rounds > 0:
|
|
logger.info(
|
|
f"Clarification enabled (rounds: {clarification_rounds}/{max_clarification_rounds}): Continuing conversation"
|
|
)
|
|
|
|
# Add user's response to clarification history (only user messages)
|
|
last_message = None
|
|
if state.get("messages"):
|
|
last_message = state["messages"][-1]
|
|
# Extract content from last message for logging
|
|
if isinstance(last_message, dict):
|
|
content = last_message.get("content", "No content")
|
|
else:
|
|
content = getattr(last_message, "content", "No content")
|
|
logger.info(f"Last message content: {content}")
|
|
# Handle dict format
|
|
if isinstance(last_message, dict):
|
|
if last_message.get("role") == "user":
|
|
clarification_history.append(last_message["content"])
|
|
logger.info(
|
|
f"Added user response to clarification history: {last_message['content']}"
|
|
)
|
|
# Handle object format (like HumanMessage)
|
|
elif hasattr(last_message, "role") and last_message.role == "user":
|
|
clarification_history.append(last_message.content)
|
|
logger.info(
|
|
f"Added user response to clarification history: {last_message.content}"
|
|
)
|
|
# Handle object format with content attribute (like the one in logs)
|
|
elif hasattr(last_message, "content"):
|
|
clarification_history.append(last_message.content)
|
|
logger.info(
|
|
f"Added user response to clarification history: {last_message.content}"
|
|
)
|
|
|
|
# Build comprehensive clarification context with conversation history
|
|
current_response = "No response"
|
|
if last_message:
|
|
# Handle dict format
|
|
if isinstance(last_message, dict):
|
|
if last_message.get("role") == "user":
|
|
current_response = last_message.get("content", "No response")
|
|
else:
|
|
# If last message is not from user, try to get the latest user message
|
|
messages = state.get("messages", [])
|
|
for msg in reversed(messages):
|
|
if isinstance(msg, dict) and msg.get("role") == "user":
|
|
current_response = msg.get("content", "No response")
|
|
break
|
|
# Handle object format (like HumanMessage)
|
|
elif hasattr(last_message, "role") and last_message.role == "user":
|
|
current_response = last_message.content
|
|
# Handle object format with content attribute (like the one in logs)
|
|
elif hasattr(last_message, "content"):
|
|
current_response = last_message.content
|
|
else:
|
|
# If last message is not from user, try to get the latest user message
|
|
messages = state.get("messages", [])
|
|
for msg in reversed(messages):
|
|
if isinstance(msg, dict) and msg.get("role") == "user":
|
|
current_response = msg.get("content", "No response")
|
|
break
|
|
elif hasattr(msg, "role") and msg.role == "user":
|
|
current_response = msg.content
|
|
break
|
|
elif hasattr(msg, "content"):
|
|
current_response = msg.content
|
|
break
|
|
|
|
# Create conversation history summary
|
|
conversation_summary = ""
|
|
if clarification_history:
|
|
conversation_summary = "Previous conversation:\n"
|
|
for i, response in enumerate(clarification_history, 1):
|
|
conversation_summary += f"- Round {i}: {response}\n"
|
|
|
|
clarification_context = f"""Continuing clarification (round {clarification_rounds}/{max_clarification_rounds}):
|
|
User's latest response: {current_response}
|
|
Ask for remaining missing dimensions. Do NOT repeat questions or start new topics."""
|
|
|
|
# Log the clarification context for debugging
|
|
logger.info(f"Clarification context: {clarification_context}")
|
|
|
|
messages.append({"role": "system", "content": clarification_context})
|
|
|
|
# Bind both clarification tools
|
|
tools = [handoff_to_planner, handoff_after_clarification]
|
|
response = (
|
|
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
|
|
.bind_tools(tools)
|
|
.invoke(messages)
|
|
)
|
|
logger.debug(f"Current state messages: {state['messages']}")
|
|
|
|
# Initialize response processing variables
|
|
goto = "__end__"
|
|
locale = state.get("locale", "en-US")
|
|
research_topic = state.get("research_topic", "")
|
|
|
|
# --- Process LLM response ---
|
|
# No tool calls - LLM is asking a clarifying question
|
|
if not response.tool_calls and response.content:
|
|
if clarification_rounds < max_clarification_rounds:
|
|
# Continue clarification process
|
|
clarification_rounds += 1
|
|
# Do NOT add LLM response to clarification_history - only user responses
|
|
logger.info(
|
|
f"Clarification response: {clarification_rounds}/{max_clarification_rounds}: {response.content}"
|
|
)
|
|
|
|
# Append coordinator's question to messages
|
|
state_messages = state.get("messages", [])
|
|
if response.content:
|
|
state_messages.append(
|
|
HumanMessage(content=response.content, name="coordinator")
|
|
)
|
|
|
|
return Command(
|
|
update={
|
|
"messages": state_messages,
|
|
"locale": locale,
|
|
"research_topic": research_topic,
|
|
"resources": configurable.resources,
|
|
"clarification_rounds": clarification_rounds,
|
|
"clarification_history": clarification_history,
|
|
"is_clarification_complete": False,
|
|
"clarified_question": "",
|
|
"goto": goto,
|
|
"__interrupt__": [("coordinator", response.content)],
|
|
},
|
|
goto=goto,
|
|
)
|
|
else:
|
|
# Max rounds reached - no more questions allowed
|
|
logger.warning(
|
|
f"Max clarification rounds ({max_clarification_rounds}) reached. Handing off to planner."
|
|
)
|
|
goto = "planner"
|
|
if state.get("enable_background_investigation"):
|
|
goto = "background_investigator"
|
|
else:
|
|
# LLM called a tool (handoff) or has no content - clarification complete
|
|
if response.tool_calls:
|
|
logger.info(
|
|
f"Clarification completed after {clarification_rounds} rounds. LLM called handoff tool."
|
|
)
|
|
else:
|
|
logger.warning("LLM response has no content and no tool calls.")
|
|
# goto will be set in the final section based on tool calls
|
|
|
|
# ============================================================
|
|
# Final: Build and return Command
|
|
# ============================================================
|
|
messages = state.get("messages", [])
|
|
if response.content:
|
|
messages.append(HumanMessage(content=response.content, name="coordinator"))
|
|
|
|
# Process tool calls for BOTH branches (legacy and clarification)
|
|
if response.tool_calls:
|
|
try:
|
|
for tool_call in response.tool_calls:
|
|
tool_name = tool_call.get("name", "")
|
|
tool_args = tool_call.get("args", {})
|
|
|
|
if tool_name in ["handoff_to_planner", "handoff_after_clarification"]:
|
|
logger.info("Handing off to planner")
|
|
goto = "planner"
|
|
|
|
# Extract locale and research_topic if provided
|
|
if tool_args.get("locale") and tool_args.get("research_topic"):
|
|
locale = tool_args.get("locale")
|
|
research_topic = tool_args.get("research_topic")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing tool calls: {e}")
|
|
goto = "planner"
|
|
else:
|
|
# No tool calls - both modes should goto __end__
|
|
logger.warning("LLM didn't call any tools. Staying at __end__.")
|
|
goto = "__end__"
|
|
|
|
# Apply background_investigation routing if enabled (unified logic)
|
|
if goto == "planner" and state.get("enable_background_investigation"):
|
|
goto = "background_investigator"
|
|
|
|
# Set default values for state variables (in case they're not defined in legacy mode)
|
|
if not enable_clarification:
|
|
clarification_rounds = 0
|
|
clarification_history = []
|
|
|
|
return Command(
|
|
update={
|
|
"messages": messages,
|
|
"locale": locale,
|
|
"research_topic": research_topic,
|
|
"resources": configurable.resources,
|
|
"clarification_rounds": clarification_rounds,
|
|
"clarification_history": clarification_history,
|
|
"is_clarification_complete": goto != "coordinator",
|
|
"clarified_question": research_topic if goto != "coordinator" else "",
|
|
"goto": goto,
|
|
},
|
|
goto=goto,
|
|
)
|
|
|
|
|
|
def reporter_node(state: State, config: RunnableConfig):
|
|
"""Reporter node that write a final report."""
|
|
logger.info("Reporter write final report")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
current_plan = state.get("current_plan")
|
|
input_ = {
|
|
"messages": [
|
|
HumanMessage(
|
|
f"# Research Requirements\n\n## Task\n\n{current_plan.title}\n\n## Description\n\n{current_plan.thought}"
|
|
)
|
|
],
|
|
"locale": state.get("locale", "en-US"),
|
|
}
|
|
invoke_messages = apply_prompt_template("reporter", input_, configurable)
|
|
observations = state.get("observations", [])
|
|
|
|
# Add a reminder about the new report format, citation style, and table usage
|
|
invoke_messages.append(
|
|
HumanMessage(
|
|
content="IMPORTANT: Structure your report according to the format in the prompt. Remember to include:\n\n1. Key Points - A bulleted list of the most important findings\n2. Overview - A brief introduction to the topic\n3. Detailed Analysis - Organized into logical sections\n4. Survey Note (optional) - For more comprehensive reports\n5. Key Citations - List all references at the end\n\nFor citations, DO NOT include inline citations in the text. Instead, place all citations in the 'Key Citations' section at the end using the format: `- [Source Title](URL)`. Include an empty line between each citation for better readability.\n\nPRIORITIZE USING MARKDOWN TABLES for data presentation and comparison. Use tables whenever presenting comparative data, statistics, features, or options. Structure tables with clear headers and aligned columns. Example table format:\n\n| Feature | Description | Pros | Cons |\n|---------|-------------|------|------|\n| Feature 1 | Description 1 | Pros 1 | Cons 1 |\n| Feature 2 | Description 2 | Pros 2 | Cons 2 |",
|
|
name="system",
|
|
)
|
|
)
|
|
|
|
observation_messages = []
|
|
for observation in observations:
|
|
observation_messages.append(
|
|
HumanMessage(
|
|
content=f"Below are some observations for the research task:\n\n{observation}",
|
|
name="observation",
|
|
)
|
|
)
|
|
|
|
# Context compression
|
|
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP["reporter"])
|
|
compressed_state = ContextManager(llm_token_limit).compress_messages(
|
|
{"messages": observation_messages}
|
|
)
|
|
invoke_messages += compressed_state.get("messages", [])
|
|
|
|
logger.debug(f"Current invoke messages: {invoke_messages}")
|
|
response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
|
|
response_content = response.content
|
|
logger.info(f"reporter response: {response_content}")
|
|
|
|
return {"final_report": response_content}
|
|
|
|
|
|
def research_team_node(state: State):
|
|
"""Research team node that collaborates on tasks."""
|
|
logger.info("Research team is collaborating on tasks.")
|
|
pass
|
|
|
|
|
|
async def _execute_agent_step(
|
|
state: State, agent, agent_name: str
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Helper function to execute a step using the specified agent."""
|
|
current_plan = state.get("current_plan")
|
|
plan_title = current_plan.title
|
|
observations = state.get("observations", [])
|
|
|
|
# Find the first unexecuted step
|
|
current_step = None
|
|
completed_steps = []
|
|
for step in current_plan.steps:
|
|
if not step.execution_res:
|
|
current_step = step
|
|
break
|
|
else:
|
|
completed_steps.append(step)
|
|
|
|
if not current_step:
|
|
logger.warning("No unexecuted step found")
|
|
return Command(goto="research_team")
|
|
|
|
logger.info(f"Executing step: {current_step.title}, agent: {agent_name}")
|
|
|
|
# Format completed steps information
|
|
completed_steps_info = ""
|
|
if completed_steps:
|
|
completed_steps_info = "# Completed Research Steps\n\n"
|
|
for i, step in enumerate(completed_steps):
|
|
completed_steps_info += f"## Completed Step {i + 1}: {step.title}\n\n"
|
|
completed_steps_info += f"<finding>\n{step.execution_res}\n</finding>\n\n"
|
|
|
|
# Prepare the input for the agent with completed steps info
|
|
agent_input = {
|
|
"messages": [
|
|
HumanMessage(
|
|
content=f"# Research Topic\n\n{plan_title}\n\n{completed_steps_info}# Current Step\n\n## Title\n\n{current_step.title}\n\n## Description\n\n{current_step.description}\n\n## Locale\n\n{state.get('locale', 'en-US')}"
|
|
)
|
|
]
|
|
}
|
|
|
|
# Add citation reminder for researcher agent
|
|
if agent_name == "researcher":
|
|
if state.get("resources"):
|
|
resources_info = "**The user mentioned the following resource files:**\n\n"
|
|
for resource in state.get("resources"):
|
|
resources_info += f"- {resource.title} ({resource.description})\n"
|
|
|
|
agent_input["messages"].append(
|
|
HumanMessage(
|
|
content=resources_info
|
|
+ "\n\n"
|
|
+ "You MUST use the **local_search_tool** to retrieve the information from the resource files.",
|
|
)
|
|
)
|
|
|
|
agent_input["messages"].append(
|
|
HumanMessage(
|
|
content="IMPORTANT: DO NOT include inline citations in the text. Instead, track all sources and include a References section at the end using link reference format. Include an empty line between each citation for better readability. Use this format for each reference:\n- [Source Title](URL)\n\n- [Another Source](URL)",
|
|
name="system",
|
|
)
|
|
)
|
|
|
|
# Invoke the agent
|
|
default_recursion_limit = 25
|
|
try:
|
|
env_value_str = os.getenv("AGENT_RECURSION_LIMIT", str(default_recursion_limit))
|
|
parsed_limit = int(env_value_str)
|
|
|
|
if parsed_limit > 0:
|
|
recursion_limit = parsed_limit
|
|
logger.info(f"Recursion limit set to: {recursion_limit}")
|
|
else:
|
|
logger.warning(
|
|
f"AGENT_RECURSION_LIMIT value '{env_value_str}' (parsed as {parsed_limit}) is not positive. "
|
|
f"Using default value {default_recursion_limit}."
|
|
)
|
|
recursion_limit = default_recursion_limit
|
|
except ValueError:
|
|
raw_env_value = os.getenv("AGENT_RECURSION_LIMIT")
|
|
logger.warning(
|
|
f"Invalid AGENT_RECURSION_LIMIT value: '{raw_env_value}'. "
|
|
f"Using default value {default_recursion_limit}."
|
|
)
|
|
recursion_limit = default_recursion_limit
|
|
|
|
logger.info(f"Agent input: {agent_input}")
|
|
try:
|
|
result = await agent.ainvoke(
|
|
input=agent_input, config={"recursion_limit": recursion_limit}
|
|
)
|
|
except Exception as e:
|
|
import traceback
|
|
error_traceback = traceback.format_exc()
|
|
error_message = f"Error executing {agent_name} agent for step '{current_step.title}': {str(e)}"
|
|
logger.exception(error_message)
|
|
logger.error(f"Full traceback:\n{error_traceback}")
|
|
|
|
detailed_error = f"[ERROR] {agent_name.capitalize()} Agent Error\n\nStep: {current_step.title}\n\nError Details:\n{str(e)}\n\nPlease check the logs for more information."
|
|
current_step.execution_res = detailed_error
|
|
|
|
return Command(
|
|
update={
|
|
"messages": [
|
|
HumanMessage(
|
|
content=detailed_error,
|
|
name=agent_name,
|
|
)
|
|
],
|
|
"observations": observations + [detailed_error],
|
|
},
|
|
goto="research_team",
|
|
)
|
|
|
|
# Process the result
|
|
response_content = result["messages"][-1].content
|
|
logger.debug(f"{agent_name.capitalize()} full response: {response_content}")
|
|
|
|
# Update the step with the execution result
|
|
current_step.execution_res = response_content
|
|
logger.info(f"Step '{current_step.title}' execution completed by {agent_name}")
|
|
|
|
return Command(
|
|
update={
|
|
"messages": [
|
|
HumanMessage(
|
|
content=response_content,
|
|
name=agent_name,
|
|
)
|
|
],
|
|
"observations": observations + [response_content],
|
|
},
|
|
goto="research_team",
|
|
)
|
|
|
|
|
|
async def _setup_and_execute_agent_step(
|
|
state: State,
|
|
config: RunnableConfig,
|
|
agent_type: str,
|
|
default_tools: list,
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Helper function to set up an agent with appropriate tools and execute a step.
|
|
|
|
This function handles the common logic for both researcher_node and coder_node:
|
|
1. Configures MCP servers and tools based on agent type
|
|
2. Creates an agent with the appropriate tools or uses the default agent
|
|
3. Executes the agent on the current step
|
|
|
|
Args:
|
|
state: The current state
|
|
config: The runnable config
|
|
agent_type: The type of agent ("researcher" or "coder")
|
|
default_tools: The default tools to add to the agent
|
|
|
|
Returns:
|
|
Command to update state and go to research_team
|
|
"""
|
|
configurable = Configuration.from_runnable_config(config)
|
|
mcp_servers = {}
|
|
enabled_tools = {}
|
|
|
|
# Extract MCP server configuration for this agent type
|
|
if configurable.mcp_settings:
|
|
for server_name, server_config in configurable.mcp_settings["servers"].items():
|
|
if (
|
|
server_config["enabled_tools"]
|
|
and agent_type in server_config["add_to_agents"]
|
|
):
|
|
mcp_servers[server_name] = {
|
|
k: v
|
|
for k, v in server_config.items()
|
|
if k in ("transport", "command", "args", "url", "env", "headers")
|
|
}
|
|
for tool_name in server_config["enabled_tools"]:
|
|
enabled_tools[tool_name] = server_name
|
|
|
|
# Create and execute agent with MCP tools if available
|
|
if mcp_servers:
|
|
client = MultiServerMCPClient(mcp_servers)
|
|
loaded_tools = default_tools[:]
|
|
all_tools = await client.get_tools()
|
|
for tool in all_tools:
|
|
if tool.name in enabled_tools:
|
|
tool.description = (
|
|
f"Powered by '{enabled_tools[tool.name]}'.\n{tool.description}"
|
|
)
|
|
loaded_tools.append(tool)
|
|
|
|
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
|
|
pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)
|
|
agent = create_agent(
|
|
agent_type, agent_type, loaded_tools, agent_type, pre_model_hook
|
|
)
|
|
return await _execute_agent_step(state, agent, agent_type)
|
|
else:
|
|
# Use default tools if no MCP servers are configured
|
|
llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type])
|
|
pre_model_hook = partial(ContextManager(llm_token_limit, 3).compress_messages)
|
|
agent = create_agent(
|
|
agent_type, agent_type, default_tools, agent_type, pre_model_hook
|
|
)
|
|
return await _execute_agent_step(state, agent, agent_type)
|
|
|
|
|
|
async def researcher_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Researcher node that do research"""
|
|
logger.info("Researcher node is researching.")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
tools = [get_web_search_tool(configurable.max_search_results), crawl_tool]
|
|
retriever_tool = get_retriever_tool(state.get("resources", []))
|
|
if retriever_tool:
|
|
tools.insert(0, retriever_tool)
|
|
logger.info(f"Researcher tools: {tools}")
|
|
return await _setup_and_execute_agent_step(
|
|
state,
|
|
config,
|
|
"researcher",
|
|
tools,
|
|
)
|
|
|
|
|
|
async def coder_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Coder node that do code analysis."""
|
|
logger.info("Coder node is coding.")
|
|
return await _setup_and_execute_agent_step(
|
|
state,
|
|
config,
|
|
"coder",
|
|
[python_repl_tool],
|
|
)
|