fix: harden service websocket reconnect flows
Stabilize the service console and callback-host websocket paths so idle disconnects and mid-task client drops no longer wedge task execution or spam repeated commands. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,12 @@ fn service_console_html_stays_on_service_ws_boundary() {
|
||||
|
||||
assert!(source.contains("ws://127.0.0.1:42321"));
|
||||
assert!(source.contains("submit_task"));
|
||||
assert!(source.contains("addEventListener(\"close\""));
|
||||
assert!(source.contains("setTimeout(() => connectOrDisconnectService(true)"));
|
||||
assert!(source.contains("connectTimeoutTimer"));
|
||||
assert!(source.contains("lastHeartbeatAt"));
|
||||
assert!(source.contains("heartbeat missed, forcing reconnect"));
|
||||
assert!(source.contains("service websocket connect timed out"));
|
||||
assert!(!source.contains("/sgclaw/browser-helper.html"));
|
||||
assert!(!source.contains("/sgclaw/callback/ready"));
|
||||
assert!(!source.contains("/sgclaw/callback/events"));
|
||||
|
||||
@@ -162,6 +162,7 @@ fn start_callback_host_hotlist_browser_server(
|
||||
.to_string();
|
||||
let helper_client = Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.pool_max_idle_per_host(0)
|
||||
.build()
|
||||
.unwrap();
|
||||
let helper_html = helper_client
|
||||
@@ -213,14 +214,18 @@ fn start_callback_host_hotlist_browser_server(
|
||||
let mut saw_eval = false;
|
||||
|
||||
while Instant::now() < deadline {
|
||||
let envelope: Value = helper_client
|
||||
let envelope: Value = match helper_client
|
||||
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
|
||||
.send()
|
||||
.unwrap()
|
||||
.error_for_status()
|
||||
.unwrap()
|
||||
.json()
|
||||
.unwrap();
|
||||
.and_then(|response| response.error_for_status())
|
||||
.and_then(|response| response.json())
|
||||
{
|
||||
Ok(envelope) => envelope,
|
||||
Err(_) => {
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
continue;
|
||||
|
||||
@@ -213,6 +213,7 @@ fn start_callback_host_hotlist_browser_server(
|
||||
.to_string();
|
||||
let helper_client = Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.pool_max_idle_per_host(0)
|
||||
.build()
|
||||
.unwrap();
|
||||
let helper_html = helper_client
|
||||
@@ -264,14 +265,18 @@ fn start_callback_host_hotlist_browser_server(
|
||||
let mut saw_eval = false;
|
||||
|
||||
while Instant::now() < deadline {
|
||||
let envelope: Value = helper_client
|
||||
let envelope: Value = match helper_client
|
||||
.get(format!("{helper_origin}/sgclaw/callback/commands/next"))
|
||||
.send()
|
||||
.unwrap()
|
||||
.error_for_status()
|
||||
.unwrap()
|
||||
.json()
|
||||
.unwrap();
|
||||
.and_then(|response| response.error_for_status())
|
||||
.and_then(|response| response.json())
|
||||
{
|
||||
Ok(envelope) => envelope,
|
||||
Err(_) => {
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let Some(command) = envelope.get("command").and_then(Value::as_object) else {
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
continue;
|
||||
@@ -737,7 +742,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() {
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
client.stdin.as_mut().unwrap().write_all(" \n".as_bytes()).unwrap();
|
||||
client.stdin.as_mut().unwrap().write_all("你好\n".as_bytes()).unwrap();
|
||||
let client_output = client.wait_with_output().unwrap();
|
||||
|
||||
assert!(
|
||||
@@ -747,7 +752,7 @@ fn service_binary_survives_real_client_disconnect_after_task_complete() {
|
||||
String::from_utf8_lossy(&client_output.stderr)
|
||||
);
|
||||
assert!(
|
||||
String::from_utf8_lossy(&client_output.stdout).contains("请输入任务内容。"),
|
||||
String::from_utf8_lossy(&client_output.stdout).contains("任务执行失败:"),
|
||||
"client did not receive TaskComplete summary: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(&client_output.stdout),
|
||||
String::from_utf8_lossy(&client_output.stderr)
|
||||
@@ -1282,6 +1287,105 @@ fn service_binary_accepts_connect_request_without_starting_browser_task() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_binary_survives_client_disconnect_during_task_completion_send() {
|
||||
let service_listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let service_addr = service_listener.local_addr().unwrap();
|
||||
drop(service_listener);
|
||||
|
||||
let root = std::env::temp_dir().join(format!("sgclaw-service-disconnect-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&root).unwrap();
|
||||
let config_path = root.join("sgclaw_config.json");
|
||||
std::fs::write(
|
||||
&config_path,
|
||||
format!(
|
||||
r#"{{
|
||||
"apiKey": "sk-runtime",
|
||||
"baseUrl": "https://api.deepseek.com",
|
||||
"model": "deepseek-chat",
|
||||
"browserWsUrl": "ws://127.0.0.1:12345",
|
||||
"serviceWsListenAddr": "{service_addr}"
|
||||
}}"#
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut service = std::process::Command::new(
|
||||
std::env::var("CARGO_BIN_EXE_sg_claw").expect("sg_claw test binary path"),
|
||||
)
|
||||
.env("SGCLAW_DISABLE_POST_EXPORT_OPEN", "1")
|
||||
.arg("--config-path")
|
||||
.arg(&config_path)
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.unwrap();
|
||||
|
||||
let ws_url = format!("ws://{service_addr}");
|
||||
let connect_deadline = Instant::now() + Duration::from_secs(2);
|
||||
let mut websocket = None;
|
||||
while Instant::now() < connect_deadline {
|
||||
match connect(ws_url.as_str()) {
|
||||
Ok((socket, _)) => {
|
||||
websocket = Some(socket);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
if service.try_wait().unwrap().is_some() {
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut websocket = websocket.expect("service ws listener never became available");
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
serde_json::to_string(&ClientMessage::SubmitTask {
|
||||
instruction: "你好".to_string(),
|
||||
conversation_id: String::new(),
|
||||
messages: vec![],
|
||||
page_url: String::new(),
|
||||
page_title: String::new(),
|
||||
})
|
||||
.unwrap()
|
||||
.into(),
|
||||
))
|
||||
.unwrap();
|
||||
drop(websocket);
|
||||
|
||||
let exit_deadline = Instant::now() + Duration::from_secs(1);
|
||||
let mut service_status = None;
|
||||
while Instant::now() < exit_deadline {
|
||||
if let Some(status) = service.try_wait().unwrap() {
|
||||
service_status = Some(status);
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
if service_status.is_none() {
|
||||
service.kill().unwrap();
|
||||
let _ = service.wait();
|
||||
}
|
||||
|
||||
let stderr = service
|
||||
.stderr
|
||||
.take()
|
||||
.map(|mut stream| {
|
||||
let mut buf = Vec::new();
|
||||
use std::io::Read;
|
||||
let _ = stream.read_to_end(&mut buf);
|
||||
String::from_utf8_lossy(&buf).into_owned()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
assert!(
|
||||
service_status.is_none(),
|
||||
"sg_claw exited after client disconnected mid-task; stderr={stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn submit_task_client_message_converts_into_shared_runner_request() {
|
||||
let message = ClientMessage::SubmitTask {
|
||||
@@ -1333,6 +1437,28 @@ fn lifecycle_client_messages_round_trip_with_stable_tags() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_messages_round_trip_with_stable_tags() {
|
||||
let cases = [
|
||||
(
|
||||
ServiceMessage::StatusChanged {
|
||||
state: "started".to_string(),
|
||||
},
|
||||
r#"{"type":"status_changed","state":"started"}"#,
|
||||
),
|
||||
(
|
||||
ServiceMessage::Pong,
|
||||
r#"{"type":"pong"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
for (message, raw) in cases {
|
||||
assert_eq!(serde_json::to_string(&message).unwrap(), raw);
|
||||
let decoded: ServiceMessage = serde_json::from_str(raw).unwrap();
|
||||
assert_eq!(decoded, message);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_event_sink_maps_log_completion_and_status_messages() {
|
||||
let sink = ServiceEventSink::default();
|
||||
|
||||
Reference in New Issue
Block a user