Files
claw/tests/service_task_flow_test.rs
木炎 883647dffc feat: add config-owned direct submit runtime
Keep browser-attached workflows on the configured direct-skill path and align the Zhihu export/browser regression contracts with the current ws merge state.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 15:45:42 +08:00

931 lines
34 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::io::{BufRead, BufReader, Read as _, Write};
use std::net::TcpListener;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use reqwest::blocking::Client;
use serde_json::{json, Value};
use sgclaw::service::{ClientMessage, ServiceMessage};
use tungstenite::{accept, Message};
const RUNTIME_DROP_PANIC_TEXT: &str =
"Cannot drop a runtime in a context where blocking is not allowed";
const TEST_ZHIHU_SKILLS_DIR: &str = "D:/data/ideaSpace/rust/sgClaw/claw/claw/skills";
fn read_ws_text(stream: &mut tungstenite::WebSocket<std::net::TcpStream>) -> String {
match stream.read().unwrap() {
Message::Text(text) => text.to_string(),
other => panic!("expected text frame, got {other:?}"),
}
}
fn start_fake_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
let (mut stream, _) = loop {
match listener.accept() {
Ok(pair) => break pair,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for provider request"
);
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn read_http_json_body(stream: &mut impl std::io::Read) -> Result<Value, &'static str> {
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 1024];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading headers");
}
buffer.extend_from_slice(&chunk[..bytes]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.unwrap() + 4;
let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap();
let Some(content_length) = headers.lines().find_map(|line| {
let (name, value) = line.split_once(':')?;
name.eq_ignore_ascii_case("content-length")
.then(|| value.trim().parse::<usize>().unwrap())
}) else {
return Err("missing content-length header");
};
while buffer.len() < headers_end + content_length {
let mut chunk = vec![0_u8; content_length];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading body");
}
buffer.extend_from_slice(&chunk[..bytes]);
}
Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap())
}
#[derive(Debug)]
enum CallbackHostBrowserEvent {
BrowserFrame(Value),
CommandEnvelope(Value),
}
fn start_callback_host_hotlist_browser_server(
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
) -> (String, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(2))).unwrap();
let mut websocket = accept(stream).unwrap();
let register = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected register frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(register))
.unwrap();
websocket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
let first_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
.unwrap();
let Some(values) = first_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_open = values.len() >= 3
&& values[1] == json!("sgBrowerserOpenPage")
&& values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_open {
websocket.close(None).ok();
return;
}
let helper_url = values[2].as_str().unwrap().to_string();
let helper_origin = helper_url
.trim_end_matches("/sgclaw/browser-helper.html")
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
.pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
.get(&helper_url)
.send()
.unwrap()
.error_for_status()
.unwrap()
.text()
.unwrap();
assert!(helper_html.contains("sgclawReady"));
assert!(helper_html.contains("sgclawOnLoaded"));
assert!(helper_html.contains("sgclawOnGetText"));
assert!(helper_html.contains("sgclawOnEval"));
let pre_ready_command: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
.unwrap();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/ready"))
.json(&json!({
"type": "ready",
"helper_url": helper_url,
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度";
let hotlist_payload = json!({
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
})
.to_string();
let deadline = Instant::now() + Duration::from_secs(10);
let mut saw_get_text = false;
let mut saw_eval = false;
while Instant::now() < deadline {
let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.and_then(|response| response.error_for_status())
.and_then(|response| response.json())
{
Ok(envelope) => envelope,
Err(_) => {
thread::sleep(Duration::from_millis(20));
continue;
}
};
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
};
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
.unwrap();
let action_name = command
.get("action")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
.json(&json!({ "type": "command_ack" }))
.send()
.unwrap()
.error_for_status()
.unwrap();
let args = command
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match action_name.as_str() {
"sgBrowerserOpenPage" => {}
"sgBrowserExcuteJsCodeByDomain" => {
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
if script.contains("sgclawOnGetText") {
saw_get_text = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnGetText",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "text": hotlist_text }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
} else if script.contains("sgclawOnEval") {
saw_eval = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnEval",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "value": hotlist_payload }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
break;
} else {
panic!("unexpected callback-host domain command: {script}");
}
}
other => panic!("unexpected callback-host command action {other}"),
}
}
assert!(saw_get_text, "expected callback-host getText command");
assert!(saw_eval, "expected callback-host eval command");
websocket.close(None).ok();
});
(format!("ws://{address}"), handle)
}
fn start_direct_zhihu_browser_ws_server() -> (String, Arc<Mutex<Vec<String>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let frames = Arc::new(Mutex::new(Vec::new()));
let frames_for_thread = Arc::clone(&frames);
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut socket = accept(stream).unwrap();
let mut action_count = 0_u64;
loop {
let message = match socket.read() {
Ok(message) => message,
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => break,
Err(err) => panic!("browser ws test server read failed: {err}"),
};
let payload = match message {
Message::Text(text) => text.to_string(),
Message::Ping(payload) => {
socket.send(Message::Pong(payload)).unwrap();
continue;
}
Message::Close(_) => break,
other => panic!("expected text frame, got {other:?}"),
};
frames_for_thread.lock().unwrap().push(payload.clone());
let parsed: Value = serde_json::from_str(&payload).unwrap();
if parsed.get("type").and_then(Value::as_str) == Some("register") {
continue;
}
let values = parsed.as_array().expect("browser action frame should be an array");
let request_url = values[0].as_str().expect("request_url should be a string");
let action = values[1].as_str().expect("action should be a string");
action_count += 1;
socket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
socket.send(Message::Text("0".into())).unwrap();
let callback_frame = match action {
"sgHideBrowserCallAfterLoaded" => {
let target_url = values[2].as_str().expect("navigate target_url should be a string");
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@"
)
])
}
"sgBrowserExcuteJsCodeByArea" => {
let target_url = values[2].as_str().expect("script target_url should be a string");
let response_text = if action_count == 2 {
"知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string()
} else {
r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string()
};
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}"
)
])
}
other => panic!("unexpected browser action {other}"),
};
socket
.send(Message::Text(callback_frame.to_string().into()))
.unwrap();
if action_count >= 3 {
break;
}
}
socket.close(None).ok();
});
(format!("ws://{address}"), frames, handle)
}
#[test]
fn client_submits_first_user_line_to_service() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let status = child.wait().unwrap();
assert!(status.success());
let request = server.join().unwrap();
assert_eq!(
request,
ClientMessage::SubmitTask {
instruction: "打开百度搜索天气".to_string(),
conversation_id: "".to_string(),
messages: vec![],
page_url: "".to_string(),
page_title: "".to_string(),
}
);
}
#[test]
fn client_sends_connect_request_and_exits_after_status() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: "connected".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: "connected again".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("/connect\n".as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
let request = server.join().unwrap();
assert!(output.status.success());
assert_eq!(request, ClientMessage::Connect);
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["status: connected"]);
}
#[test]
fn client_sends_start_and_stop_requests_with_explicit_commands() {
for (input, expected_request, expected_status) in [
("/start\n", ClientMessage::Start, "status: started"),
("/stop\n", ClientMessage::Stop, "status: stopped"),
] {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let expected_state = expected_status.trim_start_matches("status: ").to_string();
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::StatusChanged {
state: expected_state,
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all(input.as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
let request = server.join().unwrap();
assert!(output.status.success());
assert_eq!(request, expected_request);
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec![expected_status]);
}
}
#[test]
fn client_prints_completion_only_once() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket
.send(Message::Text(
serde_json::to_string(&ServiceMessage::TaskComplete {
success: true,
summary: "done again".to_string(),
})
.unwrap()
.into(),
))
.unwrap();
websocket.close(None).unwrap();
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let output = child.wait_with_output().unwrap();
server.join().unwrap();
assert!(output.status.success());
let stdout = String::from_utf8(output.stdout).unwrap();
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["done"]);
}
#[test]
fn client_prints_log_entries_in_order_before_completion() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
for message in [
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "step 1".to_string(),
},
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "step 2".to_string(),
},
ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
] {
websocket
.send(Message::Text(serde_json::to_string(&message).unwrap().into()))
.unwrap();
}
websocket.close(None).unwrap();
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let stdout = child.stdout.take().unwrap();
let (tx, rx) = mpsc::channel();
let reader = thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
tx.send(line.unwrap()).unwrap();
}
});
let first = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let second = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let third = rx.recv_timeout(Duration::from_secs(1)).unwrap();
let status = child.wait().unwrap();
reader.join().unwrap();
server.join().unwrap();
assert!(status.success());
assert_eq!(vec![first, second, third], vec!["step 1", "step 2", "done"]);
}
#[test]
fn client_exits_with_failure_when_service_disconnects_before_completion() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let ws_url = format!("ws://{address}");
let server = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut websocket = accept(stream).unwrap();
let payload = read_ws_text(&mut websocket);
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
websocket.close(None).unwrap();
request
});
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
child
.stdin
.as_mut()
.unwrap()
.write_all("打开百度搜索天气\n".as_bytes())
.unwrap();
let status = child.wait().unwrap();
assert!(!status.success());
let request = server.join().unwrap();
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
}
#[test]
fn client_to_service_regression_routes_zhihu_through_callback_host_without_invalid_hmac_seed_output() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root = std::env::temp_dir().join(format!("sgclaw-service-task-flow-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut service_stderr_boot = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
service_stderr_boot.push_str(&String::from_utf8_lossy(&buf[..n]));
if service_stderr_boot.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
service_stderr_boot.contains("sg_claw ready:"),
"service did not report readiness; stderr={service_stderr_boot}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let service_stdout = service
.stdout
.take()
.map(|mut stream| {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
let service_stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
let client_stdout = String::from_utf8_lossy(&client_output.stdout).into_owned();
let client_stderr = String::from_utf8_lossy(&client_output.stderr).into_owned();
let combined_output = format!("{client_stdout}\n{client_stderr}\n{service_stdout}\n{service_stderr}");
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot"));
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(client_stdout.contains("已导出并打开知乎热榜 Excel"), "client stdout={client_stdout}");
assert!(client_stdout.contains(".xlsx"), "client stdout={client_stdout}");
assert!(
!combined_output.contains("invalid hmac seed: session key must not be empty"),
"target behavior must avoid the invalid hmac seed failure; combined_output={combined_output}"
);
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"target behavior must avoid the runtime-drop panic; combined_output={combined_output}"
);
}