Clawith/backend/app/services/heartbeat.py

485 lines
21 KiB
Python

"""Heartbeat service — proactive agent awareness loop.
Periodically triggers agents to check their environment (tasks, plaza,
etc.) and take autonomous actions. Inspired by OpenClaw's heartbeat
mechanism.
Runs as a background task inside the FastAPI process.
"""
import asyncio
import json
import uuid
from datetime import datetime, timezone, timedelta
from loguru import logger
from sqlalchemy import select, update
# Default heartbeat instruction used when HEARTBEAT.md doesn't exist
DEFAULT_HEARTBEAT_INSTRUCTION = """[Heartbeat Check]
This is your periodic heartbeat — a moment to be aware, explore, and contribute.
## Phase 1: Review Context & Discover Interest Points
First, review your **recent conversations** (provided below if available) and your **role/responsibilities**.
Identify topics or questions that:
- Are directly relevant to your role and current work
- Were mentioned by users but not fully explored at the time
- Represent emerging trends or changes in your professional domain
- Could improve your ability to serve your users
If no genuine, informative topics emerge from recent context, **skip exploration** and go directly to Phase 3.
Do NOT search for generic or obvious topics just to fill time. Quality over quantity.
## Phase 2: Targeted Exploration (Conditional)
Only if you identified genuine interest points in Phase 1:
1. Use `web_search` to investigate (maximum 5 searches per heartbeat)
2. Keep searches **tightly scoped** to your role and recent work topics
3. For each discovery worth keeping:
- Record it using `write_file` to `memory/curiosity_journal.md`
- Include the **source URL** and a brief note on **why it matters to your work**
- Rate its relevance (high/medium/low) to your current responsibilities
Format for curiosity_journal.md entries:
```
### [Date] - [Topic]
- **Finding**: [What you learned]
- **Source**: [URL]
- **Relevance**: [high/medium/low] — [Why it matters to your work]
- **Follow-up**: [Optional: questions this raises for next time]
```
## Phase 3: Agent Plaza
1. Call `plaza_get_new_posts` to check recent activity
2. If you found something genuinely valuable in Phase 2:
- Share the most impactful discovery to plaza (max 1 post)
- **Always include the source URL** when sharing internet findings
- Frame it in terms of how it's relevant to your team/domain
3. Comment on relevant existing posts (max 2 comments)
## Phase 4: Wrap Up
- If nothing needed attention and no exploration was warranted: reply with HEARTBEAT_OK
- Otherwise, briefly summarize what you explored and why
⚠️ KEY PRINCIPLES:
- Always ground exploration in YOUR role and YOUR recent work context
- Never search for random unrelated topics out of idle curiosity
- If you don't have a specific angle worth investigating, don't search
- Prefer depth over breadth — one thoroughly explored topic > five surface-level queries
- Generate follow-up questions only when you genuinely want to know more
⚠️ PRIVACY RULES — STRICTLY FOLLOW:
- NEVER share information from private user conversations
- NEVER share content from memory/memory.md
- NEVER share content from workspace/ files
- NEVER share task details from tasks.json
- You may ONLY share: general work insights, public information, opinions on plaza posts
- If unsure whether something is private, do NOT share it
⚠️ POSTING LIMITS per heartbeat:
- Maximum 1 new post
- Maximum 2 comments on existing posts
- Do NOT post trivial or repetitive content
"""
def _is_in_active_hours(active_hours: str, tz_name: str = "UTC") -> bool:
"""Check if current time is within the agent's active hours.
Format: "HH:MM-HH:MM" (e.g., "09:00-18:00")
Uses agent's configured timezone (defaults to UTC).
"""
try:
from zoneinfo import ZoneInfo
start_str, end_str = active_hours.split("-")
sh, sm = map(int, start_str.strip().split(":"))
eh, em = map(int, end_str.strip().split(":"))
try:
tz = ZoneInfo(tz_name)
except (KeyError, Exception):
tz = ZoneInfo("UTC")
now = datetime.now(tz)
current_minutes = now.hour * 60 + now.minute
start_minutes = sh * 60 + sm
end_minutes = eh * 60 + em
if start_minutes <= end_minutes:
return start_minutes <= current_minutes < end_minutes
else:
# Overnight range (e.g., "22:00-06:00")
return current_minutes >= start_minutes or current_minutes < end_minutes
except Exception:
return True # Default to active if parsing fails
async def _execute_heartbeat(agent_id: uuid.UUID):
"""Execute a single heartbeat for an agent.
Uses three short DB transactions to avoid holding connections
during long-running LLM calls:
Phase 1: Read agent, model, context, notifications → commit
Phase 2: LLM tool loop (no DB connection held)
Phase 3: Write token usage + last_heartbeat_at → commit
"""
try:
from app.database import async_session
from app.models.agent import Agent
from app.models.llm import LLMModel
# ── Phase 1: Read all context from DB (short transaction) ──
agent_name = ""
agent_role = ""
agent_creator_id = None
model_provider = ""
model_api_key = ""
model_model = ""
model_base_url = None
model_temperature = None
model_max_output_tokens = None
heartbeat_instruction = DEFAULT_HEARTBEAT_INSTRUCTION
async with async_session() as db:
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar_one_or_none()
if not agent:
return
model_id = agent.primary_model_id or agent.fallback_model_id
if not model_id:
return
model_result = await db.execute(select(LLMModel).where(LLMModel.id == model_id))
model = model_result.scalar_one_or_none()
if not model:
return
# Cache values we need for Phase 2 (after DB session closes)
agent_name = agent.name
agent_role = agent.role_description or ""
agent_creator_id = agent.creator_id
model_provider = model.provider
model_api_key = model.api_key_encrypted
model_model = model.model
model_base_url = model.base_url
model_temperature = model.temperature
model_max_output_tokens = getattr(model, 'max_output_tokens', None)
model_request_timeout = getattr(model, 'request_timeout', None)
# Read HEARTBEAT.md if it exists, otherwise use default
from pathlib import Path
from app.config import get_settings
settings = get_settings()
ws_root = Path(settings.AGENT_DATA_DIR) / str(agent_id)
hb_file = ws_root / "HEARTBEAT.md"
if hb_file.exists():
try:
custom = hb_file.read_text(encoding="utf-8", errors="replace").strip()
if custom:
# Prepend privacy rules to custom heartbeat
heartbeat_instruction = custom + """
⚠️ PRIVACY RULES — STRICTLY FOLLOW:
- NEVER share information from private user conversations
- NEVER share content from memory/memory.md
- NEVER share content from workspace/ files
- NEVER share task details from tasks.json
- You may ONLY share: general work insights, public information, opinions on plaza posts
⚠️ POSTING LIMITS per heartbeat:
- Maximum 1 new post
- Maximum 2 comments on existing posts
- Do NOT post trivial or repetitive content
"""
except Exception:
pass
# Build context
from app.services.agent_context import build_agent_context
static_prompt, dynamic_prompt = await build_agent_context(agent_id, agent_name, agent_role)
# Fetch recent activity to give heartbeat context for curiosity exploration
from app.models.activity_log import AgentActivityLog
recent_context = ""
try:
recent_result = await db.execute(
select(AgentActivityLog)
.where(AgentActivityLog.agent_id == agent_id)
.where(AgentActivityLog.action_type.in_(["chat_reply", "tool_call", "task_created", "task_updated"]))
.order_by(AgentActivityLog.created_at.desc())
.limit(50)
)
recent_activities = recent_result.scalars().all()
if recent_activities:
itms = []
for act in reversed(recent_activities): # chronological order
ts = act.created_at.strftime("%m-%d %H:%M") if act.created_at else ""
itms.append(f"- [{ts}] {act.action_type}: {act.summary[:120]}")
recent_context = "\\n\\n---\\n## Recent Activity Context\\nHere are your recent interactions and work to help you identify relevant topics:\\n\\n" + "\\n".join(itms)
except Exception as e:
logger.warning(f"Failed to fetch recent activity for heartbeat context: {e}")
# Fetch unread notifications for this agent (plaza replies, mentions, broadcasts)
inbox_context = ""
notif_lines = []
try:
from app.models.notification import Notification
notif_result = await db.execute(
select(Notification).where(
Notification.agent_id == agent_id,
Notification.is_read == False,
).order_by(Notification.created_at).limit(10)
)
unread = notif_result.scalars().all()
if unread:
notif_lines = ["\\n\\n---\\n## Inbox (new messages for you — please review and respond if appropriate)"]
for n in unread:
sender = f"from {n.sender_name}" if n.sender_name else ""
notif_lines.append(f"- [{n.type}] {n.title} {sender}: {(n.body or '')[:150]}")
n.is_read = True
except Exception as e:
logger.warning(f"Failed to drain agent notifications: {e}")
inbox_context = "\\n".join(notif_lines)
# Commit Phase 1: release the DB connection before LLM calls
await db.commit()
# DB session is now closed — connection returned to pool
# ── Phase 2: LLM calls (no DB connection held) ──
full_instruction = heartbeat_instruction + recent_context + inbox_context
# Call LLM with tools using unified client
from app.services.llm_utils import create_llm_client, get_max_tokens, LLMMessage, LLMError
from app.services.agent_tools import execute_tool, get_agent_tools_for_llm
try:
client = create_llm_client(
provider=model_provider,
api_key=model_api_key,
model=model_model,
base_url=model_base_url,
timeout=float(model_request_timeout or 120.0),
)
except Exception as e:
logger.error(f"Failed to create LLM client: {e}")
return
tools_for_llm = await get_agent_tools_for_llm(agent_id)
reply = ""
plaza_posts_made = 0 # hard limit: 1 new post per heartbeat
plaza_comments_made = 0 # hard limit: 2 comments per heartbeat
_hb_accumulated_tokens = 0
# Token tracking helpers
from app.services.token_tracker import record_token_usage, extract_usage_tokens, estimate_tokens_from_chars
# Convert messages to LLMMessage format
llm_messages = [
LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt),
LLMMessage(role="user", content=full_instruction)
]
for round_i in range(20): # More rounds for search + write + plaza
try:
response = await client.complete(
messages=llm_messages,
tools=tools_for_llm,
temperature=model_temperature,
max_tokens=get_max_tokens(model_provider, model_model, model_max_output_tokens),
)
except LLMError as e:
logger.error(f"LLM error in heartbeat: {e}")
reply = ""
break
except Exception as e:
logger.error(f"LLM call error in heartbeat: {e}")
reply = ""
break
# Track tokens for this round
real_tokens = extract_usage_tokens(response.usage)
if real_tokens:
_hb_accumulated_tokens += real_tokens
else:
round_chars = sum(len(m.content or '') for m in llm_messages) + len(response.content or '')
_hb_accumulated_tokens += estimate_tokens_from_chars(round_chars)
if response.tool_calls:
# Add assistant message with tool calls
llm_messages.append(LLMMessage(
role="assistant",
content=response.content or None,
tool_calls=[{
"id": tc["id"],
"type": "function",
"function": tc["function"],
} for tc in response.tool_calls],
reasoning_content=response.reasoning_content,
))
# Tools that require arguments — if LLM sends empty args, skip and ask to retry
# (aligned with call_llm in websocket.py)
_TOOLS_REQUIRING_ARGS = {
"write_file", "read_file", "delete_file", "read_document",
"send_message_to_agent", "send_feishu_message", "send_email",
"web_search", "jina_search", "jina_read",
}
for tc in response.tool_calls:
fn = tc["function"]
tool_name = fn["name"]
raw_args = fn.get("arguments", "{}")
logger.info(f"[Heartbeat] Raw arguments for {tool_name} (len={len(raw_args) if raw_args else 0}): {repr(raw_args[:300]) if raw_args else 'None'}")
try:
args = json.loads(raw_args) if raw_args else {}
except json.JSONDecodeError as je:
logger.warning(f"[Heartbeat] JSON parse failed for {tool_name}: {je}. Raw: {repr(raw_args[:200])}")
args = {}
# Guard: if a tool that requires arguments received empty args,
# return an error to LLM instead of executing
if not args and tool_name in _TOOLS_REQUIRING_ARGS:
logger.warning(f"[Heartbeat] Empty arguments for {tool_name}, asking LLM to retry")
llm_messages.append(LLMMessage(
role="tool",
tool_call_id=tc["id"],
content=f"Error: {tool_name} was called with empty arguments. You must provide the required parameters. Please retry with the correct arguments.",
))
continue
# ── Hard rate limits for plaza actions ──
if tool_name == "plaza_create_post":
if plaza_posts_made >= 1:
tool_result = "[BLOCKED] You have already made 1 plaza post this heartbeat. Do not post again."
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
plaza_posts_made += 1
elif tool_name == "plaza_add_comment":
if plaza_comments_made >= 2:
tool_result = "[BLOCKED] You have already made 2 comments this heartbeat. Do not comment again."
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
plaza_comments_made += 1
else:
tool_result = await execute_tool(tool_name, args, agent_id, agent_creator_id)
llm_messages.append(LLMMessage(
role="tool",
tool_call_id=tc["id"],
content=str(tool_result),
))
else:
reply = response.content or ""
break
await client.close()
# ── Phase 3: Write results back to DB (short transaction) ──
async with async_session() as db:
# Record accumulated heartbeat token usage
if _hb_accumulated_tokens > 0:
await record_token_usage(agent_id, _hb_accumulated_tokens)
# Update last_heartbeat_at
# Using an update statement is safer to avoid state drift if the object was updated elsewhere
await db.execute(
update(Agent)
.where(Agent.id == agent_id)
.values(last_heartbeat_at=datetime.now(timezone.utc))
)
await db.commit()
# Log activity if not empty
is_ok = "HEARTBEAT_OK" in reply.upper().replace(" ", "_") if reply else False
if not is_ok and reply:
from app.services.activity_logger import log_activity
await log_activity(
agent_id, "heartbeat",
f"Heartbeat: {reply[:80]}",
detail={"reply": reply[:500]},
)
logger.info(f"💓 Heartbeat for {agent_name}: {'OK' if is_ok else reply[:60]}")
except Exception as e:
logger.exception(f"Heartbeat error for agent {agent_id}: {e}")
async def _heartbeat_tick():
"""One heartbeat tick: find agents due for heartbeat."""
from app.database import async_session
from app.models.agent import Agent
from app.services.audit_logger import write_audit_log
from app.services.timezone_utils import get_agent_timezone_sync
from app.models.tenant import Tenant
now = datetime.now(timezone.utc)
try:
async with async_session() as db:
result = await db.execute(
select(Agent).where(
Agent.heartbeat_enabled == True,
Agent.status.in_(["running", "idle"]),
)
)
agents = result.scalars().all()
# Pre-load tenants for timezone resolution
tenant_ids = {a.tenant_id for a in agents if a.tenant_id}
tenants_by_id = {}
if tenant_ids:
t_result = await db.execute(select(Tenant).where(Tenant.id.in_(tenant_ids)))
tenants_by_id = {t.id: t for t in t_result.scalars().all()}
triggered = 0
for agent in agents:
# Skip expired agents
if agent.is_expired:
continue
if agent.expires_at and now >= agent.expires_at:
agent.is_expired = True
agent.heartbeat_enabled = False
agent.status = "stopped"
continue
# Resolve timezone
tenant = tenants_by_id.get(agent.tenant_id)
tz_name = get_agent_timezone_sync(agent, tenant)
# Check active hours (in agent's timezone)
if not _is_in_active_hours(agent.heartbeat_active_hours or "09:00-18:00", tz_name):
continue
# Check interval
interval = timedelta(minutes=agent.heartbeat_interval_minutes or 240)
if agent.last_heartbeat_at and (now - agent.last_heartbeat_at) < interval:
continue
# Fire heartbeat
logger.info(f"💓 Triggering heartbeat for {agent.name}")
await write_audit_log("heartbeat_fire", {"agent_name": agent.name}, agent_id=agent.id)
asyncio.create_task(_execute_heartbeat(agent.id))
triggered += 1
if triggered:
await write_audit_log("heartbeat_tick", {"eligible_agents": len(agents), "triggered": triggered})
except Exception as e:
logger.exception(f"Heartbeat tick error: {e}")
await write_audit_log("heartbeat_error", {"error": str(e)[:300]})
async def start_heartbeat():
"""Start the background heartbeat loop. Call from FastAPI startup."""
logger.info("💓 Agent heartbeat service started (60s tick)")
while True:
await _heartbeat_tick()
await asyncio.sleep(60)