use std::io::{Read as _, Write}; use std::net::TcpListener; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use reqwest::blocking::Client; use serde_json::{json, Value}; use tungstenite::{accept, connect, Message}; use sgclaw::agent::AgentRuntimeContext; use sgclaw::pipe::AgentMessage; use sgclaw::service::{ClientMessage, ServiceEventSink, ServiceMessage, ServiceSession}; const RUNTIME_DROP_PANIC_TEXT: &str = "Cannot drop a runtime in a context where blocking is not allowed"; const TEST_ZHIHU_SKILLS_DIR: &str = "D:/data/ideaSpace/rust/sgClaw/claw/claw/skills"; fn read_ws_text(stream: &mut tungstenite::WebSocket) -> String where S: std::io::Read + std::io::Write, { match stream.read().unwrap() { Message::Text(text) => text.to_string(), other => panic!("expected text frame, got {other:?}"), } } fn start_fake_deepseek_server( responses: Vec, ) -> (String, Arc>>, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); listener.set_nonblocking(true).unwrap(); let address = format!("http://{}", listener.local_addr().unwrap()); let requests = Arc::new(Mutex::new(Vec::new())); let request_log = requests.clone(); let handle = thread::spawn(move || { for response in responses { let deadline = Instant::now() + Duration::from_secs(5); let (mut stream, _) = loop { match listener.accept() { Ok(pair) => break pair, Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { assert!(Instant::now() < deadline, "timed out waiting for provider request"); thread::sleep(Duration::from_millis(10)); } Err(err) => panic!("failed to accept provider request: {err}"), } }; stream.set_nonblocking(false).unwrap(); let body = match read_http_json_body(&mut stream) { Ok(body) => body, Err(_) => continue, }; request_log.lock().unwrap().push(body); let payload = response.to_string(); let reply = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", payload.as_bytes().len(), payload ); stream.write_all(reply.as_bytes()).unwrap(); stream.flush().unwrap(); } }); (address, requests, handle) } fn start_lenient_deepseek_server( responses: Vec, ) -> (String, Arc>>, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); listener.set_nonblocking(true).unwrap(); let address = format!("http://{}", listener.local_addr().unwrap()); let requests = Arc::new(Mutex::new(Vec::new())); let request_log = requests.clone(); let handle = thread::spawn(move || { for response in responses { let deadline = Instant::now() + Duration::from_millis(750); let accepted = loop { match listener.accept() { Ok(pair) => break Some(pair), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { if Instant::now() >= deadline { break None; } thread::sleep(Duration::from_millis(10)); } Err(err) => panic!("failed to accept provider request: {err}"), } }; let Some((mut stream, _)) = accepted else { break; }; stream.set_nonblocking(false).unwrap(); let body = match read_http_json_body(&mut stream) { Ok(body) => body, Err(_) => continue, }; request_log.lock().unwrap().push(body); let payload = response.to_string(); let reply = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", payload.as_bytes().len(), payload ); stream.write_all(reply.as_bytes()).unwrap(); stream.flush().unwrap(); } }); (address, requests, handle) } fn read_http_json_body(stream: &mut impl std::io::Read) -> Result { let mut buffer = Vec::new(); let mut headers_end = None; while headers_end.is_none() { let mut chunk = [0_u8; 1024]; let bytes = stream.read(&mut chunk).unwrap(); if bytes == 0 { return Err("unexpected EOF while reading headers"); } buffer.extend_from_slice(&chunk[..bytes]); headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n"); } let headers_end = headers_end.unwrap() + 4; let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap(); let Some(content_length) = headers.lines().find_map(|line| { let (name, value) = line.split_once(':')?; name.eq_ignore_ascii_case("content-length") .then(|| value.trim().parse::().unwrap()) }) else { return Err("missing content-length header"); }; while buffer.len() < headers_end + content_length { let mut chunk = vec![0_u8; content_length]; let bytes = stream.read(&mut chunk).unwrap(); if bytes == 0 { return Err("unexpected EOF while reading body"); } buffer.extend_from_slice(&chunk[..bytes]); } Ok(serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap()) } #[derive(Debug)] enum CallbackHostBrowserEvent { BrowserFrame(Value), CommandEnvelope(Value), } fn start_callback_host_hotlist_browser_server( event_tx: mpsc::Sender, ) -> (String, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let address = listener.local_addr().unwrap(); let handle = thread::spawn(move || { let (stream, _) = listener.accept().unwrap(); stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); stream.set_write_timeout(Some(Duration::from_secs(2))).unwrap(); let mut websocket = accept(stream).unwrap(); let register = match websocket.read().unwrap() { Message::Text(text) => serde_json::from_str::(&text).unwrap(), other => panic!("expected register frame, got {other:?}"), }; event_tx .send(CallbackHostBrowserEvent::BrowserFrame(register)) .unwrap(); websocket .send(Message::Text( r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"# .to_string() .into(), )) .unwrap(); let first_action = match websocket.read().unwrap() { Message::Text(text) => serde_json::from_str::(&text).unwrap(), other => panic!("expected browser action frame, got {other:?}"), }; event_tx .send(CallbackHostBrowserEvent::BrowserFrame(first_action.clone())) .unwrap(); let Some(values) = first_action.as_array() else { websocket.close(None).ok(); return; }; let is_helper_open = values.len() >= 3 && values[1] == json!("sgBrowerserOpenPage") && values[2] .as_str() .is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html")); if !is_helper_open { websocket.close(None).ok(); return; } let helper_url = values[2].as_str().unwrap().to_string(); let helper_origin = helper_url .trim_end_matches("/sgclaw/browser-helper.html") .to_string(); let helper_client = Client::builder() .timeout(Duration::from_secs(2)) .pool_max_idle_per_host(0) .build() .unwrap(); let helper_html = helper_client .get(&helper_url) .send() .unwrap() .error_for_status() .unwrap() .text() .unwrap(); assert!(helper_html.contains("sgclawReady")); assert!(helper_html.contains("sgclawOnLoaded")); assert!(helper_html.contains("sgclawOnGetText")); assert!(helper_html.contains("sgclawOnEval")); let pre_ready_command: Value = helper_client .get(format!("{helper_origin}/sgclaw/callback/commands/next")) .send() .unwrap() .error_for_status() .unwrap() .json() .unwrap(); event_tx .send(CallbackHostBrowserEvent::CommandEnvelope(pre_ready_command)) .unwrap(); helper_client .post(format!("{helper_origin}/sgclaw/callback/ready")) .json(&json!({ "type": "ready", "helper_url": helper_url, })) .send() .unwrap() .error_for_status() .unwrap(); let hotlist_text = "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度"; let hotlist_payload = json!({ "source": "https://www.zhihu.com/hot", "sheet_name": "知乎热榜", "columns": ["rank", "title", "heat"], "rows": [[1, "问题一", "344万"], [2, "问题二", "266万"]] }) .to_string(); let deadline = Instant::now() + Duration::from_secs(10); let mut saw_get_text = false; let mut saw_eval = false; while Instant::now() < deadline { let envelope: Value = match helper_client .get(format!("{helper_origin}/sgclaw/callback/commands/next")) .send() .and_then(|response| response.error_for_status()) .and_then(|response| response.json()) { Ok(envelope) => envelope, Err(_) => { thread::sleep(Duration::from_millis(20)); continue; } }; let Some(command) = envelope.get("command").and_then(Value::as_object) else { thread::sleep(Duration::from_millis(20)); continue; }; event_tx .send(CallbackHostBrowserEvent::CommandEnvelope(envelope.clone())) .unwrap(); let action_name = command .get("action") .and_then(Value::as_str) .unwrap_or_default() .to_string(); helper_client .post(format!("{helper_origin}/sgclaw/callback/commands/ack")) .json(&json!({ "type": "command_ack" })) .send() .unwrap() .error_for_status() .unwrap(); let args = command .get("args") .and_then(Value::as_array) .cloned() .unwrap_or_default(); match action_name.as_str() { "sgBrowerserOpenPage" => {} "sgBrowserExcuteJsCodeByDomain" => { let script = args.get(1).and_then(Value::as_str).unwrap_or_default(); if script.contains("sgclawOnGetText") { saw_get_text = true; helper_client .post(format!("{helper_origin}/sgclaw/callback/events")) .json(&json!({ "callback": "sgclawOnGetText", "request_url": helper_url, "target_url": "https://www.zhihu.com/hot", "action": action_name, "payload": { "text": hotlist_text } })) .send() .unwrap() .error_for_status() .unwrap(); } else if script.contains("sgclawOnEval") { saw_eval = true; helper_client .post(format!("{helper_origin}/sgclaw/callback/events")) .json(&json!({ "callback": "sgclawOnEval", "request_url": helper_url, "target_url": "https://www.zhihu.com/hot", "action": action_name, "payload": { "value": hotlist_payload } })) .send() .unwrap() .error_for_status() .unwrap(); break; } else { panic!("unexpected callback-host domain command: {script}"); } } other => panic!("unexpected callback-host command action {other}"), } } assert!(saw_get_text, "expected callback-host getText command"); assert!(saw_eval, "expected callback-host eval command"); websocket.close(None).ok(); }); (format!("ws://{address}"), handle) } fn start_direct_zhihu_browser_ws_server() -> (String, Arc>>, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let address = listener.local_addr().unwrap(); let frames = Arc::new(Mutex::new(Vec::new())); let frames_for_thread = Arc::clone(&frames); let handle = thread::spawn(move || { let (stream, _) = listener.accept().unwrap(); stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); let mut socket = accept(stream).unwrap(); let mut action_count = 0_u64; loop { let message = match socket.read() { Ok(message) => message, Err(tungstenite::Error::ConnectionClosed) | Err(tungstenite::Error::AlreadyClosed) => break, Err(err) => panic!("browser ws test server read failed: {err}"), }; let payload = match message { Message::Text(text) => text.to_string(), Message::Ping(payload) => { socket.send(Message::Pong(payload)).unwrap(); continue; } Message::Close(_) => break, other => panic!("expected text frame, got {other:?}"), }; frames_for_thread.lock().unwrap().push(payload.clone()); let parsed: Value = serde_json::from_str(&payload).unwrap(); if parsed.get("type").and_then(Value::as_str) == Some("register") { continue; } let values = parsed.as_array().expect("browser action frame should be an array"); let request_url = values[0].as_str().expect("request_url should be a string"); let action = values[1].as_str().expect("action should be a string"); action_count += 1; socket .send(Message::Text( r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"# .to_string() .into(), )) .unwrap(); socket.send(Message::Text("0".into())).unwrap(); let callback_frame = match action { "sgHideBrowserCallAfterLoaded" => { let target_url = values[2].as_str().expect("navigate target_url should be a string"); json!([ request_url, "callBackJsToCpp", format!( "{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgHideBrowserCallAfterLoaded@_@" ) ]) } "sgBrowserExcuteJsCodeByArea" => { let target_url = values[2].as_str().expect("script target_url should be a string"); let response_text = if action_count == 2 { "知乎热榜\n1 问题一 344万热度\n2 问题二 266万热度".to_string() } else { r#"{"source":"https://www.zhihu.com/hot","sheet_name":"知乎热榜","columns":["rank","title","heat"],"rows":[[1,"问题一","344万"],[2,"问题二","266万"]]}"#.to_string() }; json!([ request_url, "callBackJsToCpp", format!( "{request_url}@_@{target_url}@_@sgclaw_cb_{action_count}@_@sgBrowserExcuteJsCodeByArea@_@{response_text}" ) ]) } other => panic!("unexpected browser action {other}"), }; socket .send(Message::Text(callback_frame.to_string().into())) .unwrap(); if action_count >= 3 { break; } } socket.close(None).ok(); }); (format!("ws://{address}"), frames, handle) } #[test] fn service_entrypoint_function_is_exported() { let entry: fn() -> Result<(), sgclaw::pipe::PipeError> = sgclaw::service::run; let _ = entry; } #[test] fn service_run_requires_llm_config_for_startup() { std::env::remove_var("DEEPSEEK_API_KEY"); std::env::remove_var("DEEPSEEK_BASE_URL"); std::env::remove_var("DEEPSEEK_MODEL"); let result = sgclaw::service::run(); assert!(matches!( result, Err(sgclaw::pipe::PipeError::Protocol(message)) if message.contains("missing environment variable: DEEPSEEK_API_KEY") )); } #[test] fn service_startup_config_loads_ws_endpoints_from_browser_config() { let root = std::env::temp_dir().join(format!("sgclaw-service-startup-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, r#"{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "127.0.0.1:42321" }"#, ) .unwrap(); let startup = sgclaw::service::load_startup_config(&AgentRuntimeContext::new( Some(config_path), root, )) .unwrap(); assert_eq!(startup.browser_ws_url.as_deref(), Some("ws://127.0.0.1:12345")); assert_eq!( startup.service_ws_listen_addr.as_deref(), Some("127.0.0.1:42321") ); } #[test] fn service_startup_config_uses_default_ws_endpoints_when_not_configured() { let root = std::env::temp_dir().join(format!("sgclaw-service-defaults-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, r#"{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat" }"#, ) .unwrap(); let startup = sgclaw::service::load_startup_config(&AgentRuntimeContext::new( Some(config_path), root, )) .unwrap(); assert_eq!( startup.browser_ws_url.as_deref(), Some("ws://127.0.0.1:12345") ); assert_eq!( startup.service_ws_listen_addr.as_deref(), Some("127.0.0.1:42321") ); } #[test] fn service_binary_reports_resolved_startup_endpoints() { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = listener.local_addr().unwrap(); drop(listener); let root = std::env::temp_dir().join(format!("sgclaw-service-report-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut child = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let deadline = Instant::now() + Duration::from_secs(2); let mut stderr = String::new(); while Instant::now() < deadline { if let Some(stream) = child.stderr.as_mut() { use std::io::Read; let mut buf = [0_u8; 1024]; match stream.read(&mut buf) { Ok(0) => {} Ok(n) => { stderr.push_str(&String::from_utf8_lossy(&buf[..n])); if stderr.contains("sg_claw ready:") { break; } } Err(_) => {} } } if child.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(20)); } let status = child.try_wait().unwrap(); if status.is_none() { child.kill().unwrap(); let _ = child.wait(); } if let Some(mut stream) = child.stderr.take() { use std::io::Read; let mut buf = Vec::new(); let _ = stream.read_to_end(&mut buf); stderr.push_str(&String::from_utf8_lossy(&buf)); } assert!(stderr.contains("sg_claw ready:")); assert!(stderr.contains(&service_addr.to_string())); assert!(stderr.contains("ws://127.0.0.1:12345")); } #[test] fn service_binary_keeps_service_ws_listener_available_for_client_connections() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let root = std::env::temp_dir().join(format!("sgclaw-service-live-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut child = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let deadline = Instant::now() + Duration::from_secs(2); let mut connected = false; while Instant::now() < deadline { match connect(ws_url.as_str()) { Ok((socket, _)) => { connected = true; drop(socket); break; } Err(_) => { if child.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(50)); } } } let status = child.try_wait().unwrap(); if status.is_none() { child.kill().unwrap(); let _ = child.wait(); } let stderr = child .stderr .take() .map(|mut stream| { let mut buf = Vec::new(); use std::io::Read; let _ = stream.read_to_end(&mut buf); String::from_utf8_lossy(&buf).into_owned() }) .unwrap_or_default(); assert!(connected, "service ws listener never became available; stderr={stderr}"); assert!(status.is_none(), "sg_claw exited before client could connect; stderr={stderr}"); } #[test] fn service_binary_survives_real_client_disconnect_after_task_complete() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut service = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let ready_deadline = Instant::now() + Duration::from_secs(2); let mut stderr = String::new(); while Instant::now() < ready_deadline { if let Some(stream) = service.stderr.as_mut() { let mut buf = [0_u8; 1024]; match stream.read(&mut buf) { Ok(0) => {} Ok(n) => { stderr.push_str(&String::from_utf8_lossy(&buf[..n])); if stderr.contains("sg_claw ready:") { break; } } Err(_) => {} } } if service.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(20)); } assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}"); let mut client = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"), ) .env("SG_CLAW_SERVICE_WS_URL", &ws_url) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); client.stdin.as_mut().unwrap().write_all("你好\n".as_bytes()).unwrap(); let client_output = client.wait_with_output().unwrap(); assert!( client_output.status.success(), "client failed: stdout={} stderr={}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr) ); assert!( String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"), "client did not receive TaskComplete summary: stdout={} stderr={}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr) ); let exit_deadline = Instant::now() + Duration::from_secs(1); let mut service_status = None; while Instant::now() < exit_deadline { if let Some(status) = service.try_wait().unwrap() { service_status = Some(status); break; } thread::sleep(Duration::from_millis(20)); } if service_status.is_none() { service.kill().unwrap(); let _ = service.wait(); } let stderr = service .stderr .take() .map(|mut stream| { let mut buf = Vec::new(); use std::io::Read; let _ = stream.read_to_end(&mut buf); String::from_utf8_lossy(&buf).into_owned() }) .unwrap_or_default(); assert!( service_status.is_none(), "sg_claw exited after client disconnect; stderr={stderr}" ); } #[test] fn service_binary_submit_flow_routes_zhihu_through_callback_host() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let (event_tx, event_rx) = mpsc::channel(); let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx); let root = std::env::temp_dir().join(format!("sgclaw-service-zhihu-submit-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "http://127.0.0.1:9", "model": "deepseek-chat", "skillsDir": "{TEST_ZHIHU_SKILLS_DIR}", "browserWsUrl": "{browser_ws_url}", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut service = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let ready_deadline = Instant::now() + Duration::from_secs(2); let mut stderr = String::new(); while Instant::now() < ready_deadline { if let Some(stream) = service.stderr.as_mut() { let mut buf = [0_u8; 1024]; match stream.read(&mut buf) { Ok(0) => {} Ok(n) => { stderr.push_str(&String::from_utf8_lossy(&buf[..n])); if stderr.contains("sg_claw ready:") { break; } } Err(_) => {} } } if service.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(20)); } assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}"); let mut client = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"), ) .env("SG_CLAW_SERVICE_WS_URL", &ws_url) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); client .stdin .as_mut() .unwrap() .write_all("打开知乎热榜,获取前10条数据,并导出 Excel\n".as_bytes()) .unwrap(); let client_output = client.wait_with_output().unwrap(); browser_server.join().unwrap(); let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let service_status = service.try_wait().unwrap(); if service_status.is_none() { service.kill().unwrap(); let _ = service.wait(); } if let Some(mut stream) = service.stderr.take() { let mut buf = Vec::new(); let _ = stream.read_to_end(&mut buf); stderr.push_str(&String::from_utf8_lossy(&buf)); } let combined_output = format!( "{}\n{}\n{}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr), stderr ); let register = match register { CallbackHostBrowserEvent::BrowserFrame(value) => value, other => panic!("expected register browser frame, got {other:?}"), }; assert_eq!(register, json!({ "type": "register", "role": "web" })); let bootstrap = match bootstrap { CallbackHostBrowserEvent::BrowserFrame(value) => value, other => panic!("expected helper bootstrap frame, got {other:?}"), }; assert_eq!(bootstrap[0], json!("https://www.zhihu.com")); assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage")); assert!(bootstrap[2] .as_str() .is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"))); let pre_ready = match pre_ready { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected pre-ready command envelope, got {other:?}"), }; assert_eq!(pre_ready, json!({ "ok": false, "command": null })); let open_page = match open_page { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected open-page command envelope, got {other:?}"), }; assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage")); assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot")); let get_text = match get_text { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected getText command envelope, got {other:?}"), }; assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain")); assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com")); assert!(get_text["command"]["args"][1] .as_str() .is_some_and(|script| script.contains("sgclawOnGetText"))); let eval = match eval { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected eval command envelope, got {other:?}"), }; assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain")); assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com")); assert!(eval["command"]["args"][1] .as_str() .is_some_and(|script| script.contains("sgclawOnEval"))); assert!(client_output.status.success()); assert!( String::from_utf8_lossy(&client_output.stdout).contains("已导出并打开知乎热榜 Excel"), "client stdout={} stderr={}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr) ); assert!( String::from_utf8_lossy(&client_output.stdout).contains(".xlsx"), "client stdout={} stderr={}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr) ); assert!( !combined_output.contains(RUNTIME_DROP_PANIC_TEXT), "service submit flow still contains runtime-drop panic: {combined_output}" ); } #[test] fn service_binary_submit_flow_uses_callback_host_command_semantics_for_zhihu() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let (event_tx, event_rx) = mpsc::channel(); let (browser_ws_url, browser_server) = start_callback_host_hotlist_browser_server(event_tx); let root = std::env::temp_dir().join(format!("sgclaw-service-session-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "http://127.0.0.1:9", "model": "deepseek-chat", "skillsDir": "{TEST_ZHIHU_SKILLS_DIR}", "browserWsUrl": "{browser_ws_url}", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut service = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let ready_deadline = Instant::now() + Duration::from_secs(2); let mut stderr = String::new(); while Instant::now() < ready_deadline { if let Some(stream) = service.stderr.as_mut() { let mut buf = [0_u8; 1024]; match stream.read(&mut buf) { Ok(0) => {} Ok(n) => { stderr.push_str(&String::from_utf8_lossy(&buf[..n])); if stderr.contains("sg_claw ready:") { break; } } Err(_) => {} } } if service.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(20)); } assert!(stderr.contains("sg_claw ready:"), "service did not report readiness; stderr={stderr}"); let mut client = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw_client").expect("sg_claw_client test binary path"), ) .env("SG_CLAW_SERVICE_WS_URL", &ws_url) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); client .stdin .as_mut() .unwrap() .write_all("打开知乎热榜,获取前10条数据,并导出 Excel\n".as_bytes()) .unwrap(); let client_output = client.wait_with_output().unwrap(); browser_server.join().unwrap(); let register = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let bootstrap = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let pre_ready = event_rx.recv_timeout(Duration::from_secs(2)).unwrap(); let open_page = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let get_text = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let eval = event_rx.recv_timeout(Duration::from_secs(4)).unwrap(); let service_status = service.try_wait().unwrap(); if service_status.is_none() { service.kill().unwrap(); let _ = service.wait(); } if let Some(mut stream) = service.stderr.take() { let mut buf = Vec::new(); let _ = stream.read_to_end(&mut buf); stderr.push_str(&String::from_utf8_lossy(&buf)); } let combined_output = format!( "{}\n{}\n{}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr), stderr ); let register = match register { CallbackHostBrowserEvent::BrowserFrame(value) => value, other => panic!("expected register browser frame, got {other:?}"), }; assert_eq!(register, json!({ "type": "register", "role": "web" })); let bootstrap = match bootstrap { CallbackHostBrowserEvent::BrowserFrame(value) => value, other => panic!("expected helper bootstrap frame, got {other:?}"), }; assert_eq!(bootstrap[0], json!("https://www.zhihu.com")); assert_eq!(bootstrap[1], json!("sgBrowerserOpenPage")); assert!(bootstrap[2] .as_str() .is_some_and(|url| url.ends_with("/sgclaw/browser-helper.html"))); let pre_ready = match pre_ready { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected pre-ready command envelope, got {other:?}"), }; assert_eq!(pre_ready, json!({ "ok": false, "command": null })); let open_page = match open_page { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected open-page command envelope, got {other:?}"), }; assert_eq!(open_page["command"]["action"], json!("sgBrowerserOpenPage")); assert_eq!(open_page["command"]["args"][0], json!("https://www.zhihu.com/hot")); let get_text = match get_text { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected getText command envelope, got {other:?}"), }; assert_eq!(get_text["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain")); assert_eq!(get_text["command"]["args"][0], json!("www.zhihu.com")); assert!(get_text["command"]["args"][1] .as_str() .is_some_and(|script| script.contains("sgclawOnGetText"))); let eval = match eval { CallbackHostBrowserEvent::CommandEnvelope(value) => value, other => panic!("expected eval command envelope, got {other:?}"), }; assert_eq!(eval["command"]["action"], json!("sgBrowserExcuteJsCodeByDomain")); assert_eq!(eval["command"]["args"][0], json!("www.zhihu.com")); assert!(eval["command"]["args"][1] .as_str() .is_some_and(|script| script.contains("sgclawOnEval"))); assert!(client_output.status.success()); assert!( !combined_output.contains(RUNTIME_DROP_PANIC_TEXT), "service submit flow still contains runtime-drop panic: {combined_output}" ); } #[test] fn first_client_attaches_without_busy_error() { let session = ServiceSession::new(); let result = session.try_attach_client(); assert_eq!(result, Ok(())); } #[test] fn second_client_gets_busy_message() { let session = ServiceSession::new(); assert_eq!(session.try_attach_client(), Ok(())); let result = session.try_attach_client(); assert_eq!( result, Err(ServiceMessage::Busy { message: "service already has an attached client".to_string(), }) ); } #[test] fn disconnect_releases_the_session_for_a_new_client() { let session = ServiceSession::new(); assert_eq!(session.try_attach_client(), Ok(())); session.detach_client(); assert_eq!(session.try_attach_client(), Ok(())); } #[test] fn task_cannot_start_without_an_attached_client() { let session = ServiceSession::new(); let result = session.try_start_task(); assert_eq!( result, Err(ServiceMessage::Busy { message: "service has no attached client".to_string(), }) ); } #[test] fn overlapping_task_submission_is_rejected() { let session = ServiceSession::new(); assert_eq!(session.try_attach_client(), Ok(())); assert_eq!(session.try_start_task(), Ok(())); let result = session.try_start_task(); assert_eq!( result, Err(ServiceMessage::Busy { message: "service already has a running task".to_string(), }) ); } #[test] fn service_binary_accepts_connect_request_without_starting_browser_task() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let root = std::env::temp_dir().join(format!("sgclaw-service-connect-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut service = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let connect_deadline = Instant::now() + Duration::from_secs(2); let mut websocket = None; while Instant::now() < connect_deadline { match connect(ws_url.as_str()) { Ok((socket, _)) => { websocket = Some(socket); break; } Err(_) => { if service.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(50)); } } } let mut websocket = websocket.expect("service ws listener never became available"); websocket .send(Message::Text( serde_json::to_string(&ClientMessage::Connect).unwrap().into(), )) .unwrap(); let frame = read_ws_text(&mut websocket); let message: ServiceMessage = serde_json::from_str(&frame).unwrap(); assert_eq!( message, ServiceMessage::StatusChanged { state: "connected".to_string(), } ); websocket.close(None).unwrap(); let exit_deadline = Instant::now() + Duration::from_secs(1); let mut service_status = None; while Instant::now() < exit_deadline { if let Some(status) = service.try_wait().unwrap() { service_status = Some(status); break; } thread::sleep(Duration::from_millis(20)); } if service_status.is_none() { service.kill().unwrap(); let _ = service.wait(); } let stderr = service .stderr .take() .map(|mut stream| { let mut buf = Vec::new(); use std::io::Read; let _ = stream.read_to_end(&mut buf); String::from_utf8_lossy(&buf).into_owned() }) .unwrap_or_default(); assert!( service_status.is_none(), "sg_claw exited after connect lifecycle request; stderr={stderr}" ); } #[test] fn service_binary_survives_client_disconnect_during_task_completion_send() { let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let service_addr = service_listener.local_addr().unwrap(); drop(service_listener); let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); let config_path = root.join("sgclaw_config.json"); std::fs::write( &config_path, format!( r#"{{ "apiKey": "sk-runtime", "baseUrl": "https://api.deepseek.com", "model": "deepseek-chat", "browserWsUrl": "ws://127.0.0.1:12345", "serviceWsListenAddr": "{service_addr}" }}"# ), ) .unwrap(); let mut service = std::process::Command::new( std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), ) .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") .arg("--config-path") .arg(&config_path) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); let ws_url = format!("ws://{service_addr}"); let connect_deadline = Instant::now() + Duration::from_secs(2); let mut websocket = None; while Instant::now() < connect_deadline { match connect(ws_url.as_str()) { Ok((socket, _)) => { websocket = Some(socket); break; } Err(_) => { if service.try_wait().unwrap().is_some() { break; } thread::sleep(Duration::from_millis(50)); } } } let mut websocket = websocket.expect("service ws listener never became available"); websocket .send(Message::Text( serde_json::to_string(&ClientMessage::SubmitTask { instruction: "你好".to_string(), conversation_id: String::new(), messages: vec![], page_url: String::new(), page_title: String::new(), }) .unwrap() .into(), )) .unwrap(); drop(websocket); let exit_deadline = Instant::now() + Duration::from_secs(1); let mut service_status = None; while Instant::now() < exit_deadline { if let Some(status) = service.try_wait().unwrap() { service_status = Some(status); break; } thread::sleep(Duration::from_millis(20)); } if service_status.is_none() { service.kill().unwrap(); let _ = service.wait(); } let stderr = service .stderr .take() .map(|mut stream| { let mut buf = Vec::new(); use std::io::Read; let _ = stream.read_to_end(&mut buf); String::from_utf8_lossy(&buf).into_owned() }) .unwrap_or_default(); assert!( service_status.is_none(), "sg_claw exited after client disconnected mid-task; stderr={stderr}" ); } #[test] fn submit_task_client_message_converts_into_shared_runner_request() { let message = ClientMessage::SubmitTask { instruction: "continue task".to_string(), conversation_id: "conv-1".to_string(), messages: vec![sgclaw::pipe::ConversationMessage { role: "user".to_string(), content: "prior turn".to_string(), }], page_url: "https://example.com".to_string(), page_title: "Example".to_string(), }; let request = message.into_submit_task_request().expect("submit task request"); assert_eq!(request.instruction, "continue task"); assert_eq!(request.conversation_id.as_deref(), Some("conv-1")); assert_eq!(request.messages.len(), 1); assert_eq!(request.page_url.as_deref(), Some("https://example.com")); assert_eq!(request.page_title.as_deref(), Some("Example")); } #[test] fn ping_client_message_does_not_convert_into_submit_task_request() { let message = ClientMessage::Ping; assert!(message.into_submit_task_request().is_none()); } #[test] fn lifecycle_client_messages_do_not_convert_into_submit_task_request() { for message in [ClientMessage::Connect, ClientMessage::Start, ClientMessage::Stop] { assert!(message.into_submit_task_request().is_none()); } } #[test] fn lifecycle_client_messages_round_trip_with_stable_tags() { let cases = [ (ClientMessage::Connect, r#"{"type":"connect"}"#), (ClientMessage::Start, r#"{"type":"start"}"#), (ClientMessage::Stop, r#"{"type":"stop"}"#), ]; for (message, raw) in cases { assert_eq!(serde_json::to_string(&message).unwrap(), raw); let decoded: ClientMessage = serde_json::from_str(raw).unwrap(); assert_eq!(decoded, message); } } #[test] fn service_messages_round_trip_with_stable_tags() { let cases = [ ( ServiceMessage::StatusChanged { state: "started".to_string(), }, r#"{"type":"status_changed","state":"started"}"#, ), ( ServiceMessage::Pong, r#"{"type":"pong"}"#, ), ]; for (message, raw) in cases { assert_eq!(serde_json::to_string(&message).unwrap(), raw); let decoded: ServiceMessage = serde_json::from_str(raw).unwrap(); assert_eq!(decoded, message); } } #[test] fn service_event_sink_maps_log_completion_and_status_messages() { let sink = ServiceEventSink::default(); sgclaw::agent::AgentEventSink::send( &sink, &AgentMessage::StatusChanged { state: "started".to_string(), }, ) .unwrap(); sgclaw::agent::AgentEventSink::send( &sink, &AgentMessage::LogEntry { level: "info".to_string(), message: "hello".to_string(), }, ) .unwrap(); sgclaw::agent::AgentEventSink::send( &sink, &AgentMessage::TaskComplete { success: true, summary: "done".to_string(), }, ) .unwrap(); assert_eq!( sink.sent_messages(), vec![ ServiceMessage::StatusChanged { state: "started".to_string(), }, ServiceMessage::LogEntry { level: "info".to_string(), message: "hello".to_string(), }, ServiceMessage::TaskComplete { success: true, summary: "done".to_string(), }, ] ); }