将 thread_summary.py 中的 _strip_code_fence、_extract_json_object、 _escape_inner_quotes_in_json_strings 三个函数提取到新建的 json_utils.py 共享模块,thread_updater.py 同步使用统一接口。
96 lines
2.5 KiB
Python
96 lines
2.5 KiB
Python
"""JSON extraction helpers for LLM-generated memory payloads."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
def strip_code_fence(text: str) -> str:
|
|
cleaned = text.strip()
|
|
if not cleaned.startswith("```"):
|
|
return cleaned
|
|
lines = cleaned.split("\n")
|
|
return "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]).strip()
|
|
|
|
|
|
def escape_inner_quotes_in_json_strings(text: str) -> str:
|
|
"""Heuristically repair unescaped inner double quotes inside JSON strings."""
|
|
out: list[str] = []
|
|
in_string = False
|
|
escape = False
|
|
n = len(text)
|
|
i = 0
|
|
while i < n:
|
|
ch = text[i]
|
|
if not in_string:
|
|
out.append(ch)
|
|
if ch == '"':
|
|
in_string = True
|
|
i += 1
|
|
continue
|
|
|
|
if escape:
|
|
out.append(ch)
|
|
escape = False
|
|
i += 1
|
|
continue
|
|
|
|
if ch == "\\":
|
|
out.append(ch)
|
|
escape = True
|
|
i += 1
|
|
continue
|
|
|
|
if ch == '"':
|
|
j = i + 1
|
|
while j < n and text[j].isspace():
|
|
j += 1
|
|
next_char = text[j] if j < n else ""
|
|
if next_char in {":", ",", "}", "]", ""}:
|
|
out.append(ch)
|
|
in_string = False
|
|
else:
|
|
out.append('\\"')
|
|
i += 1
|
|
continue
|
|
|
|
out.append(ch)
|
|
i += 1
|
|
|
|
return "".join(out)
|
|
|
|
|
|
def extract_json_object(text: str) -> dict[str, Any] | None:
|
|
cleaned = strip_code_fence(text)
|
|
try:
|
|
parsed = json.loads(cleaned)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except json.JSONDecodeError:
|
|
repaired = escape_inner_quotes_in_json_strings(cleaned)
|
|
if repaired != cleaned:
|
|
try:
|
|
parsed = json.loads(repaired)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
match = re.search(r"\{.*\}", cleaned, flags=re.DOTALL)
|
|
if not match:
|
|
return None
|
|
|
|
candidate = match.group(0)
|
|
try:
|
|
parsed = json.loads(candidate)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except json.JSONDecodeError:
|
|
repaired = escape_inner_quotes_in_json_strings(candidate)
|
|
if repaired != candidate:
|
|
try:
|
|
parsed = json.loads(repaired)
|
|
return parsed if isinstance(parsed, dict) else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|