Align the service task flow callback-host regression with the hidden helper close/open bootstrap sequence uncovered during final request-url verification. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
964 lines
35 KiB
Rust
964 lines
35 KiB
Rust
use std::io::{BufRead, BufReader, Read as _, Write};
|
||
use std::net::TcpListener;
|
||
use std::sync::{mpsc, Arc, Mutex};
|
||
use std::thread;
|
||
use std::time::{Duration, Instant};
|
||
|
||
use reqwest::blocking::Client;
|
||
use serde_json::{json, Value};
|
||
use sgclaw::service::{ClientMessage, ServiceMessage};
|
||
use tungstenite::{accept, Message};
|
||
|
||
const RUNTIME_DROP_PANIC_TEXT: &str =
|
||
"Cannot drop a runtime in a context where blocking is not allowed";
|
||
|
||
const TEST_ZHIHU_SKILLS_DIR: &str = "D:/data/ideaSpace/rust/sgClaw/claw/claw/skills";
|
||
|
||
fn read_ws_text(stream: &mut tungstenite::WebSocket<std::net::TcpStream>) -> String {
|
||
match stream.read().unwrap() {
|
||
Message::Text(text) => text.to_string(),
|
||
other => panic!("expected text frame, got {other:?}"),
|
||
}
|
||
}
|
||
|
||
fn start_fake_deepseek_server(
|
||
responses: Vec<Value>,
|
||
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
listener.set_nonblocking(true).unwrap();
|
||
let address = format!("http://{}", listener.local_addr().unwrap());
|
||
let requests = Arc::new(Mutex::new(Vec::new()));
|
||
let request_log = requests.clone();
|
||
|
||
let handle = thread::spawn(move || {
|
||
for response in responses {
|
||
let deadline = std::time::Instant::now() + Duration::from_secs(5);
|
||
let (mut stream, _) = loop {
|
||
match listener.accept() {
|
||
Ok(pair) => break pair,
|
||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||
assert!(
|
||
std::time::Instant::now() < deadline,
|
||
"timed out waiting for provider request"
|
||
);
|
||
thread::sleep(Duration::from_millis(10));
|
||
}
|
||
Err(err) => panic!("failed to accept provider request: {err}"),
|
||
}
|
||
};
|
||
stream.set_nonblocking(false).unwrap();
|
||
let body = match read_http_json_body(&mut stream) {
|
||
Ok(body) => body,
|
||
Err(_) => continue,
|
||
};
|
||
request_log.lock().unwrap().push(body);
|
||
|
||
let payload = response.to_string();
|
||
let reply = format!(
|
||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
|
||
payload.as_bytes().len(),
|
||
payload
|
||
);
|
||
stream.write_all(reply.as_bytes()).unwrap();
|
||
stream.flush().unwrap();
|
||
}
|
||
});
|
||
|
||
(address, requests, handle)
|
||
}
|
||
|
||
fn read_http_json_body(stream: &mut impl std::io::Read) -> Result<Value, &'static str> {
|
||
let mut buffer = Vec::new();
|
||
let mut headers_end = None;
|
||
|
||
while headers_end.is_none() {
|
||
let mut chunk = [0_u8; 1024];
|
||
let bytes = stream.read(&mut chunk).unwrap();
|
||
if bytes == 0 {
|
||
return Err("unexpected EOF while reading headers");
|
||
}
|
||
buffer.extend_from_slice(&chunk[..bytes]);
|
||
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
|
||
}
|
||
|
||
let headers_end = headers_end.unwrap() + 4;
|
||
let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap();
|
||
let Some(content_length) = headers.lines().find_map(|line| {
|
||
let (name, value) = line.split_once(':')?;
|
||
name.eq_ignore_ascii_case("content-length")
|
||
.then(|| value.trim().parse::<usize>().unwrap())
|
||
}) else {
|
||
return Err("missing content-length header");
|
||
};
|
||
|
||
while buffer.len() < headers_end + content_length {
|
||
let mut chunk = vec![0_u8; content_length];
|
||
let bytes = stream.read(&mut chunk).unwrap();
|
||
if bytes == 0 {
|
||
return Err("unexpected EOF while reading body");
|
||
}
|
||
buffer.extend_from_slice(&chunk[..bytes]);
|
||
}
|
||
|
||
Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap())
|
||
}
|
||
|
||
#[derive(Debug)]
|
||
enum CallbackHostBrowserEvent {
|
||
BrowserFrame(Value),
|
||
CommandEnvelope(Value),
|
||
}
|
||
|
||
fn start_callback_host_hotlist_browser_server(
|
||
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
|
||
) -> (String, thread::JoinHandle<()>) {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
|
||
let handle = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
|
||
stream.set_write_timeout(Some(Duration::from_secs(2))).unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
|
||
let register = match websocket.read().unwrap() {
|
||
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
|
||
other => panic!("expected register frame, got {other:?}"),
|
||
};
|
||
event_tx
|
||
.send(CallbackHostBrowserEvent::BrowserFrame(register))
|
||
.unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
|
||
.to_string()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
|
||
let first_action = match websocket.read().unwrap() {
|
||
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
|
||
other => panic!("expected browser action frame, got {other:?}"),
|
||
};
|
||
event_tx
|
||
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
|
||
.unwrap();
|
||
|
||
let second_action = match websocket.read().unwrap() {
|
||
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
|
||
other => panic!("expected second browser action frame, got {other:?}"),
|
||
};
|
||
event_tx
|
||
.send(CallbackHostBrowserEvent::BrowserFrame(second_action.clone()))
|
||
.unwrap();
|
||
|
||
let Some(close_values) = first_action.as_array() else {
|
||
websocket.close(None).ok();
|
||
return;
|
||
};
|
||
let is_helper_close = close_values.len() >= 3
|
||
&& close_values[1] == json!("sgHideBrowerserClosePage")
|
||
&& close_values[2]
|
||
.as_str()
|
||
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
|
||
if !is_helper_close {
|
||
websocket.close(None).ok();
|
||
return;
|
||
}
|
||
|
||
let Some(values) = second_action.as_array() else {
|
||
websocket.close(None).ok();
|
||
return;
|
||
};
|
||
let is_helper_open = values.len() >= 3
|
||
&& values[1] == json!("sgHideBrowerserOpenPage")
|
||
&& values[2]
|
||
.as_str()
|
||
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
|
||
if !is_helper_open {
|
||
websocket.close(None).ok();
|
||
return;
|
||
}
|
||
|
||
let helper_url = values[2].as_str().unwrap().to_string();
|
||
let helper_origin = helper_url
|
||
.trim_end_matches("/sgclaw/browser-helper.html")
|
||
.to_string();
|
||
let helper_client = Client::builder()
|
||
.timeout(Duration::from_secs(2))
|
||
.pool_max_idle_per_host(0)
|
||
.build()
|
||
.unwrap();
|
||
let helper_html = helper_client
|
||
.get(&helper_url)
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap()
|
||
.text()
|
||
.unwrap();
|
||
assert!(helper_html.contains("sgclawReady"));
|
||
assert!(helper_html.contains("sgclawOnLoaded"));
|
||
assert!(helper_html.contains("sgclawOnGetText"));
|
||
assert!(helper_html.contains("sgclawOnEval"));
|
||
|
||
let pre_ready_command: Value = helper_client
|
||
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap()
|
||
.json()
|
||
.unwrap();
|
||
event_tx
|
||
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
|
||
.unwrap();
|
||
|
||
helper_client
|
||
.post(format!("{helper_origin}/sgclaw/callback/ready"))
|
||
.json(&json!({
|
||
"type": "ready",
|
||
"helper_url": helper_url,
|
||
}))
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap();
|
||
|
||
let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度";
|
||
let hotlist_payload = json!({
|
||
"source": "https://www.zhihu.com/hot",
|
||
"sheet_name": "知乎热榜",
|
||
"columns": ["rank", "title", "heat"],
|
||
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
|
||
})
|
||
.to_string();
|
||
let deadline = Instant::now() + Duration::from_secs(10);
|
||
let mut saw_get_text = false;
|
||
let mut saw_eval = false;
|
||
|
||
while Instant::now() < deadline {
|
||
let envelope: Value = match helper_client
|
||
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
|
||
.send()
|
||
.and_then(|response| response.error_for_status())
|
||
.and_then(|response| response.json())
|
||
{
|
||
Ok(envelope) => envelope,
|
||
Err(_) => {
|
||
thread::sleep(Duration::from_millis(20));
|
||
continue;
|
||
}
|
||
};
|
||
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
|
||
thread::sleep(Duration::from_millis(20));
|
||
continue;
|
||
};
|
||
event_tx
|
||
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
|
||
.unwrap();
|
||
|
||
let action_name = command
|
||
.get("action")
|
||
.and_then(Value::as_str)
|
||
.unwrap_or_default()
|
||
.to_string();
|
||
|
||
helper_client
|
||
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
|
||
.json(&json!({ "type": "command_ack" }))
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap();
|
||
|
||
let args = command
|
||
.get("args")
|
||
.and_then(Value::as_array)
|
||
.cloned()
|
||
.unwrap_or_default();
|
||
match action_name.as_str() {
|
||
"sgBrowerserOpenPage" => {}
|
||
"sgBrowserExcuteJsCodeByDomain" => {
|
||
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
|
||
if script.contains("sgclawOnGetText") {
|
||
saw_get_text = true;
|
||
helper_client
|
||
.post(format!("{helper_origin}/sgclaw/callback/events"))
|
||
.json(&json!({
|
||
"callback": "sgclawOnGetText",
|
||
"request_url": helper_url,
|
||
"target_url": "https://www.zhihu.com/hot",
|
||
"action": action_name,
|
||
"payload": { "text": hotlist_text }
|
||
}))
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap();
|
||
} else if script.contains("sgclawOnEval") {
|
||
saw_eval = true;
|
||
helper_client
|
||
.post(format!("{helper_origin}/sgclaw/callback/events"))
|
||
.json(&json!({
|
||
"callback": "sgclawOnEval",
|
||
"request_url": helper_url,
|
||
"target_url": "https://www.zhihu.com/hot",
|
||
"action": action_name,
|
||
"payload": { "value": hotlist_payload }
|
||
}))
|
||
.send()
|
||
.unwrap()
|
||
.error_for_status()
|
||
.unwrap();
|
||
break;
|
||
} else {
|
||
panic!("unexpected callback-host domain command: {script}");
|
||
}
|
||
}
|
||
other => panic!("unexpected callback-host command action {other}"),
|
||
}
|
||
}
|
||
|
||
assert!(saw_get_text, "expected callback-host getText command");
|
||
assert!(saw_eval, "expected callback-host eval command");
|
||
websocket.close(None).ok();
|
||
});
|
||
|
||
(format!("ws://{address}"), handle)
|
||
}
|
||
|
||
fn start_direct_zhihu_browser_ws_server() -> (String, Arc<Mutex<Vec<String>>>, thread::JoinHandle<()>) {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let frames = Arc::new(Mutex::new(Vec::new()));
|
||
let frames_for_thread = Arc::clone(&frames);
|
||
|
||
let handle = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
|
||
stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
|
||
let mut socket = accept(stream).unwrap();
|
||
let mut action_count = 0_u64;
|
||
|
||
loop {
|
||
let message = match socket.read() {
|
||
Ok(message) => message,
|
||
Err(tungstenite::Error::ConnectionClosed)
|
||
| Err(tungstenite::Error::AlreadyClosed) => break,
|
||
Err(err) => panic!("browser ws test server read failed: {err}"),
|
||
};
|
||
let payload = match message {
|
||
Message::Text(text) => text.to_string(),
|
||
Message::Ping(payload) => {
|
||
socket.send(Message::Pong(payload)).unwrap();
|
||
continue;
|
||
}
|
||
Message::Close(_) => break,
|
||
other => panic!("expected text frame, got {other:?}"),
|
||
};
|
||
frames_for_thread.lock().unwrap().push(payload.clone());
|
||
|
||
let parsed: Value = serde_json::from_str(&payload).unwrap();
|
||
if parsed.get("type").and_then(Value::as_str) == Some("register") {
|
||
continue;
|
||
}
|
||
|
||
let values = parsed.as_array().expect("browser action frame should be an array");
|
||
let request_url = values[0].as_str().expect("request_url should be a string");
|
||
let action = values[1].as_str().expect("action should be a string");
|
||
action_count += 1;
|
||
|
||
socket
|
||
.send(Message::Text(
|
||
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
|
||
.to_string()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
socket.send(Message::Text("0".into())).unwrap();
|
||
|
||
let callback_frame = match action {
|
||
"sgHideBrowserCallAfterLoaded" => {
|
||
let target_url = values[2].as_str().expect("navigate target_url should be a string");
|
||
json!([
|
||
request_url,
|
||
"callBackJsToCpp",
|
||
format!(
|
||
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@"
|
||
)
|
||
])
|
||
}
|
||
"sgBrowserExcuteJsCodeByArea" => {
|
||
let target_url = values[2].as_str().expect("script target_url should be a string");
|
||
let response_text = if action_count == 2 {
|
||
"知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string()
|
||
} else {
|
||
r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string()
|
||
};
|
||
json!([
|
||
request_url,
|
||
"callBackJsToCpp",
|
||
format!(
|
||
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}"
|
||
)
|
||
])
|
||
}
|
||
other => panic!("unexpected browser action {other}"),
|
||
};
|
||
|
||
socket
|
||
.send(Message::Text(callback_frame.to_string().into()))
|
||
.unwrap();
|
||
|
||
if action_count >= 3 {
|
||
break;
|
||
}
|
||
}
|
||
|
||
socket.close(None).ok();
|
||
});
|
||
|
||
(format!("ws://{address}"), frames, handle)
|
||
}
|
||
|
||
#[test]
|
||
fn client_submits_first_user_line_to_service() {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::TaskComplete {
|
||
success: true,
|
||
summary: "done".to_string(),
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket.close(None).unwrap();
|
||
request
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("打开百度搜索天气\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let status = child.wait().unwrap();
|
||
assert!(status.success());
|
||
|
||
let request = server.join().unwrap();
|
||
assert_eq!(
|
||
request,
|
||
ClientMessage::SubmitTask {
|
||
instruction: "打开百度搜索天气".to_string(),
|
||
conversation_id: "".to_string(),
|
||
messages: vec![],
|
||
page_url: "".to_string(),
|
||
page_title: "".to_string(),
|
||
}
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn client_sends_connect_request_and_exits_after_status() {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::StatusChanged {
|
||
state: "connected".to_string(),
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::StatusChanged {
|
||
state: "connected again".to_string(),
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket.close(None).unwrap();
|
||
request
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("/connect\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let output = child.wait_with_output().unwrap();
|
||
let request = server.join().unwrap();
|
||
|
||
assert!(output.status.success());
|
||
assert_eq!(request, ClientMessage::Connect);
|
||
let stdout = String::from_utf8(output.stdout).unwrap();
|
||
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["status: connected"]);
|
||
}
|
||
|
||
#[test]
|
||
fn client_sends_start_and_stop_requests_with_explicit_commands() {
|
||
for (input, expected_request, expected_status) in [
|
||
("/start\n", ClientMessage::Start, "status: started"),
|
||
("/stop\n", ClientMessage::Stop, "status: stopped"),
|
||
] {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
let expected_state = expected_status.trim_start_matches("status: ").to_string();
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::StatusChanged {
|
||
state: expected_state,
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket.close(None).unwrap();
|
||
request
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all(input.as_bytes())
|
||
.unwrap();
|
||
|
||
let output = child.wait_with_output().unwrap();
|
||
let request = server.join().unwrap();
|
||
|
||
assert!(output.status.success());
|
||
assert_eq!(request, expected_request);
|
||
let stdout = String::from_utf8(output.stdout).unwrap();
|
||
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec![expected_status]);
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn client_prints_completion_only_once() {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
|
||
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::TaskComplete {
|
||
success: true,
|
||
summary: "done".to_string(),
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket
|
||
.send(Message::Text(
|
||
serde_json::to_string(&ServiceMessage::TaskComplete {
|
||
success: true,
|
||
summary: "done again".to_string(),
|
||
})
|
||
.unwrap()
|
||
.into(),
|
||
))
|
||
.unwrap();
|
||
websocket.close(None).unwrap();
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("打开百度搜索天气\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let output = child.wait_with_output().unwrap();
|
||
server.join().unwrap();
|
||
|
||
assert!(output.status.success());
|
||
let stdout = String::from_utf8(output.stdout).unwrap();
|
||
assert_eq!(stdout.lines().collect::<Vec<_>>(), vec!["done"]);
|
||
}
|
||
|
||
#[test]
|
||
fn client_prints_log_entries_in_order_before_completion() {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
|
||
|
||
for message in [
|
||
ServiceMessage::LogEntry {
|
||
level: "info".to_string(),
|
||
message: "step 1".to_string(),
|
||
},
|
||
ServiceMessage::LogEntry {
|
||
level: "info".to_string(),
|
||
message: "step 2".to_string(),
|
||
},
|
||
ServiceMessage::TaskComplete {
|
||
success: true,
|
||
summary: "done".to_string(),
|
||
},
|
||
] {
|
||
websocket
|
||
.send(Message::Text(serde_json::to_string(&message).unwrap().into()))
|
||
.unwrap();
|
||
}
|
||
websocket.close(None).unwrap();
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("打开百度搜索天气\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let stdout = child.stdout.take().unwrap();
|
||
let (tx, rx) = mpsc::channel();
|
||
let reader = thread::spawn(move || {
|
||
let reader = BufReader::new(stdout);
|
||
for line in reader.lines() {
|
||
tx.send(line.unwrap()).unwrap();
|
||
}
|
||
});
|
||
|
||
let first = rx.recv_timeout(Duration::from_secs(1)).unwrap();
|
||
let second = rx.recv_timeout(Duration::from_secs(1)).unwrap();
|
||
let third = rx.recv_timeout(Duration::from_secs(1)).unwrap();
|
||
|
||
let status = child.wait().unwrap();
|
||
reader.join().unwrap();
|
||
server.join().unwrap();
|
||
|
||
assert!(status.success());
|
||
assert_eq!(vec![first, second, third], vec!["step 1", "step 2", "done"]);
|
||
}
|
||
|
||
#[test]
|
||
fn client_exits_with_failure_when_service_disconnects_before_completion() {
|
||
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let address = listener.local_addr().unwrap();
|
||
let ws_url = format!("ws://{address}");
|
||
|
||
let server = thread::spawn(move || {
|
||
let (stream, _) = listener.accept().unwrap();
|
||
let mut websocket = accept(stream).unwrap();
|
||
let payload = read_ws_text(&mut websocket);
|
||
let request: ClientMessage = serde_json::from_str(&payload).unwrap();
|
||
websocket.close(None).unwrap();
|
||
request
|
||
});
|
||
|
||
let mut child = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
child
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("打开百度搜索天气\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let status = child.wait().unwrap();
|
||
assert!(!status.success());
|
||
|
||
let request = server.join().unwrap();
|
||
assert_eq!(request.into_submit_task_request().unwrap().instruction, "打开百度搜索天气");
|
||
}
|
||
|
||
#[test]
|
||
fn client_to_service_regression_routes_zhihu_through_callback_host_without_invalid_hmac_seed_output() {
|
||
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||
let service_addr = service_listener.local_addr().unwrap();
|
||
drop(service_listener);
|
||
|
||
let (event_tx, event_rx) = mpsc::channel();
|
||
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
|
||
|
||
let root = std::env::temp_dir().join(format!("sgclaw-service-task-flow-{}", uuid::Uuid::new_v4()));
|
||
std::fs::create_dir_all(&root).unwrap();
|
||
let config_path = root.join("sgclaw_config.json");
|
||
std::fs::write(
|
||
&config_path,
|
||
format!(
|
||
r#"{{
|
||
"apiKey": "sk-runtime",
|
||
"baseUrl": "http://127.0.0.1:9",
|
||
"model": "deepseek-chat",
|
||
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
|
||
"browserWsUrl": "{browser_ws_url}",
|
||
"serviceWsListenAddr": "{service_addr}"
|
||
}}"#
|
||
),
|
||
)
|
||
.unwrap();
|
||
|
||
let mut service = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
|
||
)
|
||
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
|
||
.arg("--config-path")
|
||
.arg(&config_path)
|
||
.stdout(std::process::Stdio::piped())
|
||
.stderr(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
|
||
let ws_url = format!("ws://{service_addr}");
|
||
let ready_deadline = Instant::now() + Duration::from_secs(2);
|
||
let mut service_stderr_boot = String::new();
|
||
while Instant::now() < ready_deadline {
|
||
if let Some(stream) = service.stderr.as_mut() {
|
||
let mut buf = [0_u8; 1024];
|
||
match stream.read(&mut buf) {
|
||
Ok(0) => {}
|
||
Ok(n) => {
|
||
service_stderr_boot.push_str(&String::from_utf8_lossy(&buf[..n]));
|
||
if service_stderr_boot.contains("sg_claw ready:") {
|
||
break;
|
||
}
|
||
}
|
||
Err(_) => {}
|
||
}
|
||
}
|
||
if service.try_wait().unwrap().is_some() {
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_millis(20));
|
||
}
|
||
assert!(
|
||
service_stderr_boot.contains("sg_claw ready:"),
|
||
"service did not report readiness; stderr={service_stderr_boot}"
|
||
);
|
||
|
||
let mut client = std::process::Command::new(
|
||
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
|
||
)
|
||
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
|
||
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
|
||
.stdin(std::process::Stdio::piped())
|
||
.stdout(std::process::Stdio::piped())
|
||
.stderr(std::process::Stdio::piped())
|
||
.spawn()
|
||
.unwrap();
|
||
client
|
||
.stdin
|
||
.as_mut()
|
||
.unwrap()
|
||
.write_all("打开知乎热榜,获取前10条数据,并导出 Excel\n".as_bytes())
|
||
.unwrap();
|
||
|
||
let client_output = client.wait_with_output().unwrap();
|
||
browser_server.join().unwrap();
|
||
|
||
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
|
||
let bootstrap_close = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
|
||
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
|
||
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
|
||
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
|
||
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
|
||
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
|
||
|
||
let exit_deadline = Instant::now() + Duration::from_secs(1);
|
||
let mut service_status = None;
|
||
while Instant::now() < exit_deadline {
|
||
if let Some(status) = service.try_wait().unwrap() {
|
||
service_status = Some(status);
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_millis(20));
|
||
}
|
||
if service_status.is_none() {
|
||
service.kill().unwrap();
|
||
let _ = service.wait();
|
||
}
|
||
let service_stdout = service
|
||
.stdout
|
||
.take()
|
||
.map(|mut stream| {
|
||
let mut buf = Vec::new();
|
||
let _ = stream.read_to_end(&mut buf);
|
||
String::from_utf8_lossy(&buf).into_owned()
|
||
})
|
||
.unwrap_or_default();
|
||
let service_stderr = service
|
||
.stderr
|
||
.take()
|
||
.map(|mut stream| {
|
||
let mut buf = Vec::new();
|
||
let _ = stream.read_to_end(&mut buf);
|
||
String::from_utf8_lossy(&buf).into_owned()
|
||
})
|
||
.unwrap_or_default();
|
||
|
||
let client_stdout = String::from_utf8_lossy(&client_output.stdout).into_owned();
|
||
let client_stderr = String::from_utf8_lossy(&client_output.stderr).into_owned();
|
||
let combined_output = format!("{client_stdout}\n{client_stderr}\n{service_stdout}\n{service_stderr}");
|
||
|
||
let register = match register {
|
||
CallbackHostBrowserEvent::BrowserFrame(value) => value,
|
||
other => panic!("expected register browser frame, got {other:?}"),
|
||
};
|
||
assert_eq!(register, json!({ "type": "register", "role": "web" }));
|
||
|
||
let bootstrap_close = match bootstrap_close {
|
||
CallbackHostBrowserEvent::BrowserFrame(value) => value,
|
||
other => panic!("expected helper close frame, got {other:?}"),
|
||
};
|
||
assert_eq!(bootstrap_close[0], json!("https://www.zhihu.com"));
|
||
assert_eq!(bootstrap_close[1], json!("sgHideBrowerserClosePage"));
|
||
assert!(bootstrap_close[2]
|
||
.as_str()
|
||
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
|
||
|
||
let bootstrap = match bootstrap {
|
||
CallbackHostBrowserEvent::BrowserFrame(value) => value,
|
||
other => panic!("expected helper bootstrap frame, got {other:?}"),
|
||
};
|
||
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
|
||
assert_eq!(bootstrap[1], json!("sgHideBrowerserOpenPage"));
|
||
assert!(bootstrap[2]
|
||
.as_str()
|
||
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
|
||
|
||
let pre_ready = match pre_ready {
|
||
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
|
||
other => panic!("expected pre-ready command envelope, got {other:?}"),
|
||
};
|
||
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
|
||
|
||
let open_page = match open_page {
|
||
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
|
||
other => panic!("expected open-page command envelope, got {other:?}"),
|
||
};
|
||
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
|
||
assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot"));
|
||
|
||
let get_text = match get_text {
|
||
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
|
||
other => panic!("expected getText command envelope, got {other:?}"),
|
||
};
|
||
assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
|
||
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
|
||
assert!(get_text["command"]["args"][1]
|
||
.as_str()
|
||
.is_some_and(|script| script.contains("sgclawOnGetText")));
|
||
|
||
let eval = match eval {
|
||
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
|
||
other => panic!("expected eval command envelope, got {other:?}"),
|
||
};
|
||
assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
|
||
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
|
||
assert!(eval["command"]["args"][1]
|
||
.as_str()
|
||
.is_some_and(|script| script.contains("sgclawOnEval")));
|
||
|
||
assert!(client_output.status.success());
|
||
assert!(client_stdout.contains("已导出并打开知乎热榜 Excel"), "client stdout={client_stdout}");
|
||
assert!(client_stdout.contains(".xlsx"), "client stdout={client_stdout}");
|
||
assert!(
|
||
!combined_output.contains("invalid hmac seed: session key must not be empty"),
|
||
"target behavior must avoid the invalid hmac seed failure; combined_output={combined_output}"
|
||
);
|
||
assert!(
|
||
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
|
||
"target behavior must avoid the runtime-drop panic; combined_output={combined_output}"
|
||
);
|
||
}
|