"""Utilities for parsing YAML-defined skill package structures. This module supports turning a YAML document describing files/directories into real filesystem content under a thread's virtual path (for example, ``/mnt/user-data/uploads/skill``). """ from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path import yaml # type: ignore[import-not-found] @dataclass(frozen=True) class ParsedSkillTree: """Normalized parsed structure from YAML spec.""" directories: set[str] files: dict[str, str] def _pick_first_existing(data: dict, keys: tuple[str, ...]): for key in keys: if key in data: return data[key] return None def _extract_spec_root(data: dict) -> dict: """Extract the effective spec root. Supports nested wrappers like: - skill: { ... } - package: { ... } - spec: { ... } """ if not isinstance(data, dict): raise ValueError("YAML root must be an object") known_keys = { "entries", "files", "directories", "dirs", "tree", "structure", "file_tree", "fileTree", "file_structure", "paths", } if any(k in data for k in known_keys): return data wrapper_candidates = ("skill", "package", "spec", "data", "content", "payload") for wrapper in wrapper_candidates: candidate = data.get(wrapper) if isinstance(candidate, dict) and any(k in candidate for k in known_keys): return candidate # Fallback: if exactly one nested object exists, try it as spec root. nested_dicts = [v for v in data.values() if isinstance(v, dict)] if len(nested_dicts) == 1: return nested_dicts[0] return data def _normalize_relative_path(path: str) -> str: """Normalize and validate a relative path. Raises: ValueError: If path is unsafe or invalid. """ if not isinstance(path, str): raise ValueError("Path must be a string") normalized = path.strip().replace("\\", "/") if normalized in {"/", ".", "./"}: return "" if not normalized: raise ValueError("Path cannot be empty") if normalized.startswith("/"): raise ValueError(f"Path must be relative, got absolute path: {path}") if ":" in normalized: raise ValueError(f"Path cannot contain ':' (possible drive path): {path}") parts = [part for part in normalized.split("/") if part] if not parts: raise ValueError("Path cannot be empty") if any(part in {".", ".."} for part in parts): raise ValueError(f"Path traversal is not allowed: {path}") return "/".join(parts) def _add_directory(path: str, directories: set[str]) -> None: normalized = _normalize_relative_path(path) if not normalized: return directories.add(normalized) def _add_file(path: str, content: str, files: dict[str, str], directories: set[str]) -> None: normalized = _normalize_relative_path(path) if not normalized: raise ValueError("File path cannot be root ('/')") if not isinstance(content, str): raise ValueError(f"File content must be a string for '{normalized}'") parent = Path(normalized).parent if str(parent) != ".": directories.add(str(parent).replace("\\", "/")) files[normalized] = content def _walk_tree_dict(tree: dict, base: str, files: dict[str, str], directories: set[str]) -> None: for name, value in tree.items(): if not isinstance(name, str): raise ValueError("Tree keys must be strings") if name.strip() in {"/", ".", "./"}: if isinstance(value, dict): _walk_tree_dict(value, base, files, directories) continue raise ValueError("Root sentinel '/' can only be used for directory/object nodes") node_path = f"{base}/{name}" if base else name if isinstance(value, dict): _add_directory(node_path, directories) _walk_tree_dict(value, _normalize_relative_path(node_path), files, directories) elif isinstance(value, str): _add_file(node_path, value, files, directories) else: raise ValueError( f"Unsupported tree node type for '{node_path}': {type(value).__name__}. " "Use object (directory) or string (file content)." ) def _parse_entries_node( node: dict, base: str, files: dict[str, str], directories: set[str], ) -> None: raw_path = node.get("path") raw_name = node.get("name") if raw_path is None and raw_name is None: raise ValueError("Each entry must have at least one of: 'path' or 'name'") if raw_path is not None and not isinstance(raw_path, str): raise ValueError("Entry 'path' must be a string") if raw_name is not None and not isinstance(raw_name, str): raise ValueError("Entry 'name' must be a string") # Common schema compatibility: # - `path` is parent directory (e.g. "/") # - `name` is current node name (e.g. "README.md") # Build parent then append name when both are present. parent = base if isinstance(raw_path, str) and raw_path.strip(): rp = raw_path.strip() if rp not in {"/", ".", "./"}: parent = _normalize_relative_path(f"{base}/{rp}" if base else rp) if isinstance(raw_name, str) and raw_name.strip(): if parent: node_path = _normalize_relative_path(f"{parent}/{raw_name.strip()}") else: node_path = _normalize_relative_path(raw_name.strip()) else: # Fallback: only path provided if not isinstance(raw_path, str) or not raw_path.strip(): raise ValueError("Each entry must have a non-empty 'path' or 'name'") rp = raw_path.strip() if rp in {"/", ".", "./"}: node_path = base else: node_path = _normalize_relative_path(f"{base}/{rp}" if base else rp) node_type = node.get("type") content = node.get("content") children = node.get("children") inferred_type = "directory" if isinstance(children, list) else "file" if content is not None else None final_type = node_type or inferred_type if final_type == "directory": _add_directory(node_path, directories) if children is None: return if not isinstance(children, list): raise ValueError(f"Entry '{node_path}' children must be a list") for child in children: if not isinstance(child, dict): raise ValueError(f"Entry '{node_path}' children must be objects") _parse_entries_node(child, node_path, files, directories) return if final_type == "file": if content is None: raise ValueError(f"File entry '{node_path}' is missing 'content'") _add_file(node_path, content, files, directories) return raise ValueError( f"Unable to infer entry type for '{node_path}'. Set 'type' to 'file' or 'directory'." ) def parse_skill_yaml_spec(yaml_text: str) -> ParsedSkillTree: """Parse YAML text into normalized directories and files. Supported forms: - entries: [{type,path/content/children}, ...] - files: {"path/to/file": "text"} + optional directories/dirs - tree/structure: nested dict where dict=directory and string=file content """ try: data = yaml.safe_load(yaml_text) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML: {e}") from e if data is None: raise ValueError("YAML is empty") if not isinstance(data, dict): raise ValueError("YAML root must be an object") data = _extract_spec_root(data) directories: set[str] = set() files: dict[str, str] = {} # Form 1: explicit entries list entries = _pick_first_existing(data, ("entries", "nodes", "items")) if entries is not None: if not isinstance(entries, list): raise ValueError("'entries' must be a list") for entry in entries: if not isinstance(entry, dict): raise ValueError("Each item in 'entries' must be an object") _parse_entries_node(entry, "", files, directories) # Form 2: files + directories file_map = _pick_first_existing(data, ("files", "paths", "file_map", "fileMap", "documents")) if file_map is not None: if isinstance(file_map, dict): for path, content in file_map.items(): _add_file(path, content, files, directories) elif isinstance(file_map, list): for item in file_map: if not isinstance(item, dict): raise ValueError("Each item in 'files' list must be an object") path = item.get("path") or item.get("name") or item.get("file") content = item.get("content") if content is None: content = item.get("text") if content is None: content = item.get("body") if path is None or content is None: raise ValueError("Each file item needs 'path' and 'content'") _add_file(path, content, files, directories) else: raise ValueError("'files' must be a map or list") directory_list = _pick_first_existing(data, ("directories", "dirs", "folders", "folder_paths")) if directory_list is not None: if not isinstance(directory_list, list): raise ValueError("'directories'/'dirs' must be a list") for path in directory_list: _add_directory(path, directories) # Form 3: nested tree tree = _pick_first_existing(data, ("tree", "structure", "file_tree", "fileTree", "file_structure")) if tree is not None: if isinstance(tree, dict): _walk_tree_dict(tree, "", files, directories) elif isinstance(tree, list): for item in tree: if not isinstance(item, dict): raise ValueError("Items in 'tree' list must be objects") _parse_entries_node(item, "", files, directories) else: raise ValueError("'tree'/'structure' must be an object or list") # Heuristic fallback: treat root as path->content map when possible. if not files and not directories: candidate_keys = [k for k in data.keys() if isinstance(k, str)] if candidate_keys and all(isinstance(data[k], str) for k in candidate_keys): for path, content in data.items(): _add_file(path, content, files, directories) if not files and not directories: raise ValueError( "No content found. Provide at least one of: entries, files, directories/dirs, tree/structure" ) # Ensure parent directories exist for every file for rel_file in files: parent = Path(rel_file).parent if str(parent) != ".": directories.add(str(parent).replace("\\", "/")) return ParsedSkillTree(directories=directories, files=files) def materialize_skill_tree(parsed: ParsedSkillTree, target_root: Path, clear_target: bool = True) -> None: """Create parsed directories/files under target root.""" if clear_target and target_root.exists(): import shutil shutil.rmtree(target_root) target_root.mkdir(parents=True, exist_ok=True) for rel_dir in sorted(parsed.directories, key=lambda p: (p.count("/"), p)): (target_root / rel_dir).mkdir(parents=True, exist_ok=True) for rel_file, content in parsed.files.items(): file_path = target_root / rel_file file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content, encoding="utf-8") def _build_cli_parser() -> argparse.ArgumentParser: """Build command-line argument parser. CLI usage: python skill_yaml_importer.py [options] """ parser = argparse.ArgumentParser(description="Parse and validate a skill YAML spec file") parser.add_argument("input_path", help="Path to a YAML file or a directory containing YAML files") parser.add_argument( "--show-files", action="store_true", help="Print sorted parsed file paths", ) parser.add_argument( "--show-directories", action="store_true", help="Print sorted parsed directory paths", ) parser.add_argument( "--json", action="store_true", help="Print parsed summary as JSON", ) parser.add_argument( "--recursive", action="store_true", help="When input path is a directory, scan YAML files recursively", ) parser.add_argument( "--log-file", default=None, help="Optional path to save full execution results and summary as JSON", ) return parser def _collect_yaml_files(input_path: Path, recursive: bool) -> list[Path]: if input_path.is_file(): return [input_path] if not input_path.is_dir(): return [] patterns = ("*.yaml", "*.yml") files: list[Path] = [] for pattern in patterns: iterator = input_path.rglob(pattern) if recursive else input_path.glob(pattern) files.extend(iterator) # Stable order for deterministic output return sorted({p.resolve() for p in files}) def _parse_one_yaml_file(yaml_path: Path, show_files: bool, show_directories: bool) -> dict: yaml_text = yaml_path.read_text(encoding="utf-8") parsed = parse_skill_yaml_spec(yaml_text) directories = sorted(parsed.directories) files = sorted(parsed.files.keys()) return { "yaml_file": str(yaml_path), "directories_count": len(directories), "files_count": len(files), "directories": directories if show_directories else None, "files": files if show_files else None, } def _main() -> int: """CLI entrypoint for parsing one YAML file or a batch of YAML files. Exit codes: 0: all files parsed successfully 1: invalid input path or no YAML files found 2: processed completed with one or more parse failures """ args = _build_cli_parser().parse_args() input_path = Path(args.input_path) if not input_path.exists(): print(f"Input path not found: {input_path}", file=sys.stderr) return 1 yaml_files = _collect_yaml_files(input_path, recursive=args.recursive) if not yaml_files: print(f"No YAML files found under: {input_path}", file=sys.stderr) return 1 successes: list[dict] = [] failures: list[dict[str, str]] = [] for yaml_path in yaml_files: try: result = _parse_one_yaml_file( yaml_path, show_files=args.show_files, show_directories=args.show_directories, ) successes.append(result) if not args.json: print(f"OK: {yaml_path}") print(f" Directories: {result['directories_count']}") print(f" Files: {result['files_count']}") except Exception as e: # noqa: BLE001 failures.append({"yaml_file": str(yaml_path), "error": str(e)}) print(f"ERROR: {yaml_path}: {e}", file=sys.stderr) summary = { "input_path": str(input_path), "total": len(yaml_files), "success": len(successes), "failed": len(failures), } report = {"summary": summary, "successes": successes, "failures": failures} if args.log_file: try: log_path = Path(args.log_file) log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Log saved: {log_path}") except Exception as e: # noqa: BLE001 print(f"Failed to write log file '{args.log_file}': {e}", file=sys.stderr) if args.json: print(json.dumps(report, ensure_ascii=False, indent=2)) else: print("\n[Summary]") print(f"Input: {summary['input_path']}") print(f"Total: {summary['total']}") print(f"Success: {summary['success']}") print(f"Failed: {summary['failed']}") return 0 if not failures else 2 if __name__ == "__main__": raise SystemExit(_main())