feat: add browser script skill execution
This commit is contained in:
532
scripts/validate_skill_lib.py
Normal file
532
scripts/validate_skill_lib.py
Normal file
@@ -0,0 +1,532 @@
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
MAX_TEXT_FILE_BYTES = 512 * 1024
|
||||
SCRIPT_SUFFIXES = (
|
||||
".sh",
|
||||
".bash",
|
||||
".zsh",
|
||||
".ksh",
|
||||
".fish",
|
||||
".ps1",
|
||||
".bat",
|
||||
".cmd",
|
||||
)
|
||||
HIGH_RISK_PATTERNS = (
|
||||
(re.compile(r"(?im)\bcurl\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "curl-pipe-shell"),
|
||||
(re.compile(r"(?im)\bwget\b[^\n|]{0,200}\|\s*(?:sh|bash|zsh)\b"), "wget-pipe-shell"),
|
||||
(re.compile(r"(?im)\b(?:invoke-expression|iex)\b"), "powershell-iex"),
|
||||
(re.compile(r"(?im)\brm\s+-rf\s+/"), "destructive-rm-rf-root"),
|
||||
(re.compile(r"(?im)\bnc(?:at)?\b[^\n]{0,120}\s-e\b"), "netcat-remote-exec"),
|
||||
(re.compile(r"(?im)\bdd\s+if="), "disk-overwrite-dd"),
|
||||
(re.compile(r"(?im)\bmkfs(?:\.[a-z0-9]+)?\b"), "filesystem-format"),
|
||||
(re.compile(r"(?im):\(\)\s*\{\s*:\|\:&\s*\};:"), "fork-bomb"),
|
||||
)
|
||||
MARKDOWN_LINK_RE = re.compile(r"\[[^\]]*\]\(([^)]+)\)")
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
SKILL_LIB_ROOT = REPO_ROOT.parent / "skill_lib"
|
||||
SKILLS_DIR = SKILL_LIB_ROOT / "skills"
|
||||
|
||||
|
||||
class SkillRecord(NamedTuple):
|
||||
name: str
|
||||
description: str
|
||||
version: str
|
||||
author: str | None
|
||||
tags: list[str]
|
||||
prompt_body: str
|
||||
location: Path
|
||||
|
||||
|
||||
class AuditReport(NamedTuple):
|
||||
files_scanned: int
|
||||
findings: list[str]
|
||||
|
||||
|
||||
class ValidationResult(NamedTuple):
|
||||
record: SkillRecord
|
||||
report: AuditReport
|
||||
ok: bool
|
||||
|
||||
|
||||
def discover_skill_dirs(skills_dir: Path | None = None) -> list[Path]:
|
||||
root = skills_dir or SKILLS_DIR
|
||||
if not root.exists():
|
||||
return []
|
||||
return sorted(path for path in root.iterdir() if path.is_dir())
|
||||
|
||||
|
||||
def load_skill(skill_dir: Path) -> SkillRecord:
|
||||
manifest_path = skill_dir / "SKILL.toml"
|
||||
markdown_path = skill_dir / "SKILL.md"
|
||||
|
||||
if manifest_path.is_file():
|
||||
manifest = tomllib.loads(manifest_path.read_text(encoding="utf-8"))
|
||||
skill_meta = manifest.get("skill", {})
|
||||
prompts = manifest.get("prompts", [])
|
||||
body = ""
|
||||
if markdown_path.is_file():
|
||||
_, body = parse_skill_markdown(markdown_path.read_text(encoding="utf-8"))
|
||||
elif prompts:
|
||||
body = "\n\n".join(str(prompt) for prompt in prompts)
|
||||
|
||||
description = skill_meta.get("description")
|
||||
if not description or not str(description).strip():
|
||||
description = extract_description(body)
|
||||
|
||||
return SkillRecord(
|
||||
name=skill_meta.get("name") or skill_dir.name,
|
||||
description=str(description),
|
||||
version=str(skill_meta.get("version") or "0.1.0"),
|
||||
author=skill_meta.get("author") or None,
|
||||
tags=list(skill_meta.get("tags", [])),
|
||||
prompt_body=body,
|
||||
location=manifest_path,
|
||||
)
|
||||
|
||||
skill_path = markdown_path
|
||||
content = skill_path.read_text(encoding="utf-8")
|
||||
meta, body = parse_skill_markdown(content)
|
||||
|
||||
name = meta["name"] or skill_dir.name
|
||||
description = meta["description"]
|
||||
if not description or not description.strip():
|
||||
description = extract_description(body)
|
||||
|
||||
version = meta["version"] or "0.1.0"
|
||||
author = meta["author"] or None
|
||||
tags = list(meta["tags"])
|
||||
|
||||
return SkillRecord(
|
||||
name=name,
|
||||
description=description,
|
||||
version=version,
|
||||
author=author,
|
||||
tags=tags,
|
||||
prompt_body=body,
|
||||
location=skill_path,
|
||||
)
|
||||
|
||||
|
||||
def validate_all_skills(allow_scripts: bool = False) -> list[ValidationResult]:
|
||||
results = []
|
||||
for skill_dir in discover_skill_dirs():
|
||||
record = load_skill(skill_dir)
|
||||
report = audit_skill_directory(skill_dir, allow_scripts=allow_scripts)
|
||||
results.append(ValidationResult(record=record, report=report, ok=not report.findings))
|
||||
return results
|
||||
|
||||
|
||||
def parse_skill_markdown(content: str) -> tuple[dict[str, object], str]:
|
||||
frontmatter = split_skill_frontmatter(content)
|
||||
if frontmatter is None:
|
||||
return empty_meta(), content
|
||||
raw_frontmatter, body = frontmatter
|
||||
return parse_simple_frontmatter(raw_frontmatter), body
|
||||
|
||||
|
||||
def split_skill_frontmatter(content: str) -> tuple[str, str] | None:
|
||||
normalized = content.replace("\r\n", "\n")
|
||||
if not normalized.startswith("---\n"):
|
||||
return None
|
||||
rest = normalized[len("---\n") :]
|
||||
marker = "\n---\n"
|
||||
idx = rest.find(marker)
|
||||
if idx != -1:
|
||||
return rest[:idx], rest[idx + len(marker) :]
|
||||
if rest.endswith("\n---"):
|
||||
return rest[:-4], ""
|
||||
return None
|
||||
|
||||
|
||||
def parse_simple_frontmatter(frontmatter: str) -> dict[str, object]:
|
||||
meta = empty_meta()
|
||||
collecting_tags = False
|
||||
|
||||
for raw_line in frontmatter.splitlines():
|
||||
if collecting_tags:
|
||||
trimmed = raw_line.strip()
|
||||
if trimmed.startswith("- "):
|
||||
tag = trimmed[2:].strip().strip('"').strip("'")
|
||||
if tag:
|
||||
meta["tags"].append(tag)
|
||||
continue
|
||||
collecting_tags = False
|
||||
|
||||
if ":" not in raw_line:
|
||||
continue
|
||||
key, value = raw_line.split(":", 1)
|
||||
key = key.strip()
|
||||
value = value.strip().strip('"').strip("'")
|
||||
|
||||
if key == "name":
|
||||
meta["name"] = value
|
||||
elif key == "description":
|
||||
meta["description"] = value
|
||||
elif key == "version":
|
||||
meta["version"] = value
|
||||
elif key == "author":
|
||||
meta["author"] = value
|
||||
elif key == "tags":
|
||||
if not value:
|
||||
collecting_tags = True
|
||||
else:
|
||||
cleaned = value.lstrip("[").rstrip("]")
|
||||
meta["tags"] = [
|
||||
item.strip().strip('"').strip("'")
|
||||
for item in cleaned.split(",")
|
||||
if item.strip().strip('"').strip("'")
|
||||
]
|
||||
|
||||
return meta
|
||||
|
||||
|
||||
def empty_meta() -> dict[str, object]:
|
||||
return {
|
||||
"name": None,
|
||||
"description": None,
|
||||
"version": None,
|
||||
"author": None,
|
||||
"tags": [],
|
||||
}
|
||||
|
||||
|
||||
def extract_description(body: str) -> str:
|
||||
for line in body.splitlines():
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
if not line.strip():
|
||||
continue
|
||||
return line.strip()
|
||||
return "No description"
|
||||
|
||||
|
||||
def audit_skill_directory(skill_dir: Path, allow_scripts: bool = False) -> AuditReport:
|
||||
if not skill_dir.exists():
|
||||
raise FileNotFoundError(f"Skill source does not exist: {skill_dir}")
|
||||
if not skill_dir.is_dir():
|
||||
raise NotADirectoryError(f"Skill source must be a directory: {skill_dir}")
|
||||
|
||||
canonical_root = skill_dir.resolve()
|
||||
findings: list[str] = []
|
||||
files_scanned = 0
|
||||
|
||||
has_manifest = (canonical_root / "SKILL.md").is_file() or (canonical_root / "SKILL.toml").is_file()
|
||||
if not has_manifest:
|
||||
findings.append(
|
||||
"Skill root must include SKILL.md or SKILL.toml for deterministic auditing."
|
||||
)
|
||||
|
||||
for path in collect_paths_depth_first(canonical_root):
|
||||
files_scanned += 1
|
||||
findings.extend(audit_path(canonical_root, path, allow_scripts=allow_scripts))
|
||||
|
||||
return AuditReport(files_scanned=files_scanned, findings=findings)
|
||||
|
||||
|
||||
def collect_paths_depth_first(root: Path) -> list[Path]:
|
||||
stack = [root]
|
||||
discovered: list[Path] = []
|
||||
|
||||
while stack:
|
||||
current = stack.pop()
|
||||
discovered.append(current)
|
||||
if not current.is_dir():
|
||||
continue
|
||||
children = sorted(current.iterdir())
|
||||
for child in reversed(children):
|
||||
stack.append(child)
|
||||
|
||||
return discovered
|
||||
|
||||
|
||||
def audit_path(root: Path, path: Path, allow_scripts: bool) -> list[str]:
|
||||
findings: list[str] = []
|
||||
metadata = path.lstat()
|
||||
rel = relative_display(root, path)
|
||||
|
||||
if path.is_symlink():
|
||||
findings.append(f"{rel}: symlinks are not allowed in installed skills.")
|
||||
return findings
|
||||
|
||||
if path.is_dir():
|
||||
return findings
|
||||
|
||||
if not allow_scripts and is_unsupported_script_file(path):
|
||||
findings.append(f"{rel}: script-like files are blocked by skill security policy.")
|
||||
|
||||
if metadata.st_size > MAX_TEXT_FILE_BYTES and (is_markdown_file(path) or is_toml_file(path)):
|
||||
findings.append(f"{rel}: file is too large for static audit (>{MAX_TEXT_FILE_BYTES} bytes).")
|
||||
return findings
|
||||
|
||||
if is_markdown_file(path):
|
||||
findings.extend(audit_markdown_file(root, path))
|
||||
elif is_toml_file(path):
|
||||
findings.extend(audit_manifest_file(root, path))
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def audit_markdown_file(root: Path, path: Path) -> list[str]:
|
||||
findings: list[str] = []
|
||||
content = path.read_text(encoding="utf-8")
|
||||
rel = relative_display(root, path)
|
||||
|
||||
pattern = detect_high_risk_snippet(content)
|
||||
if pattern:
|
||||
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
|
||||
|
||||
for target in extract_markdown_links(content):
|
||||
findings.extend(audit_markdown_link_target(root, path, target))
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def audit_manifest_file(root: Path, path: Path) -> list[str]:
|
||||
findings: list[str] = []
|
||||
content = path.read_text(encoding="utf-8")
|
||||
rel = relative_display(root, path)
|
||||
|
||||
pattern = detect_high_risk_snippet(content)
|
||||
if pattern:
|
||||
findings.append(f"{rel}: detected high-risk command pattern ({pattern}).")
|
||||
|
||||
if any(operator in content for operator in ("&&", "||", ";", "`", "$(")):
|
||||
findings.append(f"{rel}: manifest content uses shell chaining operators, which are blocked.")
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def extract_markdown_links(content: str) -> list[str]:
|
||||
return [match.group(1).strip() for match in MARKDOWN_LINK_RE.finditer(content)]
|
||||
|
||||
|
||||
def audit_markdown_link_target(root: Path, source: Path, raw_target: str) -> list[str]:
|
||||
findings: list[str] = []
|
||||
normalized = normalize_markdown_target(raw_target)
|
||||
if not normalized or normalized.startswith("#"):
|
||||
return findings
|
||||
|
||||
rel = relative_display(root, source)
|
||||
scheme = url_scheme(normalized)
|
||||
if scheme:
|
||||
if scheme in {"http", "https", "mailto"}:
|
||||
if has_markdown_suffix(normalized):
|
||||
findings.append(
|
||||
f"{rel}: remote markdown links are blocked by skill security audit ({normalized})."
|
||||
)
|
||||
return findings
|
||||
findings.append(f"{rel}: unsupported URL scheme in markdown link ({normalized}).")
|
||||
return findings
|
||||
|
||||
stripped = strip_query_and_fragment(normalized)
|
||||
if not stripped:
|
||||
return findings
|
||||
|
||||
if looks_like_absolute_path(stripped):
|
||||
findings.append(f"{rel}: absolute markdown link paths are not allowed ({normalized}).")
|
||||
return findings
|
||||
|
||||
if has_script_suffix(stripped):
|
||||
findings.append(f"{rel}: markdown links to script files are blocked ({normalized}).")
|
||||
|
||||
if not has_markdown_suffix(stripped):
|
||||
return findings
|
||||
|
||||
base_dir = source.parent
|
||||
linked_path = base_dir / stripped
|
||||
|
||||
try:
|
||||
canonical_target = linked_path.resolve(strict=True)
|
||||
except FileNotFoundError:
|
||||
if is_cross_skill_reference(stripped):
|
||||
return findings
|
||||
findings.append(f"{rel}: markdown link points to a missing file ({normalized}).")
|
||||
return findings
|
||||
|
||||
if not is_subpath(canonical_target, root):
|
||||
skills_root = skills_root_for(root)
|
||||
if skills_root and is_subpath(canonical_target, skills_root):
|
||||
if not canonical_target.is_file():
|
||||
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
|
||||
return findings
|
||||
findings.append(f"{rel}: markdown link escapes skill root ({normalized}).")
|
||||
return findings
|
||||
|
||||
if not canonical_target.is_file():
|
||||
findings.append(f"{rel}: markdown link must point to a file ({normalized}).")
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def detect_high_risk_snippet(content: str) -> str | None:
|
||||
for pattern, label in HIGH_RISK_PATTERNS:
|
||||
if pattern.search(content):
|
||||
return label
|
||||
return None
|
||||
|
||||
|
||||
def normalize_markdown_target(raw_target: str) -> str:
|
||||
trimmed = raw_target.strip()
|
||||
if trimmed.startswith("<"):
|
||||
trimmed = trimmed[1:]
|
||||
if trimmed.endswith(">"):
|
||||
trimmed = trimmed[:-1]
|
||||
parts = trimmed.split()
|
||||
return parts[0] if parts else ""
|
||||
|
||||
|
||||
def strip_query_and_fragment(target: str) -> str:
|
||||
end = len(target)
|
||||
hash_idx = target.find("#")
|
||||
if hash_idx != -1:
|
||||
end = min(end, hash_idx)
|
||||
query_idx = target.find("?")
|
||||
if query_idx != -1:
|
||||
end = min(end, query_idx)
|
||||
return target[:end]
|
||||
|
||||
|
||||
def url_scheme(target: str) -> str | None:
|
||||
if ":" not in target:
|
||||
return None
|
||||
scheme, rest = target.split(":", 1)
|
||||
if not scheme or not rest:
|
||||
return None
|
||||
if not all(ch.isalnum() or ch in "+-." for ch in scheme):
|
||||
return None
|
||||
return scheme
|
||||
|
||||
|
||||
def looks_like_absolute_path(target: str) -> bool:
|
||||
if Path(target).is_absolute():
|
||||
return True
|
||||
if len(target) >= 3 and target[0].isalpha() and target[1] == ":" and target[2] in "\\/":
|
||||
return True
|
||||
return target.startswith("~/")
|
||||
|
||||
|
||||
def is_cross_skill_reference(target: str) -> bool:
|
||||
normalized = target[2:] if target.startswith("./") else target
|
||||
path = Path(target)
|
||||
|
||||
if ".." in path.parts:
|
||||
return True
|
||||
|
||||
return "/" not in normalized and "\\" not in normalized and has_markdown_suffix(normalized)
|
||||
|
||||
|
||||
def skills_root_for(root: Path) -> Path | None:
|
||||
current = root
|
||||
while True:
|
||||
if current.name == "skills":
|
||||
return current
|
||||
if current.parent == current:
|
||||
return None
|
||||
current = current.parent
|
||||
|
||||
|
||||
def relative_display(root: Path, path: Path) -> str:
|
||||
try:
|
||||
rel = path.relative_to(root)
|
||||
except ValueError:
|
||||
return str(path)
|
||||
return "." if str(rel) == "." else str(rel)
|
||||
|
||||
|
||||
def is_markdown_file(path: Path) -> bool:
|
||||
return path.suffix.lower() in {".md", ".markdown"}
|
||||
|
||||
|
||||
def is_toml_file(path: Path) -> bool:
|
||||
return path.suffix.lower() == ".toml"
|
||||
|
||||
|
||||
def is_unsupported_script_file(path: Path) -> bool:
|
||||
return has_script_suffix(str(path).lower()) or has_shell_shebang(path)
|
||||
|
||||
|
||||
def has_script_suffix(raw: str) -> bool:
|
||||
lowered = raw.lower()
|
||||
return any(lowered.endswith(suffix) for suffix in SCRIPT_SUFFIXES)
|
||||
|
||||
|
||||
def has_shell_shebang(path: Path) -> bool:
|
||||
try:
|
||||
prefix = path.read_bytes()[:128]
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
first_line = prefix.decode("utf-8", errors="ignore").splitlines()[0].strip().lower() if prefix else ""
|
||||
interpreter = shebang_interpreter(first_line)
|
||||
return interpreter in {"sh", "bash", "zsh", "ksh", "fish", "pwsh", "powershell"}
|
||||
|
||||
|
||||
def shebang_interpreter(line: str) -> str | None:
|
||||
if not line.startswith("#!"):
|
||||
return None
|
||||
|
||||
shebang = line[2:].strip()
|
||||
if not shebang:
|
||||
return None
|
||||
|
||||
parts = shebang.split()
|
||||
first = Path(parts[0]).name
|
||||
|
||||
if first == "env":
|
||||
for part in parts[1:]:
|
||||
if part.startswith("-"):
|
||||
continue
|
||||
return Path(part).name
|
||||
return None
|
||||
|
||||
return first
|
||||
|
||||
|
||||
def has_markdown_suffix(target: str) -> bool:
|
||||
lowered = target.lower()
|
||||
return lowered.endswith(".md") or lowered.endswith(".markdown")
|
||||
|
||||
|
||||
def is_subpath(path: Path, root: Path) -> bool:
|
||||
try:
|
||||
path.relative_to(root)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Validate the sibling skill_lib against ZeroClaw-like rules.")
|
||||
parser.add_argument(
|
||||
"--allow-scripts",
|
||||
action="store_true",
|
||||
help="Allow shell-script files during auditing.",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
results = validate_all_skills(allow_scripts=args.allow_scripts)
|
||||
if not results:
|
||||
print(f"FAIL no skills discovered under {SKILLS_DIR}")
|
||||
return 1
|
||||
|
||||
all_ok = True
|
||||
for result in results:
|
||||
status = "PASS" if result.ok else "FAIL"
|
||||
print(f"{status} {result.record.name}")
|
||||
for finding in result.report.findings:
|
||||
print(f" - {finding}")
|
||||
all_ok = all_ok and result.ok
|
||||
|
||||
print(f"Checked {len(results)} skills in {SKILL_LIB_ROOT}")
|
||||
return 0 if all_ok else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user