258 lines
10 KiB
Python
258 lines
10 KiB
Python
"""Background task executor — runs LLM to complete tasks automatically.
|
|
|
|
Uses the same agent context (soul, memory, skills, relationships, tools)
|
|
as the chat dialog. Supports tool-calling loop for autonomous execution.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from loguru import logger
|
|
from sqlalchemy import select
|
|
|
|
from app.database import async_session
|
|
from app.models.agent import Agent
|
|
from app.models.llm import LLMModel
|
|
from app.models.task import Task, TaskLog
|
|
|
|
|
|
async def execute_task(task_id: uuid.UUID, agent_id: uuid.UUID) -> None:
|
|
"""Execute a task using the agent's configured LLM with full context.
|
|
|
|
Uses the same context as chat dialog: build_agent_context for system prompt,
|
|
agent tools for tool-calling, and a multi-round tool loop.
|
|
|
|
Flow:
|
|
- todo tasks: pending → doing → done
|
|
- supervision tasks: pending → doing → pending (stays active, just logs result)
|
|
"""
|
|
logger.info(f"[TaskExec] Starting task {task_id} for agent {agent_id}")
|
|
|
|
# Step 1: Mark as doing
|
|
async with async_session() as db:
|
|
result = await db.execute(select(Task).where(Task.id == task_id))
|
|
task = result.scalar_one_or_none()
|
|
if not task:
|
|
logger.warning(f"[TaskExec] Task {task_id} not found")
|
|
return
|
|
|
|
task.status = "doing"
|
|
db.add(TaskLog(task_id=task_id, content="🤖 开始执行任务..."))
|
|
await db.commit()
|
|
task_title = task.title
|
|
task_description = task.description or ""
|
|
task_type = task.type # 'todo' or 'supervision'
|
|
supervision_target = task.supervision_target_name or ""
|
|
|
|
# Step 2: Load agent + model
|
|
async with async_session() as db:
|
|
agent_result = await db.execute(select(Agent).where(Agent.id == agent_id))
|
|
agent = agent_result.scalar_one_or_none()
|
|
if not agent:
|
|
await _log_error(task_id, "数字员工未找到")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
model_id = agent.primary_model_id or agent.fallback_model_id
|
|
if not model_id:
|
|
await _log_error(task_id, f"{agent.name} 未配置 LLM 模型,无法执行任务")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
model_result = await db.execute(
|
|
select(LLMModel).where(LLMModel.id == model_id)
|
|
)
|
|
model = model_result.scalar_one_or_none()
|
|
if not model:
|
|
await _log_error(task_id, "配置的模型不存在")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
agent_name = agent.name
|
|
creator_id = agent.creator_id
|
|
|
|
# Step 3: Build full agent context (same as chat dialog)
|
|
from app.services.agent_context import build_agent_context
|
|
static_prompt, dynamic_prompt = await build_agent_context(agent_id, agent_name, agent.role_description or "")
|
|
|
|
# Add task-execution-specific instructions
|
|
task_addendum = """
|
|
|
|
## Task Execution Mode
|
|
|
|
You are now in TASK EXECUTION MODE (not a conversation). A task has been assigned to you.
|
|
- Focus on completing the task as thoroughly as possible.
|
|
- Break down complex tasks into steps and execute each step.
|
|
- Use your tools actively to gather information, send messages, read/write files, etc.
|
|
- Provide a detailed execution report at the end.
|
|
- If the task involves contacting someone, use `send_feishu_message` to reach them.
|
|
- If the task requires data or information, use your tools to fetch it.
|
|
- Do NOT ask the user follow-up questions — take initiative and complete the task autonomously.
|
|
"""
|
|
dynamic_prompt += task_addendum
|
|
|
|
# Build user prompt
|
|
if task_type == 'supervision':
|
|
user_prompt = f"[督办任务] {task_title}"
|
|
if task_description:
|
|
user_prompt += f"\n任务描述: {task_description}"
|
|
if supervision_target:
|
|
user_prompt += f"\n督办对象: {supervision_target}"
|
|
user_prompt += "\n\n请执行此督办任务:联系督办对象,了解进展,并汇报结果。"
|
|
else:
|
|
user_prompt = f"[任务执行] {task_title}"
|
|
if task_description:
|
|
user_prompt += f"\n任务描述: {task_description}"
|
|
user_prompt += "\n\n请认真完成此任务,给出详细的执行结果。"
|
|
|
|
# Step 4: Call LLM with tool loop
|
|
from app.services.llm_utils import create_llm_client, get_max_tokens, LLMMessage, LLMError
|
|
|
|
messages = [
|
|
LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt),
|
|
LLMMessage(role="user", content=user_prompt),
|
|
]
|
|
|
|
# Normalize base_url
|
|
if not model.base_url:
|
|
await _log_error(task_id, f"未配置 {model.provider} 的 API 地址")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
# Create unified LLM client
|
|
try:
|
|
client = create_llm_client(
|
|
provider=model.provider,
|
|
api_key=model.api_key_encrypted,
|
|
model=model.model,
|
|
base_url=model.base_url,
|
|
timeout=float(getattr(model, 'request_timeout', None) or 1200.0),
|
|
)
|
|
except Exception as e:
|
|
await _log_error(task_id, f"创建 LLM 客户端失败: {e}")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
# Load tools (same as chat dialog)
|
|
from app.services.agent_tools import execute_tool, get_agent_tools_for_llm
|
|
tools_for_llm = await get_agent_tools_for_llm(agent_id)
|
|
|
|
try:
|
|
logger.info(f"[TaskExec] Calling LLM with tools for task: {task_title}")
|
|
reply = ""
|
|
|
|
# Tool-calling loop (max 50 rounds for task execution)
|
|
for round_i in range(50):
|
|
try:
|
|
response = await client.complete(
|
|
messages=messages,
|
|
tools=tools_for_llm if tools_for_llm else None,
|
|
temperature=model.temperature,
|
|
max_tokens=get_max_tokens(model.provider, model.model, getattr(model, 'max_output_tokens', None)),
|
|
)
|
|
except LLMError as e:
|
|
await _log_error(task_id, f"LLM 错误: {e}")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
except Exception as e:
|
|
await _log_error(task_id, f"调用模型失败: {str(e)[:200]}")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
if response.tool_calls:
|
|
# Add assistant message with tool calls
|
|
messages.append(LLMMessage(
|
|
role="assistant",
|
|
content=response.content or None,
|
|
tool_calls=[{
|
|
"id": tc["id"],
|
|
"type": "function",
|
|
"function": tc["function"],
|
|
} for tc in response.tool_calls],
|
|
reasoning_content=response.reasoning_content,
|
|
))
|
|
|
|
for tc in response.tool_calls:
|
|
fn = tc["function"]
|
|
tool_name = fn["name"]
|
|
raw_args = fn.get("arguments", "{}")
|
|
logger.info(f"[TaskExec] Round {round_i+1} calling tool: {tool_name}({json.dumps(raw_args, ensure_ascii=False)[:100]})")
|
|
try:
|
|
args = json.loads(raw_args) if raw_args else {}
|
|
except Exception:
|
|
args = {}
|
|
|
|
tool_result = await execute_tool(tool_name, args, agent_id, creator_id)
|
|
messages.append(LLMMessage(
|
|
role="tool",
|
|
tool_call_id=tc["id"],
|
|
content=str(tool_result),
|
|
))
|
|
else:
|
|
reply = response.content or ""
|
|
break
|
|
else:
|
|
reply = "(已达到最大工具调用轮数)"
|
|
|
|
await client.close()
|
|
logger.info(f"[TaskExec] LLM reply: {reply[:80]}")
|
|
except Exception as e:
|
|
error_msg = str(e) or repr(e)
|
|
logger.error(f"[TaskExec] Error: {error_msg}")
|
|
await _log_error(task_id, f"执行出错: {error_msg[:150]}")
|
|
if task_type == 'supervision':
|
|
await _restore_supervision_status(task_id)
|
|
return
|
|
|
|
# Step 5: Save result and update status
|
|
async with async_session() as db:
|
|
result = await db.execute(select(Task).where(Task.id == task_id))
|
|
task = result.scalar_one_or_none()
|
|
if task:
|
|
if task_type == 'supervision':
|
|
# Supervision tasks stay active; just log the result
|
|
task.status = "pending"
|
|
db.add(TaskLog(task_id=task_id, content=f"✅ 督办执行完成\n\n{reply}"))
|
|
else:
|
|
task.status = "done"
|
|
task.completed_at = datetime.now(timezone.utc)
|
|
db.add(TaskLog(task_id=task_id, content=f"✅ 任务完成\n\n{reply}"))
|
|
await db.commit()
|
|
logger.info(f"[TaskExec] Task {task_id} {'logged' if task_type == 'supervision' else 'completed'}!")
|
|
|
|
# Log activity
|
|
from app.services.activity_logger import log_activity
|
|
await log_activity(
|
|
agent_id, "task_updated",
|
|
f"{'督办' if task_type == 'supervision' else '任务'}执行: {task_title[:60]}",
|
|
detail={"task_id": str(task_id), "task_type": task_type, "title": task_title, "reply": reply[:500]},
|
|
related_id=task_id,
|
|
)
|
|
|
|
|
|
async def _log_error(task_id: uuid.UUID, message: str) -> None:
|
|
"""Add an error log to the task."""
|
|
logger.error(f"[TaskExec] Error for {task_id}: {message}")
|
|
async with async_session() as db:
|
|
db.add(TaskLog(task_id=task_id, content=f"❌ {message}"))
|
|
await db.commit()
|
|
|
|
|
|
async def _restore_supervision_status(task_id: uuid.UUID) -> None:
|
|
"""Restore supervision task status back to pending after a failed execution."""
|
|
async with async_session() as db:
|
|
result = await db.execute(select(Task).where(Task.id == task_id))
|
|
task = result.scalar_one_or_none()
|
|
if task and task.status == "doing":
|
|
task.status = "pending"
|
|
await db.commit()
|