deerflow2/backend/packages/harness/deerflow/guardrails/builtin.py

172 lines
6.4 KiB
Python

"""Built-in guardrail providers that ship with DeerFlow."""
from __future__ import annotations
import fnmatch
import json
import logging
import re
import shlex
from datetime import UTC, datetime
from pathlib import PurePosixPath
from typing import Any
from deerflow.guardrails.provider import GuardrailDecision, GuardrailReason, GuardrailRequest
logger = logging.getLogger(__name__)
class AllowlistProvider:
"""Simple allowlist/denylist provider. No external dependencies."""
name = "allowlist"
def __init__(self, *, allowed_tools: list[str] | None = None, denied_tools: list[str] | None = None):
self._allowed = set(allowed_tools) if allowed_tools else None
self._denied = set(denied_tools) if denied_tools else set()
def evaluate(self, request: GuardrailRequest) -> GuardrailDecision:
if self._allowed is not None and request.tool_name not in self._allowed:
return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' not in allowlist")])
if request.tool_name in self._denied:
return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' is denied")])
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision:
return self.evaluate(request)
class SensitiveDataProvider:
"""Block tool calls that may access sensitive files such as .env and keys."""
name = "sensitive-data"
_DEFAULT_PROTECTED_TOOLS = {
"read_file",
"write_file",
"str_replace",
"ls",
"glob",
"grep",
"bash",
}
_DEFAULT_DENY_BASENAMES = {".env"}
_DEFAULT_DENY_GLOBS = {
".env.*",
"*.pem",
"*.key",
"id_rsa*",
"secrets.*",
"credentials.*",
}
def __init__(
self,
*,
protected_tools: list[str] | None = None,
deny_basenames: list[str] | None = None,
deny_globs: list[str] | None = None,
block_skills_env: bool = True,
**_: Any,
):
self._protected_tools = {t.lower() for t in (protected_tools or list(self._DEFAULT_PROTECTED_TOOLS))}
self._deny_basenames = {n.lower() for n in (deny_basenames or list(self._DEFAULT_DENY_BASENAMES))}
self._deny_globs = {p.lower() for p in (deny_globs or list(self._DEFAULT_DENY_GLOBS))}
self._block_skills_env = block_skills_env
def _normalize_candidate(self, raw: str | None) -> str:
if not raw:
return ""
return str(raw).strip().strip("\"'")
def _looks_sensitive_path(self, raw_path: str) -> bool:
value = self._normalize_candidate(raw_path)
if not value:
return False
lowered = value.lower()
if self._block_skills_env and "/mnt/skills/" in lowered:
basename = PurePosixPath(lowered).name
if basename == ".env" or basename.startswith(".env."):
return True
basename = PurePosixPath(lowered).name
if basename in self._deny_basenames:
return True
return any(fnmatch.fnmatch(basename, pat) for pat in self._deny_globs)
def _extract_bash_candidates(self, command: str) -> list[str]:
candidates: list[str] = []
if not command:
return candidates
try:
tokens = shlex.split(command)
except ValueError:
tokens = command.split()
for token in tokens:
t = token.strip()
if not t:
continue
# Path-like tokens
if "/" in t or t.startswith("."):
candidates.append(t)
# file.env style arguments may not contain slash
if t.lower().startswith(".env"):
candidates.append(t)
return candidates
def _collect_candidates(self, request: GuardrailRequest) -> list[str]:
args = request.tool_input if isinstance(request.tool_input, dict) else {}
tool = request.tool_name
candidates: list[str] = []
if tool in {"read_file", "write_file", "str_replace", "ls"}:
path = args.get("path")
if isinstance(path, str):
candidates.append(path)
elif tool in {"glob", "grep"}:
path = args.get("path")
if isinstance(path, str):
candidates.append(path)
glob_pat = args.get("glob")
if isinstance(glob_pat, str):
candidates.append(glob_pat)
elif tool == "bash":
command = str(args.get("command") or "")
candidates.extend(self._extract_bash_candidates(command))
# Fast-path for common secret exposure commands
if re.search(r"\b(printenv|env)\b", command, flags=re.IGNORECASE):
candidates.append(".env")
return candidates
def _audit(self, request: GuardrailRequest, decision: GuardrailDecision) -> None:
if decision.allow:
return
code = decision.reasons[0].code if decision.reasons else "oap.blocked_pattern"
rec = {
"timestamp": datetime.now(UTC).isoformat(),
"provider": self.name,
"tool_name": request.tool_name,
"reason_code": code,
"thread_id": request.thread_id,
"agent_id": request.agent_id,
}
logger.warning("[SensitiveDataGuardrail] %s", json.dumps(rec, ensure_ascii=False))
def evaluate(self, request: GuardrailRequest) -> GuardrailDecision:
tool = (request.tool_name or "").lower()
if tool not in self._protected_tools:
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
candidates = self._collect_candidates(request)
if any(self._looks_sensitive_path(c) for c in candidates):
decision = GuardrailDecision(
allow=False,
reasons=[GuardrailReason(code="oap.blocked_pattern", message="sensitive path access is blocked by policy")],
policy_id="sensitive-data.v1",
)
self._audit(request, decision)
return decision
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision:
return self.evaluate(request)