485 lines
21 KiB
Python
485 lines
21 KiB
Python
"""Heartbeat service — proactive agent awareness loop.
|
|
|
|
Periodically triggers agents to check their environment (tasks, plaza,
|
|
etc.) and take autonomous actions. Inspired by OpenClaw's heartbeat
|
|
mechanism.
|
|
|
|
Runs as a background task inside the FastAPI process.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
from loguru import logger
|
|
from sqlalchemy import select, update
|
|
|
|
# Default heartbeat instruction used when HEARTBEAT.md doesn't exist
|
|
DEFAULT_HEARTBEAT_INSTRUCTION = """[Heartbeat Check]
|
|
|
|
This is your periodic heartbeat — a moment to be aware, explore, and contribute.
|
|
|
|
## Phase 1: Review Context & Discover Interest Points
|
|
|
|
First, review your **recent conversations** (provided below if available) and your **role/responsibilities**.
|
|
Identify topics or questions that:
|
|
- Are directly relevant to your role and current work
|
|
- Were mentioned by users but not fully explored at the time
|
|
- Represent emerging trends or changes in your professional domain
|
|
- Could improve your ability to serve your users
|
|
|
|
If no genuine, informative topics emerge from recent context, **skip exploration** and go directly to Phase 3.
|
|
Do NOT search for generic or obvious topics just to fill time. Quality over quantity.
|
|
|
|
## Phase 2: Targeted Exploration (Conditional)
|
|
|
|
Only if you identified genuine interest points in Phase 1:
|
|
|
|
1. Use `web_search` to investigate (maximum 5 searches per heartbeat)
|
|
2. Keep searches **tightly scoped** to your role and recent work topics
|
|
3. For each discovery worth keeping:
|
|
- Record it using `write_file` to `memory/curiosity_journal.md`
|
|
- Include the **source URL** and a brief note on **why it matters to your work**
|
|
- Rate its relevance (high/medium/low) to your current responsibilities
|
|
|
|
Format for curiosity_journal.md entries:
|
|
```
|
|
### [Date] - [Topic]
|
|
- **Finding**: [What you learned]
|
|
- **Source**: [URL]
|
|
- **Relevance**: [high/medium/low] — [Why it matters to your work]
|
|
- **Follow-up**: [Optional: questions this raises for next time]
|
|
```
|
|
|
|
## Phase 3: Agent Plaza
|
|
|
|
1. Call `plaza_get_new_posts` to check recent activity
|
|
2. If you found something genuinely valuable in Phase 2:
|
|
- Share the most impactful discovery to plaza (max 1 post)
|
|
- **Always include the source URL** when sharing internet findings
|
|
- Frame it in terms of how it's relevant to your team/domain
|
|
3. Comment on relevant existing posts (max 2 comments)
|
|
|
|
## Phase 4: Wrap Up
|
|
|
|
- If nothing needed attention and no exploration was warranted: reply with HEARTBEAT_OK
|
|
- Otherwise, briefly summarize what you explored and why
|
|
|
|
⚠️ KEY PRINCIPLES:
|
|
- Always ground exploration in YOUR role and YOUR recent work context
|
|
- Never search for random unrelated topics out of idle curiosity
|
|
- If you don't have a specific angle worth investigating, don't search
|
|
- Prefer depth over breadth — one thoroughly explored topic > five surface-level queries
|
|
- Generate follow-up questions only when you genuinely want to know more
|
|
|
|
⚠️ PRIVACY RULES — STRICTLY FOLLOW:
|
|
- NEVER share information from private user conversations
|
|
- NEVER share content from memory/memory.md
|
|
- NEVER share content from workspace/ files
|
|
- NEVER share task details from tasks.json
|
|
- You may ONLY share: general work insights, public information, opinions on plaza posts
|
|
- If unsure whether something is private, do NOT share it
|
|
|
|
⚠️ POSTING LIMITS per heartbeat:
|
|
- Maximum 1 new post
|
|
- Maximum 2 comments on existing posts
|
|
- Do NOT post trivial or repetitive content
|
|
"""
|
|
|
|
|
|
def _is_in_active_hours(active_hours: str, tz_name: str = "UTC") -> bool:
|
|
"""Check if current time is within the agent's active hours.
|
|
|
|
Format: "HH:MM-HH:MM" (e.g., "09:00-18:00")
|
|
Uses agent's configured timezone (defaults to UTC).
|
|
"""
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
start_str, end_str = active_hours.split("-")
|
|
sh, sm = map(int, start_str.strip().split(":"))
|
|
eh, em = map(int, end_str.strip().split(":"))
|
|
try:
|
|
tz = ZoneInfo(tz_name)
|
|
except (KeyError, Exception):
|
|
tz = ZoneInfo("UTC")
|
|
now = datetime.now(tz)
|
|
current_minutes = now.hour * 60 + now.minute
|
|
start_minutes = sh * 60 + sm
|
|
end_minutes = eh * 60 + em
|
|
if start_minutes <= end_minutes:
|
|
return start_minutes <= current_minutes < end_minutes
|
|
else:
|
|
# Overnight range (e.g., "22:00-06:00")
|
|
return current_minutes >= start_minutes or current_minutes < end_minutes
|
|
except Exception:
|
|
return True # Default to active if parsing fails
|
|
|
|
|
|
async def _execute_heartbeat(agent_id: uuid.UUID):
|
|
"""Execute a single heartbeat for an agent.
|
|
|
|
Uses three short DB transactions to avoid holding connections
|
|
during long-running LLM calls:
|
|
Phase 1: Read agent, model, context, notifications → commit
|
|
Phase 2: LLM tool loop (no DB connection held)
|
|
Phase 3: Write token usage + last_heartbeat_at → commit
|
|
"""
|
|
try:
|
|
from app.database import async_session
|
|
from app.models.agent import Agent
|
|
from app.models.llm import LLMModel
|
|
|
|
# ── Phase 1: Read all context from DB (short transaction) ──
|
|
agent_name = ""
|
|
agent_role = ""
|
|
agent_creator_id = None
|
|
model_provider = ""
|
|
model_api_key = ""
|
|
model_model = ""
|
|
model_base_url = None
|
|
model_temperature = None
|
|
model_max_output_tokens = None
|
|
heartbeat_instruction = DEFAULT_HEARTBEAT_INSTRUCTION
|
|
|
|
async with async_session() as db:
|
|
result = await db.execute(select(Agent).where(Agent.id == agent_id))
|
|
agent = result.scalar_one_or_none()
|
|
if not agent:
|
|
return
|
|
|
|
model_id = agent.primary_model_id or agent.fallback_model_id
|
|
if not model_id:
|
|
return
|
|
|
|
model_result = await db.execute(select(LLMModel).where(LLMModel.id == model_id))
|
|
model = model_result.scalar_one_or_none()
|
|
if not model:
|
|
return
|
|
|
|
# Cache values we need for Phase 2 (after DB session closes)
|
|
agent_name = agent.name
|
|
agent_role = agent.role_description or ""
|
|
agent_creator_id = agent.creator_id
|
|
model_provider = model.provider
|
|
model_api_key = model.api_key_encrypted
|
|
model_model = model.model
|
|
model_base_url = model.base_url
|
|
model_temperature = model.temperature
|
|
model_max_output_tokens = getattr(model, 'max_output_tokens', None)
|
|
model_request_timeout = getattr(model, 'request_timeout', None)
|
|
|
|
# Read HEARTBEAT.md if it exists, otherwise use default
|
|
from pathlib import Path
|
|
from app.config import get_settings
|
|
settings = get_settings()
|
|
|
|
ws_root = Path(settings.AGENT_DATA_DIR) / str(agent_id)
|
|
hb_file = ws_root / "HEARTBEAT.md"
|
|
if hb_file.exists():
|
|
try:
|
|
custom = hb_file.read_text(encoding="utf-8", errors="replace").strip()
|
|
if custom:
|
|
# Prepend privacy rules to custom heartbeat
|
|
heartbeat_instruction = custom + """
|
|
|
|
⚠️ PRIVACY RULES — STRICTLY FOLLOW:
|
|
- NEVER share information from private user conversations
|
|
- NEVER share content from memory/memory.md
|
|
- NEVER share content from workspace/ files
|
|
- NEVER share task details from tasks.json
|
|
- You may ONLY share: general work insights, public information, opinions on plaza posts
|
|
|
|
⚠️ POSTING LIMITS per heartbeat:
|
|
- Maximum 1 new post
|
|
- Maximum 2 comments on existing posts
|
|
- Do NOT post trivial or repetitive content
|
|
"""
|
|
except Exception:
|
|
pass
|
|
|
|
# Build context
|
|
from app.services.agent_context import build_agent_context
|
|
static_prompt, dynamic_prompt = await build_agent_context(agent_id, agent_name, agent_role)
|
|
|
|
# Fetch recent activity to give heartbeat context for curiosity exploration
|
|
from app.models.activity_log import AgentActivityLog
|
|
recent_context = ""
|
|
try:
|
|
recent_result = await db.execute(
|
|
select(AgentActivityLog)
|
|
.where(AgentActivityLog.agent_id == agent_id)
|
|
.where(AgentActivityLog.action_type.in_(["chat_reply", "tool_call", "task_created", "task_updated"]))
|
|
.order_by(AgentActivityLog.created_at.desc())
|
|
.limit(50)
|
|
)
|
|
recent_activities = recent_result.scalars().all()
|
|
if recent_activities:
|
|
itms = []
|
|
for act in reversed(recent_activities): # chronological order
|
|
ts = act.created_at.strftime("%m-%d %H:%M") if act.created_at else ""
|
|
itms.append(f"- [{ts}] {act.action_type}: {act.summary[:120]}")
|
|
recent_context = "\\n\\n---\\n## Recent Activity Context\\nHere are your recent interactions and work to help you identify relevant topics:\\n\\n" + "\\n".join(itms)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch recent activity for heartbeat context: {e}")
|
|
|
|
# Fetch unread notifications for this agent (plaza replies, mentions, broadcasts)
|
|
inbox_context = ""
|
|
notif_lines = []
|
|
try:
|
|
from app.models.notification import Notification
|
|
notif_result = await db.execute(
|
|
select(Notification).where(
|
|
Notification.agent_id == agent_id,
|
|
Notification.is_read == False,
|
|
).order_by(Notification.created_at).limit(10)
|
|
)
|
|
unread = notif_result.scalars().all()
|
|
if unread:
|
|
notif_lines = ["\\n\\n---\\n## Inbox (new messages for you — please review and respond if appropriate)"]
|
|
for n in unread:
|
|
sender = f"from {n.sender_name}" if n.sender_name else ""
|
|
notif_lines.append(f"- [{n.type}] {n.title} {sender}: {(n.body or '')[:150]}")
|
|
n.is_read = True
|
|
except Exception as e:
|
|
logger.warning(f"Failed to drain agent notifications: {e}")
|
|
|
|
inbox_context = "\\n".join(notif_lines)
|
|
|
|
# Commit Phase 1: release the DB connection before LLM calls
|
|
await db.commit()
|
|
# DB session is now closed — connection returned to pool
|
|
|
|
# ── Phase 2: LLM calls (no DB connection held) ──
|
|
full_instruction = heartbeat_instruction + recent_context + inbox_context
|
|
|
|
# Call LLM with tools using unified client
|
|
from app.services.llm_utils import create_llm_client, get_max_tokens, LLMMessage, LLMError
|
|
from app.services.agent_tools import execute_tool, get_agent_tools_for_llm
|
|
|
|
try:
|
|
client = create_llm_client(
|
|
provider=model_provider,
|
|
api_key=model_api_key,
|
|
model=model_model,
|
|
base_url=model_base_url,
|
|
timeout=float(model_request_timeout or 120.0),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create LLM client: {e}")
|
|
return
|
|
|
|
tools_for_llm = await get_agent_tools_for_llm(agent_id)
|
|
|
|
reply = ""
|
|
plaza_posts_made = 0 # hard limit: 1 new post per heartbeat
|
|
plaza_comments_made = 0 # hard limit: 2 comments per heartbeat
|
|
_hb_accumulated_tokens = 0
|
|
|
|
# Token tracking helpers
|
|
from app.services.token_tracker import record_token_usage, extract_usage_tokens, estimate_tokens_from_chars
|
|
|
|
# Convert messages to LLMMessage format
|
|
llm_messages = [
|
|
LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt),
|
|
LLMMessage(role="user", content=full_instruction)
|
|
]
|
|
|
|
for round_i in range(20): # More rounds for search + write + plaza
|
|
try:
|
|
response = await client.complete(
|
|
messages=llm_messages,
|
|
tools=tools_for_llm,
|
|
temperature=model_temperature,
|
|
max_tokens=get_max_tokens(model_provider, model_model, model_max_output_tokens),
|
|
)
|
|
except LLMError as e:
|
|
logger.error(f"LLM error in heartbeat: {e}")
|
|
reply = ""
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"LLM call error in heartbeat: {e}")
|
|
reply = ""
|
|
break
|
|
|
|
# Track tokens for this round
|
|
real_tokens = extract_usage_tokens(response.usage)
|
|
if real_tokens:
|
|
_hb_accumulated_tokens += real_tokens
|
|
else:
|
|
round_chars = sum(len(m.content or '') for m in llm_messages) + len(response.content or '')
|
|
_hb_accumulated_tokens += estimate_tokens_from_chars(round_chars)
|
|
|
|
if response.tool_calls:
|
|
# Add assistant message with tool calls
|
|
llm_messages.append(LLMMessage(
|
|
role="assistant",
|
|
content=response.content or None,
|
|
tool_calls=[{
|
|
"id": tc["id"],
|
|
"type": "function",
|
|
"function": tc["function"],
|
|
} for tc in response.tool_calls],
|
|
reasoning_content=response.reasoning_content,
|
|
))
|
|
|
|
# Tools that require arguments — if LLM sends empty args, skip and ask to retry
|
|
# (aligned with call_llm in websocket.py)
|
|
_TOOLS_REQUIRING_ARGS = {
|
|
"write_file", "read_file", "delete_file", "read_document",
|
|
"send_message_to_agent", "send_feishu_message", "send_email",
|
|
"web_search", "jina_search", "jina_read",
|
|
}
|
|
|
|
for tc in response.tool_calls:
|
|
fn = tc["function"]
|
|
tool_name = fn["name"]
|
|
raw_args = fn.get("arguments", "{}")
|
|
logger.info(f"[Heartbeat] Raw arguments for {tool_name} (len={len(raw_args) if raw_args else 0}): {repr(raw_args[:300]) if raw_args else 'None'}")
|
|
try:
|
|
args = json.loads(raw_args) if raw_args else {}
|
|
except json.JSONDecodeError as je:
|
|
logger.warning(f"[Heartbeat] JSON parse failed for {tool_name}: {je}. Raw: {repr(raw_args[:200])}")
|
|
args = {}
|
|
|
|
# Guard: if a tool that requires arguments received empty args,
|
|
# return an error to LLM instead of executing
|
|
if not args and tool_name in _TOOLS_REQUIRING_ARGS:
|
|
logger.warning(f"[Heartbeat] Empty arguments for {tool_name}, asking LLM to retry")
|
|
llm_messages.append(LLMMessage(
|
|
role="tool",
|
|
tool_call_id=tc["id"],
|
|
content=f"Error: {tool_name} was called with empty arguments. You must provide the required parameters. Please retry with the correct arguments.",
|
|
))
|
|
continue
|
|
|
|
# ── Hard rate limits for plaza actions ──
|
|
if tool_name == "plaza_create_post":
|
|
if plaza_posts_made >= 1:
|
|
tool_result = "[BLOCKED] You have already made 1 plaza post this heartbeat. Do not post again."
|
|
else:
|
|
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
|
|
plaza_posts_made += 1
|
|
elif tool_name == "plaza_add_comment":
|
|
if plaza_comments_made >= 2:
|
|
tool_result = "[BLOCKED] You have already made 2 comments this heartbeat. Do not comment again."
|
|
else:
|
|
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
|
|
plaza_comments_made += 1
|
|
else:
|
|
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
|
|
|
|
llm_messages.append(LLMMessage(
|
|
role="tool",
|
|
tool_call_id=tc["id"],
|
|
content=str(tool_result),
|
|
))
|
|
else:
|
|
reply = response.content or ""
|
|
break
|
|
|
|
await client.close()
|
|
|
|
# ── Phase 3: Write results back to DB (short transaction) ──
|
|
async with async_session() as db:
|
|
# Record accumulated heartbeat token usage
|
|
if _hb_accumulated_tokens > 0:
|
|
await record_token_usage(agent_id, _hb_accumulated_tokens)
|
|
|
|
# Update last_heartbeat_at
|
|
# Using an update statement is safer to avoid state drift if the object was updated elsewhere
|
|
await db.execute(
|
|
update(Agent)
|
|
.where(Agent.id == agent_id)
|
|
.values(last_heartbeat_at=datetime.now(timezone.utc))
|
|
)
|
|
await db.commit()
|
|
|
|
# Log activity if not empty
|
|
is_ok = "HEARTBEAT_OK" in reply.upper().replace(" ", "_") if reply else False
|
|
if not is_ok and reply:
|
|
from app.services.activity_logger import log_activity
|
|
await log_activity(
|
|
agent_id, "heartbeat",
|
|
f"Heartbeat: {reply[:80]}",
|
|
detail={"reply": reply[:500]},
|
|
)
|
|
|
|
logger.info(f"💓 Heartbeat for {agent_name}: {'OK' if is_ok else reply[:60]}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Heartbeat error for agent {agent_id}: {e}")
|
|
|
|
|
|
async def _heartbeat_tick():
|
|
"""One heartbeat tick: find agents due for heartbeat."""
|
|
from app.database import async_session
|
|
from app.models.agent import Agent
|
|
from app.services.audit_logger import write_audit_log
|
|
from app.services.timezone_utils import get_agent_timezone_sync
|
|
from app.models.tenant import Tenant
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
async with async_session() as db:
|
|
result = await db.execute(
|
|
select(Agent).where(
|
|
Agent.heartbeat_enabled == True,
|
|
Agent.status.in_(["running", "idle"]),
|
|
)
|
|
)
|
|
agents = result.scalars().all()
|
|
|
|
# Pre-load tenants for timezone resolution
|
|
tenant_ids = {a.tenant_id for a in agents if a.tenant_id}
|
|
tenants_by_id = {}
|
|
if tenant_ids:
|
|
t_result = await db.execute(select(Tenant).where(Tenant.id.in_(tenant_ids)))
|
|
tenants_by_id = {t.id: t for t in t_result.scalars().all()}
|
|
|
|
triggered = 0
|
|
for agent in agents:
|
|
# Skip expired agents
|
|
if agent.is_expired:
|
|
continue
|
|
if agent.expires_at and now >= agent.expires_at:
|
|
agent.is_expired = True
|
|
agent.heartbeat_enabled = False
|
|
agent.status = "stopped"
|
|
continue
|
|
|
|
# Resolve timezone
|
|
tenant = tenants_by_id.get(agent.tenant_id)
|
|
tz_name = get_agent_timezone_sync(agent, tenant)
|
|
|
|
# Check active hours (in agent's timezone)
|
|
if not _is_in_active_hours(agent.heartbeat_active_hours or "09:00-18:00", tz_name):
|
|
continue
|
|
|
|
# Check interval
|
|
interval = timedelta(minutes=agent.heartbeat_interval_minutes or 240)
|
|
if agent.last_heartbeat_at and (now - agent.last_heartbeat_at) < interval:
|
|
continue
|
|
|
|
# Fire heartbeat
|
|
logger.info(f"💓 Triggering heartbeat for {agent.name}")
|
|
await write_audit_log("heartbeat_fire", {"agent_name": agent.name}, agent_id=agent.id)
|
|
asyncio.create_task(_execute_heartbeat(agent.id))
|
|
triggered += 1
|
|
|
|
if triggered:
|
|
await write_audit_log("heartbeat_tick", {"eligible_agents": len(agents), "triggered": triggered})
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Heartbeat tick error: {e}")
|
|
await write_audit_log("heartbeat_error", {"error": str(e)[:300]})
|
|
|
|
|
|
async def start_heartbeat():
|
|
"""Start the background heartbeat loop. Call from FastAPI startup."""
|
|
logger.info("💓 Agent heartbeat service started (60s tick)")
|
|
while True:
|
|
await _heartbeat_tick()
|
|
await asyncio.sleep(60)
|