use std::net::TcpListener; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use tungstenite::{accept, Message}; #[path = "../src/browser/ws_probe.rs"] mod ws_probe; use ws_probe::{ parse_probe_args, run_probe_script, ProbeCliConfig, ProbeOutcome, ProbeStep, ProbeStepResult, }; #[derive(Clone)] enum ServerStep { ReceiveThenReply { expected: String, reply: String }, ReceiveThenReplyFrames { expected: String, replies: Vec }, ReceiveThenStaySilent { expected: String }, ReceiveThenClose { expected: String }, CloseBeforeReceive, } fn spawn_fake_server(script: Vec) -> (String, Arc>>, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = listener.local_addr().unwrap(); let received = Arc::new(Mutex::new(Vec::new())); let received_for_thread = received.clone(); let handle = thread::spawn(move || { let (stream, _) = listener.accept().unwrap(); let mut socket = accept(stream).unwrap(); for step in script { match step { ServerStep::CloseBeforeReceive => { socket.close(None).unwrap(); return; } ServerStep::ReceiveThenReply { expected, reply } => { let message = socket.read().unwrap(); let payload = match message { Message::Text(text) => text.to_string(), other => panic!("expected text frame, got {other:?}"), }; received_for_thread.lock().unwrap().push(payload.clone()); assert_eq!(payload, expected); socket.send(Message::Text(reply.into())).unwrap(); } ServerStep::ReceiveThenReplyFrames { expected, replies } => { let message = socket.read().unwrap(); let payload = match message { Message::Text(text) => text.to_string(), other => panic!("expected text frame, got {other:?}"), }; received_for_thread.lock().unwrap().push(payload.clone()); assert_eq!(payload, expected); for reply in replies { socket.send(Message::Text(reply.into())).unwrap(); } } ServerStep::ReceiveThenStaySilent { expected } => { let message = socket.read().unwrap(); let payload = match message { Message::Text(text) => text.to_string(), other => panic!("expected text frame, got {other:?}"), }; received_for_thread.lock().unwrap().push(payload.clone()); assert_eq!(payload, expected); thread::sleep(Duration::from_millis(120)); } ServerStep::ReceiveThenClose { expected } => { let message = socket.read().unwrap(); let payload = match message { Message::Text(text) => text.to_string(), other => panic!("expected text frame, got {other:?}"), }; received_for_thread.lock().unwrap().push(payload.clone()); assert_eq!(payload, expected); socket.close(None).unwrap(); return; } } } }); (format!("ws://{addr}"), received, handle) } #[test] fn parse_probe_args_rejects_non_ws_schemes() { let cases = [ "wss://127.0.0.1:12345", "http://127.0.0.1:12345", "127.0.0.1:12345", ]; for ws_url in cases { let args = vec![ "--ws-url".to_string(), ws_url.to_string(), "--timeout-ms".to_string(), "1500".to_string(), "--step".to_string(), "open-agent::[\"about:blank\",\"sgOpenAgent\"]".to_string(), ]; let err = parse_probe_args(&args).unwrap_err(); assert_eq!( err.to_string(), format!( "probe argument error: unsupported --ws-url scheme (only ws:// is supported for this probe): {ws_url}" ) ); } } #[test] fn parse_probe_args_accepts_ws_url_timeout_and_ordered_steps() { let args = vec![ "--ws-url".to_string(), "ws://127.0.0.1:12345".to_string(), "--timeout-ms".to_string(), "1500".to_string(), "--step".to_string(), "open-agent::[\"about:blank\",\"sgOpenAgent\"]".to_string(), "--step".to_string(), "open-hot::[\"about:blank\",\"sgBrowerserOpenPage\",\"https://www.zhihu.com/hot\"]" .to_string(), ]; let parsed = parse_probe_args(&args).unwrap(); assert_eq!( parsed, ProbeCliConfig { ws_url: "ws://127.0.0.1:12345".to_string(), timeout_ms: 1500, steps: vec![ ProbeStep { label: "open-agent".to_string(), payload: "[\"about:blank\",\"sgOpenAgent\"]".to_string(), expect_reply: true, }, ProbeStep { label: "open-hot".to_string(), payload: "[\"about:blank\",\"sgBrowerserOpenPage\",\"https://www.zhihu.com/hot\"]" .to_string(), expect_reply: true, }, ], } ); } #[test] fn parse_probe_args_defaults_register_step_when_step_is_omitted() { let args = vec![ "--ws-url".to_string(), "ws://127.0.0.1:12345".to_string(), ]; let parsed = parse_probe_args(&args).unwrap(); assert_eq!(parsed.ws_url, "ws://127.0.0.1:12345"); assert_eq!(parsed.timeout_ms, 1500); assert_eq!( parsed.steps, vec![ProbeStep { label: "register".to_string(), payload: r#"{"type":"register","role":"web"}"#.to_string(), expect_reply: true, }] ); } #[test] fn parse_probe_args_defaults_timeout_when_flag_is_omitted() { let args = vec![ "--ws-url".to_string(), "ws://127.0.0.1:12345".to_string(), "--step".to_string(), "open-agent::[\"about:blank\",\"sgOpenAgent\"]".to_string(), ]; let parsed = parse_probe_args(&args).unwrap(); assert_eq!(parsed.ws_url, "ws://127.0.0.1:12345"); assert_eq!(parsed.timeout_ms, 1500); assert_eq!( parsed.steps, vec![ProbeStep { label: "open-agent".to_string(), payload: "[\"about:blank\",\"sgOpenAgent\"]".to_string(), expect_reply: true, }] ); } #[test] fn probe_records_welcome_then_silence_transcript() { let steps = vec![ ProbeStep { label: "open-agent".to_string(), payload: r#"["about:blank","sgOpenAgent"]"#.to_string(), expect_reply: true, }, ProbeStep { label: "await-followup".to_string(), payload: r#"["about:blank","sgNoop"]"#.to_string(), expect_reply: true, }, ]; let (ws_url, received, handle) = spawn_fake_server(vec![ ServerStep::ReceiveThenReply { expected: steps[0].payload.clone(), reply: "Welcome! You are client #1".to_string(), }, ServerStep::ReceiveThenStaySilent { expected: steps[1].payload.clone(), }, ]); let results = run_probe_script(&ws_url, Duration::from_millis(40), steps.clone()).unwrap(); assert_eq!( received.lock().unwrap().clone(), steps.iter().map(|step| step.payload.clone()).collect::>() ); assert_eq!( results, vec![ ProbeStepResult { label: "open-agent".to_string(), sent: r#"["about:blank","sgOpenAgent"]"#.to_string(), outcome: ProbeOutcome::Received(vec!["Welcome! You are client #1".to_string()]), }, ProbeStepResult { label: "await-followup".to_string(), sent: r#"["about:blank","sgNoop"]"#.to_string(), outcome: ProbeOutcome::TimedOut, }, ] ); handle.join().unwrap(); } #[test] fn probe_runs_ordered_frame_script_and_records_per_step_results() { let steps = vec![ ProbeStep { label: "bootstrap-1".to_string(), payload: r#"["about:blank","sgOpenAgent"]"#.to_string(), expect_reply: true, }, ProbeStep { label: "bootstrap-2".to_string(), payload: r#"["about:blank","sgSetAuthInfo","probe-user","probe-token"]"#.to_string(), expect_reply: true, }, ProbeStep { label: "action".to_string(), payload: r#"["about:blank","sgBrowerserOpenPage","https://www.zhihu.com/hot"]"#.to_string(), expect_reply: true, }, ]; let (ws_url, received, handle) = spawn_fake_server(vec![ ServerStep::ReceiveThenReply { expected: steps[0].payload.clone(), reply: "welcome".to_string(), }, ServerStep::ReceiveThenReply { expected: steps[1].payload.clone(), reply: "0".to_string(), }, ServerStep::ReceiveThenStaySilent { expected: steps[2].payload.clone(), }, ]); let results = run_probe_script(&ws_url, Duration::from_millis(40), steps.clone()).unwrap(); assert_eq!( received.lock().unwrap().clone(), steps.iter().map(|step| step.payload.clone()).collect::>() ); assert_eq!(results.len(), 3); assert_eq!(results[0].label, "bootstrap-1"); assert_eq!(results[0].outcome, ProbeOutcome::Received(vec!["welcome".to_string()])); assert_eq!(results[1].label, "bootstrap-2"); assert_eq!(results[1].outcome, ProbeOutcome::Received(vec!["0".to_string()])); assert_eq!(results[2].label, "action"); assert_eq!(results[2].sent, r#"["about:blank","sgBrowerserOpenPage","https://www.zhihu.com/hot"]"#); assert_eq!(results[2].outcome, ProbeOutcome::TimedOut); handle.join().unwrap(); } #[test] fn probe_records_multiple_frames_for_one_step_within_timeout_window() { let steps = vec![ProbeStep { label: "bootstrap".to_string(), payload: r#"["about:blank","sgOpenAgent"]"#.to_string(), expect_reply: true, }]; let (ws_url, received, handle) = spawn_fake_server(vec![ServerStep::ReceiveThenReplyFrames { expected: steps[0].payload.clone(), replies: vec!["welcome".to_string(), "status:ready".to_string()], }]); let results = run_probe_script(&ws_url, Duration::from_millis(40), steps.clone()).unwrap(); assert_eq!(received.lock().unwrap().as_slice(), [steps[0].payload.as_str()]); assert_eq!( results, vec![ProbeStepResult { label: "bootstrap".to_string(), sent: r#"["about:blank","sgOpenAgent"]"#.to_string(), outcome: ProbeOutcome::Received(vec![ "welcome".to_string(), "status:ready".to_string(), ]), }] ); handle.join().unwrap(); } #[test] fn probe_records_steps_that_do_not_wait_for_reply_without_ambiguity() { let steps = vec![ProbeStep { label: "fire-and-forget".to_string(), payload: r#"["about:blank","sgNoop"]"#.to_string(), expect_reply: false, }]; let (ws_url, received, handle) = spawn_fake_server(vec![ServerStep::ReceiveThenStaySilent { expected: steps[0].payload.clone(), }]); let results = run_probe_script(&ws_url, Duration::from_millis(40), steps.clone()).unwrap(); handle.join().unwrap(); assert_eq!(received.lock().unwrap().as_slice(), [steps[0].payload.as_str()]); assert_eq!( results, vec![ProbeStepResult { label: "fire-and-forget".to_string(), sent: r#"["about:blank","sgNoop"]"#.to_string(), outcome: ProbeOutcome::NoReplyExpected, }] ); } #[test] fn probe_records_close_when_server_closes_before_next_send() { let steps = vec![ ProbeStep { label: "open-agent".to_string(), payload: r#"["about:blank","sgOpenAgent"]"#.to_string(), expect_reply: true, }, ProbeStep { label: "follow-up".to_string(), payload: r#"["about:blank","sgNoop"]"#.to_string(), expect_reply: true, }, ]; let (ws_url, received, handle) = spawn_fake_server(vec![ ServerStep::ReceiveThenReply { expected: steps[0].payload.clone(), reply: "welcome".to_string(), }, ServerStep::CloseBeforeReceive, ]); let results = run_probe_script(&ws_url, Duration::from_millis(40), steps.clone()).unwrap(); assert_eq!(received.lock().unwrap().as_slice(), [steps[0].payload.as_str()]); assert_eq!( results, vec![ ProbeStepResult { label: "open-agent".to_string(), sent: r#"["about:blank","sgOpenAgent"]"#.to_string(), outcome: ProbeOutcome::Received(vec!["welcome".to_string()]), }, ProbeStepResult { label: "follow-up".to_string(), sent: r#"["about:blank","sgNoop"]"#.to_string(), outcome: ProbeOutcome::Closed, }, ] ); handle.join().unwrap(); } #[test] fn probe_reports_socket_close_separately_from_timeout() { let step = ProbeStep { label: "close-case".to_string(), payload: r#"["about:blank","sgOpenAgent"]"#.to_string(), expect_reply: true, }; let (ws_url, received, handle) = spawn_fake_server(vec![ServerStep::ReceiveThenClose { expected: step.payload.clone(), }]); let results = run_probe_script(&ws_url, Duration::from_millis(40), vec![step]).unwrap(); assert_eq!(received.lock().unwrap().as_slice(), [r#"["about:blank","sgOpenAgent"]"#]); assert_eq!(results.len(), 1); assert_eq!(results[0].label, "close-case"); assert_eq!(results[0].outcome, ProbeOutcome::Closed); handle.join().unwrap(); }