use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use serde_json::Value; use crate::pipe::protocol::{Action, AgentMessage, BrowserMessage, SecurityFields, Timing}; use crate::pipe::{PipeError, Transport}; use crate::security::{sign_command, MacPolicy}; #[derive(Debug, Clone, PartialEq)] pub struct CommandOutput { pub seq: u64, pub success: bool, pub data: Value, pub aom_snapshot: Vec, pub timing: Timing, } pub struct BrowserPipeTool { transport: Arc, mac_policy: MacPolicy, session_key: Vec, next_seq: AtomicU64, response_timeout: Duration, } impl BrowserPipeTool { pub fn new(transport: Arc, mac_policy: MacPolicy, session_key: Vec) -> Self { Self { transport, mac_policy, session_key, next_seq: AtomicU64::new(1), response_timeout: Duration::from_secs(30), } } pub fn with_response_timeout(mut self, response_timeout: Duration) -> Self { self.response_timeout = response_timeout; self } pub fn invoke( &self, action: Action, params: Value, expected_domain: &str, ) -> Result { self.mac_policy.validate(&action, expected_domain)?; let seq = self.next_seq.fetch_add(1, Ordering::Relaxed); let hmac = sign_command(&self.session_key, seq, &action, ¶ms, expected_domain)?; let command = AgentMessage::Command { seq, action, params, security: SecurityFields { expected_domain: expected_domain.to_string(), hmac, }, }; self.transport.send(&command)?; let started = Instant::now(); loop { let Some(remaining) = self.response_timeout.checked_sub(started.elapsed()) else { return Err(PipeError::Timeout); }; match self.transport.recv_timeout(remaining)? { BrowserMessage::Response { seq: response_seq, success, data, aom_snapshot, timing, } if response_seq == seq => { return Ok(CommandOutput { seq: response_seq, success, data, aom_snapshot, timing, }); } BrowserMessage::Response { seq: response_seq, .. } => { return Err(PipeError::Protocol(format!( "received response seq {response_seq} while waiting for {seq}" ))); } BrowserMessage::Init { .. } => { return Err(PipeError::UnexpectedMessage( "received duplicate init after handshake".to_string(), )); } BrowserMessage::SubmitTask { .. } => { return Err(PipeError::UnexpectedMessage( "received submit_task while waiting for response".to_string(), )); } } } } }