import logging import mimetypes import re import unicodedata import zipfile from pathlib import Path from urllib.parse import quote from fastapi import APIRouter, HTTPException, Request from fastapi.responses import FileResponse, PlainTextResponse, Response from app.gateway.path_utils import resolve_thread_virtual_path logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["artifacts"]) ACTIVE_CONTENT_MIME_TYPES = { "text/html", "application/xhtml+xml", "image/svg+xml", } _DASH_VARIANTS_RE = re.compile(r"\s*[-\u2010\u2011\u2012\u2013\u2014\u2212]\s*") _WHITESPACE_RE = re.compile(r"\s+") def _build_content_disposition(disposition_type: str, filename: str) -> str: """Build an RFC 5987 encoded Content-Disposition header value.""" return f"{disposition_type}; filename*=UTF-8''{quote(filename)}" def _build_attachment_headers(filename: str, extra_headers: dict[str, str] | None = None) -> dict[str, str]: headers = {"Content-Disposition": _build_content_disposition("attachment", filename)} if extra_headers: headers.update(extra_headers) return headers def _canonicalize_filename_for_lookup(filename: str) -> str: """Canonical form used for conservative compatibility lookup.""" normalized = unicodedata.normalize("NFKC", filename).strip() normalized = _DASH_VARIANTS_RE.sub("-", normalized) normalized = _WHITESPACE_RE.sub(" ", normalized) return normalized def _find_compat_filename_match(missing_path: Path) -> Path | None: """Find a same-directory file whose canonicalized name uniquely matches.""" parent = missing_path.parent if not parent.is_dir(): return None target_name = _canonicalize_filename_for_lookup(missing_path.name) matches: list[Path] = [] for candidate in parent.iterdir(): if not candidate.is_file(): continue if _canonicalize_filename_for_lookup(candidate.name) == target_name: matches.append(candidate) return matches[0] if len(matches) == 1 else None def is_text_file_by_content(path: Path, sample_size: int = 8192) -> bool: """Check if file is text by examining content for null bytes.""" try: with open(path, "rb") as f: chunk = f.read(sample_size) # Text files shouldn't contain null bytes return b"\x00" not in chunk except Exception: return False def _extract_file_from_skill_archive(zip_path: Path, internal_path: str) -> bytes | None: """Extract a file from a .skill ZIP archive. Args: zip_path: Path to the .skill file (ZIP archive). internal_path: Path to the file inside the archive (e.g., "SKILL.md"). Returns: The file content as bytes, or None if not found. """ if not zipfile.is_zipfile(zip_path): return None try: with zipfile.ZipFile(zip_path, "r") as zip_ref: # List all files in the archive namelist = zip_ref.namelist() # Try direct path first if internal_path in namelist: return zip_ref.read(internal_path) # Try with any top-level directory prefix (e.g., "skill-name/SKILL.md") for name in namelist: if name.endswith("/" + internal_path) or name == internal_path: return zip_ref.read(name) # Not found return None except (zipfile.BadZipFile, KeyError): return None @router.get( "/threads/{thread_id}/artifacts/{path:path}", summary="Get Artifact File", description="Retrieve an artifact file generated by the AI agent. Text and binary files can be viewed inline, while active web content is always downloaded.", ) async def get_artifact(thread_id: str, path: str, request: Request, download: bool = False) -> Response: """Get an artifact file by its path. The endpoint automatically detects file types and returns appropriate content types. Use the `download` query parameter to force file download for non-active content. Args: thread_id: The thread ID. path: The artifact path with virtual prefix (e.g., mnt/user-data/outputs/file.txt). request: FastAPI request object (automatically injected). Returns: The file content as a FileResponse with appropriate content type: - Active content (HTML/XHTML/SVG): Served as download attachment - Text files: Plain text with proper MIME type - Binary files: Inline display with download option Raises: HTTPException: - 400 if path is invalid or not a file - 403 if access denied (path traversal detected) - 404 if file not found Query Parameters: download (bool): If true, forces attachment download for file types that are otherwise returned inline or as plain text. Active HTML/XHTML/SVG content is always downloaded regardless of this flag. Example: - Get text file inline: `/api/threads/abc123/artifacts/mnt/user-data/outputs/notes.txt` - Download file: `/api/threads/abc123/artifacts/mnt/user-data/outputs/data.csv?download=true` - Active web content such as `.html`, `.xhtml`, and `.svg` artifacts is always downloaded """ # Check if this is a request for a file inside a .skill archive (e.g., xxx.skill/SKILL.md) if ".skill/" in path: # Split the path at ".skill/" to get the ZIP file path and internal path skill_marker = ".skill/" marker_pos = path.find(skill_marker) skill_file_path = path[: marker_pos + len(".skill")] # e.g., "mnt/user-data/outputs/my-skill.skill" internal_path = path[marker_pos + len(skill_marker) :] # e.g., "SKILL.md" actual_skill_path = resolve_thread_virtual_path(thread_id, skill_file_path) if not actual_skill_path.exists(): raise HTTPException(status_code=404, detail=f"Skill file not found: {skill_file_path}") if not actual_skill_path.is_file(): raise HTTPException(status_code=400, detail=f"Path is not a file: {skill_file_path}") # Extract the file from the .skill archive content = _extract_file_from_skill_archive(actual_skill_path, internal_path) if content is None: raise HTTPException(status_code=404, detail=f"File '{internal_path}' not found in skill archive") # Determine MIME type based on the internal file mime_type, _ = mimetypes.guess_type(internal_path) # Add cache headers to avoid repeated ZIP extraction (cache for 5 minutes) cache_headers = {"Cache-Control": "private, max-age=300"} download_name = Path(internal_path).name or actual_skill_path.stem if download or mime_type in ACTIVE_CONTENT_MIME_TYPES: return Response(content=content, media_type=mime_type or "application/octet-stream", headers=_build_attachment_headers(download_name, cache_headers)) if mime_type and mime_type.startswith("text/"): return PlainTextResponse(content=content.decode("utf-8"), media_type=mime_type, headers=cache_headers) # Default to plain text for unknown types that look like text try: return PlainTextResponse(content=content.decode("utf-8"), media_type="text/plain", headers=cache_headers) except UnicodeDecodeError: return Response(content=content, media_type=mime_type or "application/octet-stream", headers=cache_headers) actual_path = resolve_thread_virtual_path(thread_id, path) logger.info(f"Resolving artifact path: thread_id={thread_id}, requested_path={path}, actual_path={actual_path}") if not actual_path.exists(): compat_path = _find_compat_filename_match(actual_path) if compat_path is None: raise HTTPException(status_code=404, detail=f"Artifact not found: {path}") logger.info( "Artifact compatibility fallback applied: requested_path=%s, resolved_path=%s", actual_path, compat_path, ) actual_path = compat_path if not actual_path.is_file(): raise HTTPException(status_code=400, detail=f"Path is not a file: {path}") mime_type, _ = mimetypes.guess_type(actual_path) if download: return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers=_build_attachment_headers(actual_path.name)) # Always force download for active content types to prevent script execution # in the application origin when users open generated artifacts. if mime_type in ACTIVE_CONTENT_MIME_TYPES: return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers=_build_attachment_headers(actual_path.name)) if mime_type and mime_type.startswith("text/"): return PlainTextResponse(content=actual_path.read_text(encoding="utf-8"), media_type=mime_type) if is_text_file_by_content(actual_path): try: return PlainTextResponse(content=actual_path.read_text(encoding="utf-8"), media_type=mime_type) except UnicodeDecodeError: # Some binary formats (e.g. certain PDFs) may not contain NUL bytes in # the sampled chunk and be misclassified as text. Fall back to binary. logger.debug("Artifact looked like text but is not valid UTF-8: %s", actual_path, exc_info=True) return Response(content=actual_path.read_bytes(), media_type=mime_type, headers={"Content-Disposition": _build_content_disposition("inline", actual_path.name)})