Files
claw/src/browser/ws_probe.rs
木炎 3e18350320 feat: add websocket browser service runtime
Wire the service/browser runtime onto the websocket-driven execution path and add the new browser/service modules needed for the submit flow and runtime integration.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 23:42:27 +08:00

308 lines
9.4 KiB
Rust

use std::net::TcpStream;
use std::time::Duration;
use thiserror::Error;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{connect, Message, WebSocket};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeStep {
pub label: String,
pub payload: String,
pub expect_reply: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeOutcome {
Received(Vec<String>),
NoReplyExpected,
TimedOut,
Closed,
ConnectFailed(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeStepResult {
pub label: String,
pub sent: String,
pub outcome: ProbeOutcome,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeCliConfig {
pub ws_url: String,
pub timeout_ms: u64,
pub steps: Vec<ProbeStep>,
}
const DEFAULT_TIMEOUT_MS: u64 = 1500;
const DEFAULT_REGISTER_STEP_LABEL: &str = "register";
const DEFAULT_REGISTER_STEP_PAYLOAD: &str = r#"{"type":"register","role":"web"}"#;
#[derive(Debug, Error)]
pub enum ProbeError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("probe timeout while waiting for websocket frame")]
Timeout,
#[error("probe websocket closed")]
Closed,
#[error("probe protocol error: {0}")]
Protocol(String),
#[error("probe argument error: {0}")]
Args(String),
}
pub fn parse_probe_args(args: &[String]) -> Result<ProbeCliConfig, ProbeError> {
let mut ws_url = None;
let mut timeout_ms = None;
let mut steps = Vec::new();
let mut index = 0;
while index < args.len() {
match args[index].as_str() {
"--ws-url" => {
index += 1;
let value = args
.get(index)
.ok_or_else(|| ProbeError::Args("missing value for --ws-url".to_string()))?;
ws_url = Some(value.clone());
}
"--timeout-ms" => {
index += 1;
let value = args.get(index).ok_or_else(|| {
ProbeError::Args("missing value for --timeout-ms".to_string())
})?;
let parsed = value.parse::<u64>().map_err(|_| {
ProbeError::Args(format!("invalid --timeout-ms value: {value}"))
})?;
timeout_ms = Some(parsed);
}
"--step" => {
index += 1;
let value = args
.get(index)
.ok_or_else(|| ProbeError::Args("missing value for --step".to_string()))?;
let (label, payload) = value.split_once("::").ok_or_else(|| {
ProbeError::Args(format!(
"invalid --step value (expected <label>::<payload>): {value}"
))
})?;
if label.is_empty() {
return Err(ProbeError::Args("step label must not be empty".to_string()));
}
if payload.is_empty() {
return Err(ProbeError::Args("step payload must not be empty".to_string()));
}
steps.push(ProbeStep {
label: label.to_string(),
payload: payload.to_string(),
expect_reply: true,
});
}
flag => {
return Err(ProbeError::Args(format!("unknown argument: {flag}")));
}
}
index += 1;
}
let ws_url = ws_url.ok_or_else(|| ProbeError::Args("missing required --ws-url".to_string()))?;
validate_ws_url(&ws_url)?;
let timeout_ms = timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
if steps.is_empty() {
steps.push(ProbeStep {
label: DEFAULT_REGISTER_STEP_LABEL.to_string(),
payload: DEFAULT_REGISTER_STEP_PAYLOAD.to_string(),
expect_reply: true,
});
}
Ok(ProbeCliConfig {
ws_url,
timeout_ms,
steps,
})
}
fn validate_ws_url(ws_url: &str) -> Result<(), ProbeError> {
if ws_url.starts_with("ws://") {
return Ok(());
}
Err(ProbeError::Args(format!(
"unsupported --ws-url scheme (only ws:// is supported for this probe): {ws_url}"
)))
}
pub fn run_probe_script(
ws_url: &str,
timeout: Duration,
steps: Vec<ProbeStep>,
) -> Result<Vec<ProbeStepResult>, ProbeError> {
let mut socket = match connect(ws_url) {
Ok((socket, _)) => socket,
Err(err) => {
let message = err.to_string();
return Ok(steps
.into_iter()
.map(|step| ProbeStepResult {
label: step.label,
sent: step.payload,
outcome: ProbeOutcome::ConnectFailed(message.clone()),
})
.collect());
}
};
configure_socket_timeout(&mut socket, timeout)?;
let mut results = Vec::with_capacity(steps.len());
for step in steps {
let ProbeStep {
label,
payload,
expect_reply,
} = step;
let send_outcome = match socket.send(Message::Text(payload.clone().into())) {
Ok(()) => None,
Err(err) => Some(map_websocket_error(err, "browser websocket send")),
};
let outcome = match send_outcome {
Some(ProbeError::Timeout) => ProbeOutcome::TimedOut,
Some(ProbeError::Closed) => ProbeOutcome::Closed,
Some(err) => return Err(err),
None if expect_reply => match read_probe_frames(&mut socket) {
Ok(frames) => ProbeOutcome::Received(frames),
Err(ProbeError::Timeout) => ProbeOutcome::TimedOut,
Err(ProbeError::Closed) => ProbeOutcome::Closed,
Err(err) => return Err(err),
},
None => ProbeOutcome::NoReplyExpected,
};
results.push(ProbeStepResult {
label,
sent: payload,
outcome,
});
}
Ok(results)
}
fn configure_socket_timeout(
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
timeout: Duration,
) -> Result<(), ProbeError> {
match websocket.get_mut() {
MaybeTlsStream::Plain(stream) => {
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
Ok(())
}
_ => Ok(()),
}
}
fn read_probe_frames(
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
) -> Result<Vec<String>, ProbeError> {
let first_frame = read_probe_frame(websocket)?;
let mut frames = vec![first_frame];
let Some(original_timeout) = get_plain_read_timeout(websocket)? else {
return Ok(frames);
};
set_plain_read_timeout(websocket, Some(Duration::from_millis(1)))?;
loop {
match read_probe_frame(websocket) {
Ok(frame) => frames.push(frame),
Err(ProbeError::Timeout) | Err(ProbeError::Closed) => break,
Err(err) => {
set_plain_read_timeout(websocket, original_timeout)?;
return Err(err);
}
}
}
set_plain_read_timeout(websocket, original_timeout)?;
Ok(frames)
}
fn get_plain_read_timeout(
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
) -> Result<Option<Option<Duration>>, ProbeError> {
match websocket.get_mut() {
MaybeTlsStream::Plain(stream) => Ok(Some(stream.read_timeout()?)),
_ => Ok(None),
}
}
fn set_plain_read_timeout(
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
timeout: Option<Duration>,
) -> Result<(), ProbeError> {
match websocket.get_mut() {
MaybeTlsStream::Plain(stream) => {
stream.set_read_timeout(timeout)?;
Ok(())
}
_ => Ok(()),
}
}
fn read_probe_frame(
websocket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
) -> Result<String, ProbeError> {
loop {
match websocket.read() {
Ok(Message::Text(text)) => return Ok(text.to_string()),
Ok(Message::Close(_)) => return Err(ProbeError::Closed),
Ok(Message::Ping(payload)) => {
websocket
.send(Message::Pong(payload))
.map_err(|err| map_websocket_error(err, "browser websocket pong"))?;
}
Ok(_) => {}
Err(err) => return Err(map_websocket_error(err, "browser websocket read")),
}
}
}
fn map_websocket_error(err: tungstenite::Error, operation: &str) -> ProbeError {
match err {
tungstenite::Error::ConnectionClosed
| tungstenite::Error::AlreadyClosed
| tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)
| tungstenite::Error::Protocol(tungstenite::error::ProtocolError::SendAfterClosing) => {
ProbeError::Closed
}
tungstenite::Error::Io(io_err)
if matches!(
io_err.kind(),
std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock
) =>
{
ProbeError::Timeout
}
tungstenite::Error::Io(io_err)
if matches!(
io_err.kind(),
std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::UnexpectedEof
) =>
{
ProbeError::Closed
}
tungstenite::Error::Io(io_err) => ProbeError::Io(io_err),
other => ProbeError::Protocol(format!("{operation} failed: {other}")),
}
}