mod common; use std::fs; use std::io::{Read, Write}; use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, OnceLock}; use std::thread; use std::time::Duration; use common::MockTransport; use serde_json::{json, Value}; use sgclaw::agent::{ handle_browser_message, handle_browser_message_with_context, AgentRuntimeContext, }; use sgclaw::compat::runtime::{execute_task, CompatTaskContext}; use sgclaw::config::DeepSeekSettings; use sgclaw::pipe::{ Action, AgentMessage, BrowserMessage, BrowserPipeTool, ConversationMessage, Timing, }; use sgclaw::security::MacPolicy; use uuid::Uuid; fn env_lock() -> &'static Mutex<()> { static LOCK: OnceLock> = OnceLock::new(); LOCK.get_or_init(|| Mutex::new(())) } fn test_policy() -> MacPolicy { MacPolicy::from_json_str( r#"{ "version": "1.0", "domains": { "allowed": ["www.baidu.com"] }, "pipe_actions": { "allowed": ["click", "type", "navigate", "getText"], "blocked": [] } }"#, ) .unwrap() } fn temp_workspace_root() -> PathBuf { let root = std::env::temp_dir().join(format!("sgclaw-compat-runtime-{}", Uuid::new_v4())); std::fs::create_dir_all(&root).unwrap(); root } fn write_deepseek_config(root: &Path, api_key: &str, base_url: &str, model: &str) -> PathBuf { let config_path = root.join("sgclaw_config.json"); fs::write( &config_path, serde_json::to_string_pretty(&json!({ "apiKey": api_key, "baseUrl": base_url, "model": model, })) .unwrap(), ) .unwrap(); config_path } fn start_fake_deepseek_server( responses: Vec, ) -> (String, Arc>>, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); listener.set_nonblocking(true).unwrap(); let address = format!("http://{}", listener.local_addr().unwrap()); let requests = Arc::new(Mutex::new(Vec::new())); let request_log = requests.clone(); let handle = thread::spawn(move || { for response in responses { let deadline = std::time::Instant::now() + Duration::from_secs(5); let (mut stream, _) = loop { match listener.accept() { Ok(pair) => break pair, Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { assert!( std::time::Instant::now() < deadline, "timed out waiting for provider request" ); thread::sleep(Duration::from_millis(10)); } Err(err) => panic!("failed to accept provider request: {err}"), } }; let body = read_http_json_body(&mut stream); request_log.lock().unwrap().push(body); let payload = response.to_string(); let reply = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", payload.len(), payload ); stream.write_all(reply.as_bytes()).unwrap(); stream.flush().unwrap(); } }); (address, requests, handle) } fn read_http_json_body(stream: &mut impl Read) -> Value { let mut buffer = Vec::new(); let mut headers_end = None; while headers_end.is_none() { let mut chunk = [0_u8; 1024]; let bytes = stream.read(&mut chunk).unwrap(); assert!(bytes > 0, "unexpected EOF while reading headers"); buffer.extend_from_slice(&chunk[..bytes]); headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n"); } let headers_end = headers_end.unwrap() + 4; let headers = String::from_utf8(buffer[..headers_end].to_vec()).unwrap(); let content_length = headers .lines() .find_map(|line| { let (name, value) = line.split_once(':')?; name.eq_ignore_ascii_case("content-length") .then(|| value.trim().parse::().unwrap()) }) .unwrap(); while buffer.len() < headers_end + content_length { let mut chunk = vec![0_u8; content_length]; let bytes = stream.read(&mut chunk).unwrap(); assert!(bytes > 0, "unexpected EOF while reading body"); buffer.extend_from_slice(&chunk[..bytes]); } serde_json::from_slice(&buffer[headers_end..headers_end + content_length]).unwrap() } #[test] fn compat_runtime_uses_zeroclaw_provider_path_and_executes_browser_actions() { let _guard = env_lock().lock().unwrap_or_else(|err| err.into_inner()); let first_response = json!({ "choices": [{ "message": { "content": "", "tool_calls": [ { "id": "call_1", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "navigate", "expected_domain": "www.baidu.com", "url": "https://www.baidu.com" })).unwrap() } }, { "id": "call_2", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "type", "expected_domain": "www.baidu.com", "selector": "#kw", "text": "天气", "clear_first": true })).unwrap() } } ] } }], "usage": { "prompt_tokens": 12, "completion_tokens": 7 } }); let second_response = json!({ "choices": [{ "message": { "content": "已通过 ZeroClaw 执行任务: 打开百度搜索天气" } }], "usage": { "prompt_tokens": 15, "completion_tokens": 8 } }); let (base_url, requests, server_handle) = start_fake_deepseek_server(vec![first_response, second_response]); std::env::set_var("DEEPSEEK_API_KEY", "deepseek-test-key"); std::env::set_var("DEEPSEEK_BASE_URL", base_url); std::env::set_var("DEEPSEEK_MODEL", "deepseek-chat"); let workspace_root = temp_workspace_root(); let settings = DeepSeekSettings::from_env().unwrap(); let transport = Arc::new(MockTransport::new(vec![ BrowserMessage::Response { seq: 1, success: true, data: json!({ "navigated": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }, BrowserMessage::Response { seq: 2, success: true, data: json!({ "typed": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 11, }, }, ])); let browser_tool = BrowserPipeTool::new( transport.clone(), test_policy(), vec![1, 2, 3, 4, 5, 6, 7, 8], ) .with_response_timeout(Duration::from_secs(1)); let summary = execute_task( transport.as_ref(), browser_tool, "打开百度搜索天气", &CompatTaskContext::default(), &workspace_root, &settings, ) .unwrap(); server_handle.join().unwrap(); let request_bodies = requests.lock().unwrap().clone(); let sent = transport.sent_messages(); assert_eq!(summary, "已通过 ZeroClaw 执行任务: 打开百度搜索天气"); assert_eq!(request_bodies.len(), 2); assert_eq!(request_bodies[0]["model"], json!("deepseek-chat")); assert_eq!( request_bodies[0]["tools"][0]["function"]["name"], json!("browser_action") ); assert!(request_bodies[1].to_string().contains("tool_call_id")); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::LogEntry { level, message } if level == "info" && message == "navigate https://www.baidu.com" ) })); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::LogEntry { level, message } if level == "info" && message == "type 天气 into #kw" ) })); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::Command { action, .. } if action == &Action::Navigate ) })); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::Command { action, .. } if action == &Action::Type ) })); } #[test] fn handle_browser_message_prefers_compat_runtime_for_supported_instruction_when_deepseek_is_configured( ) { let _guard = env_lock().lock().unwrap_or_else(|err| err.into_inner()); let first_response = json!({ "choices": [{ "message": { "content": "", "tool_calls": [ { "id": "call_1", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "navigate", "expected_domain": "www.baidu.com", "url": "https://www.baidu.com" })).unwrap() } }, { "id": "call_2", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "type", "expected_domain": "www.baidu.com", "selector": "#kw", "text": "天气", "clear_first": true })).unwrap() } }, { "id": "call_3", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "click", "expected_domain": "www.baidu.com", "selector": "#su" })).unwrap() } } ] } }] }); let second_response = json!({ "choices": [{ "message": { "content": "已通过 DeepSeek 执行任务: 打开百度搜索天气" } }] }); let (base_url, requests, server_handle) = start_fake_deepseek_server(vec![first_response, second_response]); std::env::remove_var("DEEPSEEK_API_KEY"); std::env::remove_var("DEEPSEEK_BASE_URL"); std::env::remove_var("DEEPSEEK_MODEL"); let workspace_root = temp_workspace_root(); let config_path = write_deepseek_config( &workspace_root, "deepseek-test-key", &base_url, "deepseek-chat", ); let runtime_context = AgentRuntimeContext::new(Some(config_path), workspace_root.clone()); let transport = Arc::new(MockTransport::new(vec![ BrowserMessage::Response { seq: 1, success: true, data: json!({ "navigated": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }, BrowserMessage::Response { seq: 2, success: true, data: json!({ "typed": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }, BrowserMessage::Response { seq: 3, success: true, data: json!({ "clicked": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }, ])); let browser_tool = BrowserPipeTool::new( transport.clone(), test_policy(), vec![1, 2, 3, 4, 5, 6, 7, 8], ) .with_response_timeout(Duration::from_secs(1)); handle_browser_message_with_context( transport.as_ref(), &browser_tool, &runtime_context, BrowserMessage::SubmitTask { instruction: "打开百度搜索天气".to_string(), conversation_id: String::new(), messages: vec![], page_url: String::new(), page_title: String::new(), }, ) .unwrap(); server_handle.join().unwrap(); let sent = transport.sent_messages(); let request_bodies = requests.lock().unwrap().clone(); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::TaskComplete { success, summary } if *success && summary == "已通过 DeepSeek 执行任务: 打开百度搜索天气" ) })); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::LogEntry { level, message } if level == "mode" && message == "compat_llm_primary" ) })); assert_eq!(request_bodies.len(), 2); } #[test] fn handle_browser_message_falls_back_to_compat_runtime_for_unsupported_instruction() { let _guard = env_lock().lock().unwrap_or_else(|err| err.into_inner()); let first_response = json!({ "choices": [{ "message": { "content": "", "tool_calls": [{ "id": "call_1", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "navigate", "expected_domain": "www.baidu.com", "url": "https://www.baidu.com" })).unwrap() } }] } }] }); let second_response = json!({ "choices": [{ "message": { "content": "来自 ZeroClaw runtime" } }] }); let (base_url, requests, server_handle) = start_fake_deepseek_server(vec![first_response, second_response]); std::env::set_var("DEEPSEEK_API_KEY", "deepseek-test-key"); std::env::set_var("DEEPSEEK_BASE_URL", base_url); std::env::set_var("DEEPSEEK_MODEL", "deepseek-chat"); let workspace_root = temp_workspace_root(); let original_dir = std::env::current_dir().unwrap(); std::env::set_current_dir(&workspace_root).unwrap(); let transport = Arc::new(MockTransport::new(vec![BrowserMessage::Response { seq: 1, success: true, data: json!({ "navigated": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }])); let browser_tool = BrowserPipeTool::new( transport.clone(), test_policy(), vec![1, 2, 3, 4, 5, 6, 7, 8], ) .with_response_timeout(Duration::from_secs(1)); handle_browser_message( transport.as_ref(), &browser_tool, BrowserMessage::SubmitTask { instruction: "帮我打开百度首页".to_string(), conversation_id: String::new(), messages: vec![], page_url: String::new(), page_title: String::new(), }, ) .unwrap(); server_handle.join().unwrap(); std::env::set_current_dir(original_dir).unwrap(); let sent = transport.sent_messages(); let request_bodies = requests.lock().unwrap().clone(); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::TaskComplete { success, summary } if *success && summary == "来自 ZeroClaw runtime" ) })); assert!(sent.iter().any(|message| { matches!( message, AgentMessage::LogEntry { level, message } if level == "mode" && message == "compat_llm_primary" ) })); assert_eq!(request_bodies.len(), 2); } #[test] fn handle_browser_message_rejects_non_task_greeting_explicitly() { let transport = Arc::new(MockTransport::new(vec![])); let browser_tool = BrowserPipeTool::new( transport.clone(), test_policy(), vec![1, 2, 3, 4, 5, 6, 7, 8], ) .with_response_timeout(Duration::from_secs(1)); handle_browser_message( transport.as_ref(), &browser_tool, BrowserMessage::SubmitTask { instruction: "你好".to_string(), conversation_id: String::new(), messages: vec![], page_url: String::new(), page_title: String::new(), }, ) .unwrap(); let sent = transport.sent_messages(); assert!(matches!( sent.last(), Some(AgentMessage::TaskComplete { success, summary }) if !success && summary.contains("浏览器任务入口") )); } #[test] fn compat_runtime_includes_prior_turns_in_follow_up_provider_request() { let _guard = env_lock().lock().unwrap_or_else(|err| err.into_inner()); let first_response = json!({ "choices": [{ "message": { "content": "", "tool_calls": [{ "id": "call_1", "type": "function", "function": { "name": "browser_action", "arguments": serde_json::to_string(&json!({ "action": "navigate", "expected_domain": "www.zhihu.com", "url": "https://www.zhihu.com/search?q=天气&type=content" })).unwrap() } }] } }] }); let second_response = json!({ "choices": [{ "message": { "content": "已在知乎搜索天气" } }] }); let (base_url, requests, server_handle) = start_fake_deepseek_server(vec![first_response, second_response]); let workspace_root = temp_workspace_root(); let settings = DeepSeekSettings { api_key: "deepseek-test-key".to_string(), base_url, model: "deepseek-chat".to_string(), }; let transport = Arc::new(MockTransport::new(vec![BrowserMessage::Response { seq: 1, success: true, data: json!({ "navigated": true }), aom_snapshot: vec![], timing: Timing { queue_ms: 1, exec_ms: 10, }, }])); let browser_tool = BrowserPipeTool::new( transport.clone(), test_policy(), vec![1, 2, 3, 4, 5, 6, 7, 8], ) .with_response_timeout(Duration::from_secs(1)); let task_context = CompatTaskContext { conversation_id: Some("conversation-1".to_string()), messages: vec![ ConversationMessage { role: "user".to_string(), content: "打开百度搜索天气".to_string(), }, ConversationMessage { role: "assistant".to_string(), content: "已在百度搜索天气".to_string(), }, ], page_url: Some("https://www.zhihu.com/".to_string()), page_title: Some("知乎".to_string()), }; let summary = execute_task( transport.as_ref(), browser_tool, "打开知乎搜索天气", &task_context, &workspace_root, &settings, ) .unwrap(); server_handle.join().unwrap(); let request_bodies = requests.lock().unwrap().clone(); let first_request_messages = request_bodies[0]["messages"] .as_array() .cloned() .unwrap_or_default(); assert_eq!(summary, "已在知乎搜索天气"); assert!(first_request_messages.iter().any(|message| { message["role"] == json!("user") && message["content"] == json!("打开百度搜索天气") })); assert!(first_request_messages.iter().any(|message| { message["role"] == json!("assistant") && message["content"] == json!("已在百度搜索天气") })); }