172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
"""Built-in guardrail providers that ship with DeerFlow."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import json
|
|
import logging
|
|
import re
|
|
import shlex
|
|
from datetime import UTC, datetime
|
|
from pathlib import PurePosixPath
|
|
from typing import Any
|
|
|
|
from deerflow.guardrails.provider import GuardrailDecision, GuardrailReason, GuardrailRequest
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AllowlistProvider:
|
|
"""Simple allowlist/denylist provider. No external dependencies."""
|
|
|
|
name = "allowlist"
|
|
|
|
def __init__(self, *, allowed_tools: list[str] | None = None, denied_tools: list[str] | None = None):
|
|
self._allowed = set(allowed_tools) if allowed_tools else None
|
|
self._denied = set(denied_tools) if denied_tools else set()
|
|
|
|
def evaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
|
if self._allowed is not None and request.tool_name not in self._allowed:
|
|
return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' not in allowlist")])
|
|
if request.tool_name in self._denied:
|
|
return GuardrailDecision(allow=False, reasons=[GuardrailReason(code="oap.tool_not_allowed", message=f"tool '{request.tool_name}' is denied")])
|
|
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
|
|
|
|
async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
|
return self.evaluate(request)
|
|
|
|
|
|
class SensitiveDataProvider:
|
|
"""Block tool calls that may access sensitive files such as .env and keys."""
|
|
|
|
name = "sensitive-data"
|
|
|
|
_DEFAULT_PROTECTED_TOOLS = {
|
|
"read_file",
|
|
"write_file",
|
|
"str_replace",
|
|
"ls",
|
|
"glob",
|
|
"grep",
|
|
"bash",
|
|
}
|
|
_DEFAULT_DENY_BASENAMES = {".env"}
|
|
_DEFAULT_DENY_GLOBS = {
|
|
".env.*",
|
|
"*.pem",
|
|
"*.key",
|
|
"id_rsa*",
|
|
"secrets.*",
|
|
"credentials.*",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
protected_tools: list[str] | None = None,
|
|
deny_basenames: list[str] | None = None,
|
|
deny_globs: list[str] | None = None,
|
|
block_skills_env: bool = True,
|
|
**_: Any,
|
|
):
|
|
self._protected_tools = {t.lower() for t in (protected_tools or list(self._DEFAULT_PROTECTED_TOOLS))}
|
|
self._deny_basenames = {n.lower() for n in (deny_basenames or list(self._DEFAULT_DENY_BASENAMES))}
|
|
self._deny_globs = {p.lower() for p in (deny_globs or list(self._DEFAULT_DENY_GLOBS))}
|
|
self._block_skills_env = block_skills_env
|
|
|
|
def _normalize_candidate(self, raw: str | None) -> str:
|
|
if not raw:
|
|
return ""
|
|
return str(raw).strip().strip("\"'")
|
|
|
|
def _looks_sensitive_path(self, raw_path: str) -> bool:
|
|
value = self._normalize_candidate(raw_path)
|
|
if not value:
|
|
return False
|
|
lowered = value.lower()
|
|
if self._block_skills_env and "/mnt/skills/" in lowered:
|
|
basename = PurePosixPath(lowered).name
|
|
if basename == ".env" or basename.startswith(".env."):
|
|
return True
|
|
basename = PurePosixPath(lowered).name
|
|
if basename in self._deny_basenames:
|
|
return True
|
|
return any(fnmatch.fnmatch(basename, pat) for pat in self._deny_globs)
|
|
|
|
def _extract_bash_candidates(self, command: str) -> list[str]:
|
|
candidates: list[str] = []
|
|
if not command:
|
|
return candidates
|
|
try:
|
|
tokens = shlex.split(command)
|
|
except ValueError:
|
|
tokens = command.split()
|
|
for token in tokens:
|
|
t = token.strip()
|
|
if not t:
|
|
continue
|
|
# Path-like tokens
|
|
if "/" in t or t.startswith("."):
|
|
candidates.append(t)
|
|
# file.env style arguments may not contain slash
|
|
if t.lower().startswith(".env"):
|
|
candidates.append(t)
|
|
return candidates
|
|
|
|
def _collect_candidates(self, request: GuardrailRequest) -> list[str]:
|
|
args = request.tool_input if isinstance(request.tool_input, dict) else {}
|
|
tool = request.tool_name
|
|
candidates: list[str] = []
|
|
if tool in {"read_file", "write_file", "str_replace", "ls"}:
|
|
path = args.get("path")
|
|
if isinstance(path, str):
|
|
candidates.append(path)
|
|
elif tool in {"glob", "grep"}:
|
|
path = args.get("path")
|
|
if isinstance(path, str):
|
|
candidates.append(path)
|
|
glob_pat = args.get("glob")
|
|
if isinstance(glob_pat, str):
|
|
candidates.append(glob_pat)
|
|
elif tool == "bash":
|
|
command = str(args.get("command") or "")
|
|
candidates.extend(self._extract_bash_candidates(command))
|
|
# Fast-path for common secret exposure commands
|
|
if re.search(r"\b(printenv|env)\b", command, flags=re.IGNORECASE):
|
|
candidates.append(".env")
|
|
return candidates
|
|
|
|
def _audit(self, request: GuardrailRequest, decision: GuardrailDecision) -> None:
|
|
if decision.allow:
|
|
return
|
|
code = decision.reasons[0].code if decision.reasons else "oap.blocked_pattern"
|
|
rec = {
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"provider": self.name,
|
|
"tool_name": request.tool_name,
|
|
"reason_code": code,
|
|
"thread_id": request.thread_id,
|
|
"agent_id": request.agent_id,
|
|
}
|
|
logger.warning("[SensitiveDataGuardrail] %s", json.dumps(rec, ensure_ascii=False))
|
|
|
|
def evaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
|
tool = (request.tool_name or "").lower()
|
|
if tool not in self._protected_tools:
|
|
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
|
|
|
|
candidates = self._collect_candidates(request)
|
|
if any(self._looks_sensitive_path(c) for c in candidates):
|
|
decision = GuardrailDecision(
|
|
allow=False,
|
|
reasons=[GuardrailReason(code="oap.blocked_pattern", message="sensitive path access is blocked by policy")],
|
|
policy_id="sensitive-data.v1",
|
|
)
|
|
self._audit(request, decision)
|
|
return decision
|
|
|
|
return GuardrailDecision(allow=True, reasons=[GuardrailReason(code="oap.allowed")])
|
|
|
|
async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
|
return self.evaluate(request)
|