import json import subprocess import textwrap import unittest from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] EXTRACTOR_PATH = ( REPO_ROOT.parent / "skill_lib" / "skills" / "zhihu-hotlist" / "scripts" / "extract_hotlist.js" ) def run_extractor(*, body_text: str, selectors: dict[str, list[dict]] | None = None) -> dict: selector_map = selectors or {} node_script = textwrap.dedent( f""" import fs from 'node:fs'; import vm from 'node:vm'; const extractorPath = {json.dumps(str(EXTRACTOR_PATH))}; const selectorMap = {json.dumps(selector_map, ensure_ascii=False)}; const bodyText = {json.dumps(body_text, ensure_ascii=False)}; const source = fs.readFileSync(extractorPath, 'utf8'); function createNode(spec) {{ const text = String(spec?.text ?? ''); const children = spec?.selectors ?? {{}}; return {{ textContent: text, innerText: text, querySelector(selector) {{ const value = children[selector]; if (!value) {{ return null; }} return createNode(value); }}, }}; }} const bodyNode = createNode({{text: bodyText}}); const context = {{ args: {{top_n: '10'}}, location: {{origin: 'https://www.zhihu.com', pathname: '/hot'}}, document: {{ body: bodyNode, querySelector(selector) {{ if (selector === 'body' || selector === '#root' || selector === 'main') {{ return bodyNode; }} return null; }}, querySelectorAll(selector) {{ return (selectorMap[selector] || []).map((item) => createNode(item)); }}, }}, console, JSON, Math, Number, Object, RegExp, Set, String, Array, Error, }}; try {{ const result = vm.runInNewContext(`(function(){{\\n${{source}}\\n}})()`, context); process.stdout.write(JSON.stringify({{ok: true, result}})); }} catch (error) {{ process.stdout.write(JSON.stringify({{ ok: false, error: String(error && error.message ? error.message : error), }})); process.exitCode = 1; }} """ ) completed = subprocess.run( ["node", "--input-type=module", "-e", node_script], check=False, capture_output=True, text=True, ) payload = json.loads(completed.stdout) if completed.returncode != 0: raise AssertionError(payload["error"]) return payload["result"] class SkillScriptHotlistExtractorTest(unittest.TestCase): def test_extracts_hotlist_from_page_text_when_legacy_dom_classes_are_missing(self): result = run_extractor( body_text=textwrap.dedent( """ 知乎热榜 1 如何看待张雪机车在 2026 年 WSBK 葡萄牙站夺冠? 1707 万热度 2 李荣浩摆证据 4 连质问单依纯 1150 万热度 3 日本拟动用外储做空国际原油 601 万热度 """ ), ) self.assertEqual(result["sheet_name"], "知乎热榜") self.assertEqual(result["columns"], ["rank", "title", "heat"]) self.assertEqual( result["rows"][:3], [ [1, "如何看待张雪机车在 2026 年 WSBK 葡萄牙站夺冠?", "1707万"], [2, "李荣浩摆证据 4 连质问单依纯", "1150万"], [3, "日本拟动用外储做空国际原油", "601万"], ], ) if __name__ == "__main__": unittest.main()