Files
claw/tests/service_ws_session_test.rs

2154 lines
72 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::io::{Read as _, Write};
use std::net::TcpListener;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use reqwest::blocking::Client;
use serde_json::{json, Value};
use tungstenite::{accept, connect, Message};
use sgclaw::agent::AgentRuntimeContext;
use sgclaw::pipe::AgentMessage;
use sgclaw::service::{ClientMessage, ServiceEventSink, ServiceMessage, ServiceSession};
const RUNTIME_DROP_PANIC_TEXT: &str =
"Cannot drop a runtime in a context where blocking is not allowed";
const TEST_ZHIHU_SKILLS_DIR: &str = "D:/data/ideaSpace/rust/sgClaw/claw/claw/skills";
fn read_ws_text<S>(stream: &mut tungstenite::WebSocket<S>) -> String
where
S: std::io::Read + std::io::Write,
{
match stream.read().unwrap() {
Message::Text(text) => text.to_string(),
other => panic!("expected text frame, got {other:?}"),
}
}
fn start_fake_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = Instant::now() + Duration::from_secs(5);
let (mut stream, _) = loop {
match listener.accept() {
Ok(pair) => break pair,
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
assert!(
Instant::now() < deadline,
"timed out waiting for provider request"
);
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn start_lenient_deepseek_server(
responses: Vec<Value>,
) -> (String, Arc<Mutex<Vec<Value>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let requests = Arc::new(Mutex::new(Vec::new()));
let request_log = requests.clone();
let handle = thread::spawn(move || {
for response in responses {
let deadline = Instant::now() + Duration::from_millis(750);
let accepted = loop {
match listener.accept() {
Ok(pair) => break Some(pair),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() >= deadline {
break None;
}
thread::sleep(Duration::from_millis(10));
}
Err(err) => panic!("failed to accept provider request: {err}"),
}
};
let Some((mut stream, _)) = accepted else {
break;
};
stream.set_nonblocking(false).unwrap();
let body = match read_http_json_body(&mut stream) {
Ok(body) => body,
Err(_) => continue,
};
request_log.lock().unwrap().push(body);
let payload = response.to_string();
let reply = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
payload.as_bytes().len(),
payload
);
stream.write_all(reply.as_bytes()).unwrap();
stream.flush().unwrap();
}
});
(address, requests, handle)
}
fn read_http_json_body(stream: &mut impl std::io::Read) -> Result<Value, &'static str> {
let mut buffer = Vec::new();
let mut headers_end = None;
while headers_end.is_none() {
let mut chunk = [0_u8; 1024];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading headers");
}
buffer.extend_from_slice(&chunk[..bytes]);
headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n");
}
let headers_end = headers_end.unwrap() + 4;
let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap();
let Some(content_length) = headers.lines().find_map(|line| {
let (name, value) = line.split_once(':')?;
name.eq_ignore_ascii_case("content-length")
.then(|| value.trim().parse::<usize>().unwrap())
}) else {
return Err("missing content-length header");
};
while buffer.len() < headers_end + content_length {
let mut chunk = vec![0_u8; content_length];
let bytes = stream.read(&mut chunk).unwrap();
if bytes == 0 {
return Err("unexpected EOF while reading body");
}
buffer.extend_from_slice(&chunk[..bytes]);
}
Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap())
}
#[derive(Debug)]
enum CallbackHostBrowserEvent {
BrowserFrame(Value),
CommandEnvelope(Value),
}
fn start_callback_host_hotlist_browser_server(
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
) -> (String, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut websocket = accept(stream).unwrap();
let register = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected register frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(register))
.unwrap();
websocket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
let first_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected first browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
.unwrap();
let second_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected second browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(
second_action.clone(),
))
.unwrap();
let Some(close_values) = first_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_close = close_values.len() >= 3
&& close_values[1] == json!("sgHideBrowerserClosePage")
&& close_values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_close {
websocket.close(None).ok();
return;
}
let Some(values) = second_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_open = values.len() >= 3
&& values[1] == json!("sgHideBrowerserOpenPage")
&& values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_open {
websocket.close(None).ok();
return;
}
let helper_url = values[2].as_str().unwrap().to_string();
let helper_origin = helper_url
.trim_end_matches("/sgclaw/browser-helper.html")
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
.pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
.get(&helper_url)
.send()
.unwrap()
.error_for_status()
.unwrap()
.text()
.unwrap();
assert!(helper_html.contains("sgclawReady"));
assert!(helper_html.contains("sgclawOnLoaded"));
assert!(helper_html.contains("sgclawOnGetText"));
assert!(helper_html.contains("sgclawOnEval"));
let pre_ready_command: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
.unwrap();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/ready"))
.json(&json!({
"type": "ready",
"helper_url": helper_url,
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度";
let hotlist_payload = json!({
"source": "https://www.zhihu.com/hot",
"sheet_name": "知乎热榜",
"columns": ["rank", "title", "heat"],
"rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]]
})
.to_string();
let deadline = Instant::now() + Duration::from_secs(10);
let mut saw_get_text = false;
let mut saw_eval = false;
while Instant::now() < deadline {
let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.and_then(|response| response.error_for_status())
.and_then(|response| response.json())
{
Ok(envelope) => envelope,
Err(_) => {
thread::sleep(Duration::from_millis(20));
continue;
}
};
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
};
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
.unwrap();
let action_name = command
.get("action")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
.json(&json!({ "type": "command_ack" }))
.send()
.unwrap()
.error_for_status()
.unwrap();
let args = command
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match action_name.as_str() {
"sgBrowerserOpenPage" => {}
"sgBrowserExcuteJsCodeByDomain" => {
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
if script.contains("sgclawOnGetText") {
saw_get_text = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnGetText",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "text": hotlist_text }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
} else if script.contains("sgclawOnEval") {
saw_eval = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnEval",
"request_url": helper_url,
"target_url": "https://www.zhihu.com/hot",
"action": action_name,
"payload": { "value": hotlist_payload }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
break;
} else {
panic!("unexpected callback-host domain command: {script}");
}
}
other => panic!("unexpected callback-host command action {other}"),
}
}
assert!(saw_get_text, "expected callback-host getText command");
assert!(saw_eval, "expected callback-host eval command");
websocket.close(None).ok();
});
(format!("ws://{address}"), handle)
}
fn start_callback_host_manifest_scene_browser_server(
event_tx: mpsc::Sender<CallbackHostBrowserEvent>,
) -> (String, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(2)))
.unwrap();
let mut websocket = accept(stream).unwrap();
let register = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected register frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(register))
.unwrap();
websocket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
let first_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected first browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone()))
.unwrap();
let second_action = match websocket.read().unwrap() {
Message::Text(text) => serde_json::from_str::<Value>(&text).unwrap(),
other => panic!("expected second browser action frame, got {other:?}"),
};
event_tx
.send(CallbackHostBrowserEvent::BrowserFrame(
second_action.clone(),
))
.unwrap();
let Some(close_values) = first_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_close = close_values.len() >= 3
&& close_values[1] == json!("sgHideBrowerserClosePage")
&& close_values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_close {
websocket.close(None).ok();
return;
}
let Some(values) = second_action.as_array() else {
websocket.close(None).ok();
return;
};
let is_helper_open = values.len() >= 3
&& values[1] == json!("sgHideBrowerserOpenPage")
&& values[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"));
if !is_helper_open {
websocket.close(None).ok();
return;
}
let helper_url = values[2].as_str().unwrap().to_string();
let helper_origin = helper_url
.trim_end_matches("/sgclaw/browser-helper.html")
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
.pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
.get(&helper_url)
.send()
.unwrap()
.error_for_status()
.unwrap()
.text()
.unwrap();
assert!(helper_html.contains("sgclawReady"));
assert!(helper_html.contains("sgclawOnLoaded"));
assert!(helper_html.contains("sgclawOnGetText"));
assert!(helper_html.contains("sgclawOnEval"));
let pre_ready_command: Value = helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.unwrap()
.error_for_status()
.unwrap()
.json()
.unwrap();
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command))
.unwrap();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/ready"))
.json(&json!({
"type": "ready",
"helper_url": helper_url,
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
let manifest_payload = json!({
"type": "report-artifact",
"report_name": "manifest-scene-report",
"status": "ok",
"columns": ["ORG_NAME"],
"rows": [{"ORG_NAME": "国网兰州供电公司"}],
"counts": {"rows": 1}
})
.to_string();
let deadline = Instant::now() + Duration::from_secs(10);
let mut saw_eval = false;
while Instant::now() < deadline {
let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
.and_then(|response| response.error_for_status())
.and_then(|response| response.json())
{
Ok(envelope) => envelope,
Err(_) => {
thread::sleep(Duration::from_millis(20));
continue;
}
};
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
};
event_tx
.send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone()))
.unwrap();
let action_name = command
.get("action")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
helper_client
.post(format!("{helper_origin}/sgclaw/callback/commands/ack"))
.json(&json!({ "type": "command_ack" }))
.send()
.unwrap()
.error_for_status()
.unwrap();
let args = command
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
match action_name.as_str() {
"sgBrowerserOpenPage" => {
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnLoaded",
"request_url": helper_url,
"target_url": "https://manifest.example.test/report",
"action": "navigate",
"payload": { "loaded": true }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
}
"sgBrowserExcuteJsCodeByDomain" => {
let script = args.get(1).and_then(Value::as_str).unwrap_or_default();
assert!(
script.contains("manifest-scene-report"),
"expected manifest-scene eval script, got {script}"
);
saw_eval = true;
helper_client
.post(format!("{helper_origin}/sgclaw/callback/events"))
.json(&json!({
"callback": "sgclawOnEval",
"request_url": helper_url,
"target_url": "https://manifest.example.test/report",
"action": action_name,
"payload": { "value": manifest_payload }
}))
.send()
.unwrap()
.error_for_status()
.unwrap();
break;
}
other => panic!("unexpected callback-host command action {other}"),
}
}
assert!(saw_eval, "expected callback-host eval command");
websocket.close(None).ok();
});
(format!("ws://{address}"), handle)
}
fn temp_manifest_scene_skill_root() -> std::path::PathBuf {
let root = std::env::temp_dir().join(format!(
"sgclaw-service-manifest-scene-skill-root-{}",
uuid::Uuid::new_v4()
));
let skill_dir = root.join("manifest-scene-report");
let script_dir = skill_dir.join("scripts");
std::fs::create_dir_all(&script_dir).unwrap();
std::fs::write(
skill_dir.join("SKILL.toml"),
r#"
[skill]
name = "manifest-scene-report"
description = "Collect manifest scene report data."
version = "0.1.0"
[[tools]]
name = "collect_manifest_scene"
description = "Collect manifest scene report rows."
kind = "browser_script"
command = "scripts/collect_manifest_scene.js"
"#,
)
.unwrap();
std::fs::write(
skill_dir.join("scene.toml"),
r#"
[scene]
id = "manifest-scene-report"
skill = "manifest-scene-report"
tool = "collect_manifest_scene"
kind = "browser_script"
version = "0.1.0"
category = "report_collection"
[manifest]
schema_version = "1"
[bootstrap]
expected_domain = "manifest.example.test"
target_url = "https://manifest.example.test/report"
page_title_keywords = []
requires_target_page = true
[deterministic]
suffix = "。。。"
include_keywords = ["自定义场景报表"]
exclude_keywords = []
[[params]]
name = "period"
resolver = "literal_passthrough"
required = false
prompt_missing = "missing"
prompt_ambiguous = "ambiguous"
[params.resolver_config]
output_field = "period_value"
value = "2026-03"
[artifact]
type = "report-artifact"
success_status = ["ok", "partial", "empty"]
failure_status = ["blocked", "error"]
"#,
)
.unwrap();
std::fs::write(
script_dir.join("collect_manifest_scene.js"),
r#"
return {
type: "report-artifact",
report_name: "manifest-scene-report",
status: "ok",
columns: ["ORG_NAME"],
rows: [{ ORG_NAME: "国网兰州供电公司" }],
counts: { rows: 1 }
};
"#,
)
.unwrap();
root
}
fn start_direct_zhihu_browser_ws_server(
) -> (String, Arc<Mutex<Vec<String>>>, thread::JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let frames = Arc::new(Mutex::new(Vec::new()));
let frames_for_thread = Arc::clone(&frames);
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut socket = accept(stream).unwrap();
let mut action_count = 0_u64;
loop {
let message = match socket.read() {
Ok(message) => message,
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => break,
Err(err) => panic!("browser ws test server read failed: {err}"),
};
let payload = match message {
Message::Text(text) => text.to_string(),
Message::Ping(payload) => {
socket.send(Message::Pong(payload)).unwrap();
continue;
}
Message::Close(_) => break,
other => panic!("expected text frame, got {other:?}"),
};
frames_for_thread.lock().unwrap().push(payload.clone());
let parsed: Value = serde_json::from_str(&payload).unwrap();
if parsed.get("type").and_then(Value::as_str) == Some("register") {
continue;
}
let values = parsed
.as_array()
.expect("browser action frame should be an array");
let request_url = values[0].as_str().expect("request_url should be a string");
let action = values[1].as_str().expect("action should be a string");
action_count += 1;
socket
.send(Message::Text(
r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"#
.to_string()
.into(),
))
.unwrap();
socket.send(Message::Text("0".into())).unwrap();
let callback_frame = match action {
"sgHideBrowserCallAfterLoaded" => {
let target_url = values[2]
.as_str()
.expect("navigate target_url should be a string");
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@"
)
])
}
"sgBrowserExcuteJsCodeByArea" => {
let target_url = values[2]
.as_str()
.expect("script target_url should be a string");
let response_text = if action_count == 2 {
"知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string()
} else {
r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string()
};
json!([
request_url,
"callBackJsToCpp",
format!(
"{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}"
)
])
}
other => panic!("unexpected browser action {other}"),
};
socket
.send(Message::Text(callback_frame.to_string().into()))
.unwrap();
if action_count >= 3 {
break;
}
}
socket.close(None).ok();
});
(format!("ws://{address}"), frames, handle)
}
#[test]
fn service_entrypoint_function_is_exported() {
let entry: fn() -> Result<(), sgclaw::pipe::PipeError> = sgclaw::service::run;
let _ = entry;
}
#[test]
fn service_run_requires_llm_config_for_startup() {
std::env::remove_var("DEEPSEEK_API_KEY");
std::env::remove_var("DEEPSEEK_BASE_URL");
std::env::remove_var("DEEPSEEK_MODEL");
let result = sgclaw::service::run();
assert!(matches!(
result,
Err(sgclaw::pipe::PipeError::Protocol(message))
if message.contains("missing environment variable: DEEPSEEK_API_KEY")
));
}
#[test]
fn service_startup_config_loads_ws_endpoints_from_browser_config() {
let root =
std::env::temp_dir().join(format!("sgclaw-service-startup-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
r#"{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "127.0.0.1:42321"
}"#,
)
.unwrap();
let startup =
sgclaw::service::load_startup_config(&AgentRuntimeContext::new(Some(config_path), root))
.unwrap();
assert_eq!(
startup.browser_ws_url.as_deref(),
Some("ws://127.0.0.1:12345")
);
assert_eq!(
startup.service_ws_listen_addr.as_deref(),
Some("127.0.0.1:42321")
);
}
#[test]
fn service_startup_config_uses_default_ws_endpoints_when_not_configured() {
let root =
std::env::temp_dir().join(format!("sgclaw-service-defaults-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
r#"{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat"
}"#,
)
.unwrap();
let startup =
sgclaw::service::load_startup_config(&AgentRuntimeContext::new(Some(config_path), root))
.unwrap();
assert_eq!(
startup.browser_ws_url.as_deref(),
Some("ws://127.0.0.1:12345")
);
assert_eq!(
startup.service_ws_listen_addr.as_deref(),
Some("127.0.0.1:42321")
);
}
#[test]
fn service_binary_reports_resolved_startup_endpoints() {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = listener.local_addr().unwrap();
drop(listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-report-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < deadline {
if let Some(stream) = child.stderr.as_mut() {
use std::io::Read;
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if child.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
let status = child.try_wait().unwrap();
if status.is_none() {
child.kill().unwrap();
let _ = child.wait();
}
if let Some(mut stream) = child.stderr.take() {
use std::io::Read;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
assert!(stderr.contains("sg_claw ready:"));
assert!(stderr.contains(&service_addr.to_string()));
assert!(stderr.contains("ws://127.0.0.1:12345"));
}
#[test]
fn service_binary_keeps_service_ws_listener_available_for_client_connections() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!("sgclaw-service-live-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut child = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let deadline = Instant::now() + Duration::from_secs(2);
let mut connected = false;
while Instant::now() < deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
connected = true;
drop(socket);
break;
}
Err(_) => {
if child.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let status = child.try_wait().unwrap();
if status.is_none() {
child.kill().unwrap();
let _ = child.wait();
}
let stderr = child
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
connected,
"service ws listener never became available; stderr={stderr}"
);
assert!(
status.is_none(),
"sg_claw exited before client could connect; stderr={stderr}"
);
}
#[test]
fn service_binary_survives_real_client_disconnect_after_task_complete() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!(
"sgclaw-service-disconnect-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
stderr.contains("sg_claw ready:"),
"service did not report readiness; stderr={stderr}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("你好\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
assert!(
client_output.status.success(),
"client failed: stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"),
"client did not receive TaskComplete summary: stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after client disconnect; stderr={stderr}"
);
}
#[test]
fn service_binary_submit_flow_routes_zhihu_through_callback_host() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root = std::env::temp_dir().join(format!(
"sgclaw-service-zhihu-submit-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
stderr.contains("sg_claw ready:"),
"service did not report readiness; stderr={stderr}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap_close = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let service_status = service.try_wait().unwrap();
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
if let Some(mut stream) = service.stderr.take() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
let combined_output = format!(
"{}\n{}\n{}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr),
stderr
);
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap_close = match bootstrap_close {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper close frame, got {other:?}"),
};
assert_eq!(bootstrap_close[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap_close[1], json!("sgHideBrowerserClosePage"));
assert!(bootstrap_close[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgHideBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(
open_page["command"]["args"][0],
json!("https://www.zhihu.com/hot")
);
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(
get_text["command"]["action"],
json!("sgBrowserExcuteJsCodeByDomain")
);
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(
eval["command"]["action"],
json!("sgBrowserExcuteJsCodeByDomain")
);
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(
String::from_utf8_lossy(&client_output.stdout).contains("已导出并打开知乎热榜 Excel"),
"client stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
String::from_utf8_lossy(&client_output.stdout).contains(".xlsx"),
"client stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"service submit flow still contains runtime-drop panic: {combined_output}"
);
}
#[test]
fn service_binary_submit_flow_routes_configured_manifest_scene_through_callback_host() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) =
start_callback_host_manifest_scene_browser_server(event_tx);
let skills_dir = temp_manifest_scene_skill_root();
let root = std::env::temp_dir().join(format!(
"sgclaw-service-manifest-scene-submit-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
let resources_dir = root.join("resources");
std::fs::create_dir_all(&resources_dir).unwrap();
std::fs::write(
resources_dir.join("rules.json"),
r#"{
"version": "1.0",
"domains": {
"allowed": ["manifest.example.test"]
},
"pipe_actions": {
"allowed": ["click", "type", "navigate", "getText", "eval"],
"blocked": ["executeJsInPage"]
}
}"#,
)
.unwrap();
let skills_dir_json = skills_dir.to_string_lossy().replace("\\", "/");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{skills_dir_json}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.current_dir(&root)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
stderr.contains("sg_claw ready:"),
"service did not report readiness; stderr={stderr}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("请执行自定义场景报表。。。\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
let browser_server_result = browser_server.join();
let register_result = event_rx.recv_timeout(Duration::from_secs(2));
let bootstrap_close_result = event_rx.recv_timeout(Duration::from_secs(2));
let bootstrap_result = event_rx.recv_timeout(Duration::from_secs(2));
let pre_ready_result = event_rx.recv_timeout(Duration::from_secs(2));
let first_command_result = event_rx.recv_timeout(Duration::from_secs(4));
let second_command_result = event_rx.recv_timeout(Duration::from_secs(4));
let service_status = service.try_wait().unwrap();
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
if let Some(mut stream) = service.stderr.take() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
let combined_output = format!(
"{}\n{}\n{}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr),
stderr
);
assert!(
browser_server_result.is_ok(),
"manifest callback-host helper panicked; browser_server_result={browser_server_result:?} output={combined_output} register={register_result:?} bootstrap_close={bootstrap_close_result:?} bootstrap={bootstrap_result:?} pre_ready={pre_ready_result:?} first_command={first_command_result:?} second_command={second_command_result:?}"
);
let register = register_result.expect("missing register event");
let bootstrap_close = bootstrap_close_result.expect("missing bootstrap close event");
let bootstrap = bootstrap_result.expect("missing bootstrap open event");
let pre_ready = pre_ready_result.expect("missing pre-ready event");
let first_command = first_command_result.expect("missing first command event");
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap_close = match bootstrap_close {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper close frame, got {other:?}"),
};
assert_eq!(
bootstrap_close[0],
json!("https://manifest.example.test/report")
);
assert_eq!(bootstrap_close[1], json!("sgHideBrowerserClosePage"));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://manifest.example.test/report"));
assert_eq!(bootstrap[1], json!("sgHideBrowerserOpenPage"));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let first_command = match first_command {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected first command envelope, got {other:?}"),
};
let eval = if first_command["command"]["action"] == json!("sgBrowerserOpenPage") {
assert_eq!(
first_command["command"]["args"][0],
json!("https://manifest.example.test/report")
);
let second_command =
second_command_result.expect("missing eval command event after open-page");
match second_command {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
}
} else {
assert!(
second_command_result.is_err(),
"did not expect a second command when the first command was already eval: {second_command_result:?}"
);
first_command
};
assert_eq!(
eval["command"]["action"],
json!("sgBrowserExcuteJsCodeByDomain")
);
assert_eq!(eval["command"]["args"][0], json!("manifest.example.test"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("manifest-scene-report")));
assert!(client_output.status.success());
assert!(
!combined_output.contains("compat_llm_primary"),
"manifest scene should not fall through to compat LLM: {combined_output}"
);
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"service submit flow still contains runtime-drop panic: {combined_output}"
);
let _ = std::fs::remove_dir_all(skills_dir);
}
#[test]
fn service_binary_submit_flow_uses_callback_host_command_semantics_for_zhihu() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let (event_tx, event_rx) = mpsc::channel();
let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx);
let root =
std::env::temp_dir().join(format!("sgclaw-service-session-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "http://127.0.0.1:9",
"model": "deepseek-chat",
"skillsDir": "{TEST_ZHIHU_SKILLS_DIR}",
"browserWsUrl": "{browser_ws_url}",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let ready_deadline = Instant::now() + Duration::from_secs(2);
let mut stderr = String::new();
while Instant::now() < ready_deadline {
if let Some(stream) = service.stderr.as_mut() {
let mut buf = [0_u8; 1024];
match stream.read(&mut buf) {
Ok(0) => {}
Ok(n) => {
stderr.push_str(&String::from_utf8_lossy(&buf[..n]));
if stderr.contains("sg_claw ready:") {
break;
}
}
Err(_) => {}
}
}
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(20));
}
assert!(
stderr.contains("sg_claw ready:"),
"service did not report readiness; stderr={stderr}"
);
let mut client = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"),
)
.env("SG_CLAW_SERVICE_WS_URL", &ws_url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
client
.stdin
.as_mut()
.unwrap()
.write_all("打开知乎热榜获取前10条数据并导出 Excel\n".as_bytes())
.unwrap();
let client_output = client.wait_with_output().unwrap();
browser_server.join().unwrap();
let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap_close = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap();
let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap();
let service_status = service.try_wait().unwrap();
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
if let Some(mut stream) = service.stderr.take() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf);
stderr.push_str(&String::from_utf8_lossy(&buf));
}
let combined_output = format!(
"{}\n{}\n{}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr),
stderr
);
let register = match register {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected register browser frame, got {other:?}"),
};
assert_eq!(register, json!({ "type": "register", "role": "web" }));
let bootstrap_close = match bootstrap_close {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper close frame, got {other:?}"),
};
assert_eq!(bootstrap_close[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap_close[1], json!("sgHideBrowerserClosePage"));
assert!(bootstrap_close[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let bootstrap = match bootstrap {
CallbackHostBrowserEvent::BrowserFrame(value) => value,
other => panic!("expected helper bootstrap frame, got {other:?}"),
};
assert_eq!(bootstrap[0], json!("https://www.zhihu.com"));
assert_eq!(bootstrap[1], json!("sgHideBrowerserOpenPage"));
assert!(bootstrap[2]
.as_str()
.is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")));
let pre_ready = match pre_ready {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected pre-ready command envelope, got {other:?}"),
};
assert_eq!(pre_ready, json!({ "ok": false, "command": null }));
let open_page = match open_page {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected open-page command envelope, got {other:?}"),
};
assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage"));
assert_eq!(
open_page["command"]["args"][0],
json!("https://www.zhihu.com/hot")
);
let get_text = match get_text {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected getText command envelope, got {other:?}"),
};
assert_eq!(
get_text["command"]["action"],
json!("sgBrowserExcuteJsCodeByDomain")
);
assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com"));
assert!(get_text["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnGetText")));
let eval = match eval {
CallbackHostBrowserEvent::CommandEnvelope(value) => value,
other => panic!("expected eval command envelope, got {other:?}"),
};
assert_eq!(
eval["command"]["action"],
json!("sgBrowserExcuteJsCodeByDomain")
);
assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com"));
assert!(eval["command"]["args"][1]
.as_str()
.is_some_and(|script| script.contains("sgclawOnEval")));
assert!(client_output.status.success());
assert!(
!combined_output.contains(RUNTIME_DROP_PANIC_TEXT),
"service submit flow still contains runtime-drop panic: {combined_output}"
);
}
#[test]
fn first_client_attaches_without_busy_error() {
let session = ServiceSession::new();
let result = session.try_attach_client();
assert_eq!(result, Ok(()));
}
#[test]
fn second_client_gets_busy_message() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
let result = session.try_attach_client();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service already has an attached client".to_string(),
})
);
}
#[test]
fn disconnect_releases_the_session_for_a_new_client() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
session.detach_client();
assert_eq!(session.try_attach_client(), Ok(()));
}
#[test]
fn task_cannot_start_without_an_attached_client() {
let session = ServiceSession::new();
let result = session.try_start_task();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service has no attached client".to_string(),
})
);
}
#[test]
fn overlapping_task_submission_is_rejected() {
let session = ServiceSession::new();
assert_eq!(session.try_attach_client(), Ok(()));
assert_eq!(session.try_start_task(), Ok(()));
let result = session.try_start_task();
assert_eq!(
result,
Err(ServiceMessage::Busy {
message: "service already has a running task".to_string(),
})
);
}
#[test]
fn service_binary_accepts_connect_request_without_starting_browser_task() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root =
std::env::temp_dir().join(format!("sgclaw-service-connect-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let connect_deadline = Instant::now() + Duration::from_secs(2);
let mut websocket = None;
while Instant::now() < connect_deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
websocket = Some(socket);
break;
}
Err(_) => {
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let mut websocket = websocket.expect("service ws listener never became available");
websocket
.send(Message::Text(
serde_json::to_string(&ClientMessage::Connect)
.unwrap()
.into(),
))
.unwrap();
let frame = read_ws_text(&mut websocket);
let message: ServiceMessage = serde_json::from_str(&frame).unwrap();
assert_eq!(
message,
ServiceMessage::StatusChanged {
state: "connected".to_string(),
}
);
websocket.close(None).unwrap();
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after connect lifecycle request; stderr={stderr}"
);
}
#[test]
fn service_binary_survives_client_disconnect_during_task_completion_send() {
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let service_addr = service_listener.local_addr().unwrap();
drop(service_listener);
let root = std::env::temp_dir().join(format!(
"sgclaw-service-disconnect-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&root).unwrap();
let config_path = root.join("sgclaw_config.json");
std::fs::write(
&config_path,
format!(
r#"{{
"apiKey": "sk-runtime",
"baseUrl": "https://api.deepseek.com",
"model": "deepseek-chat",
"browserWsUrl": "ws://127.0.0.1:12345",
"serviceWsListenAddr": "{service_addr}"
}}"#
),
)
.unwrap();
let mut service = std::process::Command::new(
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
)
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
.arg("--config-path")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let ws_url = format!("ws://{service_addr}");
let connect_deadline = Instant::now() + Duration::from_secs(2);
let mut websocket = None;
while Instant::now() < connect_deadline {
match connect(ws_url.as_str()) {
Ok((socket, _)) => {
websocket = Some(socket);
break;
}
Err(_) => {
if service.try_wait().unwrap().is_some() {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
let mut websocket = websocket.expect("service ws listener never became available");
websocket
.send(Message::Text(
serde_json::to_string(&ClientMessage::SubmitTask {
instruction: "你好".to_string(),
conversation_id: String::new(),
messages: vec![],
page_url: String::new(),
page_title: String::new(),
})
.unwrap()
.into(),
))
.unwrap();
drop(websocket);
let exit_deadline = Instant::now() + Duration::from_secs(1);
let mut service_status = None;
while Instant::now() < exit_deadline {
if let Some(status) = service.try_wait().unwrap() {
service_status = Some(status);
break;
}
thread::sleep(Duration::from_millis(20));
}
if service_status.is_none() {
service.kill().unwrap();
let _ = service.wait();
}
let stderr = service
.stderr
.take()
.map(|mut stream| {
let mut buf = Vec::new();
use std::io::Read;
let _ = stream.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
})
.unwrap_or_default();
assert!(
service_status.is_none(),
"sg_claw exited after client disconnected mid-task; stderr={stderr}"
);
}
#[test]
fn submit_task_client_message_converts_into_shared_runner_request() {
let message = ClientMessage::SubmitTask {
instruction: "continue task".to_string(),
conversation_id: "conv-1".to_string(),
messages: vec![sgclaw::pipe::ConversationMessage {
role: "user".to_string(),
content: "prior turn".to_string(),
}],
page_url: "https://example.com".to_string(),
page_title: "Example".to_string(),
};
let request = message
.into_submit_task_request()
.expect("submit task request");
assert_eq!(request.instruction, "continue task");
assert_eq!(request.conversation_id.as_deref(), Some("conv-1"));
assert_eq!(request.messages.len(), 1);
assert_eq!(request.page_url.as_deref(), Some("https://example.com"));
assert_eq!(request.page_title.as_deref(), Some("Example"));
}
#[test]
fn ping_client_message_does_not_convert_into_submit_task_request() {
let message = ClientMessage::Ping;
assert!(message.into_submit_task_request().is_none());
}
#[test]
fn lifecycle_client_messages_do_not_convert_into_submit_task_request() {
for message in [
ClientMessage::Connect,
ClientMessage::Start,
ClientMessage::Stop,
] {
assert!(message.into_submit_task_request().is_none());
}
}
#[test]
fn lifecycle_client_messages_round_trip_with_stable_tags() {
let cases = [
(ClientMessage::Connect, r#"{"type":"connect"}"#),
(ClientMessage::Start, r#"{"type":"start"}"#),
(ClientMessage::Stop, r#"{"type":"stop"}"#),
];
for (message, raw) in cases {
assert_eq!(serde_json::to_string(&message).unwrap(), raw);
let decoded: ClientMessage = serde_json::from_str(raw).unwrap();
assert_eq!(decoded, message);
}
}
#[test]
fn service_messages_round_trip_with_stable_tags() {
let cases = [
(
ServiceMessage::StatusChanged {
state: "started".to_string(),
},
r#"{"type":"status_changed","state":"started"}"#,
),
(ServiceMessage::Pong, r#"{"type":"pong"}"#),
];
for (message, raw) in cases {
assert_eq!(serde_json::to_string(&message).unwrap(), raw);
let decoded: ServiceMessage = serde_json::from_str(raw).unwrap();
assert_eq!(decoded, message);
}
}
#[test]
fn service_event_sink_maps_log_completion_and_status_messages() {
let sink = ServiceEventSink::default();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::StatusChanged {
state: "started".to_string(),
},
)
.unwrap();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::LogEntry {
level: "info".to_string(),
message: "hello".to_string(),
},
)
.unwrap();
sgclaw::agent::AgentEventSink::send(
&sink,
&AgentMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
)
.unwrap();
assert_eq!(
sink.sent_messages(),
vec![
ServiceMessage::StatusChanged {
state: "started".to_string(),
},
ServiceMessage::LogEntry {
level: "info".to_string(),
message: "hello".to_string(),
},
ServiceMessage::TaskComplete {
success: true,
summary: "done".to_string(),
},
]
);
}