Files
claw/scripts/validate_skill_lib.py
2026-03-30 08:29:44 +08:00

533 lines
16 KiB
Python

import argparse
import re
import sys
import tomllib
from pathlib import Path
from typing import NamedTuple
MAX_TEXT_FILE_BYTES = 512 * 1024
SCRIPT_SUFFIXES = (
".sh",
".bash",
".zsh",
".ksh",
".fish",
".ps1",
".bat",
".cmd",
)
HIGH_RISK_PATTERNS = (
(re.compile(r"(?im)\bcurl\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "curl-pipe-shell"),
(re.compile(r"(?im)\bwget\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "wget-pipe-shell"),
(re.compile(r"(?im)\b(?:invoke-expression|iex)\b"), "powershell-iex"),
(re.compile(r"(?im)\brm\s+-rf\s+/"), "destructive-rm-rf-root"),
(re.compile(r"(?im)\bnc(?:at)?\b[^\n]{0,120}\s-e\b"), "netcat-remote-exec"),
(re.compile(r"(?im)\bdd\s+if="), "disk-overwrite-dd"),
(re.compile(r"(?im)\bmkfs(?:\.[a-z0-9]+)?\b"), "filesystem-format"),
(re.compile(r"(?im):\(\)\s*\{\s*:\|\:&\s*\};:"), "fork-bomb"),
)
MARKDOWN_LINK_RE = re.compile(r"\[[^\]]*\]\(([^)]+)\)")
REPO_ROOT = Path(__file__).resolve().parents[1]
SKILL_LIB_ROOT = REPO_ROOT.parent / "skill_lib"
SKILLS_DIR = SKILL_LIB_ROOT / "skills"
class SkillRecord(NamedTuple):
name: str
description: str
version: str
author: str | None
tags: list[str]
prompt_body: str
location: Path
class AuditReport(NamedTuple):
files_scanned: int
findings: list[str]
class ValidationResult(NamedTuple):
record: SkillRecord
report: AuditReport
ok: bool
def discover_skill_dirs(skills_dir: Path | None = None) -> list[Path]:
root = skills_dir or SKILLS_DIR
if not root.exists():
return []
return sorted(path for path in root.iterdir() if path.is_dir())
def load_skill(skill_dir: Path) -> SkillRecord:
manifest_path = skill_dir / "SKILL.toml"
markdown_path = skill_dir / "SKILL.md"
if manifest_path.is_file():
manifest = tomllib.loads(manifest_path.read_text(encoding="utf-8"))
skill_meta = manifest.get("skill", {})
prompts = manifest.get("prompts", [])
body = ""
if markdown_path.is_file():
_, body = parse_skill_markdown(markdown_path.read_text(encoding="utf-8"))
elif prompts:
body = "\n\n".join(str(prompt) for prompt in prompts)
description = skill_meta.get("description")
if not description or not str(description).strip():
description = extract_description(body)
return SkillRecord(
name=skill_meta.get("name") or skill_dir.name,
description=str(description),
version=str(skill_meta.get("version") or "0.1.0"),
author=skill_meta.get("author") or None,
tags=list(skill_meta.get("tags", [])),
prompt_body=body,
location=manifest_path,
)
skill_path = markdown_path
content = skill_path.read_text(encoding="utf-8")
meta, body = parse_skill_markdown(content)
name = meta["name"] or skill_dir.name
description = meta["description"]
if not description or not description.strip():
description = extract_description(body)
version = meta["version"] or "0.1.0"
author = meta["author"] or None
tags = list(meta["tags"])
return SkillRecord(
name=name,
description=description,
version=version,
author=author,
tags=tags,
prompt_body=body,
location=skill_path,
)
def validate_all_skills(allow_scripts: bool = False) -> list[ValidationResult]:
results = []
for skill_dir in discover_skill_dirs():
record = load_skill(skill_dir)
report = audit_skill_directory(skill_dir, allow_scripts=allow_scripts)
results.append(ValidationResult(record=record, report=report, ok=not report.findings))
return results
def parse_skill_markdown(content: str) -> tuple[dict[str, object], str]:
frontmatter = split_skill_frontmatter(content)
if frontmatter is None:
return empty_meta(), content
raw_frontmatter, body = frontmatter
return parse_simple_frontmatter(raw_frontmatter), body
def split_skill_frontmatter(content: str) -> tuple[str, str] | None:
normalized = content.replace("\r\n", "\n")
if not normalized.startswith("---\n"):
return None
rest = normalized[len("---\n") :]
marker = "\n---\n"
idx = rest.find(marker)
if idx != -1:
return rest[:idx], rest[idx + len(marker) :]
if rest.endswith("\n---"):
return rest[:-4], ""
return None
def parse_simple_frontmatter(frontmatter: str) -> dict[str, object]:
meta = empty_meta()
collecting_tags = False
for raw_line in frontmatter.splitlines():
if collecting_tags:
trimmed = raw_line.strip()
if trimmed.startswith("- "):
tag = trimmed[2:].strip().strip('"').strip("'")
if tag:
meta["tags"].append(tag)
continue
collecting_tags = False
if ":" not in raw_line:
continue
key, value = raw_line.split(":", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key == "name":
meta["name"] = value
elif key == "description":
meta["description"] = value
elif key == "version":
meta["version"] = value
elif key == "author":
meta["author"] = value
elif key == "tags":
if not value:
collecting_tags = True
else:
cleaned = value.lstrip("[").rstrip("]")
meta["tags"] = [
item.strip().strip('"').strip("'")
for item in cleaned.split(",")
if item.strip().strip('"').strip("'")
]
return meta
def empty_meta() -> dict[str, object]:
return {
"name": None,
"description": None,
"version": None,
"author": None,
"tags": [],
}
def extract_description(body: str) -> str:
for line in body.splitlines():
if line.startswith("#"):
continue
if not line.strip():
continue
return line.strip()
return "No description"
def audit_skill_directory(skill_dir: Path, allow_scripts: bool = False) -> AuditReport:
if not skill_dir.exists():
raise FileNotFoundError(f"Skill source does not exist: {skill_dir}")
if not skill_dir.is_dir():
raise NotADirectoryError(f"Skill source must be a directory: {skill_dir}")
canonical_root = skill_dir.resolve()
findings: list[str] = []
files_scanned = 0
has_manifest = (canonical_root / "SKILL.md").is_file() or (canonical_root / "SKILL.toml").is_file()
if not has_manifest:
findings.append(
"Skill root must include SKILL.md or SKILL.toml for deterministic auditing."
)
for path in collect_paths_depth_first(canonical_root):
files_scanned += 1
findings.extend(audit_path(canonical_root, path, allow_scripts=allow_scripts))
return AuditReport(files_scanned=files_scanned, findings=findings)
def collect_paths_depth_first(root: Path) -> list[Path]:
stack = [root]
discovered: list[Path] = []
while stack:
current = stack.pop()
discovered.append(current)
if not current.is_dir():
continue
children = sorted(current.iterdir())
for child in reversed(children):
stack.append(child)
return discovered
def audit_path(root: Path, path: Path, allow_scripts: bool) -> list[str]:
findings: list[str] = []
metadata = path.lstat()
rel = relative_display(root, path)
if path.is_symlink():
findings.append(f"{rel}: symlinks are not allowed in installed skills.")
return findings
if path.is_dir():
return findings
if not allow_scripts and is_unsupported_script_file(path):
findings.append(f"{rel}: script-like files are blocked by skill security policy.")
if metadata.st_size > MAX_TEXT_FILE_BYTES and (is_markdown_file(path) or is_toml_file(path)):
findings.append(f"{rel}: file is too large for static audit (>{MAX_TEXT_FILE_BYTES} bytes).")
return findings
if is_markdown_file(path):
findings.extend(audit_markdown_file(root, path))
elif is_toml_file(path):
findings.extend(audit_manifest_file(root, path))
return findings
def audit_markdown_file(root: Path, path: Path) -> list[str]:
findings: list[str] = []
content = path.read_text(encoding="utf-8")
rel = relative_display(root, path)
pattern = detect_high_risk_snippet(content)
if pattern:
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
for target in extract_markdown_links(content):
findings.extend(audit_markdown_link_target(root, path, target))
return findings
def audit_manifest_file(root: Path, path: Path) -> list[str]:
findings: list[str] = []
content = path.read_text(encoding="utf-8")
rel = relative_display(root, path)
pattern = detect_high_risk_snippet(content)
if pattern:
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
if any(operator in content for operator in ("&&", "||", ";", "`", "$(")):
findings.append(f"{rel}: manifest content uses shell chaining operators, which are blocked.")
return findings
def extract_markdown_links(content: str) -> list[str]:
return [match.group(1).strip() for match in MARKDOWN_LINK_RE.finditer(content)]
def audit_markdown_link_target(root: Path, source: Path, raw_target: str) -> list[str]:
findings: list[str] = []
normalized = normalize_markdown_target(raw_target)
if not normalized or normalized.startswith("#"):
return findings
rel = relative_display(root, source)
scheme = url_scheme(normalized)
if scheme:
if scheme in {"http", "https", "mailto"}:
if has_markdown_suffix(normalized):
findings.append(
f"{rel}: remote markdown links are blocked by skill security audit ({normalized})."
)
return findings
findings.append(f"{rel}: unsupported URL scheme in markdown link ({normalized}).")
return findings
stripped = strip_query_and_fragment(normalized)
if not stripped:
return findings
if looks_like_absolute_path(stripped):
findings.append(f"{rel}: absolute markdown link paths are not allowed ({normalized}).")
return findings
if has_script_suffix(stripped):
findings.append(f"{rel}: markdown links to script files are blocked ({normalized}).")
if not has_markdown_suffix(stripped):
return findings
base_dir = source.parent
linked_path = base_dir / stripped
try:
canonical_target = linked_path.resolve(strict=True)
except FileNotFoundError:
if is_cross_skill_reference(stripped):
return findings
findings.append(f"{rel}: markdown link points to a missing file ({normalized}).")
return findings
if not is_subpath(canonical_target, root):
skills_root = skills_root_for(root)
if skills_root and is_subpath(canonical_target, skills_root):
if not canonical_target.is_file():
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
return findings
findings.append(f"{rel}: markdown link escapes skill root ({normalized}).")
return findings
if not canonical_target.is_file():
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
return findings
def detect_high_risk_snippet(content: str) -> str | None:
for pattern, label in HIGH_RISK_PATTERNS:
if pattern.search(content):
return label
return None
def normalize_markdown_target(raw_target: str) -> str:
trimmed = raw_target.strip()
if trimmed.startswith("<"):
trimmed = trimmed[1:]
if trimmed.endswith(">"):
trimmed = trimmed[:-1]
parts = trimmed.split()
return parts[0] if parts else ""
def strip_query_and_fragment(target: str) -> str:
end = len(target)
hash_idx = target.find("#")
if hash_idx != -1:
end = min(end, hash_idx)
query_idx = target.find("?")
if query_idx != -1:
end = min(end, query_idx)
return target[:end]
def url_scheme(target: str) -> str | None:
if ":" not in target:
return None
scheme, rest = target.split(":", 1)
if not scheme or not rest:
return None
if not all(ch.isalnum() or ch in "+-." for ch in scheme):
return None
return scheme
def looks_like_absolute_path(target: str) -> bool:
if Path(target).is_absolute():
return True
if len(target) >= 3 and target[0].isalpha() and target[1] == ":" and target[2] in "\\/":
return True
return target.startswith("~/")
def is_cross_skill_reference(target: str) -> bool:
normalized = target[2:] if target.startswith("./") else target
path = Path(target)
if ".." in path.parts:
return True
return "/" not in normalized and "\\" not in normalized and has_markdown_suffix(normalized)
def skills_root_for(root: Path) -> Path | None:
current = root
while True:
if current.name == "skills":
return current
if current.parent == current:
return None
current = current.parent
def relative_display(root: Path, path: Path) -> str:
try:
rel = path.relative_to(root)
except ValueError:
return str(path)
return "." if str(rel) == "." else str(rel)
def is_markdown_file(path: Path) -> bool:
return path.suffix.lower() in {".md", ".markdown"}
def is_toml_file(path: Path) -> bool:
return path.suffix.lower() == ".toml"
def is_unsupported_script_file(path: Path) -> bool:
return has_script_suffix(str(path).lower()) or has_shell_shebang(path)
def has_script_suffix(raw: str) -> bool:
lowered = raw.lower()
return any(lowered.endswith(suffix) for suffix in SCRIPT_SUFFIXES)
def has_shell_shebang(path: Path) -> bool:
try:
prefix = path.read_bytes()[:128]
except OSError:
return False
first_line = prefix.decode("utf-8", errors="ignore").splitlines()[0].strip().lower() if prefix else ""
interpreter = shebang_interpreter(first_line)
return interpreter in {"sh", "bash", "zsh", "ksh", "fish", "pwsh", "powershell"}
def shebang_interpreter(line: str) -> str | None:
if not line.startswith("#!"):
return None
shebang = line[2:].strip()
if not shebang:
return None
parts = shebang.split()
first = Path(parts[0]).name
if first == "env":
for part in parts[1:]:
if part.startswith("-"):
continue
return Path(part).name
return None
return first
def has_markdown_suffix(target: str) -> bool:
lowered = target.lower()
return lowered.endswith(".md") or lowered.endswith(".markdown")
def is_subpath(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Validate the sibling skill_lib against ZeroClaw-like rules.")
parser.add_argument(
"--allow-scripts",
action="store_true",
help="Allow shell-script files during auditing.",
)
args = parser.parse_args(argv)
results = validate_all_skills(allow_scripts=args.allow_scripts)
if not results:
print(f"FAIL no skills discovered under {SKILLS_DIR}")
return 1
all_ok = True
for result in results:
status = "PASS" if result.ok else "FAIL"
print(f"{status} {result.record.name}")
for finding in result.report.findings:
print(f" - {finding}")
all_ok = all_ok and result.ok
print(f"Checked {len(results)} skills in {SKILL_LIB_ROOT}")
return 0 if all_ok else 1
if __name__ == "__main__":
sys.exit(main())