feat: align browser callback runtime and export flows

Consolidate the browser task runtime around the callback path, add safer artifact opening for Zhihu exports, and cover the new service/browser flows with focused tests and supporting docs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
木炎
2026-04-06 21:44:53 +08:00
parent 0dd655712c
commit bdf8e12246
55 changed files with 14440 additions and 1053 deletions

View File

@@ -0,0 +1,922 @@
use std::io::{BufRead, BufReader, Read as _, Write};
use std::net::TcpListener;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use reqwest::blocking::Client;
use serde_json::{json, Value};
use sgclaw::service::{ClientMessage, ServiceMessage};
use tungstenite::{accept, Message};
const RUNTIME_DROP_PANIC_TEXT: &str =
"Cannot drop a runtime in a context where blocking is not allowed";
fn read_ws_text(stream: &mut tungstenite::WebSocket<std::net::TcpStream>) -> String {
match stream.read().unwrap() {
Message::Text(text) => text.to_string(),
other => panic!("expected text frame, got {other:?}"),
}
}
fn start_fake_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
let (mut stream, _) = loop {
match listener.accept() {
Ok(pair) => break pair,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for provider request"
);
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn read_http_json_body(stream: &mut impl std::io::Read) -> Result<Value, &'static str> {
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 1024];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading headers");
}
buffer.extend_from_slice(&chunk[..bytes]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.unwrap() + 4;
let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap();
let Some(content_length) = headers.lines().find_map(|line| {
let (name, value) = line.split_once(':')?;
name.eq_ignore_ascii_case("content-length")
.then(|| value.trim().parse::<usize>().unwrap())
}) else {
return Err("missing content-length header");
};
while buffer.len() < headers_end + content_length {
let mut chunk = vec![0_u8; content_length];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading body");
}
buffer.extend_from_slice(&chunk[..bytes]);
}
Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap())
}
#[derive(Debug)]
enum CallbackHostBrowserEvent {
BrowserFrame(Value),
CommandEnvelope(Value),
}
fn start_callback_host_hotlist_browser_server(
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
) -> (String, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(2))).unwrap();
let mut websocket = accept(stream).unwrap();
let register = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected register frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(register))
.unwrap();
websocket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
let first_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
.unwrap();
let Some(values) = first_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_open = values.len() >= 3
&& values[1] == json!("sgBrowerserOpenPage")
&& values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_open {
websocket.close(None).ok();
return;
}
let helper_url = values[2].as_str().unwrap().to_string();
let helper_origin = helper_url
.trim_end_matches("/sgclaw/browser-helper.html")
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
.build()
.unwrap();
let helper_html = helper_client
.get(&helper_url)
.send()
.unwrap()
.error_for_status()
.unwrap()
.text()
.unwrap();
assert!(helper_html.contains("sgclawReady"));
assert!(helper_html.contains("sgclawOnLoaded"));
assert!(helper_html.contains("sgclawOnGetText"));
assert!(helper_html.contains("sgclawOnEval"));
let pre_ready_command: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
.unwrap();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/ready"))
.json(&json!({
"type": "ready",
"helper_url": helper_url,
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度";
let hotlist_payload = json!({
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
})
.to_string();
let deadline = Instant::now() + Duration::from_secs(10);
let mut saw_get_text = false;
let mut saw_eval = false;
while Instant::now() < deadline {
let envelope: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
};
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
.unwrap();
let action_name = command
.get("action")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
.json(&json!({ "type": "command_ack" }))
.send()
.unwrap()
.error_for_status()
.unwrap();
let args = command
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match action_name.as_str() {
"sgBrowerserOpenPage" => {}
"sgBrowserExcuteJsCodeByDomain" => {
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
if script.contains("sgclawOnGetText") {
saw_get_text = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnGetText",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "text": hotlist_text }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
} else if script.contains("sgclawOnEval") {
saw_eval = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnEval",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "value": hotlist_payload }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
break;
} else {
panic!("unexpected callback-host domain command: {script}");
}
}
other => panic!("unexpected callback-host command action {other}"),
}
}
assert!(saw_get_text, "expected callback-host getText command");
assert!(saw_eval, "expected callback-host eval command");
websocket.close(None).ok();
});
(format!("ws://{address}"), handle)
}
fn start_direct_zhihu_browser_ws_server() -> (String, Arc<Mutex<Vec<String>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let frames = Arc::new(Mutex::new(Vec::new()));
let frames_for_thread = Arc::clone(&frames);
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut socket = accept(stream).unwrap();
let mut action_count = 0_u64;
loop {
let message = match socket.read() {
Ok(message) => message,
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => break,
Err(err) => panic!("browser ws test server read failed: {err}"),
};
let payload = match message {
Message::Text(text) => text.to_string(),
Message::Ping(payload) => {
socket.send(Message::Pong(payload)).unwrap();
continue;
}
Message::Close(_) => break,
other => panic!("expected text frame, got {other:?}"),
};
frames_for_thread.lock().unwrap().push(payload.clone());
let parsed: Value = serde_json::from_str(&payload).unwrap();
if parsed.get("type").and_then(Value::as_str) == Some("register") {
continue;
}
let values = parsed.as_array().expect("browser action frame should be an array");
let request_url = values[0].as_str().expect("request_url should be a string");
let action = values[1].as_str().expect("action should be a string");
action_count += 1;
socket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
socket.send(Message::Text("0".into())).unwrap();
let callback_frame = match action {
"sgHideBrowserCallAfterLoaded" => {
let target_url = values[2].as_str().expect("navigate target_url should be a string");
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@"
)
])
}
"sgBrowserExcuteJsCodeByArea" => {
let target_url = values[2].as_str().expect("script target_url should be a string");
let response_text = if action_count == 2 {
"知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string()
} else {
r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string()
};
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}"
)
])
}
other => panic!("unexpected browser action {other}"),
};
socket
.send(Message::Text(callback_frame.to_string().into()))
.unwrap();
if action_count >= 3 {
break;
}
}
socket.close(None).ok();
});
(format!("ws://{address}"), frames, handle)
}
#[test]
fn client_submits_first_user_line_to_service() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let status = child.wait().unwrap();
assert!(status.success());
let request = server.join().unwrap();
assert_eq!(
request,
ClientMessage::SubmitTask {
instruction: "打开百度搜索天气".to_string(),
conversation_id: "".to_string(),
messages: vec![],
page_url: "".to_string(),
page_title: "".to_string(),
}
);
}
#[test]
fn client_sends_connect_request_and_exits_after_status() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: "connected".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: "connected again".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("/connect\n".as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
let request = server.join().unwrap();
assert!(output.status.success());
assert_eq!(request, ClientMessage::Connect);
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["status: connected"]);
}
#[test]
fn client_sends_start_and_stop_requests_with_explicit_commands() {
for (input, expected_request, expected_status) in [
("/start\n", ClientMessage::Start, "status: started"),
("/stop\n", ClientMessage::Stop, "status: stopped"),
] {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let expected_state = expected_status.trim_start_matches("status: ").to_string();
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: expected_state,
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all(input.as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
let request = server.join().unwrap();
assert!(output.status.success());
assert_eq!(request, expected_request);
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec![expected_status]);
}
}
#[test]
fn client_prints_completion_only_once() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done again".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
server.join().unwrap();
assert!(output.status.success());
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["done"]);
}
#[test]
fn client_prints_log_entries_in_order_before_completion() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
for message in [
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "step 1".to_string(),
},
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "step 2".to_string(),
},
ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
] {
websocket
.send(Message::Text(serde_json::to_string(&message).unwrap().into()))
.unwrap();
}
websocket.close(None).unwrap();
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let stdout = child.stdout.take().unwrap();
let (tx, rx) = mpsc::channel();
let reader = thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
tx.send(line.unwrap()).unwrap();
}
});
let first = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let second = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let third = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let status = child.wait().unwrap();
reader.join().unwrap();
server.join().unwrap();
assert!(status.success());
assert_eq!(vec![first, second, third], vec!["step 1", "step 2", "done"]);
}
#[test]
fn client_exits_with_failure_when_service_disconnects_before_completion() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let status = child.wait().unwrap();
assert!(!status.success());
let request = server.join().unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
}
#[test]
fn client_to_service_regression_routes_zhihu_through_callback_host_without_invalid_hmac_seed_output() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root = std::env::temp_dir().join(format!("sgclaw-service-task-flow-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut service_stderr_boot = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
service_stderr_boot.push_str(&String::from_utf8_lossy(&buf[..n]));
if service_stderr_boot.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
service_stderr_boot.contains("sg_claw ready:"),
"service did not report readiness; stderr={service_stderr_boot}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let service_stdout = service
.stdout
.take()
.map(|mut stream| {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
let service_stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
let client_stdout = String::from_utf8_lossy(&client_output.stdout).into_owned();
let client_stderr = String::from_utf8_lossy(&client_output.stderr).into_owned();
let combined_output = format!("{client_stdout}\n{client_stderr}\n{service_stdout}\n{service_stderr}");
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot"));
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(client_stdout.contains("已导出并打开知乎热榜 Excel"), "client stdout={client_stdout}");
assert!(client_stdout.contains(".xlsx"), "client stdout={client_stdout}");
assert!(
!combined_output.contains("invalid hmac seed: session key must not be empty"),
"target behavior must avoid the invalid hmac seed failure; combined_output={combined_output}"
);
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"target behavior must avoid the runtime-drop panic; combined_output={combined_output}"
);
}