533 lines
16 KiB
Python
533 lines
16 KiB
Python
import argparse
|
|
import re
|
|
import sys
|
|
import tomllib
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
|
|
MAX_TEXT_FILE_BYTES = 512 * 1024
|
|
SCRIPT_SUFFIXES = (
|
|
".sh",
|
|
".bash",
|
|
".zsh",
|
|
".ksh",
|
|
".fish",
|
|
".ps1",
|
|
".bat",
|
|
".cmd",
|
|
)
|
|
HIGH_RISK_PATTERNS = (
|
|
(re.compile(r"(?im)\bcurl\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "curl-pipe-shell"),
|
|
(re.compile(r"(?im)\bwget\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "wget-pipe-shell"),
|
|
(re.compile(r"(?im)\b(?:invoke-expression|iex)\b"), "powershell-iex"),
|
|
(re.compile(r"(?im)\brm\s+-rf\s+/"), "destructive-rm-rf-root"),
|
|
(re.compile(r"(?im)\bnc(?:at)?\b[^\n]{0,120}\s-e\b"), "netcat-remote-exec"),
|
|
(re.compile(r"(?im)\bdd\s+if="), "disk-overwrite-dd"),
|
|
(re.compile(r"(?im)\bmkfs(?:\.[a-z0-9]+)?\b"), "filesystem-format"),
|
|
(re.compile(r"(?im):\(\)\s*\{\s*:\|\:&\s*\};:"), "fork-bomb"),
|
|
)
|
|
MARKDOWN_LINK_RE = re.compile(r"\[[^\]]*\]\(([^)]+)\)")
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
SKILL_LIB_ROOT = REPO_ROOT.parent / "skill_lib"
|
|
SKILLS_DIR = SKILL_LIB_ROOT / "skills"
|
|
|
|
|
|
class SkillRecord(NamedTuple):
|
|
name: str
|
|
description: str
|
|
version: str
|
|
author: str | None
|
|
tags: list[str]
|
|
prompt_body: str
|
|
location: Path
|
|
|
|
|
|
class AuditReport(NamedTuple):
|
|
files_scanned: int
|
|
findings: list[str]
|
|
|
|
|
|
class ValidationResult(NamedTuple):
|
|
record: SkillRecord
|
|
report: AuditReport
|
|
ok: bool
|
|
|
|
|
|
def discover_skill_dirs(skills_dir: Path | None = None) -> list[Path]:
|
|
root = skills_dir or SKILLS_DIR
|
|
if not root.exists():
|
|
return []
|
|
return sorted(path for path in root.iterdir() if path.is_dir())
|
|
|
|
|
|
def load_skill(skill_dir: Path) -> SkillRecord:
|
|
manifest_path = skill_dir / "SKILL.toml"
|
|
markdown_path = skill_dir / "SKILL.md"
|
|
|
|
if manifest_path.is_file():
|
|
manifest = tomllib.loads(manifest_path.read_text(encoding="utf-8"))
|
|
skill_meta = manifest.get("skill", {})
|
|
prompts = manifest.get("prompts", [])
|
|
body = ""
|
|
if markdown_path.is_file():
|
|
_, body = parse_skill_markdown(markdown_path.read_text(encoding="utf-8"))
|
|
elif prompts:
|
|
body = "\n\n".join(str(prompt) for prompt in prompts)
|
|
|
|
description = skill_meta.get("description")
|
|
if not description or not str(description).strip():
|
|
description = extract_description(body)
|
|
|
|
return SkillRecord(
|
|
name=skill_meta.get("name") or skill_dir.name,
|
|
description=str(description),
|
|
version=str(skill_meta.get("version") or "0.1.0"),
|
|
author=skill_meta.get("author") or None,
|
|
tags=list(skill_meta.get("tags", [])),
|
|
prompt_body=body,
|
|
location=manifest_path,
|
|
)
|
|
|
|
skill_path = markdown_path
|
|
content = skill_path.read_text(encoding="utf-8")
|
|
meta, body = parse_skill_markdown(content)
|
|
|
|
name = meta["name"] or skill_dir.name
|
|
description = meta["description"]
|
|
if not description or not description.strip():
|
|
description = extract_description(body)
|
|
|
|
version = meta["version"] or "0.1.0"
|
|
author = meta["author"] or None
|
|
tags = list(meta["tags"])
|
|
|
|
return SkillRecord(
|
|
name=name,
|
|
description=description,
|
|
version=version,
|
|
author=author,
|
|
tags=tags,
|
|
prompt_body=body,
|
|
location=skill_path,
|
|
)
|
|
|
|
|
|
def validate_all_skills(allow_scripts: bool = False) -> list[ValidationResult]:
|
|
results = []
|
|
for skill_dir in discover_skill_dirs():
|
|
record = load_skill(skill_dir)
|
|
report = audit_skill_directory(skill_dir, allow_scripts=allow_scripts)
|
|
results.append(ValidationResult(record=record, report=report, ok=not report.findings))
|
|
return results
|
|
|
|
|
|
def parse_skill_markdown(content: str) -> tuple[dict[str, object], str]:
|
|
frontmatter = split_skill_frontmatter(content)
|
|
if frontmatter is None:
|
|
return empty_meta(), content
|
|
raw_frontmatter, body = frontmatter
|
|
return parse_simple_frontmatter(raw_frontmatter), body
|
|
|
|
|
|
def split_skill_frontmatter(content: str) -> tuple[str, str] | None:
|
|
normalized = content.replace("\r\n", "\n")
|
|
if not normalized.startswith("---\n"):
|
|
return None
|
|
rest = normalized[len("---\n") :]
|
|
marker = "\n---\n"
|
|
idx = rest.find(marker)
|
|
if idx != -1:
|
|
return rest[:idx], rest[idx + len(marker) :]
|
|
if rest.endswith("\n---"):
|
|
return rest[:-4], ""
|
|
return None
|
|
|
|
|
|
def parse_simple_frontmatter(frontmatter: str) -> dict[str, object]:
|
|
meta = empty_meta()
|
|
collecting_tags = False
|
|
|
|
for raw_line in frontmatter.splitlines():
|
|
if collecting_tags:
|
|
trimmed = raw_line.strip()
|
|
if trimmed.startswith("- "):
|
|
tag = trimmed[2:].strip().strip('"').strip("'")
|
|
if tag:
|
|
meta["tags"].append(tag)
|
|
continue
|
|
collecting_tags = False
|
|
|
|
if ":" not in raw_line:
|
|
continue
|
|
key, value = raw_line.split(":", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
|
|
if key == "name":
|
|
meta["name"] = value
|
|
elif key == "description":
|
|
meta["description"] = value
|
|
elif key == "version":
|
|
meta["version"] = value
|
|
elif key == "author":
|
|
meta["author"] = value
|
|
elif key == "tags":
|
|
if not value:
|
|
collecting_tags = True
|
|
else:
|
|
cleaned = value.lstrip("[").rstrip("]")
|
|
meta["tags"] = [
|
|
item.strip().strip('"').strip("'")
|
|
for item in cleaned.split(",")
|
|
if item.strip().strip('"').strip("'")
|
|
]
|
|
|
|
return meta
|
|
|
|
|
|
def empty_meta() -> dict[str, object]:
|
|
return {
|
|
"name": None,
|
|
"description": None,
|
|
"version": None,
|
|
"author": None,
|
|
"tags": [],
|
|
}
|
|
|
|
|
|
def extract_description(body: str) -> str:
|
|
for line in body.splitlines():
|
|
if line.startswith("#"):
|
|
continue
|
|
if not line.strip():
|
|
continue
|
|
return line.strip()
|
|
return "No description"
|
|
|
|
|
|
def audit_skill_directory(skill_dir: Path, allow_scripts: bool = False) -> AuditReport:
|
|
if not skill_dir.exists():
|
|
raise FileNotFoundError(f"Skill source does not exist: {skill_dir}")
|
|
if not skill_dir.is_dir():
|
|
raise NotADirectoryError(f"Skill source must be a directory: {skill_dir}")
|
|
|
|
canonical_root = skill_dir.resolve()
|
|
findings: list[str] = []
|
|
files_scanned = 0
|
|
|
|
has_manifest = (canonical_root / "SKILL.md").is_file() or (canonical_root / "SKILL.toml").is_file()
|
|
if not has_manifest:
|
|
findings.append(
|
|
"Skill root must include SKILL.md or SKILL.toml for deterministic auditing."
|
|
)
|
|
|
|
for path in collect_paths_depth_first(canonical_root):
|
|
files_scanned += 1
|
|
findings.extend(audit_path(canonical_root, path, allow_scripts=allow_scripts))
|
|
|
|
return AuditReport(files_scanned=files_scanned, findings=findings)
|
|
|
|
|
|
def collect_paths_depth_first(root: Path) -> list[Path]:
|
|
stack = [root]
|
|
discovered: list[Path] = []
|
|
|
|
while stack:
|
|
current = stack.pop()
|
|
discovered.append(current)
|
|
if not current.is_dir():
|
|
continue
|
|
children = sorted(current.iterdir())
|
|
for child in reversed(children):
|
|
stack.append(child)
|
|
|
|
return discovered
|
|
|
|
|
|
def audit_path(root: Path, path: Path, allow_scripts: bool) -> list[str]:
|
|
findings: list[str] = []
|
|
metadata = path.lstat()
|
|
rel = relative_display(root, path)
|
|
|
|
if path.is_symlink():
|
|
findings.append(f"{rel}: symlinks are not allowed in installed skills.")
|
|
return findings
|
|
|
|
if path.is_dir():
|
|
return findings
|
|
|
|
if not allow_scripts and is_unsupported_script_file(path):
|
|
findings.append(f"{rel}: script-like files are blocked by skill security policy.")
|
|
|
|
if metadata.st_size > MAX_TEXT_FILE_BYTES and (is_markdown_file(path) or is_toml_file(path)):
|
|
findings.append(f"{rel}: file is too large for static audit (>{MAX_TEXT_FILE_BYTES} bytes).")
|
|
return findings
|
|
|
|
if is_markdown_file(path):
|
|
findings.extend(audit_markdown_file(root, path))
|
|
elif is_toml_file(path):
|
|
findings.extend(audit_manifest_file(root, path))
|
|
|
|
return findings
|
|
|
|
|
|
def audit_markdown_file(root: Path, path: Path) -> list[str]:
|
|
findings: list[str] = []
|
|
content = path.read_text(encoding="utf-8")
|
|
rel = relative_display(root, path)
|
|
|
|
pattern = detect_high_risk_snippet(content)
|
|
if pattern:
|
|
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
|
|
|
|
for target in extract_markdown_links(content):
|
|
findings.extend(audit_markdown_link_target(root, path, target))
|
|
|
|
return findings
|
|
|
|
|
|
def audit_manifest_file(root: Path, path: Path) -> list[str]:
|
|
findings: list[str] = []
|
|
content = path.read_text(encoding="utf-8")
|
|
rel = relative_display(root, path)
|
|
|
|
pattern = detect_high_risk_snippet(content)
|
|
if pattern:
|
|
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
|
|
|
|
if any(operator in content for operator in ("&&", "||", ";", "`", "$(")):
|
|
findings.append(f"{rel}: manifest content uses shell chaining operators, which are blocked.")
|
|
|
|
return findings
|
|
|
|
|
|
def extract_markdown_links(content: str) -> list[str]:
|
|
return [match.group(1).strip() for match in MARKDOWN_LINK_RE.finditer(content)]
|
|
|
|
|
|
def audit_markdown_link_target(root: Path, source: Path, raw_target: str) -> list[str]:
|
|
findings: list[str] = []
|
|
normalized = normalize_markdown_target(raw_target)
|
|
if not normalized or normalized.startswith("#"):
|
|
return findings
|
|
|
|
rel = relative_display(root, source)
|
|
scheme = url_scheme(normalized)
|
|
if scheme:
|
|
if scheme in {"http", "https", "mailto"}:
|
|
if has_markdown_suffix(normalized):
|
|
findings.append(
|
|
f"{rel}: remote markdown links are blocked by skill security audit ({normalized})."
|
|
)
|
|
return findings
|
|
findings.append(f"{rel}: unsupported URL scheme in markdown link ({normalized}).")
|
|
return findings
|
|
|
|
stripped = strip_query_and_fragment(normalized)
|
|
if not stripped:
|
|
return findings
|
|
|
|
if looks_like_absolute_path(stripped):
|
|
findings.append(f"{rel}: absolute markdown link paths are not allowed ({normalized}).")
|
|
return findings
|
|
|
|
if has_script_suffix(stripped):
|
|
findings.append(f"{rel}: markdown links to script files are blocked ({normalized}).")
|
|
|
|
if not has_markdown_suffix(stripped):
|
|
return findings
|
|
|
|
base_dir = source.parent
|
|
linked_path = base_dir / stripped
|
|
|
|
try:
|
|
canonical_target = linked_path.resolve(strict=True)
|
|
except FileNotFoundError:
|
|
if is_cross_skill_reference(stripped):
|
|
return findings
|
|
findings.append(f"{rel}: markdown link points to a missing file ({normalized}).")
|
|
return findings
|
|
|
|
if not is_subpath(canonical_target, root):
|
|
skills_root = skills_root_for(root)
|
|
if skills_root and is_subpath(canonical_target, skills_root):
|
|
if not canonical_target.is_file():
|
|
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
|
|
return findings
|
|
findings.append(f"{rel}: markdown link escapes skill root ({normalized}).")
|
|
return findings
|
|
|
|
if not canonical_target.is_file():
|
|
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
|
|
|
|
return findings
|
|
|
|
|
|
def detect_high_risk_snippet(content: str) -> str | None:
|
|
for pattern, label in HIGH_RISK_PATTERNS:
|
|
if pattern.search(content):
|
|
return label
|
|
return None
|
|
|
|
|
|
def normalize_markdown_target(raw_target: str) -> str:
|
|
trimmed = raw_target.strip()
|
|
if trimmed.startswith("<"):
|
|
trimmed = trimmed[1:]
|
|
if trimmed.endswith(">"):
|
|
trimmed = trimmed[:-1]
|
|
parts = trimmed.split()
|
|
return parts[0] if parts else ""
|
|
|
|
|
|
def strip_query_and_fragment(target: str) -> str:
|
|
end = len(target)
|
|
hash_idx = target.find("#")
|
|
if hash_idx != -1:
|
|
end = min(end, hash_idx)
|
|
query_idx = target.find("?")
|
|
if query_idx != -1:
|
|
end = min(end, query_idx)
|
|
return target[:end]
|
|
|
|
|
|
def url_scheme(target: str) -> str | None:
|
|
if ":" not in target:
|
|
return None
|
|
scheme, rest = target.split(":", 1)
|
|
if not scheme or not rest:
|
|
return None
|
|
if not all(ch.isalnum() or ch in "+-." for ch in scheme):
|
|
return None
|
|
return scheme
|
|
|
|
|
|
def looks_like_absolute_path(target: str) -> bool:
|
|
if Path(target).is_absolute():
|
|
return True
|
|
if len(target) >= 3 and target[0].isalpha() and target[1] == ":" and target[2] in "\\/":
|
|
return True
|
|
return target.startswith("~/")
|
|
|
|
|
|
def is_cross_skill_reference(target: str) -> bool:
|
|
normalized = target[2:] if target.startswith("./") else target
|
|
path = Path(target)
|
|
|
|
if ".." in path.parts:
|
|
return True
|
|
|
|
return "/" not in normalized and "\\" not in normalized and has_markdown_suffix(normalized)
|
|
|
|
|
|
def skills_root_for(root: Path) -> Path | None:
|
|
current = root
|
|
while True:
|
|
if current.name == "skills":
|
|
return current
|
|
if current.parent == current:
|
|
return None
|
|
current = current.parent
|
|
|
|
|
|
def relative_display(root: Path, path: Path) -> str:
|
|
try:
|
|
rel = path.relative_to(root)
|
|
except ValueError:
|
|
return str(path)
|
|
return "." if str(rel) == "." else str(rel)
|
|
|
|
|
|
def is_markdown_file(path: Path) -> bool:
|
|
return path.suffix.lower() in {".md", ".markdown"}
|
|
|
|
|
|
def is_toml_file(path: Path) -> bool:
|
|
return path.suffix.lower() == ".toml"
|
|
|
|
|
|
def is_unsupported_script_file(path: Path) -> bool:
|
|
return has_script_suffix(str(path).lower()) or has_shell_shebang(path)
|
|
|
|
|
|
def has_script_suffix(raw: str) -> bool:
|
|
lowered = raw.lower()
|
|
return any(lowered.endswith(suffix) for suffix in SCRIPT_SUFFIXES)
|
|
|
|
|
|
def has_shell_shebang(path: Path) -> bool:
|
|
try:
|
|
prefix = path.read_bytes()[:128]
|
|
except OSError:
|
|
return False
|
|
|
|
first_line = prefix.decode("utf-8", errors="ignore").splitlines()[0].strip().lower() if prefix else ""
|
|
interpreter = shebang_interpreter(first_line)
|
|
return interpreter in {"sh", "bash", "zsh", "ksh", "fish", "pwsh", "powershell"}
|
|
|
|
|
|
def shebang_interpreter(line: str) -> str | None:
|
|
if not line.startswith("#!"):
|
|
return None
|
|
|
|
shebang = line[2:].strip()
|
|
if not shebang:
|
|
return None
|
|
|
|
parts = shebang.split()
|
|
first = Path(parts[0]).name
|
|
|
|
if first == "env":
|
|
for part in parts[1:]:
|
|
if part.startswith("-"):
|
|
continue
|
|
return Path(part).name
|
|
return None
|
|
|
|
return first
|
|
|
|
|
|
def has_markdown_suffix(target: str) -> bool:
|
|
lowered = target.lower()
|
|
return lowered.endswith(".md") or lowered.endswith(".markdown")
|
|
|
|
|
|
def is_subpath(path: Path, root: Path) -> bool:
|
|
try:
|
|
path.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="Validate the sibling skill_lib against ZeroClaw-like rules.")
|
|
parser.add_argument(
|
|
"--allow-scripts",
|
|
action="store_true",
|
|
help="Allow shell-script files during auditing.",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
results = validate_all_skills(allow_scripts=args.allow_scripts)
|
|
if not results:
|
|
print(f"FAIL no skills discovered under {SKILLS_DIR}")
|
|
return 1
|
|
|
|
all_ok = True
|
|
for result in results:
|
|
status = "PASS" if result.ok else "FAIL"
|
|
print(f"{status} {result.record.name}")
|
|
for finding in result.report.findings:
|
|
print(f" - {finding}")
|
|
all_ok = all_ok and result.ok
|
|
|
|
print(f"Checked {len(results)} skills in {SKILL_LIB_ROOT}")
|
|
return 0 if all_ok else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|