feat: add websocket browser service runtime
Wire the service/browser runtime onto the websocket-driven execution path and add the new browser/service modules needed for the submit flow and runtime integration. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
307
src/browser/ws_probe.rs
Normal file
307
src/browser/ws_probe.rs
Normal file
@@ -0,0 +1,307 @@
|
||||
use std::net::TcpStream;
|
||||
use std::time::Duration;
|
||||
|
||||
use thiserror::Error;
|
||||
use tungstenite::stream::MaybeTlsStream;
|
||||
use tungstenite::{connect, Message, WebSocket};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ProbeStep {
|
||||
pub label: String,
|
||||
pub payload: String,
|
||||
pub expect_reply: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ProbeOutcome {
|
||||
Received(Vec<String>),
|
||||
NoReplyExpected,
|
||||
TimedOut,
|
||||
Closed,
|
||||
ConnectFailed(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ProbeStepResult {
|
||||
pub label: String,
|
||||
pub sent: String,
|
||||
pub outcome: ProbeOutcome,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ProbeCliConfig {
|
||||
pub ws_url: String,
|
||||
pub timeout_ms: u64,
|
||||
pub steps: Vec<ProbeStep>,
|
||||
}
|
||||
|
||||
const DEFAULT_TIMEOUT_MS: u64 = 1500;
|
||||
const DEFAULT_REGISTER_STEP_LABEL: &str = "register";
|
||||
const DEFAULT_REGISTER_STEP_PAYLOAD: &str = r#"{"type":"register","role":"web"}"#;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ProbeError {
|
||||
#[error("io error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("probe timeout while waiting for websocket frame")]
|
||||
Timeout,
|
||||
#[error("probe websocket closed")]
|
||||
Closed,
|
||||
#[error("probe protocol error: {0}")]
|
||||
Protocol(String),
|
||||
#[error("probe argument error: {0}")]
|
||||
Args(String),
|
||||
}
|
||||
|
||||
pub fn parse_probe_args(args: &[String]) -> Result<ProbeCliConfig, ProbeError> {
|
||||
let mut ws_url = None;
|
||||
let mut timeout_ms = None;
|
||||
let mut steps = Vec::new();
|
||||
let mut index = 0;
|
||||
|
||||
while index < args.len() {
|
||||
match args[index].as_str() {
|
||||
"--ws-url" => {
|
||||
index += 1;
|
||||
let value = args
|
||||
.get(index)
|
||||
.ok_or_else(|| ProbeError::Args("missing value for --ws-url".to_string()))?;
|
||||
ws_url = Some(value.clone());
|
||||
}
|
||||
"--timeout-ms" => {
|
||||
index += 1;
|
||||
let value = args.get(index).ok_or_else(|| {
|
||||
ProbeError::Args("missing value for --timeout-ms".to_string())
|
||||
})?;
|
||||
let parsed = value.parse::<u64>().map_err(|_| {
|
||||
ProbeError::Args(format!("invalid --timeout-ms value: {value}"))
|
||||
})?;
|
||||
timeout_ms = Some(parsed);
|
||||
}
|
||||
"--step" => {
|
||||
index += 1;
|
||||
let value = args
|
||||
.get(index)
|
||||
.ok_or_else(|| ProbeError::Args("missing value for --step".to_string()))?;
|
||||
let (label, payload) = value.split_once("::").ok_or_else(|| {
|
||||
ProbeError::Args(format!(
|
||||
"invalid --step value (expected <label>::<payload>): {value}"
|
||||
))
|
||||
})?;
|
||||
if label.is_empty() {
|
||||
return Err(ProbeError::Args("step label must not be empty".to_string()));
|
||||
}
|
||||
if payload.is_empty() {
|
||||
return Err(ProbeError::Args("step payload must not be empty".to_string()));
|
||||
}
|
||||
steps.push(ProbeStep {
|
||||
label: label.to_string(),
|
||||
payload: payload.to_string(),
|
||||
expect_reply: true,
|
||||
});
|
||||
}
|
||||
flag => {
|
||||
return Err(ProbeError::Args(format!("unknown argument: {flag}")));
|
||||
}
|
||||
}
|
||||
index += 1;
|
||||
}
|
||||
|
||||
let ws_url = ws_url.ok_or_else(|| ProbeError::Args("missing required --ws-url".to_string()))?;
|
||||
validate_ws_url(&ws_url)?;
|
||||
let timeout_ms = timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
|
||||
if steps.is_empty() {
|
||||
steps.push(ProbeStep {
|
||||
label: DEFAULT_REGISTER_STEP_LABEL.to_string(),
|
||||
payload: DEFAULT_REGISTER_STEP_PAYLOAD.to_string(),
|
||||
expect_reply: true,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(ProbeCliConfig {
|
||||
ws_url,
|
||||
timeout_ms,
|
||||
steps,
|
||||
})
|
||||
}
|
||||
|
||||
fn validate_ws_url(ws_url: &str) -> Result<(), ProbeError> {
|
||||
if ws_url.starts_with("ws://") {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(ProbeError::Args(format!(
|
||||
"unsupported --ws-url scheme (only ws:// is supported for this probe): {ws_url}"
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn run_probe_script(
|
||||
ws_url: &str,
|
||||
timeout: Duration,
|
||||
steps: Vec<ProbeStep>,
|
||||
) -> Result<Vec<ProbeStepResult>, ProbeError> {
|
||||
let mut socket = match connect(ws_url) {
|
||||
Ok((socket, _)) => socket,
|
||||
Err(err) => {
|
||||
let message = err.to_string();
|
||||
return Ok(steps
|
||||
.into_iter()
|
||||
.map(|step| ProbeStepResult {
|
||||
label: step.label,
|
||||
sent: step.payload,
|
||||
outcome: ProbeOutcome::ConnectFailed(message.clone()),
|
||||
})
|
||||
.collect());
|
||||
}
|
||||
};
|
||||
|
||||
configure_socket_timeout(&mut socket, timeout)?;
|
||||
|
||||
let mut results = Vec::with_capacity(steps.len());
|
||||
for step in steps {
|
||||
let ProbeStep {
|
||||
label,
|
||||
payload,
|
||||
expect_reply,
|
||||
} = step;
|
||||
|
||||
let send_outcome = match socket.send(Message::Text(payload.clone().into())) {
|
||||
Ok(()) => None,
|
||||
Err(err) => Some(map_websocket_error(err, "browser websocket send")),
|
||||
};
|
||||
|
||||
let outcome = match send_outcome {
|
||||
Some(ProbeError::Timeout) => ProbeOutcome::TimedOut,
|
||||
Some(ProbeError::Closed) => ProbeOutcome::Closed,
|
||||
Some(err) => return Err(err),
|
||||
None if expect_reply => match read_probe_frames(&mut socket) {
|
||||
Ok(frames) => ProbeOutcome::Received(frames),
|
||||
Err(ProbeError::Timeout) => ProbeOutcome::TimedOut,
|
||||
Err(ProbeError::Closed) => ProbeOutcome::Closed,
|
||||
Err(err) => return Err(err),
|
||||
},
|
||||
None => ProbeOutcome::NoReplyExpected,
|
||||
};
|
||||
|
||||
results.push(ProbeStepResult {
|
||||
label,
|
||||
sent: payload,
|
||||
outcome,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn configure_socket_timeout(
|
||||
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
timeout: Duration,
|
||||
) -> Result<(), ProbeError> {
|
||||
match websocket.get_mut() {
|
||||
MaybeTlsStream::Plain(stream) => {
|
||||
stream.set_read_timeout(Some(timeout))?;
|
||||
stream.set_write_timeout(Some(timeout))?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_probe_frames(
|
||||
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
) -> Result<Vec<String>, ProbeError> {
|
||||
let first_frame = read_probe_frame(websocket)?;
|
||||
let mut frames = vec![first_frame];
|
||||
|
||||
let Some(original_timeout) = get_plain_read_timeout(websocket)? else {
|
||||
return Ok(frames);
|
||||
};
|
||||
|
||||
set_plain_read_timeout(websocket, Some(Duration::from_millis(1)))?;
|
||||
|
||||
loop {
|
||||
match read_probe_frame(websocket) {
|
||||
Ok(frame) => frames.push(frame),
|
||||
Err(ProbeError::Timeout) | Err(ProbeError::Closed) => break,
|
||||
Err(err) => {
|
||||
set_plain_read_timeout(websocket, original_timeout)?;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
set_plain_read_timeout(websocket, original_timeout)?;
|
||||
Ok(frames)
|
||||
}
|
||||
|
||||
fn get_plain_read_timeout(
|
||||
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
) -> Result<Option<Option<Duration>>, ProbeError> {
|
||||
match websocket.get_mut() {
|
||||
MaybeTlsStream::Plain(stream) => Ok(Some(stream.read_timeout()?)),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn set_plain_read_timeout(
|
||||
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<(), ProbeError> {
|
||||
match websocket.get_mut() {
|
||||
MaybeTlsStream::Plain(stream) => {
|
||||
stream.set_read_timeout(timeout)?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_probe_frame(
|
||||
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
||||
) -> Result<String, ProbeError> {
|
||||
loop {
|
||||
match websocket.read() {
|
||||
Ok(Message::Text(text)) => return Ok(text.to_string()),
|
||||
Ok(Message::Close(_)) => return Err(ProbeError::Closed),
|
||||
Ok(Message::Ping(payload)) => {
|
||||
websocket
|
||||
.send(Message::Pong(payload))
|
||||
.map_err(|err| map_websocket_error(err, "browser websocket pong"))?;
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => return Err(map_websocket_error(err, "browser websocket read")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn map_websocket_error(err: tungstenite::Error, operation: &str) -> ProbeError {
|
||||
match err {
|
||||
tungstenite::Error::ConnectionClosed
|
||||
| tungstenite::Error::AlreadyClosed
|
||||
| tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)
|
||||
| tungstenite::Error::Protocol(tungstenite::error::ProtocolError::SendAfterClosing) => {
|
||||
ProbeError::Closed
|
||||
}
|
||||
tungstenite::Error::Io(io_err)
|
||||
if matches!(
|
||||
io_err.kind(),
|
||||
std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock
|
||||
) =>
|
||||
{
|
||||
ProbeError::Timeout
|
||||
}
|
||||
tungstenite::Error::Io(io_err)
|
||||
if matches!(
|
||||
io_err.kind(),
|
||||
std::io::ErrorKind::ConnectionAborted
|
||||
| std::io::ErrorKind::ConnectionReset
|
||||
| std::io::ErrorKind::BrokenPipe
|
||||
| std::io::ErrorKind::UnexpectedEof
|
||||
) =>
|
||||
{
|
||||
ProbeError::Closed
|
||||
}
|
||||
tungstenite::Error::Io(io_err) => ProbeError::Io(io_err),
|
||||
other => ProbeError::Protocol(format!("{operation} failed: {other}")),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user