From 57b9be733de6fdaf78e241eb0e275e7c5926529c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E7=82=8E?= <635735027@qq.com> Date: Thu, 9 Apr 2026 10:34:34 +0800 Subject: [PATCH] fix: harden service websocket reconnect flows Stabilize the service console and callback-host websocket paths so idle disconnects and mid-task client drops no longer wedge task execution or spam repeated commands. Co-Authored-By: Claude Sonnet 4.6 --- .../sg_claw_service_console.html | 118 +++++++++++++-- src/bin/sg_claw_client.rs | 1 + src/browser/callback_host.rs | 102 +++++++++---- src/service/protocol.rs | 1 + src/service/server.rs | 21 ++- tests/service_console_html_test.rs | 6 + tests/service_task_flow_test.rs | 17 ++- tests/service_ws_session_test.rs | 142 +++++++++++++++++- 8 files changed, 353 insertions(+), 55 deletions(-) diff --git a/frontend/service-console/sg_claw_service_console.html b/frontend/service-console/sg_claw_service_console.html index 1dc2491..7f55f80 100644 --- a/frontend/service-console/sg_claw_service_console.html +++ b/frontend/service-console/sg_claw_service_console.html @@ -386,6 +386,17 @@ }; let socket = null; + let reconnectTimer = null; + let connectTimeoutTimer = null; + let heartbeatTimer = null; + let shouldReconnect = false; + let lastHeartbeatAt = 0; + const reconnectDelayMs = 1500; + const reconnectCloseCode = 4000; + const reconnectCloseReason = "manual_disconnect"; + const heartbeatIntervalMs = 15000; + const heartbeatTimeoutMs = 30000; + const connectTimeoutMs = 5000; function appendRow(kind, text) { if (elements.emptyState) { @@ -410,6 +421,59 @@ elements.messageStream.scrollTop = elements.messageStream.scrollHeight; } + function clearReconnectTimer() { + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + } + + function clearConnectTimeoutTimer() { + if (connectTimeoutTimer) { + clearTimeout(connectTimeoutTimer); + connectTimeoutTimer = null; + } + } + + function stopHeartbeat() { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + heartbeatTimer = null; + } + } + + function startHeartbeat() { + stopHeartbeat(); + lastHeartbeatAt = Date.now(); + heartbeatTimer = setInterval(() => { + if (!socket || socket.readyState !== WebSocket.OPEN) { + return; + } + if (Date.now() - lastHeartbeatAt > heartbeatTimeoutMs) { + appendRow("error", "heartbeat missed, forcing reconnect"); + const activeSocket = socket; + socket = null; + stopHeartbeat(); + clearConnectTimeoutTimer(); + activeSocket.close(); + scheduleReconnect(); + return; + } + socket.send(JSON.stringify({ type: "ping" })); + }, heartbeatIntervalMs); + } + + function scheduleReconnect() { + clearReconnectTimer(); + clearConnectTimeoutTimer(); + if (!shouldReconnect) { + return; + } + appendRow("status", "service websocket disconnected, retrying"); + reconnectTimer = setTimeout(() => connectOrDisconnectService(true), reconnectDelayMs); + updateUiState(); + } + function setValidation(message) { elements.validationText.textContent = message; } @@ -417,7 +481,7 @@ function updateUiState() { const readyState = socket ? socket.readyState : WebSocket.CLOSED; const connected = readyState === WebSocket.OPEN; - const connecting = readyState === WebSocket.CONNECTING; + const connecting = readyState === WebSocket.CONNECTING || Boolean(reconnectTimer); let stateText = "未连接"; let stateValue = "disconnected"; @@ -435,35 +499,68 @@ elements.connectionState.dataset.state = stateValue; } - function connectOrDisconnectService() { - if (socket && (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)) { - socket.close(); + function connectOrDisconnectService(forceConnect = false) { + if (!forceConnect && socket && (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)) { + shouldReconnect = false; + clearReconnectTimer(); + clearConnectTimeoutTimer(); + stopHeartbeat(); + socket.close(reconnectCloseCode, reconnectCloseReason); return; } + clearReconnectTimer(); + clearConnectTimeoutTimer(); const url = elements.wsUrl.value.trim() || defaultWsUrl; elements.wsUrl.value = url; + shouldReconnect = true; const nextSocket = new WebSocket(url); socket = nextSocket; updateUiState(); + connectTimeoutTimer = setTimeout(() => { + if (socket !== nextSocket || nextSocket.readyState !== WebSocket.CONNECTING) { + return; + } + appendRow("error", "service websocket connect timed out"); + socket = null; + nextSocket.close(); + scheduleReconnect(); + }, connectTimeoutMs); + nextSocket.addEventListener("open", () => { if (socket !== nextSocket) { return; } + clearReconnectTimer(); + clearConnectTimeoutTimer(); + lastHeartbeatAt = Date.now(); + startHeartbeat(); appendRow("status", "service websocket connected"); updateUiState(); }); - nextSocket.addEventListener("close", () => { - if (socket === nextSocket) { - socket = null; + nextSocket.addEventListener("close", (event) => { + if (socket !== nextSocket) { + return; } - appendRow("status", "service websocket disconnected"); - updateUiState(); + socket = null; + clearConnectTimeoutTimer(); + stopHeartbeat(); + const manualClose = event.code === reconnectCloseCode || event.reason === reconnectCloseReason; + if (manualClose) { + shouldReconnect = false; + appendRow("status", "service websocket disconnected"); + updateUiState(); + return; + } + scheduleReconnect(); }); nextSocket.addEventListener("error", () => { + if (socket !== nextSocket) { + return; + } appendRow("error", "service websocket error"); }); @@ -471,6 +568,7 @@ } function handleMessage(event) { + lastHeartbeatAt = Date.now(); let message; try { message = JSON.parse(event.data); @@ -492,6 +590,8 @@ case "busy": appendRow("error", message.message); break; + case "pong": + break; default: appendRow("error", "unknown service message: " + event.data); } diff --git a/src/bin/sg_claw_client.rs b/src/bin/sg_claw_client.rs index f6259b2..e1eda68 100644 --- a/src/bin/sg_claw_client.rs +++ b/src/bin/sg_claw_client.rs @@ -83,6 +83,7 @@ fn run() -> Result<(), String> { eprintln!("busy: {message}"); break; } + ServiceMessage::Pong => {} } } Message::Close(_) => { diff --git a/src/browser/callback_host.rs b/src/browser/callback_host.rs index a51c060..5d60af0 100644 --- a/src/browser/callback_host.rs +++ b/src/browser/callback_host.rs @@ -896,36 +896,66 @@ window.sgclawOnEval = sgclawOnEval; window.callBackJsToCpp = callBackJsToCpp; document.getElementById('wi').textContent = SGCLAW_BROWSER_WS_URL; -_log('Connecting to browser WebSocket\u2026'); -const sgclawSocket = new WebSocket(SGCLAW_BROWSER_WS_URL); -sgclawSocket.addEventListener('open', async () => {{ - document.getElementById('sd').classList.add('on'); - document.getElementById('stx').textContent = 'Connected'; - _log('\u2713 WebSocket connected'); - _task('Connected to browser'); - sgclawSocket.send(JSON.stringify({{ type: 'register', role: 'web' }})); - await sgclawReady(); - _log('\u2713 Ready signal sent'); - _task('Ready \u2014 waiting for commands'); -}}); +let sgclawSocket = null; +let sgclawReconnectTimer = null; +let sgclawDeferredCommandLogged = false; -sgclawSocket.addEventListener('close', () => {{ - document.getElementById('sd').classList.remove('on'); - document.getElementById('stx').textContent = 'Disconnected'; - _log('\u2717 WebSocket disconnected'); - _task('Disconnected'); -}}); - -sgclawSocket.addEventListener('message', (event) => {{ - console.debug('sgclaw helper received browser frame', event.data); - try {{ - var data = String(event.data || ''); - if (data.indexOf('@_@') !== -1) {{ - sgclawEmitCallback('callBackJsToCpp', {{ raw: data }}); +function connectSocket() {{ + if (sgclawSocket && (sgclawSocket.readyState === WebSocket.OPEN || sgclawSocket.readyState === WebSocket.CONNECTING)) {{ + return; + }} + _log('Connecting to browser WebSocket\u2026'); + document.getElementById('stx').textContent = 'Connecting…'; + _task('Connecting to browser'); + const socket = new WebSocket(SGCLAW_BROWSER_WS_URL); + sgclawSocket = socket; + socket.addEventListener('open', async () => {{ + if (sgclawSocket !== socket) {{ + return; }} - }} catch (_e) {{}} -}}); + if (sgclawReconnectTimer) {{ + clearTimeout(sgclawReconnectTimer); + sgclawReconnectTimer = null; + }} + sgclawDeferredCommandLogged = false; + document.getElementById('sd').classList.add('on'); + document.getElementById('stx').textContent = 'Connected'; + _log('\u2713 WebSocket connected'); + _task('Connected to browser'); + socket.send(JSON.stringify({{ type: 'register', role: 'web' }})); + await sgclawReady(); + _log('\u2713 Ready signal sent'); + _task('Ready \u2014 waiting for commands'); + }}); + + socket.addEventListener('close', () => {{ + if (sgclawSocket !== socket) {{ + return; + }} + sgclawSocket = null; + document.getElementById('sd').classList.remove('on'); + document.getElementById('stx').textContent = 'Disconnected'; + _log('\u2717 WebSocket disconnected'); + _task('Disconnected — reconnecting'); + if (!sgclawReconnectTimer) {{ + sgclawReconnectTimer = setTimeout(connectSocket, 1000); + }} + }}); + + socket.addEventListener('message', (event) => {{ + if (sgclawSocket !== socket) {{ + return; + }} + console.debug('sgclaw helper received browser frame', event.data); + try {{ + var data = String(event.data || ''); + if (data.indexOf('@_@') !== -1) {{ + sgclawEmitCallback('callBackJsToCpp', {{ raw: data }}); + }} + }} catch (_e) {{}} + }}); +}} async function sgclawPollCommands() {{ try {{ @@ -936,22 +966,29 @@ async function sgclawPollCommands() {{ const envelope = await response.json(); const command = envelope && envelope.command; if (!command || !command.action) {{ + sgclawDeferredCommandLogged = false; return; }} + if (!sgclawSocket || sgclawSocket.readyState !== WebSocket.OPEN) {{ + if (!sgclawDeferredCommandLogged) {{ + _log('! Browser connection lost — command deferred'); + sgclawDeferredCommandLogged = true; + }} + return; + }} + sgclawDeferredCommandLogged = false; _nc++; const args = Array.isArray(command.args) ? command.args : []; _lastCmd=Date.now();_setIdle(false); _log('\u2192 execute '+command.action+''+(args.length>1?' '+String(args[1]||'').substring(0,50)+'':'')); _task('Executing: '+command.action); - if (sgclawSocket.readyState !== WebSocket.OPEN) {{ - return; - }} sgclawSocket.send(JSON.stringify([window.location.href || SGCLAW_HELPER_URL, command.action, ...args])); await sgclawPostJson(SGCLAW_COMMAND_ACK_ENDPOINT, {{ type: 'command_ack' }}); }} catch (_error) {{ }} }} +connectSocket(); setInterval(sgclawPollCommands, 250); _log('sgClaw Runtime Console initialized'); @@ -1110,6 +1147,11 @@ mod tests { assert!(html.contains("ws://127.0.0.1:12345")); assert!(html.contains(r#"JSON.stringify({ type: 'register', role: 'web' })"#)); assert!(html.contains("sgclawReady")); + assert!(html.contains("connectSocket()")); + assert!(html.contains("setTimeout(connectSocket, 1000)")); + assert!(html.contains("if (!sgclawSocket || sgclawSocket.readyState !== WebSocket.OPEN)")); + assert!(html.contains("Browser connection lost — command deferred")); + assert!(html.contains("sgclawSocket = null;")); assert!(html.contains("sgclawOnLoaded")); assert!(html.contains("sgclawOnClickProbe")); assert!(html.contains("sgclawOnClick")); diff --git a/src/service/protocol.rs b/src/service/protocol.rs index 315e77b..f7d9e8b 100644 --- a/src/service/protocol.rs +++ b/src/service/protocol.rs @@ -51,6 +51,7 @@ pub enum ServiceMessage { LogEntry { level: String, message: String }, TaskComplete { success: bool, summary: String }, Busy { message: String }, + Pong, } fn normalize_optional_field(value: String) -> Option { diff --git a/src/service/server.rs b/src/service/server.rs index 0e95217..1a66536 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -126,7 +126,7 @@ impl ServiceEventSink { .lock() .map_err(|_| PipeError::Protocol("service websocket writer lock poisoned".to_string()))? .send(Message::Text(payload.into())) - .map_err(|err| PipeError::Protocol(format!("service websocket send failed: {err}")))?; + .map_err(|err| map_service_websocket_error(err, "send"))?; } Ok(()) } @@ -249,6 +249,7 @@ pub fn serve_client( ClientMessage::Connect => send_status_changed(sink.as_ref(), "connected")?, ClientMessage::Start => send_status_changed(sink.as_ref(), "started")?, ClientMessage::Stop => send_status_changed(sink.as_ref(), "stopped")?, + ClientMessage::Ping => sink.send_service_message(ServiceMessage::Pong)?, ClientMessage::SubmitTask { instruction, conversation_id, @@ -335,7 +336,6 @@ pub fn serve_client( } } } - ClientMessage::Ping => {} } } } @@ -471,6 +471,23 @@ impl Transport for NoopTransport { } } +#[cfg(test)] +mod pipe_closed_mapping_tests { + use super::*; + + #[test] + fn map_service_websocket_error_treats_connection_aborted_send_as_pipe_closed() { + let err = tungstenite::Error::Io(std::io::Error::from(std::io::ErrorKind::ConnectionAborted)); + assert!(matches!(map_service_websocket_error(err, "send"), PipeError::PipeClosed)); + } + + #[test] + fn map_service_websocket_error_treats_send_after_closing_as_pipe_closed() { + let err = tungstenite::Error::Protocol(tungstenite::error::ProtocolError::SendAfterClosing); + assert!(matches!(map_service_websocket_error(err, "send"), PipeError::PipeClosed)); + } +} + #[cfg(test)] struct ServiceBridgeTransport { bridge_base_url: String, diff --git a/tests/service_console_html_test.rs b/tests/service_console_html_test.rs index dff93e0..4f2945b 100644 --- a/tests/service_console_html_test.rs +++ b/tests/service_console_html_test.rs @@ -12,6 +12,12 @@ fn service_console_html_stays_on_service_ws_boundary() { assert!(source.contains("ws://127.0.0.1:42321")); assert!(source.contains("submit_task")); + assert!(source.contains("addEventListener(\"close\"")); + assert!(source.contains("setTimeout(() => connectOrDisconnectService(true)")); + assert!(source.contains("connectTimeoutTimer")); + assert!(source.contains("lastHeartbeatAt")); + assert!(source.contains("heartbeat missed, forcing reconnect")); + assert!(source.contains("service websocket connect timed out")); assert!(!source.contains("/sgclaw/browser-helper.html")); assert!(!source.contains("/sgclaw/callback/ready")); assert!(!source.contains("/sgclaw/callback/events")); diff --git a/tests/service_task_flow_test.rs b/tests/service_task_flow_test.rs index 297f016..a093966 100644 --- a/tests/service_task_flow_test.rs +++ b/tests/service_task_flow_test.rs @@ -162,6 +162,7 @@ fn start_callback_host_hotlist_browser_server( .to_string(); let helper_client = Client::builder() .timeout(Duration::from_secs(2)) + .pool_max_idle_per_host(0) .build() .unwrap(); let helper_html = helper_client @@ -213,14 +214,18 @@ fn start_callback_host_hotlist_browser_server( let mut saw_eval = false; while Instant::now() < deadline { - let envelope: Value = helper_client + let envelope: Value = match helper_client .get(format!("{helper_origin}/sgclaw/callback/commands/next")) .send() - .unwrap() - .error_for_status() - .unwrap() - .json() - .unwrap(); + .and_then(|response| response.error_for_status()) + .and_then(|response| response.json()) + { + Ok(envelope) => envelope, + Err(_) => { + thread::sleep(Duration::from_millis(20)); + continue; + } + }; let Some(command) = envelope.get("command").and_then(Value::as_object) else { thread::sleep(Duration::from_millis(20)); continue; diff --git a/tests/service_ws_session_test.rs b/tests/service_ws_session_test.rs index 96a9c9a..a273f2e 100644 --- a/tests/service_ws_session_test.rs +++ b/tests/service_ws_session_test.rs @@ -213,6 +213,7 @@ fn start_callback_host_hotlist_browser_server( .to_string(); let helper_client = Client::builder() .timeout(Duration::from_secs(2)) + .pool_max_idle_per_host(0) .build() .unwrap(); let helper_html = helper_client @@ -264,14 +265,18 @@ fn start_callback_host_hotlist_browser_server( let mut saw_eval = false; while Instant::now() < deadline { - let envelope: Value = helper_client + let envelope: Value = match helper_client .get(format!("{helper_origin}/sgclaw/callback/commands/next")) .send() - .unwrap() - .error_for_status() - .unwrap() - .json() - .unwrap(); + .and_then(|response| response.error_for_status()) + .and_then(|response| response.json()) + { + Ok(envelope) => envelope, + Err(_) => { + thread::sleep(Duration::from_millis(20)); + continue; + } + }; let Some(command) = envelope.get("command").and_then(Value::as_object) else { thread::sleep(Duration::from_millis(20)); continue; @@ -737,7 +742,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() { .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); - client.stdin.as_mut().unwrap().write_all(" \n".as_bytes()).unwrap(); + client.stdin.as_mut().unwrap().write_all("你好\n".as_bytes()).unwrap(); let client_output = client.wait_with_output().unwrap(); assert!( @@ -747,7 +752,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() { String::from_utf8_lossy(&client_output.stderr) ); assert!( - String::from_utf8_lossy(&client_output.stdout).contains("请输入任务内容。"), + String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"), "client did not receive TaskComplete summary: stdout={} stderr={}", String::from_utf8_lossy(&client_output.stdout), String::from_utf8_lossy(&client_output.stderr) @@ -1282,6 +1287,105 @@ fn service_binary_accepts_connect_request_without_starting_browser_task() { ); } +#[test] +fn service_binary_survives_client_disconnect_during_task_completion_send() { + let service_listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let service_addr = service_listener.local_addr().unwrap(); + drop(service_listener); + + let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&root).unwrap(); + let config_path = root.join("sgclaw_config.json"); + std::fs::write( + &config_path, + format!( + r#"{{ + "apiKey": "sk-runtime", + "baseUrl": "https://api.deepseek.com", + "model": "deepseek-chat", + "browserWsUrl": "ws://127.0.0.1:12345", + "serviceWsListenAddr": "{service_addr}" +}}"# + ), + ) + .unwrap(); + + let mut service = std::process::Command::new( + std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"), + ) + .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1") + .arg("--config-path") + .arg(&config_path) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let ws_url = format!("ws://{service_addr}"); + let connect_deadline = Instant::now() + Duration::from_secs(2); + let mut websocket = None; + while Instant::now() < connect_deadline { + match connect(ws_url.as_str()) { + Ok((socket, _)) => { + websocket = Some(socket); + break; + } + Err(_) => { + if service.try_wait().unwrap().is_some() { + break; + } + thread::sleep(Duration::from_millis(50)); + } + } + } + + let mut websocket = websocket.expect("service ws listener never became available"); + websocket + .send(Message::Text( + serde_json::to_string(&ClientMessage::SubmitTask { + instruction: "你好".to_string(), + conversation_id: String::new(), + messages: vec![], + page_url: String::new(), + page_title: String::new(), + }) + .unwrap() + .into(), + )) + .unwrap(); + drop(websocket); + + let exit_deadline = Instant::now() + Duration::from_secs(1); + let mut service_status = None; + while Instant::now() < exit_deadline { + if let Some(status) = service.try_wait().unwrap() { + service_status = Some(status); + break; + } + thread::sleep(Duration::from_millis(20)); + } + if service_status.is_none() { + service.kill().unwrap(); + let _ = service.wait(); + } + + let stderr = service + .stderr + .take() + .map(|mut stream| { + let mut buf = Vec::new(); + use std::io::Read; + let _ = stream.read_to_end(&mut buf); + String::from_utf8_lossy(&buf).into_owned() + }) + .unwrap_or_default(); + + assert!( + service_status.is_none(), + "sg_claw exited after client disconnected mid-task; stderr={stderr}" + ); +} + #[test] fn submit_task_client_message_converts_into_shared_runner_request() { let message = ClientMessage::SubmitTask { @@ -1333,6 +1437,28 @@ fn lifecycle_client_messages_round_trip_with_stable_tags() { } } +#[test] +fn service_messages_round_trip_with_stable_tags() { + let cases = [ + ( + ServiceMessage::StatusChanged { + state: "started".to_string(), + }, + r#"{"type":"status_changed","state":"started"}"#, + ), + ( + ServiceMessage::Pong, + r#"{"type":"pong"}"#, + ), + ]; + + for (message, raw) in cases { + assert_eq!(serde_json::to_string(&message).unwrap(), raw); + let decoded: ServiceMessage = serde_json::from_str(raw).unwrap(); + assert_eq!(decoded, message); + } +} + #[test] fn service_event_sink_maps_log_completion_and_status_messages() { let sink = ServiceEventSink::default();