use std::path::Path; use std::sync::Arc; use async_trait::async_trait; use futures_util::{stream, StreamExt}; use zeroclaw::agent::dispatcher::NativeToolDispatcher; use zeroclaw::agent::{Agent, TurnEvent}; use zeroclaw::config::Config as ZeroClawConfig; use zeroclaw::observability::{NoopObserver, Observer}; use zeroclaw::providers::{ self, ChatMessage, ChatRequest, ChatResponse, Provider, }; use zeroclaw::providers::traits::{ ProviderCapabilities, StreamEvent, StreamOptions, StreamResult, }; use crate::compat::browser_tool_adapter::{ZeroClawBrowserTool, BROWSER_ACTION_TOOL_NAME}; use crate::compat::config_adapter::build_zeroclaw_config_from_settings; use crate::config::DeepSeekSettings; use crate::compat::event_bridge::log_entry_for_turn_event; use crate::compat::memory_adapter::build_memory; use crate::pipe::{BrowserPipeTool, ConversationMessage, PipeError, Transport}; #[derive(Debug, Clone, Default)] pub struct CompatTaskContext { pub conversation_id: Option, pub messages: Vec, pub page_url: Option, pub page_title: Option, } pub fn execute_task( transport: &T, browser_tool: BrowserPipeTool, instruction: &str, task_context: &CompatTaskContext, workspace_root: &Path, settings: &DeepSeekSettings, ) -> Result { let config = build_zeroclaw_config_from_settings(workspace_root, settings); let provider = build_provider(&config)?; let runtime = tokio::runtime::Runtime::new() .map_err(|err| PipeError::Protocol(format!("failed to create tokio runtime: {err}")))?; runtime.block_on(execute_task_with_provider( transport, browser_tool, provider, instruction, task_context, config, )) } pub async fn execute_task_with_provider( transport: &T, browser_tool: BrowserPipeTool, provider: Box, instruction: &str, task_context: &CompatTaskContext, config: ZeroClawConfig, ) -> Result { let mut agent = build_agent(browser_tool, provider, &config)?; if let Some(conversation_id) = task_context .conversation_id .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) { agent.set_memory_session_id(Some(conversation_id.to_string())); } let seed_messages = build_seed_history(task_context); if !seed_messages.is_empty() { agent.seed_history(&seed_messages); } let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::(32); let instruction = instruction.to_string(); let task = tokio::spawn(async move { agent.turn_streamed(&instruction, event_tx).await }); while let Some(event) = event_rx.recv().await { if let Some(log_entry) = log_entry_for_turn_event(&event) { transport.send(&log_entry)?; } } task.await .map_err(|err| PipeError::Protocol(format!("zeroclaw task join failed: {err}")))? .map_err(|err| PipeError::Protocol(err.to_string())) } fn build_agent( browser_tool: BrowserPipeTool, provider: Box, config: &ZeroClawConfig, ) -> Result { let memory = build_memory(config).map_err(map_anyhow_to_pipe_error)?; let observer: Arc = Arc::new(NoopObserver); let tools: Vec> = vec![Box::new(ZeroClawBrowserTool::new(browser_tool))]; Agent::builder() .provider(provider) .tools(tools) .memory(Arc::from(memory)) .observer(observer) .tool_dispatcher(Box::new(NativeToolDispatcher)) .config(config.agent.clone()) .model_name( config .default_model .clone() .unwrap_or_else(|| "deepseek-chat".to_string()), ) .temperature(config.default_temperature) .workspace_dir(config.workspace_dir.clone()) .allowed_tools(Some(vec![BROWSER_ACTION_TOOL_NAME.to_string()])) .build() .map_err(map_anyhow_to_pipe_error) } fn build_provider(config: &ZeroClawConfig) -> Result, PipeError> { let provider_name = config.default_provider.as_deref().unwrap_or("deepseek"); let model_name = config .default_model .as_deref() .unwrap_or("deepseek-chat"); let runtime_options = providers::provider_runtime_options_from_config(config); let resolved_provider_name = if provider_name == "deepseek" { config .api_url .as_deref() .map(str::trim) .filter(|url| !url.is_empty()) .map(|url| format!("custom:{url}")) .unwrap_or_else(|| provider_name.to_string()) } else { provider_name.to_string() }; let provider = providers::create_routed_provider_with_options( &resolved_provider_name, config.api_key.as_deref(), config.api_url.as_deref(), &config.reliability, &config.model_routes, model_name, &runtime_options, ) .map_err(map_anyhow_to_pipe_error)?; Ok(Box::new(NonStreamingProvider::new(provider))) } fn map_anyhow_to_pipe_error(err: anyhow::Error) -> PipeError { PipeError::Protocol(err.to_string()) } struct NonStreamingProvider { inner: Box, } impl NonStreamingProvider { fn new(inner: Box) -> Self { Self { inner } } } #[async_trait] impl Provider for NonStreamingProvider { fn capabilities(&self) -> ProviderCapabilities { self.inner.capabilities() } async fn chat_with_system( &self, system_prompt: Option<&str>, message: &str, model: &str, temperature: f64, ) -> anyhow::Result { self.inner .chat_with_system(system_prompt, message, model, temperature) .await } async fn chat_with_history( &self, messages: &[ChatMessage], model: &str, temperature: f64, ) -> anyhow::Result { self.inner.chat_with_history(messages, model, temperature).await } async fn chat( &self, request: ChatRequest<'_>, model: &str, temperature: f64, ) -> anyhow::Result { self.inner.chat(request, model, temperature).await } fn supports_streaming(&self) -> bool { false } fn supports_streaming_tool_events(&self) -> bool { false } fn stream_chat( &self, _request: ChatRequest<'_>, _model: &str, _temperature: f64, _options: StreamOptions, ) -> stream::BoxStream<'static, StreamResult> { stream::empty().boxed() } } fn build_seed_history(task_context: &CompatTaskContext) -> Vec { task_context .messages .iter() .filter_map(to_chat_message) .collect() } fn to_chat_message(message: &ConversationMessage) -> Option { let content = message.content.trim(); if content.is_empty() { return None; } match message.role.as_str() { "user" => Some(ChatMessage::user(content)), "assistant" => Some(ChatMessage::assistant(content)), "system" => Some(ChatMessage::system(content)), _ => None, } }