"""File management API routes for agent workspaces.""" import os import uuid from pathlib import Path import aiofiles from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import FileResponse from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseModel from app.config import get_settings from app.core.permissions import check_agent_access from app.core.security import get_current_user from app.database import get_db from app.models.user import User from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession settings = get_settings() router = APIRouter(prefix="/agents/{agent_id}/files", tags=["files"]) class FileInfo(BaseModel): name: str path: str is_dir: bool size: int = 0 modified_at: str = "" url: str | None = None class FileContent(BaseModel): path: str content: str class FileWrite(BaseModel): content: str def _agent_base_dir(agent_id: uuid.UUID) -> Path: return Path(settings.AGENT_DATA_DIR) / str(agent_id) def _safe_path(agent_id: uuid.UUID, rel_path: str) -> Path: """Ensure the path is within the agent's directory (no path traversal).""" base = _agent_base_dir(agent_id) full = (base / rel_path).resolve() if not str(full).startswith(str(base.resolve())): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Path traversal not allowed") return full @router.get("/", response_model=list[FileInfo]) async def list_files( agent_id: uuid.UUID, path: str = "", current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """List files and directories in an agent's file system.""" await check_agent_access(db, current_user, agent_id) target = _safe_path(agent_id, path) if not target.exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Path not found") if not target.is_dir(): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Path is not a directory") items = [] base_abs = _agent_base_dir(agent_id).resolve() for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name)): if entry.name == '.gitkeep': continue rel = str(entry.resolve().relative_to(base_abs)) stat = entry.stat() items.append(FileInfo( name=entry.name, path=rel, is_dir=entry.is_dir(), size=stat.st_size if entry.is_file() else 0, modified_at=str(stat.st_mtime), url=f"/api/agents/{agent_id}/files/download?path={rel}" if not entry.is_dir() else None )) return items @router.get("/content", response_model=FileContent) async def read_file( agent_id: uuid.UUID, path: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Read the content of a file.""" await check_agent_access(db, current_user, agent_id) target = _safe_path(agent_id, path) if not target.exists() or not target.is_file(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") try: async with aiofiles.open(target, "r", encoding="utf-8") as f: content = await f.read() return FileContent(path=path, content=content) except UnicodeDecodeError: return FileContent(path=path, content=f"[二进制文件: {target.name}, {target.stat().st_size} bytes]") @router.get("/download") async def download_file( agent_id: uuid.UUID, path: str, token: str = "", credentials: HTTPAuthorizationCredentials | None = Depends(HTTPBearer(auto_error=False)), db: AsyncSession = Depends(get_db), ): """Download / serve a file from the agent workspace (browser-friendly). Auth via Bearer header OR `token` query parameter (for tags). """ from app.core.security import decode_access_token # Resolve JWT token from either Bearer header or query param jwt_token = None if credentials: jwt_token = credentials.credentials elif token: jwt_token = token if not jwt_token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required") payload = decode_access_token(jwt_token) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") result = await db.execute(select(User).where(User.id == uuid.UUID(user_id))) user = result.scalar_one_or_none() if not user or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive") await check_agent_access(db, user, agent_id) target = _safe_path(agent_id, path) if not target.exists() or not target.is_file(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") return FileResponse(path=str(target), filename=target.name) @router.put("/content") async def write_file( agent_id: uuid.UUID, path: str, data: FileWrite, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Write content to a file (create or overwrite).""" await check_agent_access(db, current_user, agent_id) target = _safe_path(agent_id, path) target.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(target, "w", encoding="utf-8") as f: await f.write(data.content) return {"status": "ok", "path": path} @router.delete("/content") async def delete_file( agent_id: uuid.UUID, path: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Delete a file.""" await check_agent_access(db, current_user, agent_id) target = _safe_path(agent_id, path) if not target.exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found") if target.is_dir(): import shutil shutil.rmtree(target) else: target.unlink() return {"status": "ok", "path": path} class ImportSkillBody(BaseModel): skill_id: str @router.post("/import-skill") async def import_skill_to_agent( agent_id: uuid.UUID, body: ImportSkillBody, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Import a global skill into this agent's skills/ workspace folder. Copies all files from the global skill registry into /skills//. """ await check_agent_access(db, current_user, agent_id) from sqlalchemy.orm import selectinload from app.models.skill import Skill, SkillFile # Load the global skill with its files result = await db.execute( select(Skill).where(Skill.id == body.skill_id).options(selectinload(Skill.files)) ) skill = result.scalar_one_or_none() if not skill: raise HTTPException(status_code=404, detail="Skill not found") if not skill.files: raise HTTPException(status_code=400, detail="Skill has no files") # Write each file into the agent's workspace base = _agent_base_dir(agent_id) skill_dir = base / "skills" / skill.folder_name skill_dir.mkdir(parents=True, exist_ok=True) written = [] for f in skill.files: file_path = (skill_dir / f.path).resolve() # Safety check if not str(file_path).startswith(str(base.resolve())): continue file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(f.content, encoding="utf-8") written.append(f.path) return { "status": "ok", "skill_name": skill.name, "folder_name": skill.folder_name, "files_written": len(written), "files": written, } # Separate router for file uploads (binary) since we need UploadFile from fastapi import File as FastFile, UploadFile as UploadFileType upload_router = APIRouter(prefix="/agents/{agent_id}/files", tags=["files"]) @upload_router.post("/upload") async def upload_file_to_workspace( agent_id: uuid.UUID, file: UploadFileType = FastFile(...), path: str = "workspace/knowledge_base", current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Upload a binary file to agent workspace.""" await check_agent_access(db, current_user, agent_id) # Validate path prefix if not path.startswith(("workspace/", "skills/")): raise HTTPException(status_code=400, detail="只能上传到 workspace/ 或 skills/ 目录") base = _agent_base_dir(agent_id) target_dir = (base / path).resolve() if not str(target_dir).startswith(str(base.resolve())): raise HTTPException(status_code=403, detail="Path traversal not allowed") target_dir.mkdir(parents=True, exist_ok=True) filename = file.filename or "unnamed" # Sanitize filename filename = filename.replace("/", "_").replace("\\", "_") save_path = target_dir / filename content = await file.read() save_path.write_bytes(content) # Auto-extract text from non-text files extracted_path = None from app.services.text_extractor import needs_extraction, save_extracted_text if needs_extraction(filename): txt_file = save_extracted_text(save_path, content, filename) if txt_file: base_abs = base.resolve() extracted_path = str(txt_file.resolve().relative_to(base_abs)) return { "status": "ok", "path": f"{path}/{filename}", "url": f"/api/agents/{agent_id}/files/download?path={path}/{filename}", "filename": filename, "size": len(content), "extracted_text_path": extracted_path, } # ─── Enterprise Knowledge Base ───────────────────────────────── enterprise_kb_router = APIRouter(prefix="/enterprise/knowledge-base", tags=["enterprise"]) def _enterprise_kb_dir(tenant_id: str) -> Path: return Path(settings.AGENT_DATA_DIR) / f"enterprise_info_{tenant_id}" / "knowledge_base" def _enterprise_info_dir(tenant_id: str) -> Path: return Path(settings.AGENT_DATA_DIR) / f"enterprise_info_{tenant_id}" @enterprise_kb_router.get("/files") async def list_enterprise_kb_files( path: str = "", current_user: User = Depends(get_current_user), ): """List files in enterprise knowledge base (tenant-scoped).""" if not current_user.tenant_id: return [] info_dir = _enterprise_info_dir(str(current_user.tenant_id)).resolve() info_dir.mkdir(parents=True, exist_ok=True) if path: target = (info_dir / path).resolve() else: target = info_dir if not str(target).startswith(str(info_dir)): raise HTTPException(status_code=403, detail="Path traversal not allowed") if not target.exists() or not target.is_dir(): return [] items = [] for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name)): if entry.name == '.gitkeep': continue rel = str(entry.resolve().relative_to(info_dir.resolve())) stat = entry.stat() items.append({ "name": entry.name, "path": rel, "is_dir": entry.is_dir(), "size": stat.st_size if entry.is_file() else 0, "url": f"/api/enterprise/knowledge-base/download?path={rel}" if not entry.is_dir() else None }) return items @enterprise_kb_router.post("/upload") async def upload_enterprise_kb_file( file: UploadFileType = FastFile(...), sub_path: str = "", current_user: User = Depends(get_current_user), ): """Upload a file to enterprise knowledge base (tenant-scoped).""" from app.core.security import require_role # Only admin can upload to enterprise KB if current_user.role not in ("platform_admin", "org_admin"): raise HTTPException(status_code=403, detail="Only admins can upload to enterprise knowledge base") if not current_user.tenant_id: raise HTTPException(status_code=400, detail="No tenant associated") info_dir = _enterprise_info_dir(str(current_user.tenant_id)) target_dir = (info_dir / sub_path).resolve() if not str(target_dir).startswith(str(info_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal not allowed") target_dir.mkdir(parents=True, exist_ok=True) filename = file.filename or "unnamed" filename = filename.replace("/", "_").replace("\\", "_") save_path = target_dir / filename content = await file.read() save_path.write_bytes(content) # Auto-extract text from non-text files extracted_path = None from app.services.text_extractor import needs_extraction, save_extracted_text if needs_extraction(filename): txt_file = save_extracted_text(save_path, content, filename) if txt_file: extracted_path = str(txt_file.resolve().relative_to(info_dir.resolve())) rel_path = f"{sub_path}/{filename}" if sub_path else filename return { "status": "ok", "path": rel_path, "url": f"/api/enterprise/knowledge-base/download?path={rel_path}", "filename": filename, "size": len(content), "extracted_text_path": extracted_path, } @enterprise_kb_router.get("/content") async def read_enterprise_file( path: str, current_user: User = Depends(get_current_user), ): """Read content of an enterprise knowledge base file (tenant-scoped).""" if not current_user.tenant_id: raise HTTPException(status_code=400, detail="No tenant associated") info_dir = _enterprise_info_dir(str(current_user.tenant_id)) target = (info_dir / path).resolve() if not str(target).startswith(str(info_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal not allowed") if not target.exists() or not target.is_file(): raise HTTPException(status_code=404, detail="File not found") try: content = target.read_text(encoding="utf-8", errors="replace") return {"path": path, "content": content} except Exception: return {"path": path, "content": f"[二进制文件: {target.name}, {target.stat().st_size} bytes]"} @enterprise_kb_router.put("/content") async def write_enterprise_file( path: str, data: FileWrite, current_user: User = Depends(get_current_user), ): """Write content to an enterprise file (tenant-scoped).""" if current_user.role not in ("platform_admin", "org_admin"): raise HTTPException(status_code=403, detail="Only admins can edit enterprise knowledge base") if not current_user.tenant_id: raise HTTPException(status_code=400, detail="No tenant associated") info_dir = _enterprise_info_dir(str(current_user.tenant_id)) target = (info_dir / path).resolve() if not str(target).startswith(str(info_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal not allowed") target.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(target, "w", encoding="utf-8") as f: await f.write(data.content) return {"status": "ok", "path": path} @enterprise_kb_router.delete("/content") async def delete_enterprise_file( path: str, current_user: User = Depends(get_current_user), ): """Delete an enterprise knowledge base file (tenant-scoped).""" if current_user.role not in ("platform_admin", "org_admin"): raise HTTPException(status_code=403, detail="Only admins can delete enterprise knowledge base files") if not current_user.tenant_id: raise HTTPException(status_code=400, detail="No tenant associated") info_dir = _enterprise_info_dir(str(current_user.tenant_id)) target = (info_dir / path).resolve() if not str(target).startswith(str(info_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal not allowed") if not target.exists(): raise HTTPException(status_code=404, detail="File not found") if target.is_dir(): import shutil shutil.rmtree(target) else: target.unlink() return {"status": "ok", "path": path} # ─── Agent-level ClawHub / URL Skill Import ───────────────── class ClawhubImportBody(BaseModel): slug: str class UrlImportBody(BaseModel): url: str @router.post("/import-from-clawhub") async def agent_import_from_clawhub( agent_id: uuid.UUID, body: ClawhubImportBody, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Import a skill from ClawHub directly into this agent's skills/ workspace.""" await check_agent_access(db, current_user, agent_id) from app.api.skills import ( CLAWHUB_BASE, _fetch_github_directory, _parse_skill_md_frontmatter, _get_github_token, ) import httpx slug = body.slug # 1. Fetch metadata from ClawHub try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(f"{CLAWHUB_BASE}/v1/skills/{slug}") if resp.status_code == 429: raise HTTPException(429, "ClawHub rate limit exceeded. Please wait and try again.") if resp.status_code != 200: raise HTTPException(502, f"ClawHub API error: {resp.status_code}") meta = resp.json() except HTTPException: raise except Exception as e: raise HTTPException(502, f"Failed to connect to ClawHub: {e}") skill_info = meta.get("skill", {}) owner_info = meta.get("owner", {}) handle = owner_info.get("handle", "").lower() if not handle: raise HTTPException(400, "Could not determine skill owner from ClawHub metadata") # 2. Fetch files from GitHub github_path = f"skills/{handle}/{slug}" tenant_id = str(current_user.tenant_id) if current_user.tenant_id else None token = await _get_github_token(tenant_id) files = await _fetch_github_directory("openclaw", "skills", github_path, "main", token) # 3. Write to agent workspace: skills// base = _agent_base_dir(agent_id) folder_name = slug skill_dir = base / "skills" / folder_name skill_dir.mkdir(parents=True, exist_ok=True) written = [] for f in files: file_path = (skill_dir / f["path"]).resolve() if not str(file_path).startswith(str(base.resolve())): continue file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(f["content"], encoding="utf-8") written.append(f["path"]) return { "status": "ok", "skill_name": skill_info.get("displayName", slug), "folder_name": folder_name, "files_written": len(written), "files": written, } @router.post("/import-from-url") async def agent_import_from_url( agent_id: uuid.UUID, body: UrlImportBody, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Import a skill from a GitHub URL directly into this agent's skills/ workspace.""" await check_agent_access(db, current_user, agent_id) from app.api.skills import _parse_github_url, _fetch_github_directory, _get_github_token parsed = _parse_github_url(body.url) if not parsed: raise HTTPException(400, "Invalid GitHub URL") owner, repo, branch, path = parsed["owner"], parsed["repo"], parsed["branch"], parsed["path"] tenant_id = str(current_user.tenant_id) if current_user.tenant_id else None token = await _get_github_token(tenant_id) files = await _fetch_github_directory(owner, repo, path, branch, token) if not files: raise HTTPException(404, "No files found") # Derive folder name folder_name = path.rstrip("/").split("/")[-1] if path else repo # Write to agent workspace base = _agent_base_dir(agent_id) skill_dir = base / "skills" / folder_name skill_dir.mkdir(parents=True, exist_ok=True) written = [] for f in files: file_path = (skill_dir / f["path"]).resolve() if not str(file_path).startswith(str(base.resolve())): continue file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(f["content"], encoding="utf-8") written.append(f["path"]) return { "status": "ok", "folder_name": folder_name, "files_written": len(written), "files": written, }