Files
claw/tests/skill_script_hotlist_extractor_test.py
2026-03-30 08:29:44 +08:00

128 lines
3.9 KiB
Python

import json
import subprocess
import textwrap
import unittest
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
EXTRACTOR_PATH = (
REPO_ROOT.parent / "skill_lib" / "skills" / "zhihu-hotlist" / "scripts" /
"extract_hotlist.js"
)
def run_extractor(*, body_text: str, selectors: dict[str, list[dict]] | None = None) -> dict:
selector_map = selectors or {}
node_script = textwrap.dedent(
f"""
import fs from 'node:fs';
import vm from 'node:vm';
const extractorPath = {json.dumps(str(EXTRACTOR_PATH))};
const selectorMap = {json.dumps(selector_map, ensure_ascii=False)};
const bodyText = {json.dumps(body_text, ensure_ascii=False)};
const source = fs.readFileSync(extractorPath, 'utf8');
function createNode(spec) {{
const text = String(spec?.text ?? '');
const children = spec?.selectors ?? {{}};
return {{
textContent: text,
innerText: text,
querySelector(selector) {{
const value = children[selector];
if (!value) {{
return null;
}}
return createNode(value);
}},
}};
}}
const bodyNode = createNode({{text: bodyText}});
const context = {{
args: {{top_n: '10'}},
location: {{origin: 'https://www.zhihu.com', pathname: '/hot'}},
document: {{
body: bodyNode,
querySelector(selector) {{
if (selector === 'body' || selector === '#root' || selector === 'main') {{
return bodyNode;
}}
return null;
}},
querySelectorAll(selector) {{
return (selectorMap[selector] || []).map((item) => createNode(item));
}},
}},
console,
JSON,
Math,
Number,
Object,
RegExp,
Set,
String,
Array,
Error,
}};
try {{
const result = vm.runInNewContext(`(function(){{\\n${{source}}\\n}})()`, context);
process.stdout.write(JSON.stringify({{ok: true, result}}));
}} catch (error) {{
process.stdout.write(JSON.stringify({{
ok: false,
error: String(error && error.message ? error.message : error),
}}));
process.exitCode = 1;
}}
"""
)
completed = subprocess.run(
["node", "--input-type=module", "-e", node_script],
check=False,
capture_output=True,
text=True,
)
payload = json.loads(completed.stdout)
if completed.returncode != 0:
raise AssertionError(payload["error"])
return payload["result"]
class SkillScriptHotlistExtractorTest(unittest.TestCase):
def test_extracts_hotlist_from_page_text_when_legacy_dom_classes_are_missing(self):
result = run_extractor(
body_text=textwrap.dedent(
"""
知乎热榜
1
如何看待张雪机车在 2026 年 WSBK 葡萄牙站夺冠?
1707 万热度
2
李荣浩摆证据 4 连质问单依纯
1150 万热度
3
日本拟动用外储做空国际原油
601 万热度
"""
),
)
self.assertEqual(result["sheet_name"], "知乎热榜")
self.assertEqual(result["columns"], ["rank", "title", "heat"])
self.assertEqual(
result["rows"][:3],
[
[1, "如何看待张雪机车在 2026 年 WSBK 葡萄牙站夺冠?", "1707万"],
[2, "李荣浩摆证据 4 连质问单依纯", "1150万"],
[3, "日本拟动用外储做空国际原油", "601万"],
],
)
if __name__ == "__main__":
unittest.main()