Clawith/backend/app/api/gateway.py

744 lines
29 KiB
Python

"""Gateway API for OpenClaw agent communication.
OpenClaw agents authenticate via X-Api-Key header and use these endpoints
to poll for messages, report results, send messages, and send heartbeat pings.
"""
import asyncio
import hashlib
import secrets
import uuid
from datetime import datetime, timezone
from fastapi import APIRouter, Header, HTTPException, Depends, BackgroundTasks
from loguru import logger
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db, async_session
from app.models.agent import Agent
from app.models.gateway_message import GatewayMessage
from app.models.user import User
from app.schemas.schemas import (
GatewayPollResponse, GatewayMessageOut, GatewayReportRequest,
GatewayHistoryItem, GatewayRelationshipItem, GatewaySendMessageRequest,
)
router = APIRouter(prefix="/gateway", tags=["gateway"])
def _hash_key(key: str) -> str:
"""Hash an API key for storage."""
return hashlib.sha256(key.encode()).hexdigest()
async def _get_agent_by_key(api_key: str, db: AsyncSession) -> Agent:
"""Authenticate an OpenClaw agent by its API key."""
# First try plaintext (new behavior)
result = await db.execute(
select(Agent).where(
Agent.api_key_hash == api_key,
Agent.agent_type == "openclaw",
)
)
agent = result.scalar_one_or_none()
# Fallback to hashed (legacy behavior)
if not agent:
key_hash = _hash_key(api_key)
result = await db.execute(
select(Agent).where(
Agent.api_key_hash == key_hash,
Agent.agent_type == "openclaw",
)
)
agent = result.scalar_one_or_none()
if not agent:
raise HTTPException(status_code=401, detail="Invalid API key")
return agent
# ─── Generate / Regenerate API Key ──────────────────────
@router.post("/generate-key/{agent_id}")
async def generate_api_key(
agent_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
# JWT auth for this endpoint (requires the agent creator)
current_user: "User" = Depends(None), # placeholder, will use real dependency
):
"""Generate or regenerate an API key for an OpenClaw agent.
Called from the frontend by the agent creator.
"""
from app.api.agents import get_current_user
raise HTTPException(status_code=501, detail="Use the /agents/{id}/api-key endpoint instead")
@router.post("/agents/{agent_id}/api-key")
async def generate_agent_api_key(agent_id: uuid.UUID, db: AsyncSession = Depends(get_db)):
"""Generate or regenerate API key for an OpenClaw agent.
This is an internal endpoint called by the agents API.
"""
result = await db.execute(select(Agent).where(Agent.id == agent_id, Agent.agent_type == "openclaw"))
agent = result.scalar_one_or_none()
if not agent:
raise HTTPException(status_code=404, detail="OpenClaw agent not found")
# Generate a new key
raw_key = f"oc-{secrets.token_urlsafe(32)}"
agent.api_key_hash = _hash_key(raw_key)
await db.commit()
return {"api_key": raw_key, "message": "Save this key — it won't be shown again."}
# ─── Poll for messages ──────────────────────────────────
@router.get("/poll", response_model=GatewayPollResponse)
async def poll_messages(
x_api_key: str = Header(..., alias="X-Api-Key"),
db: AsyncSession = Depends(get_db),
):
"""OpenClaw agent polls for pending messages.
Returns all pending messages and marks them as delivered.
Also updates openclaw_last_seen for online status tracking.
"""
logger.info(f"[Gateway] poll called, key_prefix={x_api_key[:8]}...")
agent = await _get_agent_by_key(x_api_key, db)
# Update last seen
agent.openclaw_last_seen = datetime.now(timezone.utc)
agent.status = "running"
# Fetch pending messages
result = await db.execute(
select(GatewayMessage)
.where(GatewayMessage.agent_id == agent.id, GatewayMessage.status == "pending")
.order_by(GatewayMessage.created_at.asc())
)
messages = result.scalars().all()
# Mark as delivered
now = datetime.now(timezone.utc)
out = []
for msg in messages:
msg.status = "delivered"
msg.delivered_at = now
# Resolve sender names
sender_agent_name = None
sender_user_name = None
if msg.sender_agent_id:
r = await db.execute(select(Agent.name).where(Agent.id == msg.sender_agent_id))
sender_agent_name = r.scalar_one_or_none()
if msg.sender_user_id:
r = await db.execute(select(User.display_name).where(User.id == msg.sender_user_id))
sender_user_name = r.scalar_one_or_none()
# Fetch conversation history (last 10 messages) for context
history = []
if msg.conversation_id:
from app.models.audit import ChatMessage
hist_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.conversation_id == msg.conversation_id)
.order_by(ChatMessage.created_at.desc())
.limit(10)
)
hist_msgs = list(reversed(hist_result.scalars().all()))
for h in hist_msgs:
# Resolve sender name for each history message
h_sender = None
if h.role == "user" and h.user_id:
r = await db.execute(select(User.display_name).where(User.id == h.user_id))
h_sender = r.scalar_one_or_none()
elif h.role == "assistant":
h_sender = agent.name
history.append(GatewayHistoryItem(
role=h.role,
content=h.content or "",
sender_name=h_sender,
created_at=h.created_at,
))
out.append(GatewayMessageOut(
id=msg.id,
conversation_id=msg.conversation_id,
sender_agent_name=sender_agent_name,
sender_user_name=sender_user_name,
sender_user_id=str(msg.sender_user_id) if msg.sender_user_id else None,
content=msg.content,
created_at=msg.created_at,
history=history,
))
# Fetch agent relationships for context
from app.models.org import AgentRelationship, AgentAgentRelationship
from sqlalchemy.orm import selectinload
rel_items = []
# Human relationships (with available channels)
h_result = await db.execute(
select(AgentRelationship)
.where(AgentRelationship.agent_id == agent.id)
.options(selectinload(AgentRelationship.member))
)
for r in h_result.scalars().all():
if r.member:
channels = []
if getattr(r.member, 'external_id', None) or getattr(r.member, 'open_id', None):
channels.append("feishu")
if getattr(r.member, 'email', None):
channels.append("email")
rel_items.append(GatewayRelationshipItem(
name=r.member.name,
type="human",
role=r.relation,
description=r.description or None,
channels=channels,
))
# Agent-to-agent relationships
a_result = await db.execute(
select(AgentAgentRelationship)
.where(AgentAgentRelationship.agent_id == agent.id)
.options(selectinload(AgentAgentRelationship.target_agent))
)
for r in a_result.scalars().all():
if r.target_agent:
rel_items.append(GatewayRelationshipItem(
name=r.target_agent.name,
type="agent",
role=r.relation,
description=r.description or None,
channels=["agent"],
))
await db.commit()
return GatewayPollResponse(messages=out, relationships=rel_items)
# ─── Report results ─────────────────────────────────────
@router.post("/report")
async def report_result(
body: GatewayReportRequest,
x_api_key: str = Header(None, alias="X-Api-Key"),
db: AsyncSession = Depends(get_db),
):
"""OpenClaw agent reports the result of a processed message."""
if not x_api_key:
raise HTTPException(status_code=401, detail="Missing X-Api-Key header")
logger.info(f"[Gateway] report called, key_prefix={x_api_key[:8]}..., msg_id={body.message_id}")
agent = await _get_agent_by_key(x_api_key, db)
result = await db.execute(
select(GatewayMessage).where(
GatewayMessage.id == body.message_id,
GatewayMessage.agent_id == agent.id,
)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status_code=404, detail="Message not found")
msg.status = "completed"
msg.result = body.result
msg.completed_at = datetime.now(timezone.utc)
# Update last seen
agent.openclaw_last_seen = datetime.now(timezone.utc)
# Save result as assistant chat message and push via WebSocket
# (works for both user-originated and agent-to-agent messages)
if body.result and msg.conversation_id:
from app.models.audit import ChatMessage
from app.models.participant import Participant
# Look up OpenClaw agent's participant_id
part_r = await db.execute(select(Participant).where(Participant.type == "agent", Participant.ref_id == agent.id))
participant = part_r.scalar_one_or_none()
assistant_msg = ChatMessage(
agent_id=agent.id,
user_id=msg.sender_user_id or getattr(agent, "creator_id", agent.id),
role="assistant",
content=body.result,
conversation_id=msg.conversation_id,
participant_id=participant.id if participant else None,
)
db.add(assistant_msg)
await db.commit()
# Push to WebSocket if user is connected
if body.result and msg.conversation_id and msg.sender_user_id:
try:
from app.api.websocket import manager
await manager.send_message(str(agent.id), {
"type": "done",
"role": "assistant",
"content": body.result,
})
except Exception:
pass # User may have disconnected
# If the original message was from another agent (OpenClaw-to-OpenClaw),
# write the reply back as a gateway_message for the sender agent to poll
if body.result and msg.sender_agent_id:
async with async_session() as reply_db:
conv_id = msg.conversation_id or f"gw_agent_{msg.sender_agent_id}_{agent.id}"
gw_reply = GatewayMessage(
agent_id=msg.sender_agent_id,
sender_agent_id=agent.id,
content=body.result,
status="pending",
conversation_id=conv_id,
)
reply_db.add(gw_reply)
await reply_db.commit()
logger.info(f"[Gateway] Reply routed back to sender agent {msg.sender_agent_id}")
return {"status": "ok"}
# ─── Heartbeat ──────────────────────────────────────────
@router.post("/heartbeat")
async def heartbeat(
x_api_key: str = Header(..., alias="X-Api-Key"),
db: AsyncSession = Depends(get_db),
):
"""Pure heartbeat ping — keeps the OpenClaw agent marked as online."""
agent = await _get_agent_by_key(x_api_key, db)
agent.openclaw_last_seen = datetime.now(timezone.utc)
agent.status = "running"
await db.commit()
return {"status": "ok", "agent_id": str(agent.id)}
# ─── Send message ───────────────────────────────────────
# Track background tasks to prevent garbage collection
_background_tasks: set = set()
async def _send_to_agent_background(
source_agent_id: str,
source_agent_name: str,
target_agent_id: str,
target_agent_name: str,
target_primary_model_id: str,
target_role_description: str,
target_creator_id: str,
content: str,
):
"""Background task: invoke target agent LLM and write reply to gateway_messages.
Accepts plain values (not ORM objects) to avoid stale session references
since this runs after the request's DB session has closed.
"""
logger.info(f"[Gateway] _send_to_agent_background started: {source_agent_name} -> {target_agent_name}")
try:
from app.api.websocket import call_llm
from app.models.llm import LLMModel
from app.models.audit import ChatMessage
from app.models.chat_session import ChatSession
async with async_session() as db:
# Load target agent's LLM model
if not target_primary_model_id:
logger.warning(f"Target agent {target_agent_name} has no LLM model")
return
result = await db.execute(select(LLMModel).where(LLMModel.id == target_primary_model_id))
model = result.scalar_one_or_none()
if not model:
return
# Skip if model is disabled by admin
if not model.enabled:
logger.warning(f"Target agent {target_agent_name}'s model {model.model} is disabled, skipping")
return
# Create or find a ChatSession for this agent pair
# Use deterministic UUID so the same pair always gets the same session
import uuid as _uuid
_ns = _uuid.UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
# Sort IDs so session is the same regardless of who initiates
session_agent_id = min(source_agent_id, target_agent_id, key=str)
session_peer_id = max(source_agent_id, target_agent_id, key=str)
session_uuid = _uuid.uuid5(_ns, f"{session_agent_id}_{session_peer_id}")
conv_id = str(session_uuid)
# Find or create the ChatSession
existing = await db.execute(
select(ChatSession).where(ChatSession.id == session_uuid)
)
session = existing.scalar_one_or_none()
if not session:
from datetime import datetime, timezone
session = ChatSession(
id=session_uuid,
agent_id=session_agent_id,
user_id=target_creator_id,
title=f"{source_agent_name}{target_agent_name}",
source_channel="agent",
peer_agent_id=session_peer_id,
created_at=datetime.now(timezone.utc),
)
db.add(session)
await db.commit()
await db.refresh(session)
# Migrate any existing messages from old gw_agent_ format
old_conv_id = f"gw_agent_{source_agent_id}_{target_agent_id}"
from sqlalchemy import update
await db.execute(
update(ChatMessage)
.where(ChatMessage.conversation_id == old_conv_id)
.values(conversation_id=conv_id)
)
await db.commit()
# Update last_message_at
from datetime import datetime, timezone
session.last_message_at = datetime.now(timezone.utc)
# Agent-to-agent communication context (injected as prefix to user message
# since call_llm builds the full system prompt internally)
agent_comm_alert = (
"--- Agent-to-Agent Communication Alert ---\n"
f"You are receiving a direct message from another digital employee ({source_agent_name}). "
"CRITICAL INSTRUCTION: Your direct text reply will automatically be delivered back to them. "
"DO NOT use the `send_message_to_agent` tool to reply to this conversation. Just reply naturally in text.\n"
"If they are asking you to create or analyze a file, deliver the file using `send_file_to_agent` after writing it."
)
# Load recent conversation history for context
hist_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.conversation_id == conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(10)
)
hist_msgs = list(reversed(hist_result.scalars().all()))
messages = []
for h in hist_msgs:
messages.append({"role": h.role, "content": h.content or ""})
# Add the new message with agent communication context
user_msg = f"{agent_comm_alert}\n\n[Message from agent: {source_agent_name}]\n{content}"
messages.append({"role": "user", "content": user_msg})
from app.models.participant import Participant
# Lookup participants for both agents
src_part_r = await db.execute(select(Participant).where(Participant.type == "agent", Participant.ref_id == source_agent_id))
tgt_part_r = await db.execute(select(Participant).where(Participant.type == "agent", Participant.ref_id == target_agent_id))
src_participant = src_part_r.scalar_one_or_none()
tgt_participant = tgt_part_r.scalar_one_or_none()
# Save user message to conversation
db.add(ChatMessage(
agent_id=target_agent_id,
conversation_id=conv_id,
role="user",
content=user_msg,
user_id=target_creator_id,
participant_id=src_participant.id if src_participant else None,
))
await db.commit()
# Call LLM
collected = []
async def on_chunk(text):
collected.append(text)
reply = await call_llm(
model=model,
messages=messages,
agent_name=target_agent_name,
role_description=target_role_description,
agent_id=target_agent_id,
user_id=target_creator_id,
on_chunk=on_chunk,
)
final_reply = reply or "".join(collected)
# Save assistant reply to conversation
async with async_session() as db:
from app.models.participant import Participant
tgt_part_r = await db.execute(select(Participant).where(Participant.type == "agent", Participant.ref_id == target_agent_id))
tgt_participant = tgt_part_r.scalar_one_or_none()
db.add(ChatMessage(
agent_id=target_agent_id,
conversation_id=conv_id,
role="assistant",
content=final_reply,
user_id=target_creator_id,
participant_id=tgt_participant.id if tgt_participant else None,
))
# Write reply to gateway_messages for source (OpenClaw) to poll
gw_reply = GatewayMessage(
agent_id=source_agent_id,
sender_agent_id=target_agent_id,
content=final_reply,
status="pending",
conversation_id=conv_id,
)
db.add(gw_reply)
await db.commit()
logger.info(f"[Gateway] Agent {target_agent_name} replied to {source_agent_name}")
except Exception as e:
logger.error(f"[Gateway] send_to_agent_background failed: {e}")
import traceback
traceback.print_exc()
@router.post("/send-message")
async def send_message(
body: GatewaySendMessageRequest,
x_api_key: str = Header(..., alias="X-Api-Key"),
db: AsyncSession = Depends(get_db),
):
"""OpenClaw agent sends a message to a person or another agent.
Routes automatically based on target type:
- Agent target: triggers LLM processing, reply returned via next poll
- Human target: sends via available channel (feishu, etc.)
"""
agent = await _get_agent_by_key(x_api_key, db)
agent.openclaw_last_seen = datetime.now(timezone.utc)
target_name = body.target.strip()
content = body.content.strip()
channel_hint = (body.channel or "").strip().lower()
# 1. Try to find target as another Agent
result = await db.execute(
select(Agent).where(Agent.name.ilike(f"%{target_name}%"))
)
target_agent = result.scalars().first()
logger.info(f"[Gateway] send_message: target='{target_name}', found_agent={target_agent.name if target_agent else None}, agent_type={getattr(target_agent, 'agent_type', None) if target_agent else None}, channel_hint='{channel_hint}'")
if target_agent and (not channel_hint or channel_hint == "agent"):
conv_id = f"gw_agent_{agent.id}_{target_agent.id}"
if getattr(target_agent, 'agent_type', None) == 'openclaw':
# OpenClaw-to-OpenClaw: write to gateway_messages directly
gw_msg = GatewayMessage(
agent_id=target_agent.id,
sender_agent_id=agent.id,
content=content,
status="pending",
conversation_id=conv_id,
)
db.add(gw_msg)
await db.commit()
return {
"status": "accepted",
"target": target_agent.name,
"type": "openclaw_agent",
"message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.",
}
else:
# Native agent: async LLM processing
# Extract plain values before session closes to avoid stale ORM references
_src_id = str(agent.id)
_src_name = agent.name
_tgt_id = str(target_agent.id)
_tgt_name = target_agent.name
_tgt_model = str(target_agent.primary_model_id) if target_agent.primary_model_id else ""
_tgt_role = target_agent.role_description or ""
_tgt_creator = str(target_agent.creator_id) if target_agent.creator_id else ""
await db.commit()
task = asyncio.create_task(_send_to_agent_background(
_src_id, _src_name, _tgt_id, _tgt_name,
_tgt_model, _tgt_role, _tgt_creator, content,
))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return {
"status": "accepted",
"target": target_agent.name,
"type": "agent",
"message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.",
}
# 2. Try to find target as a human (via relationships)
from app.models.org import AgentRelationship
from sqlalchemy.orm import selectinload
rel_result = await db.execute(
select(AgentRelationship)
.where(AgentRelationship.agent_id == agent.id)
.options(selectinload(AgentRelationship.member))
)
rels = rel_result.scalars().all()
target_member = None
for r in rels:
if r.member and r.member.name == target_name:
target_member = r.member
break
# Fuzzy match if exact match fails
if not target_member:
for r in rels:
if r.member and target_name.lower() in r.member.name.lower():
target_member = r.member
break
if not target_member:
await db.commit()
raise HTTPException(
status_code=404,
detail=f"Target '{target_name}' not found. Check your relationships list."
)
# Send via feishu if available
if (target_member.external_id or target_member.open_id) and (not channel_hint or channel_hint == "feishu"):
from app.models.channel_config import ChannelConfig
from app.services.feishu_service import feishu_service
import json as _json
config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
config = config_result.scalar_one_or_none()
if not config:
# Try to find any feishu config in the org
config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.channel == "feishu").limit(1)
)
config = config_result.scalar_one_or_none()
if not config:
await db.commit()
raise HTTPException(status_code=400, detail="No Feishu channel configured")
# Prefer user_id (tenant-stable, works across apps), fallback to open_id
resp = None
if target_member.external_id:
resp = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=target_member.external_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="user_id",
)
if (resp is None or resp.get("code") != 0) and target_member.open_id:
resp = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=target_member.open_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="open_id",
)
await db.commit()
if resp and resp.get("code") == 0:
return {
"status": "sent",
"target": target_member.name,
"type": "human",
"channel": "feishu",
}
else:
raise HTTPException(
status_code=502,
detail=f"Feishu send failed: {resp.get('msg') if resp else 'no ID available'} (code {resp.get('code') if resp else 'N/A'})"
)
await db.commit()
raise HTTPException(
status_code=400,
detail=f"No available channel to reach {target_member.name}. feishu_user_id={'yes' if target_member.external_id else 'no'}, feishu_open_id={'yes' if target_member.open_id else 'no'}"
)
# ─── Setup guide ────────────────────────────────────────
@router.get("/setup-guide/{agent_id}")
async def get_setup_guide(
agent_id: uuid.UUID,
x_api_key: str = Header(..., alias="X-Api-Key"),
db: AsyncSession = Depends(get_db),
):
"""Return the pre-filled Skill file and Heartbeat instruction for this agent."""
agent = await _get_agent_by_key(x_api_key, db)
if agent.id != agent_id:
raise HTTPException(status_code=403, detail="Key does not match this agent")
# Note: we use the raw key from the header since the agent already authenticated
base_url = "https://try.clawith.ai"
skill_content = f"""---
name: clawith_sync
description: Sync with Clawith platform — check inbox, submit results, and send messages.
---
# Clawith Sync
## When to use
Check for new messages from the Clawith platform during every heartbeat cycle.
You can also proactively send messages to people and agents in your relationships.
## Instructions
### 1. Check inbox
Make an HTTP GET request:
- URL: {base_url}/api/gateway/poll
- Header: X-Api-Key: {x_api_key}
The response contains a `messages` array. Each message includes:
- `id` — unique message ID (use this for reporting)
- `content` — the message text
- `sender_user_name` — name of the Clawith user who sent it
- `sender_user_id` — unique ID of the sender
- `conversation_id` — the conversation this message belongs to
- `history` — array of previous messages in this conversation for context
The response also contains a `relationships` array describing your colleagues:
- `name` — the person or agent name
- `type` — "human" or "agent"
- `role` — relationship type (e.g. collaborator, supervisor)
- `channels` — available communication channels (e.g. ["feishu"], ["agent"])
**IMPORTANT**: Use the `history` array to understand conversation context before replying.
Different `sender_user_name` values mean different people — address them accordingly.
### 2. Report results
For each completed message, make an HTTP POST request:
- URL: {base_url}/api/gateway/report
- Header: X-Api-Key: {x_api_key}
- Header: Content-Type: application/json
- Body: {{"message_id": "<id from the message>", "result": "<your response>"}}
### 3. Send a message to someone
To proactively contact a person or agent, make an HTTP POST request:
- URL: {base_url}/api/gateway/send-message
- Header: X-Api-Key: {x_api_key}
- Header: Content-Type: application/json
- Body: {{"target": "<name of person or agent>", "content": "<your message>"}}
The system auto-detects the best channel. For agents, the reply appears in your next poll.
For humans, the message is delivered via their available channel (e.g. Feishu).
"""
heartbeat_line = "- Check Clawith inbox using the clawith_sync skill and process any pending messages"
return {
"skill_filename": "clawith_sync.md",
"skill_content": skill_content,
"heartbeat_addition": heartbeat_line,
}