"""Prompt and formatting helpers for per-thread memory.""" from __future__ import annotations import json from typing import Any from deerflow.agents.memory.prompt import _coerce_confidence, _count_tokens, format_conversation_for_update THREAD_MEMORY_UPDATE_PROMPT = """You are a user profile memory system. Current per-thread memory: {existing_memory} Conversation: {conversation} Return JSON only with this schema: {{ "user": {{ "workContext": {{"summary": string, "updatedAt": string}}, "personalContext": {{"summary": string, "updatedAt": string}}, "topOfMind": {{"summary": string, "updatedAt": string}} }}, "history": {{ "recentMonths": {{"summary": string, "updatedAt": string}}, "earlierContext": {{"summary": string, "updatedAt": string}}, "longTermBackground": {{"summary": string, "updatedAt": string}} }}, "facts": [ {{ "content": string, "category": "tech_stack"|"preference"|"personal"|"context"|"goal", "confidence": number }} ] }} Rules: - Keep only stable and useful user profile facts. - Do not store sensitive personal data (phone/email/address/password/token/id/bank). - Deduplicate and keep high-confidence facts. - Return valid JSON only. """ def create_empty_thread_memory() -> dict[str, Any]: return { "user": { "workContext": {"summary": "", "updatedAt": ""}, "personalContext": {"summary": "", "updatedAt": ""}, "topOfMind": {"summary": "", "updatedAt": ""}, }, "history": { "recentMonths": {"summary": "", "updatedAt": ""}, "earlierContext": {"summary": "", "updatedAt": ""}, "longTermBackground": {"summary": "", "updatedAt": ""}, }, "facts": [], } def format_thread_memory_for_injection(memory_data: dict[str, Any], max_tokens: int = 2000) -> str: if not memory_data: return "" user = memory_data.get("user") or {} history = memory_data.get("history") or {} facts = memory_data.get("facts") or [] user_lines: list[str] = [] for key, label in (("workContext", "Work Context"), ("personalContext", "Personal Context"), ("topOfMind", "Top Of Mind")): section = user.get(key) if isinstance(user, dict) else None if isinstance(section, dict): summary = section.get("summary") if isinstance(summary, str) and summary.strip(): user_lines.append(f"- {label}: {summary.strip()}") history_lines: list[str] = [] for key, label in (("recentMonths", "Recent Months"), ("earlierContext", "Earlier Context"), ("longTermBackground", "Long-Term Background")): section = history.get(key) if isinstance(history, dict) else None if isinstance(section, dict): summary = section.get("summary") if isinstance(summary, str) and summary.strip(): history_lines.append(f"- {label}: {summary.strip()}") sections: list[str] = [] if user_lines: sections.append("User:\n" + "\n".join(user_lines)) if history_lines: sections.append("History:\n" + "\n".join(history_lines)) # Facts are lowest priority: include by confidence/recency and trim by token budget. ranked_facts = sorted( ( f for f in facts if isinstance(f, dict) and isinstance(f.get("content"), str) and f.get("content", "").strip() ), key=lambda f: (_coerce_confidence(f.get("confidence"), default=0.0), str(f.get("createdAt", ""))), reverse=True, ) base = "\n\n".join(sections) running = _count_tokens(base) if base else 0 fact_lines: list[str] = [] if ranked_facts: running += _count_tokens("\n\nFacts:\n" if base else "Facts:\n") for fact in ranked_facts: line = ( f"- [{str(fact.get('category', 'context')).strip() or 'context'} | " f"{_coerce_confidence(fact.get('confidence'), default=0.0):.2f}] {fact.get('content').strip()}" ) candidate = ("\n" + line) if fact_lines else line cost = _count_tokens(candidate) if running + cost > max_tokens: break fact_lines.append(line) running += cost if fact_lines: sections.append("Facts:\n" + "\n".join(fact_lines)) return "\n\n".join(sections) def build_thread_memory_prompt(existing_memory: dict[str, Any], messages: list[Any]) -> str: return THREAD_MEMORY_UPDATE_PROMPT.format( existing_memory=json.dumps(existing_memory, ensure_ascii=False, indent=2), conversation=format_conversation_for_update(messages), )