Files
claw/src/compat/workflow_executor.rs
2026-04-10 17:21:13 +08:00

1196 lines
41 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::fs;
use std::path::Path;
use std::thread;
use std::time::Duration;
use regex::Regex;
use serde_json::{json, Value};
use zeroclaw::tools::Tool;
use crate::compat::openxml_office_tool::OpenXmlOfficeTool;
use crate::compat::runtime::CompatTaskContext;
use crate::compat::screen_html_export_tool::ScreenHtmlExportTool;
use crate::pipe::{
Action, AgentMessage, BrowserPipeTool, ConversationMessage, PipeError, Transport,
};
const ZHIHU_DOMAIN: &str = "www.zhihu.com";
const ZHIHU_EDITOR_DOMAIN: &str = "zhuanlan.zhihu.com";
const ZHIHU_HOT_URL: &str = "https://www.zhihu.com/hot";
const ZHIHU_CREATOR_URL: &str = "https://www.zhihu.com/creator";
const ZHIHU_EDITOR_URL: &str = "https://zhuanlan.zhihu.com/write";
const HOTLIST_READY_POLL_ATTEMPTS: usize = 10;
const HOTLIST_READY_POLL_INTERVAL: Duration = Duration::from_millis(500);
const HOTLIST_TEXT_READY_PATTERN: &str =
r"(?:^|\n)\s*1(?:[.、]|\s)+.+\d+(?:\.\d+)?\s*(?:万|亿|k|K|m|M)(?:热度)?";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkflowRoute {
ZhihuHotlistExportXlsx,
ZhihuHotlistScreen,
ZhihuArticleEntry,
ZhihuArticleDraft,
ZhihuArticlePublish,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct HotlistItem {
rank: u64,
title: String,
heat: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ArticleDraft {
title: String,
body: String,
}
pub fn detect_route(
instruction: &str,
page_url: Option<&str>,
page_title: Option<&str>,
) -> Option<WorkflowRoute> {
if crate::runtime::is_zhihu_hotlist_task(instruction, page_url, page_title) {
let normalized = instruction.to_ascii_lowercase();
if normalized.contains("dashboard")
|| instruction.contains("大屏")
|| instruction.contains("新标签页")
{
return Some(WorkflowRoute::ZhihuHotlistScreen);
}
if normalized.contains("excel")
|| normalized.contains("xlsx")
|| instruction.contains("导出")
{
return Some(WorkflowRoute::ZhihuHotlistExportXlsx);
}
}
if task_requests_zhihu_article_entry(instruction, page_url, page_title) {
return Some(WorkflowRoute::ZhihuArticleEntry);
}
if crate::runtime::task_requests_zhihu_article_publish(instruction, page_url, page_title) {
return Some(WorkflowRoute::ZhihuArticlePublish);
}
if crate::runtime::is_zhihu_write_task(instruction, page_url, page_title) {
return Some(WorkflowRoute::ZhihuArticleDraft);
}
None
}
pub fn prefers_direct_execution(route: &WorkflowRoute) -> bool {
matches!(
route,
WorkflowRoute::ZhihuHotlistExportXlsx
| WorkflowRoute::ZhihuHotlistScreen
| WorkflowRoute::ZhihuArticleEntry
| WorkflowRoute::ZhihuArticleDraft
| WorkflowRoute::ZhihuArticlePublish
)
}
pub fn should_fallback_after_summary(summary: &str, route: &WorkflowRoute) -> bool {
let normalized = summary.to_ascii_lowercase();
if normalized.contains(".xlsx") || normalized.contains(".html") {
return false;
}
let looks_like_denial = summary.contains("拒绝")
|| normalized.contains("denied")
|| normalized.contains("failed")
|| normalized.contains("protocol error")
|| normalized.contains("maximum tool iterations")
|| summary.contains("失败")
|| summary.contains("无法");
looks_like_denial
|| matches!(
route,
WorkflowRoute::ZhihuHotlistExportXlsx
| WorkflowRoute::ZhihuHotlistScreen
| WorkflowRoute::ZhihuArticleEntry
| WorkflowRoute::ZhihuArticleDraft
| WorkflowRoute::ZhihuArticlePublish
)
}
pub fn execute_route<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
workspace_root: &Path,
skills_dir: &Path,
instruction: &str,
task_context: &CompatTaskContext,
route: WorkflowRoute,
) -> Result<String, PipeError> {
match route {
WorkflowRoute::ZhihuHotlistExportXlsx | WorkflowRoute::ZhihuHotlistScreen => {
let top_n = extract_top_n(instruction);
let items = collect_hotlist_items(
transport,
browser_tool,
skills_dir,
top_n,
task_context,
)?;
if items.is_empty() {
return Err(PipeError::Protocol(
"知乎热榜采集失败:未能从页面文本中解析到热榜条目".to_string(),
));
}
match route {
WorkflowRoute::ZhihuHotlistExportXlsx => {
export_xlsx(transport, workspace_root, &items)
}
WorkflowRoute::ZhihuHotlistScreen => {
export_screen(transport, workspace_root, &items)
}
_ => unreachable!("handled by outer match"),
}
}
WorkflowRoute::ZhihuArticleEntry => {
execute_zhihu_article_entry_route(transport, browser_tool, skills_dir)
}
WorkflowRoute::ZhihuArticleDraft => {
execute_zhihu_article_route(
transport,
browser_tool,
skills_dir,
instruction,
task_context,
false,
)
}
WorkflowRoute::ZhihuArticlePublish => {
execute_zhihu_article_route(
transport,
browser_tool,
skills_dir,
instruction,
task_context,
true,
)
}
}
}
fn collect_hotlist_items<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
top_n: usize,
task_context: &CompatTaskContext,
) -> Result<Vec<HotlistItem>, PipeError> {
if let Some(items) =
ensure_hotlist_page_ready(transport, browser_tool, skills_dir, top_n, task_context)?
{
return Ok(items);
}
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-hotlist.extract_hotlist".to_string(),
})?;
let response = browser_tool.invoke(
Action::Eval,
json!({ "script": load_hotlist_extractor_script(skills_dir, top_n)? }),
ZHIHU_DOMAIN,
)?;
if !response.success {
return Err(PipeError::Protocol(format!(
"知乎热榜采集失败:{}",
response
.data
.get("error")
.and_then(|value| value.get("message"))
.and_then(Value::as_str)
.unwrap_or("browser script execution failed")
)));
}
parse_hotlist_items_payload(response.data.get("text").unwrap_or(&response.data))
}
fn ensure_hotlist_page_ready<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
top_n: usize,
task_context: &CompatTaskContext,
) -> Result<Option<Vec<HotlistItem>>, PipeError> {
let starts_on_hotlist = task_context
.page_url
.as_deref()
.is_some_and(|url| url.starts_with(ZHIHU_HOT_URL))
|| task_context
.page_title
.as_deref()
.is_some_and(|title| title.contains("热榜"));
if starts_on_hotlist && poll_for_hotlist_readiness(browser_tool)? {
return Ok(None);
}
if starts_on_hotlist {
if let Some(items) = probe_hotlist_extractor(transport, browser_tool, skills_dir, top_n)? {
return Ok(Some(items));
}
}
let mut last_error = None;
for attempt in 0..2 {
navigate_hotlist_page(transport, browser_tool)?;
if poll_for_hotlist_readiness(browser_tool)? {
return Ok(None);
}
if let Some(items) = probe_hotlist_extractor(transport, browser_tool, skills_dir, top_n)? {
return Ok(Some(items));
}
last_error = Some(PipeError::Protocol(format!(
"知乎热榜页面已打开但在短轮询窗口内仍未出现可读热榜内容attempt={}",
attempt + 1
)));
}
Err(last_error.unwrap_or_else(|| PipeError::Protocol("知乎热榜页面未就绪".to_string())))
}
fn probe_hotlist_extractor<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
top_n: usize,
) -> Result<Option<Vec<HotlistItem>>, PipeError> {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-hotlist.extract_hotlist".to_string(),
})?;
let response = browser_tool.invoke(
Action::Eval,
json!({ "script": load_hotlist_extractor_script(skills_dir, top_n)? }),
ZHIHU_DOMAIN,
)?;
if !response.success {
return Ok(None);
}
match parse_hotlist_items_payload(response.data.get("text").unwrap_or(&response.data)) {
Ok(items) if !items.is_empty() => Ok(Some(items)),
Ok(_) => Ok(None),
Err(_) => Ok(None),
}
}
fn navigate_hotlist_page<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
) -> Result<(), PipeError> {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: format!("navigate {ZHIHU_HOT_URL}"),
})?;
let response = browser_tool.invoke(
Action::Navigate,
json!({ "url": ZHIHU_HOT_URL }),
ZHIHU_DOMAIN,
)?;
if response.success {
Ok(())
} else {
Err(PipeError::Protocol(format!(
"navigate failed: {}",
response.data
)))
}
}
fn poll_for_hotlist_readiness<T: Transport + 'static>(
browser_tool: &BrowserPipeTool<T>,
) -> Result<bool, PipeError> {
let ready_pattern =
Regex::new(HOTLIST_TEXT_READY_PATTERN).expect("hotlist readiness regex must compile");
for attempt in 0..HOTLIST_READY_POLL_ATTEMPTS {
let response =
browser_tool.invoke(Action::GetText, json!({ "selector": "body" }), ZHIHU_DOMAIN)?;
if response.success {
let payload = response.data.get("text").unwrap_or(&response.data);
if hotlist_text_looks_ready(payload, &ready_pattern) {
return Ok(true);
}
}
if attempt + 1 < HOTLIST_READY_POLL_ATTEMPTS {
thread::sleep(HOTLIST_READY_POLL_INTERVAL);
}
}
Ok(false)
}
fn hotlist_text_looks_ready(payload: &Value, ready_pattern: &Regex) -> bool {
let text = payload.as_str().unwrap_or_default();
text.contains("热榜") && ready_pattern.is_match(text)
}
fn export_xlsx<T: Transport>(
transport: &T,
workspace_root: &Path,
items: &[HotlistItem],
) -> Result<String, PipeError> {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call openxml_office".to_string(),
})?;
let tool = OpenXmlOfficeTool::new(workspace_root.to_path_buf());
let rows = items
.iter()
.map(|item| json!([item.rank, item.title, item.heat]))
.collect::<Vec<_>>();
let runtime = tokio::runtime::Runtime::new()
.map_err(|err| PipeError::Protocol(format!("failed to create tokio runtime: {err}")))?;
let result = runtime
.block_on(tool.execute(json!({
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": rows,
})))
.map_err(|err| PipeError::Protocol(err.to_string()))?;
if !result.success {
return Err(PipeError::Protocol(
result
.error
.unwrap_or_else(|| "openxml_office failed".to_string()),
));
}
let payload: Value = serde_json::from_str(&result.output)
.map_err(|err| PipeError::Protocol(format!("invalid openxml_office output: {err}")))?;
let output_path = payload["output_path"].as_str().ok_or_else(|| {
PipeError::Protocol("openxml_office did not return output_path".to_string())
})?;
Ok(format!("已导出知乎热榜 Excel {output_path}"))
}
fn export_screen<T: Transport>(
transport: &T,
workspace_root: &Path,
items: &[HotlistItem],
) -> Result<String, PipeError> {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call screen_html_export".to_string(),
})?;
let tool = ScreenHtmlExportTool::new(workspace_root.to_path_buf());
let rows = items
.iter()
.map(|item| json!([item.rank, item.title, item.heat]))
.collect::<Vec<_>>();
let runtime = tokio::runtime::Runtime::new()
.map_err(|err| PipeError::Protocol(format!("failed to create tokio runtime: {err}")))?;
let result = runtime
.block_on(tool.execute(json!({ "rows": rows })))
.map_err(|err| PipeError::Protocol(err.to_string()))?;
if !result.success {
return Err(PipeError::Protocol(
result
.error
.unwrap_or_else(|| "screen_html_export failed".to_string()),
));
}
let payload: Value = serde_json::from_str(&result.output)
.map_err(|err| PipeError::Protocol(format!("invalid screen_html_export output: {err}")))?;
let output_path = payload["output_path"].as_str().ok_or_else(|| {
PipeError::Protocol("screen_html_export did not return output_path".to_string())
})?;
Ok(format!("已生成知乎热榜大屏 {output_path}"))
}
fn execute_zhihu_article_route<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
instruction: &str,
task_context: &CompatTaskContext,
publish_mode: bool,
) -> Result<String, PipeError> {
let Some(article) = extract_article_draft(instruction, &task_context.messages) else {
return Ok(
"这类知乎文章任务需要同时提供标题和正文后我才能继续确定性写作流程。请按“标题:…\\n正文…”的格式补充内容。"
.to_string(),
);
};
if publish_mode && !has_explicit_publish_confirmation(instruction) {
return Ok(build_publish_confirmation_message(&article));
}
navigate_zhihu_page(transport, browser_tool, ZHIHU_CREATOR_URL)?;
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-navigate.open_creator_entry".to_string(),
})?;
let creator_state = execute_browser_skill_script(
browser_tool,
skills_dir,
"zhihu-navigate",
"open_creator_entry.js",
json!({ "desired_target": "article_editor" }),
ZHIHU_DOMAIN,
)?;
if is_login_required_payload(&creator_state) {
return Ok(build_login_block_message(payload_current_url(
&creator_state,
)));
}
if payload_status(&creator_state) == Some("creator_home") {
return Ok(build_creator_entry_missing_message(payload_current_url(
&creator_state,
)));
}
navigate_to_editor_after_creator_entry(transport, browser_tool, &creator_state)?;
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-write.prepare_article_editor".to_string(),
})?;
let editor_state = execute_browser_skill_script(
browser_tool,
skills_dir,
"zhihu-write",
"prepare_article_editor.js",
json!({ "desired_mode": if publish_mode { "publish" } else { "draft" } }),
ZHIHU_EDITOR_DOMAIN,
)?;
if is_login_required_payload(&editor_state) {
return Ok(build_login_block_message(payload_current_url(
&editor_state,
)));
}
if payload_status(&editor_state) != Some("editor_ready") {
return Ok(build_editor_unavailable_message(payload_current_url(
&editor_state,
)));
}
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-write.fill_article_draft".to_string(),
})?;
let fill_result = execute_browser_skill_script(
browser_tool,
skills_dir,
"zhihu-write",
"fill_article_draft.js",
json!({
"title": article.title,
"body": article.body,
"publish_mode": publish_mode.to_string(),
}),
ZHIHU_EDITOR_DOMAIN,
)?;
if is_login_required_payload(&fill_result) {
return Ok(build_login_block_message(payload_current_url(&fill_result)));
}
match payload_status(&fill_result) {
Some("draft_ready") => Ok(format!(
"已进入知乎文章编辑器并写入草稿《{}",
article.title
)),
Some("publish_clicked") | Some("publish_submitted") => {
Ok(format!("已提交知乎文章发布流程《{}", article.title))
}
Some("publish_button_missing") => Err(PipeError::Protocol(
"知乎文章流程失败:未找到发布按钮".to_string(),
)),
Some("editor_not_ready") => Err(PipeError::Protocol(
"知乎文章流程失败:编辑器尚未准备就绪".to_string(),
)),
_ => Err(PipeError::Protocol(format!(
"知乎文章流程失败:浏览器脚本返回了未知状态 {fill_result}"
))),
}
}
fn execute_zhihu_article_entry_route<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
) -> Result<String, PipeError> {
navigate_zhihu_page(transport, browser_tool, ZHIHU_CREATOR_URL)?;
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-navigate.open_creator_entry".to_string(),
})?;
let creator_state = execute_browser_skill_script(
browser_tool,
skills_dir,
"zhihu-navigate",
"open_creator_entry.js",
json!({ "desired_target": "article_editor" }),
ZHIHU_DOMAIN,
)?;
if is_login_required_payload(&creator_state) {
return Ok(build_login_block_message(payload_current_url(
&creator_state,
)));
}
if payload_status(&creator_state) == Some("creator_home") {
return Ok(build_creator_entry_missing_message(payload_current_url(
&creator_state,
)));
}
navigate_to_editor_after_creator_entry(transport, browser_tool, &creator_state)?;
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: "call zhihu-write.prepare_article_editor".to_string(),
})?;
let editor_state = execute_browser_skill_script(
browser_tool,
skills_dir,
"zhihu-write",
"prepare_article_editor.js",
json!({ "desired_mode": "draft" }),
ZHIHU_EDITOR_DOMAIN,
)?;
if is_login_required_payload(&editor_state) {
return Ok(build_login_block_message(payload_current_url(
&editor_state,
)));
}
if payload_status(&editor_state) == Some("editor_ready") {
return Ok("已进入知乎文章编辑器。".to_string());
}
Ok(build_editor_unavailable_message(payload_current_url(
&editor_state,
)))
}
fn load_hotlist_extractor_script(skills_dir: &Path, top_n: usize) -> Result<String, PipeError> {
load_browser_skill_script(
skills_dir,
"zhihu-hotlist",
"extract_hotlist.js",
json!({ "top_n": top_n.to_string() }),
)
}
fn parse_hotlist_items_payload(payload: &Value) -> Result<Vec<HotlistItem>, PipeError> {
let normalized_payload = if let Some(text) = payload.as_str() {
serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.to_string()))
} else {
payload.clone()
};
let rows = normalized_payload
.get("rows")
.and_then(Value::as_array)
.ok_or_else(|| {
PipeError::Protocol("知乎热榜采集失败:浏览器脚本未返回 rows".to_string())
})?;
let mut items = Vec::new();
for row in rows {
let Some(cells) = row.as_array() else {
continue;
};
if cells.len() != 3 {
continue;
}
let rank = cells[0]
.as_u64()
.or_else(|| {
cells[0]
.as_str()
.and_then(|value| value.parse::<u64>().ok())
})
.unwrap_or((items.len() + 1) as u64);
let title = cells[1].as_str().unwrap_or_default().trim().to_string();
let heat = cells[2].as_str().unwrap_or_default().trim().to_string();
if title.is_empty() || heat.is_empty() {
continue;
}
items.push(HotlistItem { rank, title, heat });
}
if items.is_empty() {
return Err(PipeError::Protocol(
"知乎热榜采集失败:浏览器脚本未返回有效热榜条目".to_string(),
));
}
Ok(items)
}
fn extract_top_n(instruction: &str) -> usize {
let re = Regex::new(r"(?:前|top\s*)(\d{1,2})").expect("valid top-n regex");
re.captures(&instruction.to_ascii_lowercase())
.and_then(|capture| capture.get(1))
.and_then(|value| value.as_str().parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(10)
}
fn navigate_zhihu_page<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
url: &str,
) -> Result<(), PipeError> {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: format!("navigate {url}"),
})?;
let response = browser_tool.invoke(Action::Navigate, json!({ "url": url }), ZHIHU_DOMAIN)?;
if response.success {
Ok(())
} else {
Err(PipeError::Protocol(format!(
"navigate failed: {}",
response.data
)))
}
}
fn execute_browser_skill_script<T: Transport + 'static>(
browser_tool: &BrowserPipeTool<T>,
skills_dir: &Path,
skill_name: &str,
script_name: &str,
args: Value,
expected_domain: &str,
) -> Result<Value, PipeError> {
let wrapped_script =
load_browser_skill_script(skills_dir, skill_name, script_name, args)?;
let response = browser_tool.invoke(
Action::Eval,
json!({ "script": wrapped_script }),
expected_domain,
)?;
if !response.success {
return Err(PipeError::Protocol(format!(
"browser script failed: {}",
response.data
)));
}
Ok(normalize_payload(
response.data.get("text").unwrap_or(&response.data),
))
}
fn navigate_to_editor_after_creator_entry<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
creator_state: &Value,
) -> Result<(), PipeError> {
let status = payload_status(creator_state);
if status == Some("editor_ready") {
return Ok(());
}
let target_url = payload_next_url(creator_state).unwrap_or(ZHIHU_EDITOR_URL);
if status == Some("creator_entry_clicked") || status == Some("creator_entry_found") {
transport.send(&AgentMessage::LogEntry {
level: "info".to_string(),
message: format!("navigate {target_url}"),
})?;
let response = browser_tool.invoke(
Action::Navigate,
json!({ "url": target_url }),
ZHIHU_EDITOR_DOMAIN,
)?;
if !response.success {
return Err(PipeError::Protocol(format!(
"navigate failed: {}",
response.data
)));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use crate::pipe::{BrowserMessage, Timing};
use crate::security::MacPolicy;
struct MockWorkflowTransport {
sent: Mutex<Vec<AgentMessage>>,
responses: Mutex<VecDeque<BrowserMessage>>,
}
impl MockWorkflowTransport {
fn new(responses: Vec<BrowserMessage>) -> Self {
Self {
sent: Mutex::new(Vec::new()),
responses: Mutex::new(VecDeque::from(responses)),
}
}
fn sent_messages(&self) -> Vec<AgentMessage> {
self.sent.lock().unwrap().clone()
}
}
impl Transport for MockWorkflowTransport {
fn send(&self, message: &AgentMessage) -> Result<(), PipeError> {
self.sent.lock().unwrap().push(message.clone());
Ok(())
}
fn recv_timeout(&self, _timeout: Duration) -> Result<BrowserMessage, PipeError> {
self.responses
.lock()
.unwrap()
.pop_front()
.ok_or(PipeError::Timeout)
}
}
fn zhihu_test_policy() -> MacPolicy {
MacPolicy::from_json_str(
&json!({
"version": "1.0",
"domains": { "allowed": ["www.zhihu.com"] },
"pipe_actions": {
"allowed": ["navigate", "getText", "eval"],
"blocked": []
}
})
.to_string(),
)
.unwrap()
}
fn success_browser_response(seq: u64, data: Value) -> BrowserMessage {
BrowserMessage::Response {
seq,
success: true,
data,
aom_snapshot: vec![],
timing: Timing {
queue_ms: 1,
exec_ms: 10,
},
}
}
#[test]
fn collect_hotlist_items_skips_navigation_when_hot_page_is_already_readable() {
let transport = Arc::new(MockWorkflowTransport::new(vec![
success_browser_response(
1,
json!({ "text": "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度" }),
),
success_browser_response(
2,
json!({
"text": {
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
}
}),
),
]));
let browser_tool =
BrowserPipeTool::new(transport.clone(), zhihu_test_policy(), vec![1, 2, 3, 4])
.with_response_timeout(Duration::from_secs(1));
let task_context = CompatTaskContext {
page_url: Some("https://www.zhihu.com/hot".to_string()),
page_title: Some("知乎热榜".to_string()),
..CompatTaskContext::default()
};
let items = collect_hotlist_items(transport.as_ref(), &browser_tool, 10, &task_context)
.expect("hotlist collection should succeed");
assert_eq!(items.len(), 2);
let sent = transport.sent_messages();
assert!(sent.iter().any(|message| {
matches!(
message,
AgentMessage::Command { action, .. } if action == &Action::GetText
)
}));
assert!(sent.iter().any(|message| {
matches!(
message,
AgentMessage::Command { action, .. } if action == &Action::Eval
)
}));
assert!(!sent.iter().any(|message| {
matches!(
message,
AgentMessage::Command { action, .. } if action == &Action::Navigate
)
}));
}
#[test]
fn collect_hotlist_items_polls_after_navigation_before_retrying_navigation() {
let transport = Arc::new(MockWorkflowTransport::new(vec![
success_browser_response(1, json!({ "navigated": true })),
success_browser_response(2, json!({ "text": "" })),
success_browser_response(3, json!({ "text": "" })),
success_browser_response(4, json!({ "text": "知乎热榜\n1 问题一 344万热度" })),
success_browser_response(
5,
json!({
"text": {
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"]]
}
}),
),
]));
let browser_tool =
BrowserPipeTool::new(transport.clone(), zhihu_test_policy(), vec![1, 2, 3, 4, 5])
.with_response_timeout(Duration::from_secs(1));
let task_context = CompatTaskContext {
page_url: Some("https://www.zhihu.com/".to_string()),
page_title: Some("知乎".to_string()),
..CompatTaskContext::default()
};
let items = collect_hotlist_items(transport.as_ref(), &browser_tool, 10, &task_context)
.expect("hotlist collection should succeed after readiness polling");
assert_eq!(items.len(), 1);
let sent = transport.sent_messages();
let actions = sent
.iter()
.filter_map(|message| match message {
AgentMessage::Command { action, .. } => Some(action.clone()),
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(
actions,
vec![
Action::Navigate,
Action::GetText,
Action::GetText,
Action::GetText,
Action::Eval
]
);
}
#[test]
fn collect_hotlist_items_retries_navigation_after_short_readiness_budget_expires() {
let transport = Arc::new(MockWorkflowTransport::new(vec![
success_browser_response(1, json!({ "navigated": true })),
success_browser_response(2, json!({ "text": "" })),
success_browser_response(3, json!({ "text": "" })),
success_browser_response(4, json!({ "text": "" })),
success_browser_response(5, json!({ "text": "" })),
success_browser_response(6, json!({ "text": "" })),
success_browser_response(7, json!({ "text": "" })),
success_browser_response(8, json!({ "text": "" })),
success_browser_response(9, json!({ "text": "" })),
success_browser_response(10, json!({ "text": "" })),
success_browser_response(11, json!({ "text": "" })),
success_browser_response(12, json!({ "text": { "rows": [] } })),
success_browser_response(13, json!({ "navigated": true })),
success_browser_response(
14,
json!({ "text": "知乎热榜\n1 问题一 344万热度" }),
),
success_browser_response(
15,
json!({
"text": {
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"]]
}
}),
),
]));
let browser_tool = BrowserPipeTool::new(
transport.clone(),
zhihu_test_policy(),
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
)
.with_response_timeout(Duration::from_secs(1));
let task_context = CompatTaskContext {
page_url: Some("https://www.zhihu.com/".to_string()),
page_title: Some("知乎".to_string()),
..CompatTaskContext::default()
};
let items = collect_hotlist_items(transport.as_ref(), &browser_tool, 10, &task_context)
.expect("hotlist collection should succeed after one navigation retry");
assert_eq!(items.len(), 1);
let sent = transport.sent_messages();
let navigate_count = sent
.iter()
.filter(|message| {
matches!(
message,
AgentMessage::Command { action, .. } if action == &Action::Navigate
)
})
.count();
assert_eq!(navigate_count, 2);
}
#[test]
fn collect_hotlist_items_uses_extractor_probe_before_second_navigation() {
let transport = Arc::new(MockWorkflowTransport::new(vec![
success_browser_response(1, json!({ "navigated": true })),
success_browser_response(2, json!({ "text": "知乎热榜" })),
success_browser_response(3, json!({ "text": "知乎热榜" })),
success_browser_response(4, json!({ "text": "知乎热榜" })),
success_browser_response(5, json!({ "text": "知乎热榜" })),
success_browser_response(6, json!({ "text": "知乎热榜" })),
success_browser_response(7, json!({ "text": "知乎热榜" })),
success_browser_response(8, json!({ "text": "知乎热榜" })),
success_browser_response(9, json!({ "text": "知乎热榜" })),
success_browser_response(10, json!({ "text": "知乎热榜" })),
success_browser_response(11, json!({ "text": "知乎热榜" })),
success_browser_response(
12,
json!({
"text": {
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"]]
}
}),
),
success_browser_response(
13,
json!({
"text": {
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"]]
}
}),
),
]));
let browser_tool = BrowserPipeTool::new(
transport.clone(),
zhihu_test_policy(),
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
)
.with_response_timeout(Duration::from_secs(1));
let task_context = CompatTaskContext {
page_url: Some("https://www.zhihu.com/".to_string()),
page_title: Some("知乎".to_string()),
..CompatTaskContext::default()
};
let items = collect_hotlist_items(transport.as_ref(), &browser_tool, 10, &task_context)
.expect("hotlist collection should succeed via extractor probe");
assert_eq!(items.len(), 1);
let sent = transport.sent_messages();
let navigate_count = sent
.iter()
.filter(|message| {
matches!(
message,
AgentMessage::Command { action, .. } if action == &Action::Navigate
)
})
.count();
assert_eq!(navigate_count, 1);
}
}
fn load_browser_skill_script(
skills_dir: &Path,
skill_name: &str,
script_name: &str,
args: Value,
) -> Result<String, PipeError> {
let script_path = skills_dir
.join(skill_name)
.join("scripts")
.join(script_name);
let script = fs::read_to_string(&script_path).map_err(|err| {
PipeError::Protocol(format!(
"failed to read browser script {}: {err}",
script_path.display()
))
})?;
Ok(format!(
"(function() {{\nconst args = {};\n{}\n}})()",
args, script
))
}
fn normalize_payload(payload: &Value) -> Value {
if let Some(text) = payload.as_str() {
serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.to_string()))
} else {
payload.clone()
}
}
fn payload_status(payload: &Value) -> Option<&str> {
payload.get("status").and_then(Value::as_str)
}
fn payload_current_url(payload: &Value) -> Option<&str> {
payload.get("current_url").and_then(Value::as_str)
}
fn payload_next_url(payload: &Value) -> Option<&str> {
payload.get("next_url").and_then(Value::as_str)
}
fn is_login_required_payload(payload: &Value) -> bool {
payload_status(payload) == Some("login_required")
}
fn build_login_block_message(current_url: Option<&str>) -> String {
let suffix = current_url
.filter(|value| !value.is_empty())
.map(|value| format!(" 当前页面:{value}"))
.unwrap_or_default();
format!(
"当前知乎浏览器会话未登录,无法进入创作者中心或发布文章。请先登录知乎后再继续。{suffix}"
)
}
fn build_editor_unavailable_message(current_url: Option<&str>) -> String {
let suffix = current_url
.filter(|value| !value.is_empty())
.map(|value| format!(" 当前页面:{value}"))
.unwrap_or_default();
format!(
"已进入知乎创作者流程,但当前未检测到文章编辑器。可能原因是页面仍在加载、当前账号暂未开放写作入口,或知乎页面结构发生变化。请确认当前知乎账号已登录且具备发文权限,然后在页面稳定后重试。{suffix}"
)
}
fn build_creator_entry_missing_message(current_url: Option<&str>) -> String {
let suffix = current_url
.filter(|value| !value.is_empty())
.map(|value| format!(" 当前页面:{value}"))
.unwrap_or_default();
format!(
"已进入知乎创作者中心,但当前未找到“写文章”入口。请确认页面已加载完成,且当前账号具备文章发布入口后再重试。{suffix}"
)
}
fn build_publish_confirmation_message(article: &ArticleDraft) -> String {
format!(
"我已收到这篇知乎文章的内容,但在当前会话里还没有拿到明确发布确认。\n\n标题:{}\n正文:{}\n\n如果你确定现在要发布,请直接回复“确认发布”。在收到明确确认之前,我不会执行任何发布动作。",
article.title,
article.body
)
}
fn has_explicit_publish_confirmation(instruction: &str) -> bool {
let trimmed = instruction.trim();
trimmed.contains("确认发布")
|| trimmed.contains("确认发表")
|| trimmed.contains("现在发布")
|| trimmed.contains("立即发布")
|| trimmed.contains("可以发布")
}
fn task_requests_zhihu_article_entry(
instruction: &str,
page_url: Option<&str>,
page_title: Option<&str>,
) -> bool {
if !crate::runtime::is_zhihu_write_task(instruction, page_url, page_title) {
return false;
}
let normalized = instruction.to_ascii_lowercase();
let asks_to_open = normalized.contains("open")
|| normalized.contains("goto")
|| normalized.contains("go to")
|| instruction.contains("打开")
|| instruction.contains("进入")
|| instruction.contains("");
let mentions_entry = instruction.contains("页面")
|| instruction.contains("入口")
|| instruction.contains("创作中心")
|| instruction.contains("写文章")
|| instruction.contains("发文章");
let has_article_inputs = parse_article_draft(instruction).is_some();
asks_to_open && mentions_entry && !has_article_inputs
}
fn extract_article_draft(
instruction: &str,
messages: &[ConversationMessage],
) -> Option<ArticleDraft> {
parse_article_draft(instruction).or_else(|| {
messages
.iter()
.rev()
.filter(|message| message.role == "user")
.find_map(|message| parse_article_draft(&message.content))
})
}
fn parse_article_draft(text: &str) -> Option<ArticleDraft> {
let normalized = normalize_article_draft_input(text);
let title_re = Regex::new(r"(?m)^标题[:]\s*(.+?)\s*$").expect("valid zhihu title regex");
let body_re = Regex::new(r"(?s)正文[:]\s*(.+)$").expect("valid zhihu body regex");
let inline_title_re =
Regex::new(r"标题(?:是|为)\s*([^,\n]+)").expect("valid inline zhihu title regex");
let inline_body_re =
Regex::new(r"(?s)正文(?:是|为)\s*(.+)$").expect("valid inline zhihu body regex");
let title = title_re
.captures(&normalized)
.and_then(|capture| capture.get(1))
.map(|value| value.as_str().trim().to_string())
.or_else(|| {
inline_title_re
.captures(&normalized)
.and_then(|capture| capture.get(1))
.map(|value| value.as_str().trim().to_string())
})?;
let body = body_re
.captures(&normalized)
.and_then(|capture| capture.get(1))
.map(|value| value.as_str().trim().to_string())
.or_else(|| {
inline_body_re
.captures(&normalized)
.and_then(|capture| capture.get(1))
.map(|value| value.as_str().trim().trim_end_matches('。').to_string())
})?;
if title.is_empty() || body.is_empty() {
return None;
}
Some(ArticleDraft { title, body })
}
fn normalize_article_draft_input(text: &str) -> String {
let trimmed = text.trim();
let unquoted = if trimmed.len() >= 2
&& ((trimmed.starts_with('"') && trimmed.ends_with('"'))
|| (trimmed.starts_with('\'') && trimmed.ends_with('\'')))
{
&trimmed[1..trimmed.len() - 1]
} else {
trimmed
};
unquoted.replace("\\n", "\n")
}