Files
claw/tests/service_ws_session_test.rs
木炎 883647dffc feat: add config-owned direct submit runtime
Keep browser-attached workflows on the configured direct-skill path and align the Zhihu export/browser regression contracts with the current ws merge state.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 15:45:42 +08:00

1510 lines
51 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::io::{Read as _, Write};
use std::net::TcpListener;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use reqwest::blocking::Client;
use serde_json::{json, Value};
use tungstenite::{accept, connect, Message};
use sgclaw::agent::AgentRuntimeContext;
use sgclaw::pipe::AgentMessage;
use sgclaw::service::{ClientMessage, ServiceEventSink, ServiceMessage, ServiceSession};
const RUNTIME_DROP_PANIC_TEXT: &str =
"Cannot drop a runtime in a context where blocking is not allowed";
const TEST_ZHIHU_SKILLS_DIR: &str = "D:/data/ideaSpace/rust/sgClaw/claw/claw/skills";
fn read_ws_text<S>(stream: &mut tungstenite::WebSocket<S>) -> String
where
S: std::io::Read + std::io::Write,
{
match stream.read().unwrap() {
Message::Text(text) => text.to_string(),
other => panic!("expected text frame, got {other:?}"),
}
}
fn start_fake_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = Instant::now() + Duration::from_secs(5);
let (mut stream, _) = loop {
match listener.accept() {
Ok(pair) => break pair,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
assert!(Instant::now() < deadline, "timed out waiting for provider request");
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn start_lenient_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = Instant::now() + Duration::from_millis(750);
let accepted = loop {
match listener.accept() {
Ok(pair) => break Some(pair),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() >= deadline {
break None;
}
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
let Some((mut stream, _)) = accepted else {
break;
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn read_http_json_body(stream: &mut impl std::io::Read) -> Result<Value, &'static str> {
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 1024];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading headers");
}
buffer.extend_from_slice(&chunk[..bytes]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.unwrap() + 4;
let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap();
let Some(content_length) = headers.lines().find_map(|line| {
let (name, value) = line.split_once(':')?;
name.eq_ignore_ascii_case("content-length")
.then(|| value.trim().parse::<usize>().unwrap())
}) else {
return Err("missing content-length header");
};
while buffer.len() < headers_end + content_length {
let mut chunk = vec![0_u8; content_length];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading body");
}
buffer.extend_from_slice(&chunk[..bytes]);
}
Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap())
}
#[derive(Debug)]
enum CallbackHostBrowserEvent {
BrowserFrame(Value),
CommandEnvelope(Value),
}
fn start_callback_host_hotlist_browser_server(
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
) -> (String, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(2))).unwrap();
let mut websocket = accept(stream).unwrap();
let register = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected register frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(register))
.unwrap();
websocket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
let first_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
.unwrap();
let Some(values) = first_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_open = values.len() >= 3
&& values[1] == json!("sgBrowerserOpenPage")
&& values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_open {
websocket.close(None).ok();
return;
}
let helper_url = values[2].as_str().unwrap().to_string();
let helper_origin = helper_url
.trim_end_matches("/sgclaw/browser-helper.html")
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
.pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
.get(&helper_url)
.send()
.unwrap()
.error_for_status()
.unwrap()
.text()
.unwrap();
assert!(helper_html.contains("sgclawReady"));
assert!(helper_html.contains("sgclawOnLoaded"));
assert!(helper_html.contains("sgclawOnGetText"));
assert!(helper_html.contains("sgclawOnEval"));
let pre_ready_command: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
.unwrap();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/ready"))
.json(&json!({
"type": "ready",
"helper_url": helper_url,
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度";
let hotlist_payload = json!({
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
})
.to_string();
let deadline = Instant::now() + Duration::from_secs(10);
let mut saw_get_text = false;
let mut saw_eval = false;
while Instant::now() < deadline {
let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.and_then(|response| response.error_for_status())
.and_then(|response| response.json())
{
Ok(envelope) => envelope,
Err(_) => {
thread::sleep(Duration::from_millis(20));
continue;
}
};
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
};
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
.unwrap();
let action_name = command
.get("action")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
.json(&json!({ "type": "command_ack" }))
.send()
.unwrap()
.error_for_status()
.unwrap();
let args = command
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match action_name.as_str() {
"sgBrowerserOpenPage" => {}
"sgBrowserExcuteJsCodeByDomain" => {
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
if script.contains("sgclawOnGetText") {
saw_get_text = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnGetText",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "text": hotlist_text }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
} else if script.contains("sgclawOnEval") {
saw_eval = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnEval",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "value": hotlist_payload }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
break;
} else {
panic!("unexpected callback-host domain command: {script}");
}
}
other => panic!("unexpected callback-host command action {other}"),
}
}
assert!(saw_get_text, "expected callback-host getText command");
assert!(saw_eval, "expected callback-host eval command");
websocket.close(None).ok();
});
(format!("ws://{address}"), handle)
}
fn start_direct_zhihu_browser_ws_server() -> (String, Arc<Mutex<Vec<String>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let frames = Arc::new(Mutex::new(Vec::new()));
let frames_for_thread = Arc::clone(&frames);
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut socket = accept(stream).unwrap();
let mut action_count = 0_u64;
loop {
let message = match socket.read() {
Ok(message) => message,
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => break,
Err(err) => panic!("browser ws test server read failed: {err}"),
};
let payload = match message {
Message::Text(text) => text.to_string(),
Message::Ping(payload) => {
socket.send(Message::Pong(payload)).unwrap();
continue;
}
Message::Close(_) => break,
other => panic!("expected text frame, got {other:?}"),
};
frames_for_thread.lock().unwrap().push(payload.clone());
let parsed: Value = serde_json::from_str(&payload).unwrap();
if parsed.get("type").and_then(Value::as_str) == Some("register") {
continue;
}
let values = parsed.as_array().expect("browser action frame should be an array");
let request_url = values[0].as_str().expect("request_url should be a string");
let action = values[1].as_str().expect("action should be a string");
action_count += 1;
socket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
socket.send(Message::Text("0".into())).unwrap();
let callback_frame = match action {
"sgHideBrowserCallAfterLoaded" => {
let target_url = values[2].as_str().expect("navigate target_url should be a string");
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@"
)
])
}
"sgBrowserExcuteJsCodeByArea" => {
let target_url = values[2].as_str().expect("script target_url should be a string");
let response_text = if action_count == 2 {
"知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string()
} else {
r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string()
};
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}"
)
])
}
other => panic!("unexpected browser action {other}"),
};
socket
.send(Message::Text(callback_frame.to_string().into()))
.unwrap();
if action_count >= 3 {
break;
}
}
socket.close(None).ok();
});
(format!("ws://{address}"), frames, handle)
}
#[test]
fn service_entrypoint_function_is_exported() {
let entry: fn() -> Result<(), sgclaw::pipe::PipeError> = sgclaw::service::run;
let _ = entry;
}
#[test]
fn service_run_requires_llm_config_for_startup() {
std::env::remove_var("DEEPSEEK_API_KEY");
std::env::remove_var("DEEPSEEK_BASE_URL");
std::env::remove_var("DEEPSEEK_MODEL");
let result = sgclaw::service::run();
assert!(matches!(
result,
Err(sgclaw::pipe::PipeError::Protocol(message))
if message.contains("missing environment variable: DEEPSEEK_API_KEY")
));
}
#[test]
fn service_startup_config_loads_ws_endpoints_from_browser_config() {
let root = std::env::temp_dir().join(format!("sgclaw-service-startup-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
r#"{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "127.0.0.1:42321"
}"#,
)
.unwrap();
let startup = sgclaw::service::load_startup_config(&AgentRuntimeContext::new(
Some(config_path),
root,
))
.unwrap();
assert_eq!(startup.browser_ws_url.as_deref(), Some("ws://127.0.0.1:12345"));
assert_eq!(
startup.service_ws_listen_addr.as_deref(),
Some("127.0.0.1:42321")
);
}
#[test]
fn service_startup_config_uses_default_ws_endpoints_when_not_configured() {
let root = std::env::temp_dir().join(format!("sgclaw-service-defaults-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
r#"{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat"
}"#,
)
.unwrap();
let startup = sgclaw::service::load_startup_config(&AgentRuntimeContext::new(
Some(config_path),
root,
))
.unwrap();
assert_eq!(
startup.browser_ws_url.as_deref(),
Some("ws://127.0.0.1:12345")
);
assert_eq!(
startup.service_ws_listen_addr.as_deref(),
Some("127.0.0.1:42321")
);
}
#[test]
fn service_binary_reports_resolved_startup_endpoints() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = listener.local_addr().unwrap();
drop(listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-report-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < deadline {
if let Some(stream) = child.stderr.as_mut() {
use std::io::Read;
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if child.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
let status = child.try_wait().unwrap();
if status.is_none() {
child.kill().unwrap();
let _ = child.wait();
}
if let Some(mut stream) = child.stderr.take() {
use std::io::Read;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
assert!(stderr.contains("sg_claw ready:"));
assert!(stderr.contains(&service_addr.to_string()));
assert!(stderr.contains("ws://127.0.0.1:12345"));
}
#[test]
fn service_binary_keeps_service_ws_listener_available_for_client_connections() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-live-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let deadline = Instant::now() + Duration::from_secs(2);
let mut connected = false;
while Instant::now() < deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
connected = true;
drop(socket);
break;
}
Err(_) => {
if child.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let status = child.try_wait().unwrap();
if status.is_none() {
child.kill().unwrap();
let _ = child.wait();
}
let stderr = child
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(connected, "service ws listener never became available; stderr={stderr}");
assert!(status.is_none(), "sg_claw exited before client could connect; stderr={stderr}");
}
#[test]
fn service_binary_survives_real_client_disconnect_after_task_complete() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}");
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client.stdin.as_mut().unwrap().write_all("你好\n".as_bytes()).unwrap();
let client_output = client.wait_with_output().unwrap();
assert!(
client_output.status.success(),
"client failed: stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"),
"client did not receive TaskComplete summary: stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after client disconnect; stderr={stderr}"
);
}
#[test]
fn service_binary_submit_flow_routes_zhihu_through_callback_host() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root = std::env::temp_dir().join(format!("sgclaw-service-zhihu-submit-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}");
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let service_status = service.try_wait().unwrap();
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
if let Some(mut stream) = service.stderr.take() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
let combined_output = format!(
"{}\n{}\n{}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr),
stderr
);
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot"));
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(
String::from_utf8_lossy(&client_output.stdout).contains("已导出并打开知乎热榜 Excel"),
"client stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
String::from_utf8_lossy(&client_output.stdout).contains(".xlsx"),
"client stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"service submit flow still contains runtime-drop panic: {combined_output}"
);
}
#[test]
fn service_binary_submit_flow_uses_callback_host_command_semantics_for_zhihu() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root = std::env::temp_dir().join(format!("sgclaw-service-session-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}");
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let service_status = service.try_wait().unwrap();
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
if let Some(mut stream) = service.stderr.take() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
let combined_output = format!(
"{}\n{}\n{}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr),
stderr
);
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot"));
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain"));
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"service submit flow still contains runtime-drop panic: {combined_output}"
);
}
#[test]
fn first_client_attaches_without_busy_error() {
let session = ServiceSession::new();
let result = session.try_attach_client();
assert_eq!(result, Ok(()));
}
#[test]
fn second_client_gets_busy_message() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
let result = session.try_attach_client();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service already has an attached client".to_string(),
})
);
}
#[test]
fn disconnect_releases_the_session_for_a_new_client() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
session.detach_client();
assert_eq!(session.try_attach_client(), Ok(()));
}
#[test]
fn task_cannot_start_without_an_attached_client() {
let session = ServiceSession::new();
let result = session.try_start_task();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service has no attached client".to_string(),
})
);
}
#[test]
fn overlapping_task_submission_is_rejected() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
assert_eq!(session.try_start_task(), Ok(()));
let result = session.try_start_task();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service already has a running task".to_string(),
})
);
}
#[test]
fn service_binary_accepts_connect_request_without_starting_browser_task() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-connect-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let connect_deadline = Instant::now() + Duration::from_secs(2);
let mut websocket = None;
while Instant::now() < connect_deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
websocket = Some(socket);
break;
}
Err(_) => {
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let mut websocket = websocket.expect("service ws listener never became available");
websocket
.send(Message::Text(
serde_json::to_string(&ClientMessage::Connect).unwrap().into(),
))
.unwrap();
let frame = read_ws_text(&mut websocket);
let message: ServiceMessage = serde_json::from_str(&frame).unwrap();
assert_eq!(
message,
ServiceMessage::StatusChanged {
state: "connected".to_string(),
}
);
websocket.close(None).unwrap();
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after connect lifecycle request; stderr={stderr}"
);
}
#[test]
fn service_binary_survives_client_disconnect_during_task_completion_send() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let connect_deadline = Instant::now() + Duration::from_secs(2);
let mut websocket = None;
while Instant::now() < connect_deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
websocket = Some(socket);
break;
}
Err(_) => {
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let mut websocket = websocket.expect("service ws listener never became available");
websocket
.send(Message::Text(
serde_json::to_string(&ClientMessage::SubmitTask {
instruction: "你好".to_string(),
conversation_id: String::new(),
messages: vec![],
page_url: String::new(),
page_title: String::new(),
})
.unwrap()
.into(),
))
.unwrap();
drop(websocket);
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after client disconnected mid-task; stderr={stderr}"
);
}
#[test]
fn submit_task_client_message_converts_into_shared_runner_request() {
let message = ClientMessage::SubmitTask {
instruction: "continue task".to_string(),
conversation_id: "conv-1".to_string(),
messages: vec![sgclaw::pipe::ConversationMessage {
role: "user".to_string(),
content: "prior turn".to_string(),
}],
page_url: "https://example.com".to_string(),
page_title: "Example".to_string(),
};
let request = message.into_submit_task_request().expect("submit task request");
assert_eq!(request.instruction, "continue task");
assert_eq!(request.conversation_id.as_deref(), Some("conv-1"));
assert_eq!(request.messages.len(), 1);
assert_eq!(request.page_url.as_deref(), Some("https://example.com"));
assert_eq!(request.page_title.as_deref(), Some("Example"));
}
#[test]
fn ping_client_message_does_not_convert_into_submit_task_request() {
let message = ClientMessage::Ping;
assert!(message.into_submit_task_request().is_none());
}
#[test]
fn lifecycle_client_messages_do_not_convert_into_submit_task_request() {
for message in [ClientMessage::Connect, ClientMessage::Start, ClientMessage::Stop] {
assert!(message.into_submit_task_request().is_none());
}
}
#[test]
fn lifecycle_client_messages_round_trip_with_stable_tags() {
let cases = [
(ClientMessage::Connect, r#"{"type":"connect"}"#),
(ClientMessage::Start, r#"{"type":"start"}"#),
(ClientMessage::Stop, r#"{"type":"stop"}"#),
];
for (message, raw) in cases {
assert_eq!(serde_json::to_string(&message).unwrap(), raw);
let decoded: ClientMessage = serde_json::from_str(raw).unwrap();
assert_eq!(decoded, message);
}
}
#[test]
fn service_messages_round_trip_with_stable_tags() {
let cases = [
(
ServiceMessage::StatusChanged {
state: "started".to_string(),
},
r#"{"type":"status_changed","state":"started"}"#,
),
(
ServiceMessage::Pong,
r#"{"type":"pong"}"#,
),
];
for (message, raw) in cases {
assert_eq!(serde_json::to_string(&message).unwrap(), raw);
let decoded: ServiceMessage = serde_json::from_str(raw).unwrap();
assert_eq!(decoded, message);
}
}
#[test]
fn service_event_sink_maps_log_completion_and_status_messages() {
let sink = ServiceEventSink::default();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::StatusChanged {
state: "started".to_string(),
},
)
.unwrap();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::LogEntry {
level: "info".to_string(),
message: "hello".to_string(),
},
)
.unwrap();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
)
.unwrap();
assert_eq!(
sink.sent_messages(),
vec![
ServiceMessage::StatusChanged {
state: "started".to_string(),
},
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "hello".to_string(),
},
ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
]
);
}