diff --git a/Cargo.lock b/Cargo.lock index 7de854b..b5ae587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures 0.2.17", +] + [[package]] name = "ahash" version = "0.8.12" @@ -340,6 +351,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "bzip2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "cc" version = "1.2.57" @@ -543,6 +563,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "constant_time_eq" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b" + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -610,6 +636,21 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +[[package]] +name = "deflate64" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6b926516df9c60bfa16e107b21086399f8285a44ca9711344b9e553c5146e2" + +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "dialoguer" version = "0.12.0" @@ -810,6 +851,7 @@ checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", "miniz_oxide", + "zlib-rs", ] [[package]] @@ -980,11 +1022,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi 6.0.0", "rand_core 0.10.0", "wasip2", "wasip3", + "wasm-bindgen", ] [[package]] @@ -1471,6 +1515,12 @@ dependencies = [ "webpki-roots 1.0.6", ] +[[package]] +name = "libbz2-rs-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" + [[package]] name = "libc" version = "0.2.183" @@ -1543,6 +1593,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lzma-rust2" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47bb1e988e6fb779cf720ad431242d3f03167c1b3f2b1aae7f1a94b2495b36ae" +dependencies = [ + "sha2", +] + [[package]] name = "mail-parser" version = "0.11.2" @@ -1660,6 +1719,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + [[package]] name = "num-traits" version = "0.2.19" @@ -1731,6 +1796,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1838,6 +1913,18 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppmd-rust" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efca4c95a19a79d1c98f791f10aebd5c1363b473244630bb7dbde1dc98455a24" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2435,8 +2522,10 @@ dependencies = [ "sha2", "thiserror 1.0.69", "tokio", + "tungstenite 0.29.0", "uuid", "zeroclawlabs", + "zip", ] [[package]] @@ -2683,6 +2772,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "js-sys", + "num-conv", + "powerfmt", + "serde_core", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + [[package]] name = "tinystr" version = "0.8.2" @@ -3818,19 +3927,79 @@ version = "8.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7756d0206d058333667493c4014f545f4b9603c4330ccd6d9b3f86dcab59f7d9" dependencies = [ + "aes", + "bzip2", + "constant_time_eq", "crc32fast", + "deflate64", "flate2", + "getrandom 0.4.2", + "hmac", "indexmap", + "lzma-rust2", "memchr", + "pbkdf2", + "ppmd-rust", + "sha1", + "time", "typed-path", + "zeroize", + "zopfli", + "zstd", ] +[[package]] +name = "zlib-rs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be3d40e40a133f9c916ee3f9f4fa2d9d63435b5fbe1bfc6d9dae0aa0ada1513" + [[package]] name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" +[[package]] +name = "zopfli" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f05cd8797d63865425ff89b5c4a48804f35ba0ce8d125800027ad6017d2b5249" +dependencies = [ + "bumpalo", + "crc32fast", + "log", + "simd-adler32", +] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "zune-core" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 0411672..9f4bf24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,5 +17,7 @@ serde_json = "1" sha2 = "0.10" thiserror = "1" tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros"] } +tungstenite = "0.29" uuid = { version = "1", features = ["v4"] } zeroclaw = { package = "zeroclawlabs", path = "third_party/zeroclaw", default-features = false } +zip = "8.4" diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 173e978..0a0defe 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -1,104 +1,17 @@ pub mod planner; pub mod runtime; +pub mod task_runner; -use std::ffi::OsString; -use std::path::PathBuf; +use std::sync::Arc; -use crate::compat::config_adapter::resolve_skills_dir_from_sgclaw_settings; -use crate::compat::runtime::CompatTaskContext; -use crate::config::SgClawSettings; +use crate::browser::ws_backend::WsBrowserBackend; +use crate::browser::{BrowserBackend, PipeBrowserBackend}; use crate::pipe::{AgentMessage, BrowserMessage, BrowserPipeTool, PipeError, Transport}; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct AgentRuntimeContext { - config_path: Option, - workspace_root: PathBuf, -} - -impl AgentRuntimeContext { - pub fn new(config_path: Option, workspace_root: PathBuf) -> Self { - Self { - config_path, - workspace_root, - } - } - - pub fn from_process_args(args: I) -> Result - where - I: IntoIterator, - S: Into, - { - let mut config_path = None; - let mut args = args.into_iter().map(Into::into); - let _ = args.next(); - - while let Some(arg) = args.next() { - if arg == OsString::from("--config-path") { - let Some(value) = args.next() else { - return Err(PipeError::Protocol( - "missing value for --config-path".to_string(), - )); - }; - config_path = Some(PathBuf::from(value)); - continue; - } - - let arg_string = arg.to_string_lossy(); - if let Some(value) = arg_string.strip_prefix("--config-path=") { - config_path = Some(PathBuf::from(value)); - } - } - - let workspace_root = config_path - .as_ref() - .and_then(|path| path.parent().map(|parent| parent.to_path_buf())) - .unwrap_or_else(default_workspace_root); - - Ok(Self::new(config_path, workspace_root)) - } - - fn load_sgclaw_settings(&self) -> Result, PipeError> { - SgClawSettings::load(self.config_path.as_deref()) - .map_err(|err| PipeError::Protocol(err.to_string())) - } - - fn settings_source_label(&self) -> String { - match &self.config_path { - Some(path) if path.exists() => path.display().to_string(), - _ => "environment".to_string(), - } - } -} - -impl Default for AgentRuntimeContext { - fn default() -> Self { - Self::new(None, default_workspace_root()) - } -} - -fn default_workspace_root() -> PathBuf { - std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) -} - -fn send_mode_log(transport: &T, mode: &str) -> Result<(), PipeError> { - transport.send(&AgentMessage::LogEntry { - level: "mode".to_string(), - message: mode.to_string(), - }) -} - -fn missing_llm_configuration_summary() -> String { - "未配置大语言模型。请先在 sgclaw_config.json 或环境变量中配置 apiKey、baseUrl 与 model。" - .to_string() -} - -fn runtime_version_log_message() -> String { - format!( - "sgclaw runtime version={} protocol={}", - env!("CARGO_PKG_VERSION"), - crate::pipe::protocol::PROTOCOL_VERSION - ) -} +pub use task_runner::{ + run_submit_task, run_submit_task_with_browser_backend, AgentEventSink, AgentRuntimeContext, + SubmitTaskRequest, +}; fn execute_plan( transport: &T, @@ -127,6 +40,53 @@ fn execute_plan( Ok(plan.summary.clone()) } +fn normalize_optional_submit_field(value: String) -> Option { + let trimmed = value.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) +} + +fn browser_backend_for_submit( + browser_tool: &BrowserPipeTool, + context: &AgentRuntimeContext, + request: &SubmitTaskRequest, +) -> Result, PipeError> { + if let Some(browser_ws_url) = configured_browser_ws_url(context) { + return Ok(Arc::new( + WsBrowserBackend::new( + Arc::new(crate::service::browser_ws_client::ServiceWsClient::connect( + &browser_ws_url, + )?), + browser_tool.mac_policy().clone(), + crate::service::browser_ws_client::initial_request_url_for_submit_task(request), + ) + .with_response_timeout(browser_tool.response_timeout()), + )); + } + + Ok(Arc::new(PipeBrowserBackend::from_inner(browser_tool.clone()))) +} + +fn configured_browser_ws_url(context: &AgentRuntimeContext) -> Option { + std::env::var("SGCLAW_BROWSER_WS_URL") + .ok() + .filter(|value| !value.trim().is_empty()) + .or_else(|| { + context + .load_sgclaw_settings() + .ok() + .flatten() + .and_then(|settings| settings.browser_ws_url) + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + }) +} + +fn send_status_changed(transport: &T, state: &str) -> Result<(), PipeError> { + transport.send(&AgentMessage::StatusChanged { + state: state.to_string(), + }) +} + pub fn execute_task( transport: &T, browser_tool: &BrowserPipeTool, @@ -157,6 +117,9 @@ pub fn handle_browser_message_with_context( message: BrowserMessage, ) -> Result<(), PipeError> { match message { + BrowserMessage::Connect => send_status_changed(transport, "connected"), + BrowserMessage::Start => send_status_changed(transport, "started"), + BrowserMessage::Stop => send_status_changed(transport, "stopped"), BrowserMessage::SubmitTask { instruction, conversation_id, @@ -164,124 +127,15 @@ pub fn handle_browser_message_with_context( page_url, page_title, } => { - let instruction = instruction.trim().to_string(); - if instruction.is_empty() { - return transport.send(&AgentMessage::TaskComplete { - success: false, - summary: "请输入任务内容。".to_string(), - }); - } - - let task_context = CompatTaskContext { - conversation_id: (!conversation_id.trim().is_empty()) - .then_some(conversation_id.clone()), + let request = SubmitTaskRequest { + instruction, + conversation_id: normalize_optional_submit_field(conversation_id), messages, - page_url: (!page_url.trim().is_empty()).then_some(page_url), - page_title: (!page_title.trim().is_empty()).then_some(page_title), + page_url: normalize_optional_submit_field(page_url), + page_title: normalize_optional_submit_field(page_title), }; - let _ = transport.send(&AgentMessage::LogEntry { - level: "info".to_string(), - message: runtime_version_log_message(), - }); - if !task_context.messages.is_empty() { - let _ = transport.send(&AgentMessage::LogEntry { - level: "info".to_string(), - message: format!( - "continuing conversation with {} prior turns", - task_context.messages.len() - ), - }); - } - let completion = match context.load_sgclaw_settings() { - Ok(Some(settings)) => { - let resolved_skills_dir = - resolve_skills_dir_from_sgclaw_settings(&context.workspace_root, &settings); - let _ = transport.send(&AgentMessage::LogEntry { - level: "info".to_string(), - message: format!( - "DeepSeek config loaded from {} model={} base_url={}", - context.settings_source_label(), - settings.provider_model, - settings.provider_base_url - ), - }); - let _ = transport.send(&AgentMessage::LogEntry { - level: "info".to_string(), - message: format!( - "skills dir resolved to {}", - resolved_skills_dir.display() - ), - }); - let _ = transport.send(&AgentMessage::LogEntry { - level: "info".to_string(), - message: format!( - "runtime profile={:?} skills_prompt_mode={:?}", - settings.runtime_profile, settings.skills_prompt_mode - ), - }); - if crate::compat::orchestration::should_use_primary_orchestration( - &instruction, - task_context.page_url.as_deref(), - task_context.page_title.as_deref(), - ) { - let _ = send_mode_log(transport, "zeroclaw_process_message_primary"); - match crate::compat::orchestration::execute_task_with_sgclaw_settings( - transport, - browser_tool.clone(), - &instruction, - &task_context, - &context.workspace_root, - &settings, - ) { - Ok(summary) => { - return transport.send(&AgentMessage::TaskComplete { - success: true, - summary, - }) - } - Err(err) => { - return transport.send(&AgentMessage::TaskComplete { - success: false, - summary: err.to_string(), - }) - } - } - } - let _ = send_mode_log(transport, "compat_llm_primary"); - match crate::compat::runtime::execute_task_with_sgclaw_settings( - transport, - browser_tool.clone(), - &instruction, - &task_context, - &context.workspace_root, - &settings, - ) { - Ok(summary) => AgentMessage::TaskComplete { - success: true, - summary, - }, - Err(err) => AgentMessage::TaskComplete { - success: false, - summary: err.to_string(), - }, - } - } - Ok(None) => AgentMessage::TaskComplete { - success: false, - summary: missing_llm_configuration_summary(), - }, - Err(err) => { - let _ = transport.send(&AgentMessage::LogEntry { - level: "error".to_string(), - message: format!("failed to load DeepSeek config: {err}"), - }); - AgentMessage::TaskComplete { - success: false, - summary: err.to_string(), - } - } - }; - transport.send(&completion) + let browser_backend = browser_backend_for_submit(browser_tool, context, &request)?; + run_submit_task_with_browser_backend(transport, transport, browser_backend, context, request) } BrowserMessage::Init { .. } => { eprintln!("ignoring duplicate init after handshake"); @@ -293,3 +147,17 @@ pub fn handle_browser_message_with_context( } } } + +#[cfg(test)] +mod tests { + use super::normalize_optional_submit_field; + + #[test] + fn normalize_optional_submit_field_trims_and_drops_blank_values() { + assert_eq!(normalize_optional_submit_field(" \n\t ".to_string()), None); + assert_eq!( + normalize_optional_submit_field(" https://example.com/page ".to_string()), + Some("https://example.com/page".to_string()) + ); + } +} diff --git a/src/agent/task_runner.rs b/src/agent/task_runner.rs new file mode 100644 index 0000000..c1df853 --- /dev/null +++ b/src/agent/task_runner.rs @@ -0,0 +1,385 @@ +use std::ffi::OsString; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::browser::BrowserBackend; +use crate::compat::config_adapter::resolve_skills_dir_from_sgclaw_settings; +use crate::compat::runtime::CompatTaskContext; +use crate::config::SgClawSettings; +use crate::pipe::{ + AgentMessage, BrowserPipeTool, ConversationMessage, PipeError, Transport, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AgentRuntimeContext { + config_path: Option, + workspace_root: PathBuf, +} + +impl AgentRuntimeContext { + pub fn new(config_path: Option, workspace_root: PathBuf) -> Self { + Self { + config_path, + workspace_root, + } + } + + pub fn from_process_args(args: I) -> Result + where + I: IntoIterator, + S: Into, + { + let mut config_path = None; + let mut args = args.into_iter().map(Into::into); + let _ = args.next(); + + while let Some(arg) = args.next() { + if arg == OsString::from("--config-path") { + let Some(value) = args.next() else { + return Err(PipeError::Protocol( + "missing value for --config-path".to_string(), + )); + }; + config_path = Some(PathBuf::from(value)); + continue; + } + + let arg_string = arg.to_string_lossy(); + if let Some(value) = arg_string.strip_prefix("--config-path=") { + config_path = Some(PathBuf::from(value)); + } + } + + let workspace_root = config_path + .as_ref() + .and_then(|path| path.parent().map(|parent| parent.to_path_buf())) + .unwrap_or_else(default_workspace_root); + + Ok(Self::new(config_path, workspace_root)) + } + + pub(crate) fn load_sgclaw_settings(&self) -> Result, PipeError> { + SgClawSettings::load(self.config_path.as_deref()) + .map_err(|err| PipeError::Protocol(err.to_string())) + } + + fn settings_source_label(&self) -> String { + match &self.config_path { + Some(path) if path.exists() => path.display().to_string(), + _ => "environment".to_string(), + } + } +} + +impl Default for AgentRuntimeContext { + fn default() -> Self { + Self::new(None, default_workspace_root()) + } +} + +fn default_workspace_root() -> PathBuf { + std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct SubmitTaskRequest { + pub instruction: String, + pub conversation_id: Option, + pub messages: Vec, + pub page_url: Option, + pub page_title: Option, +} + +pub trait AgentEventSink: Send + Sync { + fn send(&self, message: &AgentMessage) -> Result<(), PipeError>; +} + +impl AgentEventSink for T { + fn send(&self, message: &AgentMessage) -> Result<(), PipeError> { + Transport::send(self, message) + } +} + +pub fn run_submit_task( + transport: &T, + sink: &dyn AgentEventSink, + browser_tool: &BrowserPipeTool, + context: &AgentRuntimeContext, + request: SubmitTaskRequest, +) -> Result<(), PipeError> { + let SubmitTaskRequest { + instruction, + conversation_id, + messages, + page_url, + page_title, + } = request; + let instruction = instruction.trim().to_string(); + if instruction.is_empty() { + return sink.send(&AgentMessage::TaskComplete { + success: false, + summary: "请输入任务内容。".to_string(), + }); + } + + let task_context = CompatTaskContext { + conversation_id, + messages, + page_url, + page_title, + }; + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: runtime_version_log_message(), + }); + if !task_context.messages.is_empty() { + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "continuing conversation with {} prior turns", + task_context.messages.len() + ), + }); + } + + let completion = match context.load_sgclaw_settings() { + Ok(Some(settings)) => { + let resolved_skills_dir = + resolve_skills_dir_from_sgclaw_settings(&context.workspace_root, &settings); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "DeepSeek config loaded from {} model={} base_url={}", + context.settings_source_label(), + settings.provider_model, + settings.provider_base_url + ), + }); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!("skills dir resolved to {}", resolved_skills_dir.display()), + }); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "runtime profile={:?} skills_prompt_mode={:?}", + settings.runtime_profile, settings.skills_prompt_mode + ), + }); + if crate::compat::orchestration::should_use_primary_orchestration( + &instruction, + task_context.page_url.as_deref(), + task_context.page_title.as_deref(), + ) { + let _ = send_mode_log(sink, "zeroclaw_process_message_primary"); + match crate::compat::orchestration::execute_task_with_sgclaw_settings( + transport, + browser_tool.clone(), + &instruction, + &task_context, + &context.workspace_root, + &settings, + ) { + Ok(summary) => { + return sink.send(&AgentMessage::TaskComplete { + success: true, + summary, + }); + } + Err(err) => { + return sink.send(&AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + }); + } + } + } + let _ = send_mode_log(sink, "compat_llm_primary"); + match crate::compat::runtime::execute_task_with_sgclaw_settings( + transport, + browser_tool.clone(), + &instruction, + &task_context, + &context.workspace_root, + &settings, + ) { + Ok(summary) => AgentMessage::TaskComplete { + success: true, + summary, + }, + Err(err) => AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + }, + } + } + Ok(None) => AgentMessage::TaskComplete { + success: false, + summary: missing_llm_configuration_summary(), + }, + Err(err) => { + let _ = sink.send(&AgentMessage::LogEntry { + level: "error".to_string(), + message: format!("failed to load DeepSeek config: {err}"), + }); + AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + } + } + }; + + sink.send(&completion) +} + +pub fn run_submit_task_with_browser_backend( + _transport: &T, + sink: &dyn AgentEventSink, + browser_backend: Arc, + context: &AgentRuntimeContext, + request: SubmitTaskRequest, +) -> Result<(), PipeError> { + let SubmitTaskRequest { + instruction, + conversation_id, + messages, + page_url, + page_title, + } = request; + let instruction = instruction.trim().to_string(); + if instruction.is_empty() { + return sink.send(&AgentMessage::TaskComplete { + success: false, + summary: "请输入任务内容。".to_string(), + }); + } + + let task_context = CompatTaskContext { + conversation_id, + messages, + page_url, + page_title, + }; + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: runtime_version_log_message(), + }); + if !task_context.messages.is_empty() { + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "continuing conversation with {} prior turns", + task_context.messages.len() + ), + }); + } + + let completion = match context.load_sgclaw_settings() { + Ok(Some(settings)) => { + let resolved_skills_dir = + resolve_skills_dir_from_sgclaw_settings(&context.workspace_root, &settings); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "DeepSeek config loaded from {} model={} base_url={}", + context.settings_source_label(), + settings.provider_model, + settings.provider_base_url + ), + }); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!("skills dir resolved to {}", resolved_skills_dir.display()), + }); + let _ = sink.send(&AgentMessage::LogEntry { + level: "info".to_string(), + message: format!( + "runtime profile={:?} skills_prompt_mode={:?}", + settings.runtime_profile, settings.skills_prompt_mode + ), + }); + if crate::compat::orchestration::should_use_primary_orchestration( + &instruction, + task_context.page_url.as_deref(), + task_context.page_title.as_deref(), + ) { + let _ = send_mode_log(sink, "zeroclaw_process_message_primary"); + match crate::compat::orchestration::execute_task_with_browser_backend( + sink, + browser_backend.clone(), + &instruction, + &task_context, + &context.workspace_root, + &settings, + ) { + Ok(summary) => { + return sink.send(&AgentMessage::TaskComplete { + success: true, + summary, + }); + } + Err(err) => { + return sink.send(&AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + }); + } + } + } + let _ = send_mode_log(sink, "compat_llm_primary"); + match crate::compat::runtime::execute_task_with_browser_backend( + sink, + browser_backend, + &instruction, + &task_context, + &context.workspace_root, + &settings, + ) { + Ok(summary) => AgentMessage::TaskComplete { + success: true, + summary, + }, + Err(err) => AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + }, + } + } + Ok(None) => AgentMessage::TaskComplete { + success: false, + summary: missing_llm_configuration_summary(), + }, + Err(err) => { + let _ = sink.send(&AgentMessage::LogEntry { + level: "error".to_string(), + message: format!("failed to load DeepSeek config: {err}"), + }); + AgentMessage::TaskComplete { + success: false, + summary: err.to_string(), + } + } + }; + + sink.send(&completion) +} + +fn send_mode_log(sink: &dyn AgentEventSink, mode: &str) -> Result<(), PipeError> { + sink.send(&AgentMessage::LogEntry { + level: "mode".to_string(), + message: mode.to_string(), + }) +} + +fn missing_llm_configuration_summary() -> String { + "未配置大语言模型。请先在 sgclaw_config.json 或环境变量中配置 apiKey、baseUrl 与 model。" + .to_string() +} + +fn runtime_version_log_message() -> String { + format!( + "sgclaw runtime version={} protocol={}", + env!("CARGO_PKG_VERSION"), + crate::pipe::protocol::PROTOCOL_VERSION + ) +} diff --git a/src/bin/sg_claw.rs b/src/bin/sg_claw.rs new file mode 100644 index 0000000..3a86237 --- /dev/null +++ b/src/bin/sg_claw.rs @@ -0,0 +1,10 @@ +use std::process::ExitCode; + +fn main() -> ExitCode { + if let Err(err) = sgclaw::service::run() { + eprintln!("sg_claw failed: {err}"); + return ExitCode::FAILURE; + } + + ExitCode::SUCCESS +} diff --git a/src/bin/sg_claw_client.rs b/src/bin/sg_claw_client.rs new file mode 100644 index 0000000..80869b6 --- /dev/null +++ b/src/bin/sg_claw_client.rs @@ -0,0 +1,78 @@ +use std::io::{self, BufRead}; + +use sgclaw::service::{ClientMessage, ServiceMessage}; +use tungstenite::{connect, Message}; + +fn main() -> std::process::ExitCode { + match run() { + Ok(()) => std::process::ExitCode::SUCCESS, + Err(err) => { + eprintln!("sg_claw_client failed: {err}"); + std::process::ExitCode::FAILURE + } + } +} + +fn parse_request(input: &str) -> (ClientMessage, bool) { + match input.trim() { + "/connect" => (ClientMessage::Connect, true), + "/start" => (ClientMessage::Start, true), + "/stop" => (ClientMessage::Stop, true), + instruction => ( + ClientMessage::SubmitTask { + instruction: instruction.to_string(), + conversation_id: String::new(), + messages: vec![], + page_url: String::new(), + page_title: String::new(), + }, + false, + ), + } +} + +fn run() -> Result<(), String> { + let service_url = std::env::var("SG_CLAW_SERVICE_WS_URL") + .unwrap_or_else(|_| "ws://127.0.0.1:42321".to_string()); + let (mut socket, _) = connect(service_url.as_str()).map_err(|err| err.to_string())?; + + let mut input = String::new(); + io::stdin() + .lock() + .read_line(&mut input) + .map_err(|err| err.to_string())?; + + let (request, exit_on_status) = parse_request(&input); + + let payload = serde_json::to_string(&request).map_err(|err| err.to_string())?; + socket + .send(Message::Text(payload.into())) + .map_err(|err| err.to_string())?; + + loop { + match socket.read().map_err(|err| err.to_string())? { + Message::Text(text) => { + let message: ServiceMessage = + serde_json::from_str(&text).map_err(|err| err.to_string())?; + match message { + ServiceMessage::StatusChanged { state } => { + println!("status: {state}"); + if exit_on_status { + return Ok(()); + } + } + ServiceMessage::LogEntry { level: _, message } => { + println!("{message}"); + } + ServiceMessage::TaskComplete { success: _, summary } => { + println!("{summary}"); + return Ok(()); + } + ServiceMessage::Busy { message } => return Err(message), + } + } + Message::Close(_) => return Err("service disconnected before task completion".to_string()), + _ => {} + } + } +} diff --git a/src/bin/sgbrowser_ws_probe.rs b/src/bin/sgbrowser_ws_probe.rs new file mode 100644 index 0000000..35d64e4 --- /dev/null +++ b/src/bin/sgbrowser_ws_probe.rs @@ -0,0 +1,70 @@ +use std::env; +use std::process::ExitCode; +use std::time::Duration; + +use sgclaw::{parse_probe_args, run_probe_script, ProbeOutcome}; + +fn main() -> ExitCode { + match run() { + Ok(()) => ExitCode::SUCCESS, + Err(err) => { + eprintln!("sgbrowser_ws_probe failed: {err}"); + ExitCode::FAILURE + } + } +} + +fn run() -> Result<(), String> { + let args: Vec = env::args().skip(1).collect(); + let config = match parse_probe_args(&args) { + Ok(config) => config, + Err(err) => return Err(err.to_string()), + }; + let results = match run_probe_script( + &config.ws_url, + Duration::from_millis(config.timeout_ms), + config.steps, + ) { + Ok(results) => results, + Err(err) => return Err(err.to_string()), + }; + + for (index, result) in results.iter().enumerate() { + println!("STEP {} {}", index + 1, result.label); + println!("SEND: {}", result.sent); + match &result.outcome { + ProbeOutcome::Received(frames) => { + if frames.is_empty() { + println!("RECV: "); + } else { + for frame in frames { + println!("RECV: {}", frame); + } + } + println!("OUTCOME: received"); + } + ProbeOutcome::NoReplyExpected => { + println!("RECV: "); + println!("OUTCOME: no-reply-expected"); + } + ProbeOutcome::TimedOut => { + println!("RECV: "); + println!("OUTCOME: timeout"); + } + ProbeOutcome::Closed => { + println!("RECV: "); + println!("OUTCOME: closed"); + } + ProbeOutcome::ConnectFailed(message) => { + println!("RECV: "); + println!("OUTCOME: connect-failed"); + println!("DETAIL: {}", message); + } + } + if index + 1 < results.len() { + println!(); + } + } + + Ok(()) +} diff --git a/src/browser/backend.rs b/src/browser/backend.rs new file mode 100644 index 0000000..65afdf8 --- /dev/null +++ b/src/browser/backend.rs @@ -0,0 +1,39 @@ +use std::sync::Arc; + +use serde_json::Value; + +use crate::pipe::{Action, CommandOutput, ExecutionSurfaceMetadata, PipeError}; + +pub trait BrowserBackend: Send + Sync { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result; + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata; + + fn supports_eval(&self) -> bool { + true + } +} + +impl BrowserBackend for Arc { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result { + self.as_ref().invoke(action, params, expected_domain) + } + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata { + self.as_ref().surface_metadata() + } + + fn supports_eval(&self) -> bool { + self.as_ref().supports_eval() + } +} diff --git a/src/browser/bridge_backend.rs b/src/browser/bridge_backend.rs new file mode 100644 index 0000000..fe4366b --- /dev/null +++ b/src/browser/bridge_backend.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use serde_json::Value; + +use crate::browser::backend::BrowserBackend; +use crate::browser::bridge_contract::{BridgeBrowserActionReply, BridgeBrowserActionRequest}; +use crate::browser::bridge_transport::BridgeActionTransport; +use crate::pipe::{Action, CommandOutput, ExecutionSurfaceMetadata, PipeError}; +use crate::security::MacPolicy; + +pub struct BridgeBrowserBackend { + transport: Arc, + mac_policy: MacPolicy, + next_seq: AtomicU64, +} + +impl BridgeBrowserBackend { + pub fn new(transport: Arc, mac_policy: MacPolicy) -> Self { + Self { + transport, + mac_policy, + next_seq: AtomicU64::new(1), + } + } +} + +impl BrowserBackend for BridgeBrowserBackend { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result { + self.mac_policy.validate(&action, expected_domain)?; + + let seq = self.next_seq.fetch_add(1, Ordering::Relaxed); + let reply = self.transport.execute(BridgeBrowserActionRequest::new( + action.as_str(), + params, + expected_domain, + ))?; + + match reply { + BridgeBrowserActionReply::Success(success) => Ok(CommandOutput { + seq, + success: true, + data: success.data, + aom_snapshot: success.aom_snapshot, + timing: success.timing, + }), + BridgeBrowserActionReply::Error(error) => Err(PipeError::Protocol(format!( + "bridge action failed: {}", + error.message + ))), + } + } + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata { + self.mac_policy.privileged_surface_metadata() + } + + fn supports_eval(&self) -> bool { + self.mac_policy.supports_pipe_action(&Action::Eval) + } +} diff --git a/src/browser/bridge_contract.rs b/src/browser/bridge_contract.rs new file mode 100644 index 0000000..564d9be --- /dev/null +++ b/src/browser/bridge_contract.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::pipe::Timing; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BridgeLifecycleCall { + Connect, + Start, + Stop, + SubmitTask, +} + +impl BridgeLifecycleCall { + pub fn bridge_name(self) -> &'static str { + match self { + Self::Connect => "sgclawConnect", + Self::Start => "sgclawStart", + Self::Stop => "sgclawStop", + Self::SubmitTask => "sgclawSubmitTask", + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BridgeBrowserActionRequest { + pub action: String, + pub params: Value, + pub expected_domain: String, +} + +impl BridgeBrowserActionRequest { + pub fn new( + action: impl Into, + params: Value, + expected_domain: impl Into, + ) -> Self { + Self { + action: action.into(), + params, + expected_domain: expected_domain.into(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum BridgeBrowserActionReply { + Success(BridgeBrowserActionSuccess), + Error(BridgeBrowserActionError), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BridgeBrowserActionSuccess { + pub data: Value, + pub aom_snapshot: Vec, + pub timing: Timing, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BridgeBrowserActionError { + pub message: String, + pub details: Value, +} diff --git a/src/browser/bridge_transport.rs b/src/browser/bridge_transport.rs new file mode 100644 index 0000000..b5e4a9f --- /dev/null +++ b/src/browser/bridge_transport.rs @@ -0,0 +1,9 @@ +use crate::browser::bridge_contract::{BridgeBrowserActionReply, BridgeBrowserActionRequest}; +use crate::pipe::PipeError; + +pub trait BridgeActionTransport: Send + Sync { + fn execute( + &self, + request: BridgeBrowserActionRequest, + ) -> Result; +} diff --git a/src/browser/callback_backend.rs b/src/browser/callback_backend.rs new file mode 100644 index 0000000..a7d5229 --- /dev/null +++ b/src/browser/callback_backend.rs @@ -0,0 +1,301 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::browser::backend::BrowserBackend; +use crate::pipe::{Action, CommandOutput, ExecutionSurfaceMetadata, PipeError, Timing}; +use crate::security::MacPolicy; + +const NAVIGATE_CALLBACK_NAME: &str = "sgclawOnLoaded"; +const GET_TEXT_CALLBACK_NAME: &str = "sgclawOnGetText"; +const EVAL_CALLBACK_NAME: &str = "sgclawOnEval"; +const SHOW_AREA: &str = "show"; + +pub trait BrowserCallbackHost: Send + Sync { + fn execute(&self, request: BrowserCallbackRequest) -> Result; +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BrowserCallbackRequest { + pub seq: u64, + pub request_url: String, + pub expected_domain: String, + pub action: String, + pub command: Value, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum BrowserCallbackResponse { + Success(BrowserCallbackSuccess), + Error(BrowserCallbackError), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BrowserCallbackSuccess { + pub success: bool, + pub data: Value, + pub aom_snapshot: Vec, + pub timing: Timing, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BrowserCallbackError { + pub message: String, + pub details: Value, +} + +pub struct BrowserCallbackBackend { + host: Arc, + mac_policy: MacPolicy, + helper_page_url: String, + current_target_url: Mutex>, + next_seq: AtomicU64, +} + +impl BrowserCallbackBackend { + pub fn new( + host: Arc, + mac_policy: MacPolicy, + helper_page_url: impl Into, + ) -> Self { + Self { + host, + mac_policy, + helper_page_url: helper_page_url.into(), + current_target_url: Mutex::new(None), + next_seq: AtomicU64::new(1), + } + } + + fn build_command(&self, action: &Action, params: &Value) -> Result { + match action { + Action::Navigate => { + let target_url = required_string(params, "url")?; + // Use sgBrowerserOpenPage to open the target URL in a **new** + // visible browser tab. This keeps the helper page alive so its + // WebSocket connection, command polling, and callback functions + // remain functional for subsequent GetText / Eval commands. + // + // sgBrowserCallAfterLoaded would navigate the helper page tab + // itself to the target URL, destroying all helper-page JS + // context and making further communication impossible. + // + // sgBrowerserOpenPage does not fire a JS callback; the callback + // host will treat the navigate action as fire-and-forget and + // return success once the command has been forwarded. + Ok(json!([ + self.helper_page_url, + "sgBrowerserOpenPage", + target_url, + ])) + } + Action::GetText => { + let target_url = self.target_url(action, params)?; + let domain = extract_domain(&target_url)?; + let selector = required_string(params, "selector")?; + let js_code = build_get_text_js(&self.helper_page_url, &selector); + // Use sgBrowserExcuteJsCodeByDomain (API #25) which matches + // pages by domain rather than exact URL. This is far more + // robust than sgBrowserExcuteJsCodeByArea because the actual + // page URL may differ from what we navigated to (redirects, + // query parameters, etc.). + Ok(json!([ + self.helper_page_url, + "sgBrowserExcuteJsCodeByDomain", + domain, + js_code, + SHOW_AREA, + ])) + } + Action::Eval => { + let target_url = self.target_url(action, params)?; + let domain = extract_domain(&target_url)?; + let script = required_string(params, "script")?; + let js_code = build_eval_js(&self.helper_page_url, &script); + Ok(json!([ + self.helper_page_url, + "sgBrowserExcuteJsCodeByDomain", + domain, + js_code, + SHOW_AREA, + ])) + } + _ => Err(PipeError::Protocol(format!( + "unsupported callback-host browser action: {}", + action.as_str() + ))), + } + } + + fn target_url(&self, action: &Action, params: &Value) -> Result { + if let Some(target_url) = params + .get("target_url") + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToString::to_string) + { + return Ok(target_url); + } + + self.current_target_url + .lock() + .map_err(|_| PipeError::Protocol("callback backend target url lock poisoned".to_string()))? + .clone() + .ok_or_else(|| PipeError::Protocol(format!("target_url is required for {}", action.as_str()))) + } +} + +impl BrowserBackend for BrowserCallbackBackend { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result { + self.mac_policy.validate(&action, expected_domain)?; + + let seq = self.next_seq.fetch_add(1, Ordering::Relaxed); + let reply = self.host.execute(BrowserCallbackRequest { + seq, + request_url: self.helper_page_url.clone(), + expected_domain: expected_domain.to_string(), + action: action.as_str().to_string(), + command: self.build_command(&action, ¶ms)?, + })?; + + match reply { + BrowserCallbackResponse::Success(success) => { + if matches!(action, Action::Navigate) { + if let Some(url) = params + .get("url") + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + { + *self.current_target_url.lock().map_err(|_| { + PipeError::Protocol("callback backend target url lock poisoned".to_string()) + })? = Some(url.to_string()); + } + } + Ok(CommandOutput { + seq, + success: success.success, + data: success.data, + aom_snapshot: success.aom_snapshot, + timing: success.timing, + }) + } + BrowserCallbackResponse::Error(error) => Err(PipeError::Protocol(format!( + "callback host browser action failed: {} ({})", + error.message, error.details + ))), + } + } + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata { + self.mac_policy.privileged_surface_metadata() + } + + fn supports_eval(&self) -> bool { + self.mac_policy.supports_pipe_action(&Action::Eval) + } +} + +fn required_string(params: &Value, key: &str) -> Result { + params + .get(key) + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToString::to_string) + .ok_or_else(|| PipeError::Protocol(format!("{key} is required"))) +} + +fn build_get_text_js(source_url: &str, selector: &str) -> String { + let escaped_source_url = escape_js_single_quoted(source_url); + let escaped_selector = escape_js_single_quoted(selector); + let callback = GET_TEXT_CALLBACK_NAME; + let events_url = escape_js_single_quoted(&events_endpoint_url(source_url)); + + // Three delivery paths for getting the result back to the callback host: + // + // 1. callBackJsToCpp (API #40) — browser-native IPC that routes the + // callback function to the helper page. + // 2. XMLHttpRequest POST to callback host — localhost (127.0.0.1) is + // exempt from mixed-content restrictions in Chromium. + // 3. navigator.sendBeacon fallback — same localhost exemption. + // + // The XHR / sendBeacon paths POST the event DIRECTLY in the format the + // callback host expects (callback="sgclawOnGetText", payload={text:...}) + // so normalize_callback_result can process it via Path A. + format!( + "(function(){{try{{\ + var el=document.querySelector('{escaped_selector}');\ + var t=el?((el.innerText||el.textContent||'').trim()):'';\ + try{{callBackJsToCpp('{escaped_source_url}@_@'+window.location.href+'@_@{callback}@_@sgBrowserExcuteJsCodeByDomain@_@'+t)}}catch(_){{}}\ + var j=JSON.stringify({{type:'callback',callback:'{callback}',request_url:'{escaped_source_url}',payload:{{text:t}}}});\ + try{{var r=new XMLHttpRequest();r.open('POST','{events_url}',true);r.setRequestHeader('Content-Type','application/json');r.send(j)}}catch(_){{}}\ + try{{navigator.sendBeacon('{events_url}',new Blob([j],{{type:'application/json'}}))}}catch(_){{}}\ + }}catch(e){{}}}})()" + ) +} + +fn build_eval_js(source_url: &str, script: &str) -> String { + let escaped_source_url = escape_js_single_quoted(source_url); + let callback = EVAL_CALLBACK_NAME; + let events_url = escape_js_single_quoted(&events_endpoint_url(source_url)); + + format!( + "(function(){{try{{var v=(function(){{return {script}}})();\ + var t=(typeof v==='string')?v:JSON.stringify(v);\ + try{{callBackJsToCpp('{escaped_source_url}@_@'+window.location.href+'@_@{callback}@_@sgBrowserExcuteJsCodeByDomain@_@'+(t??''))}}catch(_){{}}\ + var j=JSON.stringify({{type:'callback',callback:'{callback}',request_url:'{escaped_source_url}',payload:{{value:(t??'')}}}});\ + try{{var r=new XMLHttpRequest();r.open('POST','{events_url}',true);r.setRequestHeader('Content-Type','application/json');r.send(j)}}catch(_){{}}\ + try{{navigator.sendBeacon('{events_url}',new Blob([j],{{type:'application/json'}}))}}catch(_){{}}\ + }}catch(e){{}}}})()" + ) +} + +/// Derive the callback host events endpoint URL from the helper page URL. +/// e.g. "http://127.0.0.1:62819/sgclaw/browser-helper.html" +/// → "http://127.0.0.1:62819/sgclaw/callback/events" +fn events_endpoint_url(helper_page_url: &str) -> String { + let origin = helper_page_url + .find("://") + .and_then(|scheme_end| { + helper_page_url[scheme_end + 3..] + .find('/') + .map(|path_start| &helper_page_url[..scheme_end + 3 + path_start]) + }) + .unwrap_or(helper_page_url); + format!("{origin}/sgclaw/callback/events") +} + +/// Extract the domain from a URL. +/// e.g. "https://www.zhihu.com/hot" → "www.zhihu.com" +fn extract_domain(url: &str) -> Result { + let after_scheme = url + .find("://") + .map(|i| &url[i + 3..]) + .unwrap_or(url); + let domain = after_scheme + .split('/') + .next() + .unwrap_or(after_scheme) + .split(':') + .next() + .unwrap_or(after_scheme); + if domain.is_empty() { + return Err(PipeError::Protocol(format!( + "failed to extract domain from URL: {url}" + ))); + } + Ok(domain.to_string()) +} + +fn escape_js_single_quoted(raw: &str) -> String { + raw.replace('\\', "\\\\").replace('\'', "\\'") +} diff --git a/src/browser/callback_host.rs b/src/browser/callback_host.rs new file mode 100644 index 0000000..10ce627 --- /dev/null +++ b/src/browser/callback_host.rs @@ -0,0 +1,1105 @@ +use std::collections::VecDeque; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tungstenite::{connect, Message}; + +use crate::browser::callback_backend::{ + BrowserCallbackError, BrowserCallbackHost as BrowserCallbackExecutor, BrowserCallbackRequest, + BrowserCallbackResponse, BrowserCallbackSuccess, +}; +use crate::pipe::{PipeError, Timing}; + +const DEFAULT_LOOPBACK_ORIGIN: &str = "http://127.0.0.1:17888"; +const DEFAULT_BROWSER_WS_URL: &str = "ws://127.0.0.1:12345"; +const HELPER_PAGE_PATH: &str = "/sgclaw/browser-helper.html"; +const READY_ENDPOINT_PATH: &str = "/sgclaw/callback/ready"; +const EVENTS_ENDPOINT_PATH: &str = "/sgclaw/callback/events"; +const COMMANDS_ENDPOINT_PATH: &str = "/sgclaw/callback/commands/next"; +const COMMAND_ACK_ENDPOINT_PATH: &str = "/sgclaw/callback/commands/ack"; +const COMMAND_POLL_INTERVAL: Duration = Duration::from_millis(25); +const HELPER_POLL_INTERVAL: Duration = Duration::from_millis(50); +const HELPER_BOOTSTRAP_ACTION: &str = "sgBrowerserOpenPage"; +const NAVIGATE_CALLBACK_NAME: &str = "sgclawOnLoaded"; +const GET_TEXT_CALLBACK_NAME: &str = "sgclawOnGetText"; +const EVAL_CALLBACK_NAME: &str = "sgclawOnEval"; + +#[derive(Debug)] +pub(crate) struct BrowserCallbackHost { + helper_url: String, + helper_page_html: String, + state: Mutex, +} + +#[derive(Debug)] +pub(crate) struct LiveBrowserCallbackHost { + host: Arc, + shutdown: Arc, + server_thread: Mutex>>, + command_lock: Mutex<()>, + result_timeout: Duration, +} + +#[derive(Debug, Default)] +struct CallbackHostState { + ready: bool, + pending_ready_event: Option, + pending_results: VecDeque, + pending_commands: VecDeque, + in_flight_command: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub(crate) enum CallbackEvent { + Ready { helper_url: Option }, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct CallbackCommand { + pub action: String, + #[serde(default)] + pub args: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct CallbackCommandEnvelope { + pub ok: bool, + pub command: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct CallbackResult { + pub callback: String, + pub request_url: String, + #[serde(default)] + pub target_url: Option, + #[serde(default)] + pub action: Option, + pub payload: Value, +} + +#[derive(Debug, Deserialize)] +struct IncomingReadyEvent { + #[allow(dead_code)] + #[serde(default)] + r#type: Option, + #[serde(default)] + helper_url: Option, +} + +#[derive(Debug, Deserialize)] +struct IncomingCallbackEvent { + callback: String, + request_url: String, + #[serde(default)] + target_url: Option, + #[serde(default)] + action: Option, + payload: Value, +} + +#[derive(Debug)] +struct HttpRequest { + method: String, + path: String, + body: Vec, +} + +#[derive(Debug)] +struct ParsedCallbackJsPayload { + callback: String, + response_text: String, +} + +impl BrowserCallbackHost { + pub(crate) fn new() -> Self { + Self::with_urls(DEFAULT_LOOPBACK_ORIGIN, DEFAULT_BROWSER_WS_URL) + } + + pub(crate) fn with_urls( + loopback_origin: impl AsRef, + browser_ws_url: impl AsRef, + ) -> Self { + let origin = normalize_loopback_origin(loopback_origin.as_ref()); + let browser_ws_url = browser_ws_url.as_ref().to_string(); + let helper_url = format!("{origin}{HELPER_PAGE_PATH}"); + let helper_page_html = build_helper_page_html(&origin, &helper_url, &browser_ws_url); + + Self { + helper_url, + helper_page_html, + state: Mutex::new(CallbackHostState::default()), + } + } + + pub(crate) fn helper_url(&self) -> &str { + &self.helper_url + } + + pub(crate) fn helper_page_html(&self) -> &str { + &self.helper_page_html + } + + pub(crate) fn is_ready(&self) -> bool { + self.state.lock().unwrap().ready + } + + pub(crate) fn mark_ready(&self, helper_url: Option) { + let mut state = self.state.lock().unwrap(); + if state.ready { + return; + } + + state.ready = true; + state.pending_ready_event = Some(CallbackEvent::Ready { helper_url }); + } + + pub(crate) fn take_ready_event(&self) -> Option { + self.state.lock().unwrap().pending_ready_event.take() + } + + pub(crate) fn push_result(&self, result: CallbackResult) { + self.state.lock().unwrap().pending_results.push_back(result); + } + + pub(crate) fn take_result(&self) -> Option { + self.state.lock().unwrap().pending_results.pop_front() + } + + pub(crate) fn clear_results(&self) { + self.state.lock().unwrap().pending_results.clear(); + } + + pub(crate) fn enqueue_command(&self, command: CallbackCommand) { + self.state.lock().unwrap().pending_commands.push_back(command); + } + + pub(crate) fn current_command_envelope(&self) -> CallbackCommandEnvelope { + let mut state = self.state.lock().unwrap(); + if state.in_flight_command.is_none() { + state.in_flight_command = state.pending_commands.pop_front(); + } + + CallbackCommandEnvelope { + ok: state.in_flight_command.is_some(), + command: state.in_flight_command.clone(), + } + } + + pub(crate) fn acknowledge_in_flight_command(&self) -> Option { + self.state.lock().unwrap().in_flight_command.take() + } +} + +impl LiveBrowserCallbackHost { + pub(crate) fn start_with_browser_ws_url( + browser_ws_url: &str, + bootstrap_request_url: &str, + ready_timeout: Duration, + result_timeout: Duration, + ) -> Result { + let listener = TcpListener::bind("127.0.0.1:0").map_err(|err| { + PipeError::Protocol(format!("failed to bind callback host listener: {err}")) + })?; + listener.set_nonblocking(true).map_err(|err| { + PipeError::Protocol(format!("failed to configure callback host listener: {err}")) + })?; + let origin = format!( + "http://{}", + listener.local_addr().map_err(|err| { + PipeError::Protocol(format!( + "failed to resolve callback host listener address: {err}" + )) + })? + ); + let host = Arc::new(BrowserCallbackHost::with_urls(&origin, browser_ws_url)); + let shutdown = Arc::new(AtomicBool::new(false)); + let thread_host = host.clone(); + let thread_shutdown = shutdown.clone(); + let server_thread = thread::spawn(move || serve_loop(listener, thread_host, thread_shutdown)); + + bootstrap_helper_page(browser_ws_url, bootstrap_request_url, host.helper_url())?; + wait_for_helper_ready(host.as_ref(), ready_timeout)?; + + let live_host = Self { + host, + shutdown, + server_thread: Mutex::new(Some(server_thread)), + command_lock: Mutex::new(()), + result_timeout, + }; + Ok(live_host) + } + + pub(crate) fn helper_url(&self) -> &str { + self.host.helper_url() + } +} + +impl BrowserCallbackExecutor for LiveBrowserCallbackHost { + fn execute(&self, request: BrowserCallbackRequest) -> Result { + let _command_guard = self.command_lock.lock().unwrap(); + self.host.clear_results(); + self.host.enqueue_command(command_from_request(&request.command)?); + + // Navigate uses sgBrowerserOpenPage which opens a new tab without a JS + // callback. We only wait long enough for the helper page to pick up the + // command via its 250 ms poll interval and forward it over WebSocket. + // The caller (workflow executor) polls for page readiness separately. + let is_fire_and_forget = request.action == "navigate"; + let timeout = if is_fire_and_forget { + Duration::from_millis(1500) + } else { + self.result_timeout + }; + + eprintln!( + "callback_host: execute action={} fire_and_forget={} timeout={:?}", + request.action, is_fire_and_forget, timeout + ); + + let started = Instant::now(); + while started.elapsed() < timeout { + if let Some(result) = self.host.take_result() { + eprintln!( + "callback_host: received callback={} payload_keys={:?}", + result.callback, + result.payload.as_object().map(|m| m.keys().collect::>()) + ); + if let Some(response) = + normalize_callback_result(&request, result, started.elapsed()) + { + return Ok(response); + } + eprintln!("callback_host: callback did not match action={}, continuing to wait", request.action); + } + thread::sleep(COMMAND_POLL_INTERVAL); + } + + if is_fire_and_forget { + return Ok(BrowserCallbackResponse::Success(BrowserCallbackSuccess { + success: true, + data: json!({ "loaded": true }), + aom_snapshot: vec![], + timing: elapsed_timing(started.elapsed()), + })); + } + + eprintln!( + "callback_host: timeout waiting for callback on action={} after {:?}", + request.action, + started.elapsed() + ); + Err(PipeError::Timeout) + } +} + +impl Drop for LiveBrowserCallbackHost { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + if let Some(handle) = self.server_thread.lock().unwrap().take() { + let _ = handle.join(); + } + } +} + +impl Default for BrowserCallbackHost { + fn default() -> Self { + Self::new() + } +} + +fn normalize_loopback_origin(origin: &str) -> String { + origin.trim_end_matches('/').to_string() +} + +fn bootstrap_helper_page(browser_ws_url: &str, request_url: &str, helper_url: &str) -> Result<(), PipeError> { + eprintln!("callback_host: connecting to browser ws {browser_ws_url}"); + let (mut websocket, _) = connect(browser_ws_url) + .map_err(|err| PipeError::Protocol(format!("browser websocket connect failed: {err}")))?; + configure_bootstrap_socket(&mut websocket)?; + websocket + .send(Message::Text( + r#"{"type":"register","role":"web"}"#.to_string().into(), + )) + .map_err(|err| PipeError::Protocol(format!("browser websocket register failed: {err}")))?; + let _ = recv_bootstrap_prelude(&mut websocket); + let payload = json!([ + request_url, + HELPER_BOOTSTRAP_ACTION, + helper_url, + ]) + .to_string(); + eprintln!("callback_host: sending bootstrap command: {payload}"); + websocket + .send(Message::Text(payload.into())) + .map_err(|err| PipeError::Protocol(format!("helper bootstrap send failed: {err}")))?; + eprintln!("callback_host: bootstrap command sent, waiting for helper page at {helper_url}"); + Ok(()) +} + +fn recv_bootstrap_prelude( + websocket: &mut tungstenite::WebSocket>, +) -> Result<(), PipeError> { + loop { + match websocket.read() { + Ok(Message::Text(_)) | Ok(Message::Binary(_)) | Ok(Message::Frame(_)) => return Ok(()), + Ok(Message::Ping(payload)) => websocket + .send(Message::Pong(payload)) + .map_err(|err| PipeError::Protocol(format!("browser websocket pong failed: {err}")))?, + Ok(Message::Pong(_)) => {} + Ok(Message::Close(_)) => return Err(PipeError::PipeClosed), + Err(tungstenite::Error::ConnectionClosed) | Err(tungstenite::Error::AlreadyClosed) => { + return Err(PipeError::PipeClosed) + } + Err(tungstenite::Error::Io(err)) + if matches!( + err.kind(), + std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock + ) => + { + return Ok(()); + } + Err(err) => { + return Err(PipeError::Protocol(format!( + "browser websocket bootstrap read failed: {err}" + ))); + } + } + } +} + +fn configure_bootstrap_socket( + websocket: &mut tungstenite::WebSocket>, +) -> Result<(), PipeError> { + match websocket.get_mut() { + tungstenite::stream::MaybeTlsStream::Plain(stream) => { + stream.set_read_timeout(Some(Duration::from_secs(1)))?; + stream.set_write_timeout(Some(Duration::from_secs(1)))?; + Ok(()) + } + _ => Ok(()), + } +} + +fn wait_for_helper_ready(host: &BrowserCallbackHost, ready_timeout: Duration) -> Result<(), PipeError> { + let started = Instant::now(); + while started.elapsed() < ready_timeout { + if host.is_ready() { + eprintln!("callback_host: helper page ready after {:?}", started.elapsed()); + return Ok(()); + } + thread::sleep(HELPER_POLL_INTERVAL); + } + + eprintln!( + "callback_host: helper page did NOT become ready within {:?} — the browser may have \ + ignored the sgBrowerserOpenPage command or could not reach the helper URL", + ready_timeout, + ); + Err(PipeError::Timeout) +} + +fn serve_loop(listener: TcpListener, host: Arc, shutdown: Arc) { + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((mut stream, _)) => { + let _ = handle_request(&mut stream, host.as_ref()); + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(COMMAND_POLL_INTERVAL); + } + Err(_) => { + thread::sleep(COMMAND_POLL_INTERVAL); + } + } + } +} + +fn handle_request(stream: &mut TcpStream, host: &BrowserCallbackHost) -> Result<(), PipeError> { + let request = read_http_request(stream)?; + + // Handle CORS preflight requests from cross-origin pages (e.g. JS injected + // into zhihu.com that POSTs results back to this loopback server). + if request.method == "OPTIONS" { + return write_cors_preflight(stream); + } + + match (request.method.as_str(), request.path.as_str()) { + ("GET", HELPER_PAGE_PATH) => write_http_response( + stream, + 200, + "text/html; charset=utf-8", + host.helper_page_html().as_bytes(), + ), + ("POST", READY_ENDPOINT_PATH) => { + let payload: IncomingReadyEvent = serde_json::from_slice(&request.body).map_err(|err| { + PipeError::Protocol(format!("invalid callback host ready payload: {err}")) + })?; + host.mark_ready(payload.helper_url); + write_json_response(stream, &json!({ "ok": true })) + } + ("POST", EVENTS_ENDPOINT_PATH) => { + let payload: IncomingCallbackEvent = serde_json::from_slice(&request.body).map_err(|err| { + PipeError::Protocol(format!("invalid callback host event payload: {err}")) + })?; + eprintln!( + "callback_host: received event callback={} request_url={}", + payload.callback, + payload.request_url + ); + host.push_result(CallbackResult { + callback: payload.callback, + request_url: payload.request_url, + target_url: payload.target_url, + action: payload.action, + payload: payload.payload, + }); + write_json_response(stream, &json!({ "ok": true })) + } + ("GET", COMMANDS_ENDPOINT_PATH) => { + let envelope = host.current_command_envelope(); + if envelope.ok { + if let Some(ref cmd) = envelope.command { + eprintln!( + "callback_host: delivering command to helper action={} args_count={}", + cmd.action, + cmd.args.len() + ); + } + } + write_json_response(stream, &envelope) + } + ("POST", COMMAND_ACK_ENDPOINT_PATH) => { + let acked = host.acknowledge_in_flight_command(); + if let Some(ref cmd) = acked { + eprintln!("callback_host: command ACKed by helper action={}", cmd.action); + } + write_json_response(stream, &json!({ "ok": true })) + } + _ => write_http_response(stream, 404, "text/plain; charset=utf-8", b"not found"), + } +} + +fn read_http_request(stream: &mut TcpStream) -> Result { + let mut buffer = Vec::new(); + let mut headers_end = None; + + while headers_end.is_none() { + let mut chunk = [0_u8; 1024]; + let bytes = stream.read(&mut chunk).map_err(|err| { + PipeError::Protocol(format!("failed to read callback host request headers: {err}")) + })?; + if bytes == 0 { + return Err(PipeError::PipeClosed); + } + buffer.extend_from_slice(&chunk[..bytes]); + headers_end = buffer.windows(4).position(|window| window == b"\r\n\r\n"); + } + + let headers_end = headers_end.expect("headers end must exist") + 4; + let headers = String::from_utf8(buffer[..headers_end].to_vec()).map_err(|err| { + PipeError::Protocol(format!("invalid callback host request headers: {err}")) + })?; + let mut lines = headers.lines(); + let request_line = lines + .next() + .ok_or_else(|| PipeError::Protocol("missing callback host request line".to_string()))?; + let mut request_parts = request_line.split_whitespace(); + let method = request_parts + .next() + .ok_or_else(|| PipeError::Protocol("missing callback host request method".to_string()))? + .to_string(); + let path = request_parts + .next() + .ok_or_else(|| PipeError::Protocol("missing callback host request path".to_string()))? + .to_string(); + let content_length = lines + .find_map(|line| { + let (name, value) = line.split_once(':')?; + name.eq_ignore_ascii_case("content-length") + .then(|| value.trim().parse::().ok()) + .flatten() + }) + .unwrap_or(0); + + while buffer.len() < headers_end + content_length { + let mut chunk = vec![0_u8; content_length.max(1024)]; + let bytes = stream.read(&mut chunk).map_err(|err| { + PipeError::Protocol(format!("failed to read callback host request body: {err}")) + })?; + if bytes == 0 { + return Err(PipeError::PipeClosed); + } + buffer.extend_from_slice(&chunk[..bytes]); + } + + Ok(HttpRequest { + method, + path, + body: buffer[headers_end..headers_end + content_length].to_vec(), + }) +} + +fn write_json_response(stream: &mut TcpStream, payload: &impl Serialize) -> Result<(), PipeError> { + let body = serde_json::to_vec(payload).map_err(|err| { + PipeError::Protocol(format!("failed to serialize callback host response: {err}")) + })?; + write_http_response(stream, 200, "application/json", &body) +} + +fn write_http_response( + stream: &mut TcpStream, + status_code: u16, + content_type: &str, + body: &[u8], +) -> Result<(), PipeError> { + let status_text = match status_code { + 200 => "OK", + 404 => "Not Found", + _ => "OK", + }; + let headers = format!( + "HTTP/1.1 {status_code} {status_text}\r\n\ + Content-Type: {content_type}\r\n\ + Content-Length: {}\r\n\ + Access-Control-Allow-Origin: *\r\n\ + Connection: close\r\n\r\n", + body.len() + ); + stream + .write_all(headers.as_bytes()) + .and_then(|_| stream.write_all(body)) + .and_then(|_| stream.flush()) + .map_err(|err| PipeError::Protocol(format!("failed to write callback host response: {err}"))) +} + +fn write_cors_preflight(stream: &mut TcpStream) -> Result<(), PipeError> { + let headers = "HTTP/1.1 204 No Content\r\n\ + Access-Control-Allow-Origin: *\r\n\ + Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n\ + Access-Control-Allow-Headers: content-type\r\n\ + Access-Control-Max-Age: 86400\r\n\ + Content-Length: 0\r\n\ + Connection: close\r\n\r\n"; + stream + .write_all(headers.as_bytes()) + .and_then(|_| stream.flush()) + .map_err(|err| PipeError::Protocol(format!("failed to write CORS preflight response: {err}"))) +} + +fn command_from_request(command: &Value) -> Result { + let values = command.as_array().ok_or_else(|| { + PipeError::Protocol(format!("callback host command must be an array, got {command}")) + })?; + if values.len() < 2 { + return Err(PipeError::Protocol(format!( + "callback host command must include request_url and action, got {command}" + ))); + } + let action = values[1] + .as_str() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| { + PipeError::Protocol(format!("callback host command action is invalid: {command}")) + })? + .to_string(); + Ok(CallbackCommand { + action, + args: values[2..].to_vec(), + }) +} + +fn normalize_callback_result( + request: &BrowserCallbackRequest, + result: CallbackResult, + elapsed: Duration, +) -> Option { + match request.action.as_str() { + "navigate" if result.callback == NAVIGATE_CALLBACK_NAME => { + Some(BrowserCallbackResponse::Success(BrowserCallbackSuccess { + success: true, + data: json!({ + "loaded": true, + "target_url": result.target_url, + }), + aom_snapshot: vec![], + timing: elapsed_timing(elapsed), + })) + } + // Path A: The browser's native callBackJsToCpp routes the callback to + // the helper page and calls sgclawOnGetText / sgclawOnEval directly. + // The helper page POSTs to the events endpoint with the callback name + // and payload (e.g. { text: "..." } or { value: "..." }). + "getText" if result.callback == GET_TEXT_CALLBACK_NAME => { + let text = result.payload.get("text").and_then(Value::as_str)?; + Some(BrowserCallbackResponse::Success(BrowserCallbackSuccess { + success: true, + data: json!({ "text": text }), + aom_snapshot: vec![], + timing: elapsed_timing(elapsed), + })) + } + "eval" if result.callback == EVAL_CALLBACK_NAME => { + let value = result.payload.get("value").and_then(Value::as_str)?; + Some(BrowserCallbackResponse::Success(BrowserCallbackSuccess { + success: true, + data: json!({ "text": value }), + aom_snapshot: vec![], + timing: elapsed_timing(elapsed), + })) + } + // Path B: The browser's native callBackJsToCpp calls the helper page's + // callBackJsToCpp function with the @_@ delimited string. The helper + // page parses it and POSTs to the events endpoint with callback: + // "callBackJsToCpp" and payload: { raw: "..." }. + "getText" | "eval" if result.callback == "callBackJsToCpp" => { + let raw = result.payload.get("raw").and_then(Value::as_str)?; + let parsed = match parse_callback_js_payload(raw) { + Ok(parsed) => parsed, + Err(message) => { + return Some(BrowserCallbackResponse::Error(BrowserCallbackError { + message, + details: result.payload, + })) + } + }; + let expected_callback = expected_callback_name(&request.action).ok()?; + if parsed.callback != expected_callback { + return None; + } + Some(BrowserCallbackResponse::Success(BrowserCallbackSuccess { + success: true, + data: json!({ "text": parsed.response_text }), + aom_snapshot: vec![], + timing: elapsed_timing(elapsed), + })) + } + _ => None, + } +} + +fn parse_callback_js_payload(raw: &str) -> Result { + let mut parts = raw.splitn(5, "@_@"); + let _source_url = parts + .next() + .ok_or_else(|| "missing callback source_url segment".to_string())?; + let _target_url = parts + .next() + .ok_or_else(|| "missing callback target_url segment".to_string())?; + let callback = parts + .next() + .ok_or_else(|| "missing callback name segment".to_string())?; + let _action_url = parts + .next() + .ok_or_else(|| "missing callback action_url segment".to_string())?; + let response_text = parts + .next() + .ok_or_else(|| "missing callback response_text segment".to_string())?; + Ok(ParsedCallbackJsPayload { + callback: callback.to_string(), + response_text: response_text.to_string(), + }) +} + +fn expected_callback_name(action: &str) -> Result<&'static str, PipeError> { + match action { + "navigate" => Ok(NAVIGATE_CALLBACK_NAME), + "getText" => Ok(GET_TEXT_CALLBACK_NAME), + "eval" => Ok(EVAL_CALLBACK_NAME), + other => Err(PipeError::Protocol(format!( + "unsupported callback host action result normalization: {other}" + ))), + } +} + +fn elapsed_timing(elapsed: Duration) -> Timing { + Timing { + queue_ms: 0, + exec_ms: elapsed.as_millis() as u64, + } +} + +fn build_helper_page_html(loopback_origin: &str, helper_url: &str, browser_ws_url: &str) -> String { + format!( + r#" + + + + sgClaw Browser Helper + + + + + +"# + ) +} + +#[cfg(test)] +mod tests { + use super::{ + BrowserCallbackHost, CallbackCommand, CallbackCommandEnvelope, CallbackEvent, + CallbackResult, LiveBrowserCallbackHost, + }; + use serde_json::json; + use std::net::TcpListener; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::Duration; + use tungstenite::{accept, Message}; + + fn start_fake_browser_status_server() -> (String, Arc>>, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap(); + let frames = Arc::new(Mutex::new(Vec::new())); + let frames_for_thread = Arc::clone(&frames); + + // Use a blocking accept so the thread waits for a connection reliably. + // On Windows, non-blocking listeners can cause the accepted stream to + // inherit non-blocking mode, making tungstenite reads return WouldBlock + // immediately. + let handle = thread::spawn(move || { + let (stream, _) = listener.accept().expect("fake browser ws server accept"); + stream.set_nonblocking(false).unwrap(); + stream + .set_read_timeout(Some(Duration::from_secs(3))) + .unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(3))) + .unwrap(); + let mut socket = accept(stream).unwrap(); + // Send welcome banner proactively (like the real browser does). + let _ = socket.send(Message::Text( + r#"{"type":"welcome","client_id":1,"server_time":"2026-04-04T00:00:00"}"# + .to_string() + .into(), + )); + loop { + match socket.read() { + Ok(Message::Text(text)) => { + frames_for_thread.lock().unwrap().push(text.to_string()); + } + Ok(Message::Ping(payload)) => { + let _ = socket.send(Message::Pong(payload)); + } + Ok(Message::Close(_)) => break, + Ok(_) => {} + Err(tungstenite::Error::ConnectionClosed) + | Err(tungstenite::Error::AlreadyClosed) => break, + Err(tungstenite::Error::Io(err)) + if matches!( + err.kind(), + std::io::ErrorKind::WouldBlock + | std::io::ErrorKind::TimedOut + ) => + { + break; + } + Err(err) => { + eprintln!("fake browser ws server read: {err}"); + break; + } + } + } + }); + + (format!("ws://{address}"), frames, handle) + } + + #[test] + fn live_callback_host_sends_bootstrap_open_page_command() { + let (ws_url, frames, handle) = start_fake_browser_status_server(); + + // The helper page will not actually load in a test environment, so + // start_with_browser_ws_url returns Err(Timeout) from wait_for_helper_ready. + // We still verify that the bootstrap command was sent correctly. + let result = LiveBrowserCallbackHost::start_with_browser_ws_url( + &ws_url, + "https://www.zhihu.com", + Duration::from_millis(100), + Duration::from_millis(50), + ); + assert!(result.is_err(), "expected timeout because no real helper page loads"); + drop(result); + handle.join().unwrap(); + + let sent = frames.lock().unwrap().clone(); + assert!( + sent.iter().any(|frame| frame.contains("sgBrowerserOpenPage")), + "bootstrap should send sgBrowerserOpenPage to the browser WS; sent frames: {sent:?}" + ); + assert!( + sent.iter().any(|frame| frame.contains("/sgclaw/browser-helper.html")), + "bootstrap should include the helper page URL; sent frames: {sent:?}" + ); + assert!( + sent.iter().any(|frame| frame.contains("https://www.zhihu.com")), + "bootstrap requestUrl should be the provided page URL; sent frames: {sent:?}" + ); + } + + #[test] + fn callback_host_exposes_loopback_helper_url_and_release_helper_html() { + let host = BrowserCallbackHost::new(); + + assert_eq!( + host.helper_url(), + "http://127.0.0.1:17888/sgclaw/browser-helper.html" + ); + + let html = host.helper_page_html(); + assert!(html.contains("ws://127.0.0.1:12345")); + assert!(html.contains(r#"JSON.stringify({ type: 'register', role: 'web' })"#)); + assert!(html.contains("sgclawReady")); + assert!(html.contains("sgclawOnLoaded")); + assert!(html.contains("sgclawOnGetText")); + assert!(html.contains("sgclawOnEval")); + assert!(html.contains("/sgclaw/callback/ready")); + assert!(html.contains("/sgclaw/callback/events")); + assert!(html.contains("/sgclaw/callback/commands/next")); + assert!(html.contains("/sgclaw/callback/commands/ack")); + } + + #[test] + fn callback_host_tracks_ready_state_only_once() { + let host = BrowserCallbackHost::new(); + + assert!(!host.is_ready()); + assert!(host.take_ready_event().is_none()); + + host.mark_ready(Some("http://127.0.0.1/helper.html".to_string())); + assert!(host.is_ready()); + assert_eq!( + host.take_ready_event(), + Some(CallbackEvent::Ready { + helper_url: Some("http://127.0.0.1/helper.html".to_string()), + }) + ); + + host.mark_ready(Some("http://127.0.0.1/ignored.html".to_string())); + assert!(host.take_ready_event().is_none()); + } + + #[test] + fn callback_host_queues_structured_callback_results_for_later_consumption() { + let host = BrowserCallbackHost::new(); + + host.push_result(CallbackResult { + callback: "sgclawOnGetText".to_string(), + request_url: "http://127.0.0.1/helper.html".to_string(), + target_url: Some("https://example.com/page".to_string()), + action: Some("sgBrowserExcuteJsFun".to_string()), + payload: json!({ + "text": "hello", + "meta": { "source": "page" } + }), + }); + host.push_result(CallbackResult { + callback: "sgclawOnEval".to_string(), + request_url: "http://127.0.0.1/helper.html".to_string(), + target_url: None, + action: Some("callBackJsToCpp".to_string()), + payload: json!({ "value": 42 }), + }); + + assert_eq!( + host.take_result(), + Some(CallbackResult { + callback: "sgclawOnGetText".to_string(), + request_url: "http://127.0.0.1/helper.html".to_string(), + target_url: Some("https://example.com/page".to_string()), + action: Some("sgBrowserExcuteJsFun".to_string()), + payload: json!({ + "text": "hello", + "meta": { "source": "page" } + }), + }) + ); + assert_eq!( + host.take_result(), + Some(CallbackResult { + callback: "sgclawOnEval".to_string(), + request_url: "http://127.0.0.1/helper.html".to_string(), + target_url: None, + action: Some("callBackJsToCpp".to_string()), + payload: json!({ "value": 42 }), + }) + ); + assert!(host.take_result().is_none()); + } + + #[test] + fn callback_host_repeats_inflight_command_until_acknowledged() { + let host = BrowserCallbackHost::new(); + + host.enqueue_command(CallbackCommand { + action: "sgBrowserSetTheme".to_string(), + args: vec![json!("1")], + }); + host.enqueue_command(CallbackCommand { + action: "sgBrowerserGetUrls".to_string(), + args: vec![json!("showUrls")], + }); + + assert_eq!( + host.current_command_envelope(), + CallbackCommandEnvelope { + ok: true, + command: Some(CallbackCommand { + action: "sgBrowserSetTheme".to_string(), + args: vec![json!("1")], + }), + } + ); + assert_eq!( + host.current_command_envelope(), + CallbackCommandEnvelope { + ok: true, + command: Some(CallbackCommand { + action: "sgBrowserSetTheme".to_string(), + args: vec![json!("1")], + }), + } + ); + + assert_eq!( + host.acknowledge_in_flight_command(), + Some(CallbackCommand { + action: "sgBrowserSetTheme".to_string(), + args: vec![json!("1")], + }) + ); + assert_eq!( + host.current_command_envelope(), + CallbackCommandEnvelope { + ok: true, + command: Some(CallbackCommand { + action: "sgBrowerserGetUrls".to_string(), + args: vec![json!("showUrls")], + }), + } + ); + assert_eq!( + host.acknowledge_in_flight_command(), + Some(CallbackCommand { + action: "sgBrowerserGetUrls".to_string(), + args: vec![json!("showUrls")], + }) + ); + assert_eq!( + host.current_command_envelope(), + CallbackCommandEnvelope { + ok: false, + command: None, + } + ); + assert!(host.acknowledge_in_flight_command().is_none()); + } +} diff --git a/src/browser/mod.rs b/src/browser/mod.rs new file mode 100644 index 0000000..77bbeee --- /dev/null +++ b/src/browser/mod.rs @@ -0,0 +1,19 @@ +pub mod bridge_backend; +pub mod bridge_contract; +pub mod bridge_transport; +pub mod callback_backend; +mod backend; +pub(crate) mod callback_host; +mod pipe_backend; +pub mod ws_backend; +pub mod ws_probe; +pub mod ws_protocol; + +pub use backend::BrowserBackend; +pub use bridge_backend::BridgeBrowserBackend; +pub use callback_backend::{ + BrowserCallbackBackend, BrowserCallbackError, BrowserCallbackHost, + BrowserCallbackRequest, BrowserCallbackResponse, BrowserCallbackSuccess, +}; +pub use pipe_backend::PipeBrowserBackend; +pub use ws_backend::WsBrowserBackend; diff --git a/src/browser/pipe_backend.rs b/src/browser/pipe_backend.rs new file mode 100644 index 0000000..cfa04b7 --- /dev/null +++ b/src/browser/pipe_backend.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use serde_json::Value; + +use crate::browser::BrowserBackend; +use crate::pipe::{Action, BrowserPipeTool, CommandOutput, ExecutionSurfaceMetadata, PipeError, Transport}; +use crate::security::MacPolicy; + +pub struct PipeBrowserBackend { + inner: BrowserPipeTool, +} + +impl PipeBrowserBackend { + pub fn new(transport: Arc, mac_policy: MacPolicy, session_key: Vec) -> Self { + Self { + inner: BrowserPipeTool::new(transport, mac_policy, session_key), + } + } + + pub fn from_inner(inner: BrowserPipeTool) -> Self { + Self { inner } + } + + pub fn with_response_timeout(mut self, response_timeout: std::time::Duration) -> Self { + self.inner = self.inner.with_response_timeout(response_timeout); + self + } +} + +impl Clone for PipeBrowserBackend { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl BrowserBackend for PipeBrowserBackend { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result { + self.inner.invoke(action, params, expected_domain) + } + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata { + self.inner.surface_metadata() + } + + fn supports_eval(&self) -> bool { + self.inner.supports_eval() + } +} diff --git a/src/browser/ws_backend.rs b/src/browser/ws_backend.rs new file mode 100644 index 0000000..bd922ac --- /dev/null +++ b/src/browser/ws_backend.rs @@ -0,0 +1,158 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use serde_json::{json, Value}; + +use crate::browser::{ws_protocol, BrowserBackend}; +use crate::pipe::{Action, CommandOutput, ExecutionSurfaceMetadata, PipeError, Timing}; +use crate::security::MacPolicy; + +pub trait WsClient: Send + Sync { + fn send_text(&self, payload: &str) -> Result<(), PipeError>; + fn recv_text_timeout(&self, timeout: Duration) -> Result; +} + +pub struct WsBrowserBackend { + client: Arc, + mac_policy: MacPolicy, + request_url: Mutex, + next_seq: AtomicU64, + response_timeout: Duration, + in_flight: Mutex<()>, +} + +impl WsBrowserBackend { + pub fn new(client: Arc, mac_policy: MacPolicy, request_url: impl Into) -> Self { + Self { + client, + mac_policy, + request_url: Mutex::new(request_url.into()), + next_seq: AtomicU64::new(1), + response_timeout: Duration::from_secs(30), + in_flight: Mutex::new(()), + } + } + + pub fn with_response_timeout(mut self, response_timeout: Duration) -> Self { + self.response_timeout = response_timeout; + self + } +} + +impl BrowserBackend for WsBrowserBackend { + fn invoke( + &self, + action: Action, + params: Value, + expected_domain: &str, + ) -> Result { + let _guard = self + .in_flight + .lock() + .map_err(|_| PipeError::Protocol("browser ws request lock poisoned".to_string()))?; + + self.mac_policy.validate(&action, expected_domain)?; + + let seq = self.next_seq.fetch_add(1, Ordering::Relaxed); + let request_id = seq.to_string(); + let request_url = self + .request_url + .lock() + .map_err(|_| PipeError::Protocol("browser ws request url lock poisoned".to_string()))? + .clone(); + let encoded = ws_protocol::encode_v1_action( + &action, + ¶ms, + &request_url, + Some(request_id.as_str()), + )?; + + self.client.send_text(&encoded.payload)?; + + let status = Some(recv_status_frame(&*self.client, self.response_timeout)?); + if let Some(status) = status { + let status_code = parse_status_code(&status)?; + if status_code != 0 { + return Err(PipeError::Protocol(format!( + "browser returned non-zero status: {status_code}" + ))); + } + } + + if action == Action::Navigate { + if let Some(url) = params.get("url").and_then(Value::as_str) { + let mut request_url = self.request_url.lock().map_err(|_| { + PipeError::Protocol("browser ws request url lock poisoned".to_string()) + })?; + *request_url = url.to_string(); + } + } + + if let Some(callback) = encoded.callback { + loop { + let frame = self.client.recv_text_timeout(self.response_timeout)?; + let decoded = ws_protocol::decode_callback_frame(&frame)?; + if decoded.callback_name == callback.callback_name { + return Ok(CommandOutput { + seq, + success: true, + data: json!({ "text": decoded.response_text }), + aom_snapshot: vec![], + timing: Timing { + queue_ms: 0, + exec_ms: 0, + }, + }); + } + } + } + + Ok(CommandOutput { + seq, + success: true, + data: json!({}), + aom_snapshot: vec![], + timing: Timing { + queue_ms: 0, + exec_ms: 0, + }, + }) + } + + fn surface_metadata(&self) -> ExecutionSurfaceMetadata { + self.mac_policy.privileged_surface_metadata() + } + + fn supports_eval(&self) -> bool { + self.mac_policy.supports_pipe_action(&Action::Eval) + } +} + +fn parse_status_code(raw: &str) -> Result { + raw.trim() + .parse::() + .map_err(|_| PipeError::Protocol(format!("invalid browser status frame: {raw}"))) +} + +fn recv_status_frame(client: &dyn WsClient, timeout: Duration) -> Result { + loop { + let frame = client.recv_text_timeout(timeout)?; + if is_ignorable_status_prelude(&frame) { + continue; + } + return Ok(frame); + } +} + +fn is_ignorable_status_prelude(frame: &str) -> bool { + let trimmed = frame.trim(); + if trimmed.starts_with("Welcome!") || trimmed.starts_with("Welcome ") { + return true; + } + + serde_json::from_str::(trimmed) + .ok() + .and_then(|value| value.get("type").and_then(Value::as_str).map(str::to_string)) + .is_some_and(|kind| kind == "welcome") +} diff --git a/src/browser/ws_probe.rs b/src/browser/ws_probe.rs new file mode 100644 index 0000000..4b8d4ef --- /dev/null +++ b/src/browser/ws_probe.rs @@ -0,0 +1,307 @@ +use std::net::TcpStream; +use std::time::Duration; + +use thiserror::Error; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::{connect, Message, WebSocket}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProbeStep { + pub label: String, + pub payload: String, + pub expect_reply: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeOutcome { + Received(Vec), + NoReplyExpected, + TimedOut, + Closed, + ConnectFailed(String), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProbeStepResult { + pub label: String, + pub sent: String, + pub outcome: ProbeOutcome, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProbeCliConfig { + pub ws_url: String, + pub timeout_ms: u64, + pub steps: Vec, +} + +const DEFAULT_TIMEOUT_MS: u64 = 1500; +const DEFAULT_REGISTER_STEP_LABEL: &str = "register"; +const DEFAULT_REGISTER_STEP_PAYLOAD: &str = r#"{"type":"register","role":"web"}"#; + +#[derive(Debug, Error)] +pub enum ProbeError { + #[error("io error: {0}")] + Io(#[from] std::io::Error), + #[error("probe timeout while waiting for websocket frame")] + Timeout, + #[error("probe websocket closed")] + Closed, + #[error("probe protocol error: {0}")] + Protocol(String), + #[error("probe argument error: {0}")] + Args(String), +} + +pub fn parse_probe_args(args: &[String]) -> Result { + let mut ws_url = None; + let mut timeout_ms = None; + let mut steps = Vec::new(); + let mut index = 0; + + while index < args.len() { + match args[index].as_str() { + "--ws-url" => { + index += 1; + let value = args + .get(index) + .ok_or_else(|| ProbeError::Args("missing value for --ws-url".to_string()))?; + ws_url = Some(value.clone()); + } + "--timeout-ms" => { + index += 1; + let value = args.get(index).ok_or_else(|| { + ProbeError::Args("missing value for --timeout-ms".to_string()) + })?; + let parsed = value.parse::().map_err(|_| { + ProbeError::Args(format!("invalid --timeout-ms value: {value}")) + })?; + timeout_ms = Some(parsed); + } + "--step" => { + index += 1; + let value = args + .get(index) + .ok_or_else(|| ProbeError::Args("missing value for --step".to_string()))?; + let (label, payload) = value.split_once("::").ok_or_else(|| { + ProbeError::Args(format!( + "invalid --step value (expected