Clawith/backend/app/services/trigger_daemon.py

844 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Trigger Daemon — evaluates all agent triggers in a single background loop.
Replaces the separate heartbeat, scheduler, and supervision reminder services
with a unified trigger evaluation engine. Runs as an asyncio background task.
Every 15 seconds:
1. Load all enabled triggers from DB
2. Evaluate each trigger (cron/once/interval/poll/on_message/webhook)
3. Group fired triggers by agent_id (30s dedup window)
4. Invoke each agent once with all its fired triggers as context
"""
import asyncio
import ipaddress
import json as _json
import uuid
from datetime import datetime, timezone, timedelta
from urllib.parse import urlparse
from croniter import croniter
from loguru import logger
from sqlalchemy import select
from app.database import async_session
from app.models.trigger import AgentTrigger
from app.models.agent import Agent
TICK_INTERVAL = 15 # seconds
DEDUP_WINDOW = 30 # seconds — same agent won't be invoked twice within this window
MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping
MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse
_last_invoke: dict[uuid.UUID, datetime] = {}
_A2A_WAKE_CHAIN: dict[str, int] = {}
_A2A_WAKE_CHAIN_TTL = 300
_A2A_MAX_WAKE_DEPTH = 3
def _cleanup_stale_invoke_cache():
now = datetime.now(timezone.utc)
stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2]
for k in stale:
del _last_invoke[k]
# Webhook rate limiter: token -> list of timestamps
_webhook_hits: dict[str, list[float]] = {}
WEBHOOK_RATE_LIMIT = 5 # max hits per minute per token
# ── SSRF Protection ─────────────────────────────────────────────────
def _is_private_url(url: str) -> bool:
"""Block private/internal URLs to prevent SSRF attacks."""
try:
parsed = urlparse(url)
hostname = parsed.hostname
if not hostname:
return True
# Block obvious private hostnames
if hostname in ("localhost", "127.0.0.1", "::1", "0.0.0.0"):
return True
# Try to resolve hostname and check IP
import socket
try:
infos = socket.getaddrinfo(hostname, None)
for info in infos:
ip = ipaddress.ip_address(info[4][0])
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
return True
except (socket.gaierror, ValueError):
return True # Cannot resolve = block
return False
except Exception:
return True # Block on any parsing error
# ── Trigger Evaluation ──────────────────────────────────────────────
async def _evaluate_trigger(trigger: AgentTrigger, now: datetime) -> bool:
"""Return True if this trigger should fire right now."""
if not trigger.is_enabled:
return False
if trigger.expires_at and now >= trigger.expires_at:
# Auto-disable expired triggers
return False
if trigger.max_fires is not None and trigger.fire_count >= trigger.max_fires:
return False
# Cooldown check
if trigger.last_fired_at:
cooldown = timedelta(seconds=trigger.cooldown_seconds)
if (now - trigger.last_fired_at) < cooldown:
return False
cfg = trigger.config or {}
t = trigger.type
if t == "cron":
expr = cfg.get("expr", "* * * * *")
base = trigger.last_fired_at or trigger.created_at
try:
# Resolve timezone: trigger config → agent → tenant → UTC
tz_name = cfg.get("timezone")
if not tz_name:
from app.services.timezone_utils import get_agent_timezone
tz_name = await get_agent_timezone(trigger.agent_id)
from zoneinfo import ZoneInfo
try:
tz = ZoneInfo(tz_name)
except (KeyError, Exception):
tz = ZoneInfo("UTC")
# Evaluate cron in agent's timezone
local_now = now.astimezone(tz)
local_base = base.astimezone(tz) if base.tzinfo else base.replace(tzinfo=tz)
cron = croniter(expr, local_base)
next_run = cron.get_next(datetime)
return local_now >= next_run
except Exception as e:
logger.warning(f"Invalid cron expr '{expr}' for trigger {trigger.name}: {e}")
return False
elif t == "once":
at_str = cfg.get("at")
if not at_str:
return False
try:
at = datetime.fromisoformat(at_str)
if at.tzinfo is None:
at = at.replace(tzinfo=timezone.utc)
return now >= at and trigger.fire_count == 0
except Exception:
return False
elif t == "interval":
minutes = cfg.get("minutes", 30)
base = trigger.last_fired_at or trigger.created_at
return (now - base) >= timedelta(minutes=minutes)
elif t == "poll":
interval_min = max(cfg.get("interval_min", 5), MIN_POLL_INTERVAL_MINUTES)
base = trigger.last_fired_at or trigger.created_at
if (now - base) < timedelta(minutes=interval_min):
return False
# Actual HTTP poll + change detection
return await _poll_check(trigger)
elif t == "on_message":
return await _check_new_agent_messages(trigger)
elif t == "webhook":
# Check if a webhook payload is pending
if cfg.get("_webhook_pending"):
return True
return False
return False
async def _poll_check(trigger: AgentTrigger) -> bool:
"""HTTP poll: fetch URL, extract value via json_path, detect change.
Persists _last_value into the trigger's config JSONB so it survives
across process restarts.
"""
import httpx
cfg = trigger.config or {}
url = cfg.get("url")
if not url:
return False
# SSRF protection: block private/internal URLs
if _is_private_url(url):
logger.warning(f"Poll blocked for trigger {trigger.name}: private/internal URL '{url}'")
return False
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.request(cfg.get("method", "GET"), url, headers=cfg.get("headers", {}))
resp.raise_for_status()
data = resp.json()
json_path = cfg.get("json_path", "$")
current_value = _extract_json_path(data, json_path)
current_str = str(current_value)
fire_on = cfg.get("fire_on", "change")
should_fire = False
if fire_on == "match":
should_fire = current_str == str(cfg.get("match_value", ""))
else: # "change"
last_value = cfg.get("_last_value")
# First poll — don't fire, just record baseline
if last_value is None:
should_fire = False
else:
should_fire = current_str != last_value
# Persist _last_value to DB so it survives restarts
cfg["_last_value"] = current_str
try:
from sqlalchemy import update
async with async_session() as db:
await db.execute(
update(AgentTrigger)
.where(AgentTrigger.id == trigger.id)
.values(config=cfg)
)
await db.commit()
except Exception as e:
logger.warning(f"Failed to persist poll _last_value for {trigger.name}: {e}")
return should_fire
except Exception as e:
logger.warning(f"Poll failed for trigger {trigger.name}: {e}")
return False
def _extract_json_path(data, path: str):
"""Simple JSONPath extraction: $.key.subkey → data['key']['subkey']."""
if path == "$" or not path:
return data
parts = path.lstrip("$.").split(".")
current = data
for part in parts:
if isinstance(current, dict):
current = current.get(part)
elif isinstance(current, list) and part.isdigit():
current = current[int(part)]
else:
return None
return current
async def _check_new_agent_messages(trigger: AgentTrigger) -> bool:
"""Check if there are new messages matching this trigger.
Supports two modes:
- from_agent_name: check for agent-to-agent messages
- from_user_name: check for human user messages (Feishu/Slack/Discord)
Stores the actual message content in trigger.config['_matched_message']
so the invocation context can include it.
"""
from app.models.audit import ChatMessage
from app.models.chat_session import ChatSession
cfg = trigger.config or {}
from_agent_name = cfg.get("from_agent_name")
from_user_name = cfg.get("from_user_name")
if not from_agent_name and not from_user_name:
return False
since = trigger.last_fired_at or trigger.created_at
# Use _since_ts snapshot from trigger creation (set by _handle_set_trigger)
# This is more precise than the old 5-minute lookback which caused false positives
if trigger.fire_count == 0 and not trigger.last_fired_at:
since_ts_str = cfg.get("_since_ts")
if since_ts_str:
try:
since = datetime.fromisoformat(since_ts_str)
except Exception:
since = trigger.created_at
# No _since_ts and no last_fired_at → use trigger.created_at (no lookback)
try:
async with async_session() as db:
if from_agent_name:
# --- Agent-to-agent message check (existing logic) ---
from app.models.participant import Participant
from app.models.agent import Agent as AgentModel
safe_agent_name = from_agent_name.replace("%", "").replace("_", r"\_")
agent_r = await db.execute(
select(AgentModel).where(AgentModel.name.ilike(f"%{safe_agent_name}%"))
)
source_agent = agent_r.scalars().first()
if not source_agent:
return False
result = await db.execute(
select(Participant.id).where(
Participant.type == "agent",
Participant.ref_id == source_agent.id,
)
)
from_participant = result.scalar_one_or_none()
if not from_participant:
return False
from sqlalchemy import cast as sa_cast, String as SaString
result = await db.execute(
select(ChatMessage).join(
ChatSession, ChatMessage.conversation_id == sa_cast(ChatSession.id, SaString)
).where(
ChatMessage.participant_id == from_participant,
ChatMessage.created_at > since,
ChatMessage.role == "assistant",
).order_by(ChatMessage.created_at.desc()).limit(1)
)
msg = result.scalar_one_or_none()
if not msg:
return False
cfg["_matched_message"] = (msg.content or "")[:2000]
cfg["_matched_from"] = from_agent_name
return True
elif from_user_name:
# --- Human user message check (Feishu/Slack/Discord) ---
# Find sessions for this agent from external channels
from sqlalchemy import cast as sa_cast, String as SaString
from app.models.user import User
from app.models.agent import Agent as AgentModel
# 0. Get agent for tenant scoping
agent_r = await db.execute(select(AgentModel).where(AgentModel.id == trigger.agent_id))
agent = agent_r.scalar_one_or_none()
# Look up user by display name or username within tenant
from sqlalchemy import or_
from app.models.user import User, Identity
safe_user_name = from_user_name.replace("%", "").replace("_", r"\_")
query = (
select(User)
.join(User.identity)
.where(
or_(
User.display_name.ilike(f"%{safe_user_name}%"),
Identity.username.ilike(f"%{safe_user_name}%"),
)
)
)
if agent and agent.tenant_id:
query = query.where(User.tenant_id == agent.tenant_id)
user_r = await db.execute(query)
target_user = user_r.scalars().first()
if target_user:
# Find channel sessions for this user with this agent
result = await db.execute(
select(ChatMessage).join(
ChatSession, ChatMessage.conversation_id == sa_cast(ChatSession.id, SaString)
).where(
ChatSession.agent_id == trigger.agent_id,
ChatSession.user_id == target_user.id,
ChatSession.source_channel.in_(["feishu", "slack", "discord"]),
ChatMessage.role == "user",
ChatMessage.created_at > since,
).order_by(ChatMessage.created_at.desc()).limit(1)
)
else:
# Fallback: search by message content or session title containing the name
result = await db.execute(
select(ChatMessage).join(
ChatSession, ChatMessage.conversation_id == sa_cast(ChatSession.id, SaString)
).where(
ChatSession.agent_id == trigger.agent_id,
ChatSession.source_channel.in_(["feishu", "slack", "discord"]),
ChatMessage.role == "user",
ChatMessage.created_at > since,
).order_by(ChatMessage.created_at.desc()).limit(1)
)
msg = result.scalar_one_or_none()
if not msg:
return False
cfg["_matched_message"] = (msg.content or "")[:2000]
cfg["_matched_from"] = from_user_name
return True
except Exception as e:
logger.warning(f"on_message check failed for trigger {trigger.name}: {e}")
return False
# ── Agent Invocation ────────────────────────────────────────────────
async def _invoke_agent_for_triggers(agent_id: uuid.UUID, triggers: list[AgentTrigger]):
"""Invoke an agent with context from one or more fired triggers.
Creates a Reflection Session and calls the LLM.
"""
from app.api.websocket import call_llm
from app.services.agent_context import build_agent_context
from app.models.llm import LLMModel
from app.models.audit import ChatMessage
from app.models.chat_session import ChatSession
from app.models.participant import Participant
from app.services.audit_logger import write_audit_log
try:
async with async_session() as db:
# Load agent
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar_one_or_none()
if not agent or agent.is_expired:
return
# Load LLM model
if not agent.primary_model_id:
logger.warning(f"Agent {agent.name} has no LLM model, skipping trigger invocation")
return
result = await db.execute(select(LLMModel).where(LLMModel.id == agent.primary_model_id))
model = result.scalar_one_or_none()
if not model:
return
# Skip invocation if model is disabled by admin
if not model.enabled:
logger.warning(f"Agent {agent.name}'s model {model.model} is disabled, skipping trigger invocation")
return
# Build trigger context
context_parts = []
trigger_names = []
for t in triggers:
part = f"触发器:{t.name} ({t.type})\n原因:{t.reason}"
if t.focus_ref:
part += f"\n关联 Focus{t.focus_ref}"
# Include matched message for on_message triggers
cfg = t.config or {}
if t.type == "on_message" and cfg.get("_matched_message"):
part += f"\n收到来自 {cfg.get('_matched_from', '?')} 的消息:\n\"{cfg['_matched_message'][:500]}\""
# Include webhook payload
if t.type == "webhook" and cfg.get("_webhook_payload"):
payload_str = cfg["_webhook_payload"]
if len(payload_str) > 2000:
payload_str = payload_str[:2000] + "... (truncated)"
part += f"\nWebhook Payload:\n{payload_str}"
context_parts.append(part)
trigger_names.append(t.name)
trigger_context = (
"===== 本次唤醒上下文 =====\n"
f"唤醒来源trigger{'多个触发器同时触发' if len(triggers) > 1 else '触发器触发'}\n\n"
+ "\n---\n".join(context_parts)
+ "\n==========================="
)
# Create Reflection Session
title = f"🤖 内心独白:{', '.join(trigger_names)}"
# Find agent's participant
result = await db.execute(
select(Participant).where(Participant.type == "agent", Participant.ref_id == agent_id)
)
agent_participant = result.scalar_one_or_none()
session = ChatSession(
agent_id=agent_id,
user_id=agent.creator_id,
participant_id=agent_participant.id if agent_participant else None,
source_channel="trigger",
title=title[:200],
)
db.add(session)
await db.flush()
session_id = session.id
# Messages: trigger context only (call_llm builds system prompt internally)
messages = [
{"role": "user", "content": trigger_context},
]
# Store trigger context as a message in the session
db.add(ChatMessage(
agent_id=agent_id,
conversation_id=str(session_id),
role="user",
content=trigger_context,
user_id=agent.creator_id,
participant_id=agent_participant.id if agent_participant else None,
))
await db.commit()
# Cache participant ID for callbacks
agent_participant_id = agent_participant.id if agent_participant else None
# Call LLM (outside the DB session to avoid long transactions)
collected_content = []
async def on_chunk(text):
collected_content.append(text)
# Persist tool calls into Reflection Session for Reflections visibility
async def on_tool_call(data):
try:
async with async_session() as _tc_db:
if data["status"] == "running":
_tc_db.add(ChatMessage(
agent_id=agent_id,
conversation_id=str(session_id),
role="tool_call",
content=_json.dumps({"name": data["name"], "args": data["args"]}, ensure_ascii=False, default=str),
user_id=agent.creator_id,
participant_id=agent_participant_id,
))
elif data["status"] == "done":
result_str = str(data.get("result", ""))[:2000]
_tc_db.add(ChatMessage(
agent_id=agent_id,
conversation_id=str(session_id),
role="tool_call",
content=_json.dumps({"name": data["name"], "result": result_str}, ensure_ascii=False, default=str),
user_id=agent.creator_id,
participant_id=agent_participant_id,
))
await _tc_db.commit()
except Exception as e:
logger.warning(f"Failed to persist tool call for trigger session: {e}")
_is_a2a_wake = all(t.name == "a2a_wake" for t in triggers)
reply = await call_llm(
model=model,
messages=messages,
agent_name=agent.name,
role_description=agent.role_description or "",
agent_id=agent_id,
user_id=agent.creator_id,
on_chunk=on_chunk,
on_tool_call=on_tool_call,
max_tool_rounds_override=2 if _is_a2a_wake else None,
)
# Save assistant reply to Reflection session
async with async_session() as db:
result = await db.execute(
select(Participant).where(Participant.type == "agent", Participant.ref_id == agent_id)
)
agent_participant = result.scalar_one_or_none()
db.add(ChatMessage(
agent_id=agent_id,
conversation_id=str(session_id),
role="assistant",
content=reply or "".join(collected_content),
user_id=agent.creator_id,
participant_id=agent_participant.id if agent_participant else None,
))
# NOTE: trigger state (last_fired_at, fire_count, auto-disable)
# is already updated in _tick() BEFORE this task was launched,
# to prevent race-condition duplicate fires.
await db.commit()
# Push trigger result to user's active WebSocket connections
final_reply = reply or "".join(collected_content)
is_a2a_internal = all(t.name == "a2a_wake" for t in triggers)
if final_reply and not is_a2a_internal:
try:
from app.api.websocket import manager as ws_manager
agent_id_str = str(agent_id)
# Build notification message with trigger badge
trigger_reasons = []
for t in triggers:
ns = (t.config or {}).get("_notification_summary", "").strip()
if ns:
trigger_reasons.append(ns)
else:
r = (t.reason or "").strip()
if r and len(r) <= 80:
trigger_reasons.append(r)
elif r:
trigger_reasons.append(r[:77] + "...")
summary = trigger_reasons[0] if trigger_reasons else "有新的事件需要处理"
_is_a2a_wait = any(t.name.startswith("a2a_wait_") for t in triggers)
if _is_a2a_wait:
import re as _re
cleaned = final_reply
_internal_patterns = [
r'\b(a2a_wait_\w+|a2a_wake)\b',
r'\bwait_?\w+_?(task|reply|followup|meeting|sync|api_key)\w*\b',
r'\bresolve_\w+\b',
r'\bfocus[_ ]?item\b',
r'\btask_delegate\b',
r'\bfocus_ref\b',
r'\s*(a2a\w+|wait\w+|触发器\w*|focus\w*).*(?:已取消|已为|保持|活跃|完成状态)[^\n]*',
r'[\-•]\s*(?:触发器|trigger|focus|wait_\w+|a2a\w+).*[^\n]*',
r'(?:触发器|trigger)\s+\S+\s*(?:已取消|保持活跃|已为完成状态|fired)',
r'已静默清理触发器',
r'已静默处理完毕',
r'继续待命[。,]?\s*',
r'?\s*(?:继续)?待命。',
]
for _pat in _internal_patterns:
cleaned = _re.sub(_pat, '', cleaned, flags=_re.IGNORECASE)
cleaned = _re.sub(r'\n{3,}', '\n\n', cleaned).strip()
cleaned = _re.sub(r'[。,]\s*$', '', cleaned).strip()
if not cleaned:
cleaned = final_reply
else:
cleaned = final_reply
notification = f"{summary}\n\n{cleaned}"
# Save to user's active chat session(s) for persistence
async with async_session() as db:
from app.models.chat_session import ChatSession
from sqlalchemy import func
# Prefer the session the user currently has open (via WS)
active_session_ids = ws_manager.get_active_session_ids(agent_id_str)
target_session_ids = []
if active_session_ids:
target_session_ids = active_session_ids
logger.info(f"[Trigger] Saving notification to {len(active_session_ids)} active session(s)")
else:
# Fallback: most recent web session for this agent
_sr = await db.execute(
select(ChatSession.id)
.where(
ChatSession.agent_id == agent_id,
ChatSession.user_id == agent.creator_id,
ChatSession.source_channel.notin_(["trigger"]),
)
.order_by(
func.coalesce(ChatSession.last_message_at, ChatSession.created_at).desc()
)
.limit(1)
)
row = _sr.scalar_one_or_none()
if row:
target_session_ids = [str(row)]
logger.info(f"[Trigger] No active WS, saving to most recent session {row}")
else:
logger.warning(f"[Trigger] No web session found for agent {agent.name}")
for sid in target_session_ids:
db.add(ChatMessage(
agent_id=agent_id,
conversation_id=sid,
role="assistant",
content=notification,
user_id=agent.creator_id,
))
if target_session_ids:
await db.commit()
# Push to all active WebSocket connections for this agent
if agent_id_str in ws_manager.active_connections:
for ws, _sid in list(ws_manager.active_connections[agent_id_str]):
try:
await ws.send_json({
"type": "trigger_notification",
"content": notification,
"triggers": [t.name for t in triggers],
})
except Exception:
pass # Connection may have closed
except Exception as e:
logger.error(f"Failed to push trigger result to WebSocket: {e}")
import traceback
traceback.print_exc()
# Audit log
await write_audit_log("trigger_fired", {
"agent_name": agent.name,
"triggers": [{"name": t.name, "type": t.type} for t in triggers],
}, agent_id=agent_id)
logger.info(f"⚡ Triggers fired for {agent.name}: {[t.name for t in triggers]}")
except Exception as e:
logger.error(f"Failed to invoke agent {agent_id} for triggers: {e}")
import traceback
traceback.print_exc()
# ── Main Tick Loop ──────────────────────────────────────────────────
async def _tick():
"""One daemon tick: evaluate all triggers, group by agent, invoke."""
now = datetime.now(timezone.utc)
async with async_session() as db:
result = await db.execute(
select(AgentTrigger).where(AgentTrigger.is_enabled == True)
)
all_triggers = result.scalars().all()
if not all_triggers:
return
# Evaluate and group fired triggers by agent
fired_by_agent: dict[uuid.UUID, list[AgentTrigger]] = {}
for trigger in all_triggers:
# Auto-disable expired triggers
if trigger.expires_at and now >= trigger.expires_at:
async with async_session() as db:
result = await db.execute(select(AgentTrigger).where(AgentTrigger.id == trigger.id))
t = result.scalar_one_or_none()
if t:
t.is_enabled = False
await db.commit()
continue
try:
if await _evaluate_trigger(trigger, now):
fired_by_agent.setdefault(trigger.agent_id, []).append(trigger)
except Exception as e:
logger.warning(f"Error evaluating trigger {trigger.name}: {e}")
# Invoke each agent (with dedup window)
for agent_id, agent_triggers in fired_by_agent.items():
last = _last_invoke.get(agent_id)
if last and (now - last).total_seconds() < DEDUP_WINDOW:
continue # Skip — invoked too recently
_last_invoke[agent_id] = now
# ── Immediately update trigger state BEFORE launching async task ──
# This prevents the next tick from re-evaluating the same trigger as
# "should fire" while the LLM call is still running (which can take
# minutes). Without this, the 15s tick interval + 30s dedup window
# would cause repeated invocations for long-running triggers.
try:
async with async_session() as db:
for t in agent_triggers:
result = await db.execute(
select(AgentTrigger).where(AgentTrigger.id == t.id)
)
trigger = result.scalar_one_or_none()
if trigger:
trigger.last_fired_at = now
trigger.fire_count += 1
# Auto-disable single-shot types only
if trigger.type == "once":
trigger.is_enabled = False
if trigger.type == "webhook" and trigger.config:
trigger.config = {
**trigger.config,
"_webhook_pending": False,
"_webhook_payload": None,
}
if trigger.max_fires and trigger.fire_count >= trigger.max_fires:
trigger.is_enabled = False
await db.commit()
except Exception as e:
logger.warning(f"Failed to pre-update trigger state: {e}")
asyncio.create_task(_invoke_agent_for_triggers(agent_id, agent_triggers))
async def wake_agent_with_context(agent_id: uuid.UUID, message_context: str, *, from_agent_id: uuid.UUID | None = None, skip_dedup: bool = False) -> None:
"""Public API: wake an agent asynchronously with a message context.
Creates a synthetic trigger invocation so the agent processes the
message in a Reflection Session via the standard trigger path.
Safe to call from any async context.
Args:
agent_id: The agent to wake.
message_context: The message to deliver.
from_agent_id: The agent that initiated this wake (for chain depth tracking).
skip_dedup: If True, bypass the dedup window check. Use this for
genuine message deliveries (e.g. a task_delegate callback)
where skipping the wake would lose a real message.
"""
import time as _time
now = datetime.now(timezone.utc)
if from_agent_id:
chain_key = f"{from_agent_id}->{agent_id}"
current_depth = _A2A_WAKE_CHAIN.get(chain_key, 0)
if current_depth >= _A2A_MAX_WAKE_DEPTH:
logger.warning(
f"[A2A] Wake chain depth {current_depth} reached for {chain_key}, "
f"stopping to prevent wake storm"
)
return
_A2A_WAKE_CHAIN[chain_key] = current_depth + 1
def _decay_chain():
_A2A_WAKE_CHAIN.pop(chain_key, None)
asyncio.get_running_loop().call_later(_A2A_WAKE_CHAIN_TTL, _decay_chain)
if not skip_dedup and agent_id in _last_invoke:
elapsed = (now - _last_invoke[agent_id]).total_seconds()
if elapsed < DEDUP_WINDOW:
logger.info(
f"[A2A] Skipping wake for agent {agent_id}"
f"invoked {elapsed:.0f}s ago (dedup window {DEDUP_WINDOW}s)"
)
return
_last_invoke[agent_id] = now
dummy_trigger = AgentTrigger(
id=uuid.uuid4(),
agent_id=agent_id,
name="a2a_wake",
type="on_message",
config={"from_agent_name": "", "_matched_message": message_context[:2000], "_matched_from": "agent"},
reason=(
"You received a notification from another agent. "
"Read the message content above, update your focus and memory if needed, "
"and take any action you deem necessary. "
"Do NOT reply back to the sender unless you have a genuine question — "
"this was a notification, not a request for response."
),
is_enabled=True,
last_fired_at=now,
fire_count=0,
)
asyncio.create_task(_invoke_agent_for_triggers(agent_id, [dummy_trigger]))
async def start_trigger_daemon():
"""Start the background trigger daemon loop. Called from FastAPI startup."""
logger.info("⚡ Trigger Daemon started (15s tick, heartbeat every ~60s)")
_heartbeat_counter = 0
while True:
try:
await _tick()
except Exception as e:
logger.error(f"Trigger Daemon error: {e}")
import traceback
traceback.print_exc()
# Run heartbeat check every 4th tick (~60 seconds)
_heartbeat_counter += 1
if _heartbeat_counter >= 4:
_heartbeat_counter = 0
_cleanup_stale_invoke_cache()
try:
from app.services.heartbeat import _heartbeat_tick
await _heartbeat_tick()
except Exception as e:
logger.error(f"Heartbeat tick error: {e}")
await asyncio.sleep(TICK_INTERVAL)