diff --git a/frontend/service-console/sg_claw_service_console.html b/frontend/service-console/sg_claw_service_console.html
index 1dc2491..7f55f80 100644
--- a/frontend/service-console/sg_claw_service_console.html
+++ b/frontend/service-console/sg_claw_service_console.html
@@ -386,6 +386,17 @@
};
let socket = null;
+ let reconnectTimer = null;
+ let connectTimeoutTimer = null;
+ let heartbeatTimer = null;
+ let shouldReconnect = false;
+ let lastHeartbeatAt = 0;
+ const reconnectDelayMs = 1500;
+ const reconnectCloseCode = 4000;
+ const reconnectCloseReason = "manual_disconnect";
+ const heartbeatIntervalMs = 15000;
+ const heartbeatTimeoutMs = 30000;
+ const connectTimeoutMs = 5000;
function appendRow(kind, text) {
if (elements.emptyState) {
@@ -410,6 +421,59 @@
elements.messageStream.scrollTop = elements.messageStream.scrollHeight;
}
+ function clearReconnectTimer() {
+ if (reconnectTimer) {
+ clearTimeout(reconnectTimer);
+ reconnectTimer = null;
+ }
+ }
+
+ function clearConnectTimeoutTimer() {
+ if (connectTimeoutTimer) {
+ clearTimeout(connectTimeoutTimer);
+ connectTimeoutTimer = null;
+ }
+ }
+
+ function stopHeartbeat() {
+ if (heartbeatTimer) {
+ clearInterval(heartbeatTimer);
+ heartbeatTimer = null;
+ }
+ }
+
+ function startHeartbeat() {
+ stopHeartbeat();
+ lastHeartbeatAt = Date.now();
+ heartbeatTimer = setInterval(() => {
+ if (!socket || socket.readyState !== WebSocket.OPEN) {
+ return;
+ }
+ if (Date.now() - lastHeartbeatAt > heartbeatTimeoutMs) {
+ appendRow("error", "heartbeat missed, forcing reconnect");
+ const activeSocket = socket;
+ socket = null;
+ stopHeartbeat();
+ clearConnectTimeoutTimer();
+ activeSocket.close();
+ scheduleReconnect();
+ return;
+ }
+ socket.send(JSON.stringify({ type: "ping" }));
+ }, heartbeatIntervalMs);
+ }
+
+ function scheduleReconnect() {
+ clearReconnectTimer();
+ clearConnectTimeoutTimer();
+ if (!shouldReconnect) {
+ return;
+ }
+ appendRow("status", "service websocket disconnected, retrying");
+ reconnectTimer = setTimeout(() => connectOrDisconnectService(true), reconnectDelayMs);
+ updateUiState();
+ }
+
function setValidation(message) {
elements.validationText.textContent = message;
}
@@ -417,7 +481,7 @@
function updateUiState() {
const readyState = socket ? socket.readyState : WebSocket.CLOSED;
const connected = readyState === WebSocket.OPEN;
- const connecting = readyState === WebSocket.CONNECTING;
+ const connecting = readyState === WebSocket.CONNECTING || Boolean(reconnectTimer);
let stateText = "未连接";
let stateValue = "disconnected";
@@ -435,35 +499,68 @@
elements.connectionState.dataset.state = stateValue;
}
- function connectOrDisconnectService() {
- if (socket && (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)) {
- socket.close();
+ function connectOrDisconnectService(forceConnect = false) {
+ if (!forceConnect && socket && (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)) {
+ shouldReconnect = false;
+ clearReconnectTimer();
+ clearConnectTimeoutTimer();
+ stopHeartbeat();
+ socket.close(reconnectCloseCode, reconnectCloseReason);
return;
}
+ clearReconnectTimer();
+ clearConnectTimeoutTimer();
const url = elements.wsUrl.value.trim() || defaultWsUrl;
elements.wsUrl.value = url;
+ shouldReconnect = true;
const nextSocket = new WebSocket(url);
socket = nextSocket;
updateUiState();
+ connectTimeoutTimer = setTimeout(() => {
+ if (socket !== nextSocket || nextSocket.readyState !== WebSocket.CONNECTING) {
+ return;
+ }
+ appendRow("error", "service websocket connect timed out");
+ socket = null;
+ nextSocket.close();
+ scheduleReconnect();
+ }, connectTimeoutMs);
+
nextSocket.addEventListener("open", () => {
if (socket !== nextSocket) {
return;
}
+ clearReconnectTimer();
+ clearConnectTimeoutTimer();
+ lastHeartbeatAt = Date.now();
+ startHeartbeat();
appendRow("status", "service websocket connected");
updateUiState();
});
- nextSocket.addEventListener("close", () => {
- if (socket === nextSocket) {
- socket = null;
+ nextSocket.addEventListener("close", (event) => {
+ if (socket !== nextSocket) {
+ return;
}
- appendRow("status", "service websocket disconnected");
- updateUiState();
+ socket = null;
+ clearConnectTimeoutTimer();
+ stopHeartbeat();
+ const manualClose = event.code === reconnectCloseCode || event.reason === reconnectCloseReason;
+ if (manualClose) {
+ shouldReconnect = false;
+ appendRow("status", "service websocket disconnected");
+ updateUiState();
+ return;
+ }
+ scheduleReconnect();
});
nextSocket.addEventListener("error", () => {
+ if (socket !== nextSocket) {
+ return;
+ }
appendRow("error", "service websocket error");
});
@@ -471,6 +568,7 @@
}
function handleMessage(event) {
+ lastHeartbeatAt = Date.now();
let message;
try {
message = JSON.parse(event.data);
@@ -492,6 +590,8 @@
case "busy":
appendRow("error", message.message);
break;
+ case "pong":
+ break;
default:
appendRow("error", "unknown service message: " + event.data);
}
diff --git a/src/bin/sg_claw_client.rs b/src/bin/sg_claw_client.rs
index f6259b2..e1eda68 100644
--- a/src/bin/sg_claw_client.rs
+++ b/src/bin/sg_claw_client.rs
@@ -83,6 +83,7 @@ fn run() -> Result<(), String> {
eprintln!("busy: {message}");
break;
}
+ ServiceMessage::Pong => {}
}
}
Message::Close(_) => {
diff --git a/src/browser/callback_host.rs b/src/browser/callback_host.rs
index a51c060..5d60af0 100644
--- a/src/browser/callback_host.rs
+++ b/src/browser/callback_host.rs
@@ -896,36 +896,66 @@ window.sgclawOnEval = sgclawOnEval;
window.callBackJsToCpp = callBackJsToCpp;
document.getElementById('wi').textContent = SGCLAW_BROWSER_WS_URL;
-_log('Connecting to browser WebSocket\u2026');
-const sgclawSocket = new WebSocket(SGCLAW_BROWSER_WS_URL);
-sgclawSocket.addEventListener('open', async () => {{
- document.getElementById('sd').classList.add('on');
- document.getElementById('stx').textContent = 'Connected';
- _log('\u2713 WebSocket connected');
- _task('Connected to browser');
- sgclawSocket.send(JSON.stringify({{ type: 'register', role: 'web' }}));
- await sgclawReady();
- _log('\u2713 Ready signal sent');
- _task('Ready \u2014 waiting for commands');
-}});
+let sgclawSocket = null;
+let sgclawReconnectTimer = null;
+let sgclawDeferredCommandLogged = false;
-sgclawSocket.addEventListener('close', () => {{
- document.getElementById('sd').classList.remove('on');
- document.getElementById('stx').textContent = 'Disconnected';
- _log('\u2717 WebSocket disconnected');
- _task('Disconnected');
-}});
-
-sgclawSocket.addEventListener('message', (event) => {{
- console.debug('sgclaw helper received browser frame', event.data);
- try {{
- var data = String(event.data || '');
- if (data.indexOf('@_@') !== -1) {{
- sgclawEmitCallback('callBackJsToCpp', {{ raw: data }});
+function connectSocket() {{
+ if (sgclawSocket && (sgclawSocket.readyState === WebSocket.OPEN || sgclawSocket.readyState === WebSocket.CONNECTING)) {{
+ return;
+ }}
+ _log('Connecting to browser WebSocket\u2026');
+ document.getElementById('stx').textContent = 'Connecting…';
+ _task('Connecting to browser');
+ const socket = new WebSocket(SGCLAW_BROWSER_WS_URL);
+ sgclawSocket = socket;
+ socket.addEventListener('open', async () => {{
+ if (sgclawSocket !== socket) {{
+ return;
}}
- }} catch (_e) {{}}
-}});
+ if (sgclawReconnectTimer) {{
+ clearTimeout(sgclawReconnectTimer);
+ sgclawReconnectTimer = null;
+ }}
+ sgclawDeferredCommandLogged = false;
+ document.getElementById('sd').classList.add('on');
+ document.getElementById('stx').textContent = 'Connected';
+ _log('\u2713 WebSocket connected');
+ _task('Connected to browser');
+ socket.send(JSON.stringify({{ type: 'register', role: 'web' }}));
+ await sgclawReady();
+ _log('\u2713 Ready signal sent');
+ _task('Ready \u2014 waiting for commands');
+ }});
+
+ socket.addEventListener('close', () => {{
+ if (sgclawSocket !== socket) {{
+ return;
+ }}
+ sgclawSocket = null;
+ document.getElementById('sd').classList.remove('on');
+ document.getElementById('stx').textContent = 'Disconnected';
+ _log('\u2717 WebSocket disconnected');
+ _task('Disconnected — reconnecting');
+ if (!sgclawReconnectTimer) {{
+ sgclawReconnectTimer = setTimeout(connectSocket, 1000);
+ }}
+ }});
+
+ socket.addEventListener('message', (event) => {{
+ if (sgclawSocket !== socket) {{
+ return;
+ }}
+ console.debug('sgclaw helper received browser frame', event.data);
+ try {{
+ var data = String(event.data || '');
+ if (data.indexOf('@_@') !== -1) {{
+ sgclawEmitCallback('callBackJsToCpp', {{ raw: data }});
+ }}
+ }} catch (_e) {{}}
+ }});
+}}
async function sgclawPollCommands() {{
try {{
@@ -936,22 +966,29 @@ async function sgclawPollCommands() {{
const envelope = await response.json();
const command = envelope && envelope.command;
if (!command || !command.action) {{
+ sgclawDeferredCommandLogged = false;
return;
}}
+ if (!sgclawSocket || sgclawSocket.readyState !== WebSocket.OPEN) {{
+ if (!sgclawDeferredCommandLogged) {{
+ _log('! Browser connection lost — command deferred');
+ sgclawDeferredCommandLogged = true;
+ }}
+ return;
+ }}
+ sgclawDeferredCommandLogged = false;
_nc++;
const args = Array.isArray(command.args) ? command.args : [];
_lastCmd=Date.now();_setIdle(false);
_log('\u2192 execute '+command.action+''+(args.length>1?' '+String(args[1]||'').substring(0,50)+'':''));
_task('Executing: '+command.action);
- if (sgclawSocket.readyState !== WebSocket.OPEN) {{
- return;
- }}
sgclawSocket.send(JSON.stringify([window.location.href || SGCLAW_HELPER_URL, command.action, ...args]));
await sgclawPostJson(SGCLAW_COMMAND_ACK_ENDPOINT, {{ type: 'command_ack' }});
}} catch (_error) {{
}}
}}
+connectSocket();
setInterval(sgclawPollCommands, 250);
_log('sgClaw Runtime Console initialized');
@@ -1110,6 +1147,11 @@ mod tests {
assert!(html.contains("ws://127.0.0.1:12345"));
assert!(html.contains(r#"JSON.stringify({ type: 'register', role: 'web' })"#));
assert!(html.contains("sgclawReady"));
+ assert!(html.contains("connectSocket()"));
+ assert!(html.contains("setTimeout(connectSocket, 1000)"));
+ assert!(html.contains("if (!sgclawSocket || sgclawSocket.readyState !== WebSocket.OPEN)"));
+ assert!(html.contains("Browser connection lost — command deferred"));
+ assert!(html.contains("sgclawSocket = null;"));
assert!(html.contains("sgclawOnLoaded"));
assert!(html.contains("sgclawOnClickProbe"));
assert!(html.contains("sgclawOnClick"));
diff --git a/src/service/protocol.rs b/src/service/protocol.rs
index 315e77b..f7d9e8b 100644
--- a/src/service/protocol.rs
+++ b/src/service/protocol.rs
@@ -51,6 +51,7 @@ pub enum ServiceMessage {
LogEntry { level: String, message: String },
TaskComplete { success: bool, summary: String },
Busy { message: String },
+ Pong,
}
fn normalize_optional_field(value: String) -> Option {
diff --git a/src/service/server.rs b/src/service/server.rs
index 0e95217..1a66536 100644
--- a/src/service/server.rs
+++ b/src/service/server.rs
@@ -126,7 +126,7 @@ impl ServiceEventSink {
.lock()
.map_err(|_| PipeError::Protocol("service websocket writer lock poisoned".to_string()))?
.send(Message::Text(payload.into()))
- .map_err(|err| PipeError::Protocol(format!("service websocket send failed: {err}")))?;
+ .map_err(|err| map_service_websocket_error(err, "send"))?;
}
Ok(())
}
@@ -249,6 +249,7 @@ pub fn serve_client(
ClientMessage::Connect => send_status_changed(sink.as_ref(), "connected")?,
ClientMessage::Start => send_status_changed(sink.as_ref(), "started")?,
ClientMessage::Stop => send_status_changed(sink.as_ref(), "stopped")?,
+ ClientMessage::Ping => sink.send_service_message(ServiceMessage::Pong)?,
ClientMessage::SubmitTask {
instruction,
conversation_id,
@@ -335,7 +336,6 @@ pub fn serve_client(
}
}
}
- ClientMessage::Ping => {}
}
}
}
@@ -471,6 +471,23 @@ impl Transport for NoopTransport {
}
}
+#[cfg(test)]
+mod pipe_closed_mapping_tests {
+ use super::*;
+
+ #[test]
+ fn map_service_websocket_error_treats_connection_aborted_send_as_pipe_closed() {
+ let err = tungstenite::Error::Io(std::io::Error::from(std::io::ErrorKind::ConnectionAborted));
+ assert!(matches!(map_service_websocket_error(err, "send"), PipeError::PipeClosed));
+ }
+
+ #[test]
+ fn map_service_websocket_error_treats_send_after_closing_as_pipe_closed() {
+ let err = tungstenite::Error::Protocol(tungstenite::error::ProtocolError::SendAfterClosing);
+ assert!(matches!(map_service_websocket_error(err, "send"), PipeError::PipeClosed));
+ }
+}
+
#[cfg(test)]
struct ServiceBridgeTransport {
bridge_base_url: String,
diff --git a/tests/service_console_html_test.rs b/tests/service_console_html_test.rs
index dff93e0..4f2945b 100644
--- a/tests/service_console_html_test.rs
+++ b/tests/service_console_html_test.rs
@@ -12,6 +12,12 @@ fn service_console_html_stays_on_service_ws_boundary() {
assert!(source.contains("ws://127.0.0.1:42321"));
assert!(source.contains("submit_task"));
+ assert!(source.contains("addEventListener(\"close\""));
+ assert!(source.contains("setTimeout(() => connectOrDisconnectService(true)"));
+ assert!(source.contains("connectTimeoutTimer"));
+ assert!(source.contains("lastHeartbeatAt"));
+ assert!(source.contains("heartbeat missed, forcing reconnect"));
+ assert!(source.contains("service websocket connect timed out"));
assert!(!source.contains("/sgclaw/browser-helper.html"));
assert!(!source.contains("/sgclaw/callback/ready"));
assert!(!source.contains("/sgclaw/callback/events"));
diff --git a/tests/service_task_flow_test.rs b/tests/service_task_flow_test.rs
index 297f016..a093966 100644
--- a/tests/service_task_flow_test.rs
+++ b/tests/service_task_flow_test.rs
@@ -162,6 +162,7 @@ fn start_callback_host_hotlist_browser_server(
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
+ .pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
@@ -213,14 +214,18 @@ fn start_callback_host_hotlist_browser_server(
let mut saw_eval = false;
while Instant::now() < deadline {
- let envelope: Value = helper_client
+ let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
- .unwrap()
- .error_for_status()
- .unwrap()
- .json()
- .unwrap();
+ .and_then(|response| response.error_for_status())
+ .and_then(|response| response.json())
+ {
+ Ok(envelope) => envelope,
+ Err(_) => {
+ thread::sleep(Duration::from_millis(20));
+ continue;
+ }
+ };
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
diff --git a/tests/service_ws_session_test.rs b/tests/service_ws_session_test.rs
index 96a9c9a..a273f2e 100644
--- a/tests/service_ws_session_test.rs
+++ b/tests/service_ws_session_test.rs
@@ -213,6 +213,7 @@ fn start_callback_host_hotlist_browser_server(
.to_string();
let helper_client = Client::builder()
.timeout(Duration::from_secs(2))
+ .pool_max_idle_per_host(0)
.build()
.unwrap();
let helper_html = helper_client
@@ -264,14 +265,18 @@ fn start_callback_host_hotlist_browser_server(
let mut saw_eval = false;
while Instant::now() < deadline {
- let envelope: Value = helper_client
+ let envelope: Value = match helper_client
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
.send()
- .unwrap()
- .error_for_status()
- .unwrap()
- .json()
- .unwrap();
+ .and_then(|response| response.error_for_status())
+ .and_then(|response| response.json())
+ {
+ Ok(envelope) => envelope,
+ Err(_) => {
+ thread::sleep(Duration::from_millis(20));
+ continue;
+ }
+ };
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
thread::sleep(Duration::from_millis(20));
continue;
@@ -737,7 +742,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() {
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
- client.stdin.as_mut().unwrap().write_all(" \n".as_bytes()).unwrap();
+ client.stdin.as_mut().unwrap().write_all("你好\n".as_bytes()).unwrap();
let client_output = client.wait_with_output().unwrap();
assert!(
@@ -747,7 +752,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() {
String::from_utf8_lossy(&client_output.stderr)
);
assert!(
- String::from_utf8_lossy(&client_output.stdout).contains("请输入任务内容。"),
+ String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"),
"client did not receive TaskComplete summary: stdout={} stderr={}",
String::from_utf8_lossy(&client_output.stdout),
String::from_utf8_lossy(&client_output.stderr)
@@ -1282,6 +1287,105 @@ fn service_binary_accepts_connect_request_without_starting_browser_task() {
);
}
+#[test]
+fn service_binary_survives_client_disconnect_during_task_completion_send() {
+ let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
+ let service_addr = service_listener.local_addr().unwrap();
+ drop(service_listener);
+
+ let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4()));
+ std::fs::create_dir_all(&root).unwrap();
+ let config_path = root.join("sgclaw_config.json");
+ std::fs::write(
+ &config_path,
+ format!(
+ r#"{{
+ "apiKey": "sk-runtime",
+ "baseUrl": "https://api.deepseek.com",
+ "model": "deepseek-chat",
+ "browserWsUrl": "ws://127.0.0.1:12345",
+ "serviceWsListenAddr": "{service_addr}"
+}}"#
+ ),
+ )
+ .unwrap();
+
+ let mut service = std::process::Command::new(
+ std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
+ )
+ .env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
+ .arg("--config-path")
+ .arg(&config_path)
+ .stdout(std::process::Stdio::null())
+ .stderr(std::process::Stdio::piped())
+ .spawn()
+ .unwrap();
+
+ let ws_url = format!("ws://{service_addr}");
+ let connect_deadline = Instant::now() + Duration::from_secs(2);
+ let mut websocket = None;
+ while Instant::now() < connect_deadline {
+ match connect(ws_url.as_str()) {
+ Ok((socket, _)) => {
+ websocket = Some(socket);
+ break;
+ }
+ Err(_) => {
+ if service.try_wait().unwrap().is_some() {
+ break;
+ }
+ thread::sleep(Duration::from_millis(50));
+ }
+ }
+ }
+
+ let mut websocket = websocket.expect("service ws listener never became available");
+ websocket
+ .send(Message::Text(
+ serde_json::to_string(&ClientMessage::SubmitTask {
+ instruction: "你好".to_string(),
+ conversation_id: String::new(),
+ messages: vec![],
+ page_url: String::new(),
+ page_title: String::new(),
+ })
+ .unwrap()
+ .into(),
+ ))
+ .unwrap();
+ drop(websocket);
+
+ let exit_deadline = Instant::now() + Duration::from_secs(1);
+ let mut service_status = None;
+ while Instant::now() < exit_deadline {
+ if let Some(status) = service.try_wait().unwrap() {
+ service_status = Some(status);
+ break;
+ }
+ thread::sleep(Duration::from_millis(20));
+ }
+ if service_status.is_none() {
+ service.kill().unwrap();
+ let _ = service.wait();
+ }
+
+ let stderr = service
+ .stderr
+ .take()
+ .map(|mut stream| {
+ let mut buf = Vec::new();
+ use std::io::Read;
+ let _ = stream.read_to_end(&mut buf);
+ String::from_utf8_lossy(&buf).into_owned()
+ })
+ .unwrap_or_default();
+
+ assert!(
+ service_status.is_none(),
+ "sg_claw exited after client disconnected mid-task; stderr={stderr}"
+ );
+}
+
#[test]
fn submit_task_client_message_converts_into_shared_runner_request() {
let message = ClientMessage::SubmitTask {
@@ -1333,6 +1437,28 @@ fn lifecycle_client_messages_round_trip_with_stable_tags() {
}
}
+#[test]
+fn service_messages_round_trip_with_stable_tags() {
+ let cases = [
+ (
+ ServiceMessage::StatusChanged {
+ state: "started".to_string(),
+ },
+ r#"{"type":"status_changed","state":"started"}"#,
+ ),
+ (
+ ServiceMessage::Pong,
+ r#"{"type":"pong"}"#,
+ ),
+ ];
+
+ for (message, raw) in cases {
+ assert_eq!(serde_json::to_string(&message).unwrap(), raw);
+ let decoded: ServiceMessage = serde_json::from_str(raw).unwrap();
+ assert_eq!(decoded, message);
+ }
+}
+
#[test]
fn service_event_sink_maps_log_completion_and_status_messages() {
let sink = ServiceEventSink::default();