feat: refactor sgclaw around zeroclaw compat runtime

This commit is contained in:
zyl
2026-03-26 16:23:31 +08:00
parent bca5b75801
commit ff0771a83f
1059 changed files with 409460 additions and 23 deletions

View File

@@ -1,7 +1,9 @@
pub mod planner;
pub mod runtime;
use crate::llm::DeepSeekProvider;
use std::path::PathBuf;
use crate::config::DeepSeekSettings;
use crate::pipe::{AgentMessage, BrowserMessage, BrowserPipeTool, PipeError, Transport};
pub fn execute_task<T: Transport>(
@@ -34,19 +36,19 @@ pub fn execute_task<T: Transport>(
Ok(plan.summary)
}
pub fn handle_browser_message<T: Transport>(
pub fn handle_browser_message<T: Transport + 'static>(
transport: &T,
browser_tool: &BrowserPipeTool<T>,
message: BrowserMessage,
) -> Result<(), PipeError> {
match message {
BrowserMessage::SubmitTask { instruction } => {
let completion = match DeepSeekProvider::from_env() {
Ok(provider) => match runtime::execute_task_with_provider(
let completion = match DeepSeekSettings::from_env() {
Ok(_) => match crate::compat::runtime::execute_task(
transport,
browser_tool,
&provider,
browser_tool.clone(),
&instruction,
&std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
) {
Ok(summary) => AgentMessage::TaskComplete {
success: true,

View File

@@ -1,3 +1,4 @@
use reqwest::Url;
use serde_json::{json, Value};
use thiserror::Error;
@@ -7,6 +8,8 @@ const BAIDU_URL: &str = "https://www.baidu.com";
const BAIDU_DOMAIN: &str = "www.baidu.com";
const BAIDU_INPUT_SELECTOR: &str = "#kw";
const BAIDU_SEARCH_BUTTON_SELECTOR: &str = "#su";
const ZHIHU_URL: &str = "https://www.zhihu.com/search";
const ZHIHU_DOMAIN: &str = "www.zhihu.com";
#[derive(Debug, Clone, PartialEq)]
pub struct PlannedStep {
@@ -32,17 +35,38 @@ pub enum PlannerError {
pub fn plan_instruction(instruction: &str) -> Result<TaskPlan, PlannerError> {
let trimmed = instruction.trim();
let query = trimmed
.strip_prefix("打开百度搜索")
.or_else(|| trimmed.strip_prefix("打开百度并搜索"))
.ok_or_else(|| PlannerError::UnsupportedInstruction(trimmed.to_string()))?
.trim();
if let Some(query) = extract_query(trimmed, &["打开百度搜索", "打开百度并搜索"])? {
return Ok(plan_baidu_search(query));
}
if let Some(query) = extract_query(trimmed, &["打开知乎搜索", "打开知乎并搜索"])? {
return Ok(plan_zhihu_search(query));
}
Err(PlannerError::UnsupportedInstruction(trimmed.to_string()))
}
fn extract_query<'a>(
instruction: &'a str,
prefixes: &[&str],
) -> Result<Option<&'a str>, PlannerError> {
let Some(query) = prefixes
.iter()
.find_map(|prefix| instruction.strip_prefix(prefix))
else {
return Ok(None);
};
let query = query.trim();
if query.is_empty() {
return Err(PlannerError::MissingQuery);
}
Ok(TaskPlan {
Ok(Some(query))
}
fn plan_baidu_search(query: &str) -> TaskPlan {
TaskPlan {
summary: format!("已在百度搜索{query}"),
steps: vec![
PlannedStep {
@@ -68,5 +92,21 @@ pub fn plan_instruction(instruction: &str) -> Result<TaskPlan, PlannerError> {
log_message: format!("click {BAIDU_SEARCH_BUTTON_SELECTOR}"),
},
],
})
}
}
fn plan_zhihu_search(query: &str) -> TaskPlan {
let url = Url::parse_with_params(ZHIHU_URL, &[("type", "content"), ("q", query)])
.expect("valid Zhihu search URL");
let url: String = url.into();
TaskPlan {
summary: format!("已在知乎搜索{query}"),
steps: vec![PlannedStep {
action: Action::Navigate,
params: json!({ "url": url }),
expected_domain: ZHIHU_DOMAIN.to_string(),
log_message: format!("navigate {url}"),
}],
}
}

View File

@@ -0,0 +1,156 @@
use async_trait::async_trait;
use serde_json::{json, Map, Value};
use zeroclaw::tools::{Tool, ToolResult};
use crate::pipe::{Action, BrowserPipeTool, Transport};
pub const BROWSER_ACTION_TOOL_NAME: &str = "browser_action";
pub struct ZeroClawBrowserTool<T: Transport> {
browser_tool: BrowserPipeTool<T>,
}
impl<T: Transport> ZeroClawBrowserTool<T> {
pub fn new(browser_tool: BrowserPipeTool<T>) -> Self {
Self { browser_tool }
}
}
#[async_trait]
impl<T: Transport + 'static> Tool for ZeroClawBrowserTool<T> {
fn name(&self) -> &str {
BROWSER_ACTION_TOOL_NAME
}
fn description(&self) -> &str {
"Execute browser actions in SuperRPA through the existing sgClaw pipe protocol."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"required": ["action", "expected_domain"],
"properties": {
"action": {
"type": "string",
"enum": ["click", "type", "navigate", "getText"]
},
"expected_domain": {
"type": "string"
},
"selector": {
"type": "string"
},
"text": {
"type": "string"
},
"url": {
"type": "string"
},
"clear_first": {
"type": "boolean"
}
}
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let request = match parse_browser_action_request(args) {
Ok(request) => request,
Err(err) => return Ok(failed_tool_result(err.to_string())),
};
let result = match self.browser_tool.invoke(
request.action,
request.params,
&request.expected_domain,
) {
Ok(result) => result,
Err(err) => return Ok(failed_tool_result(err.to_string())),
};
let output = serde_json::to_string(&json!({
"seq": result.seq,
"success": result.success,
"data": result.data,
"aom_snapshot": result.aom_snapshot,
"timing": result.timing
}))?;
Ok(ToolResult {
success: result.success,
output,
error: (!result.success).then(|| "browser action returned success=false".to_string()),
})
}
}
struct BrowserActionRequest {
action: Action,
expected_domain: String,
params: Value,
}
fn parse_browser_action_request(args: Value) -> Result<BrowserActionRequest, BrowserActionAdapterError> {
let mut args = match args {
Value::Object(args) => args,
other => {
return Err(BrowserActionAdapterError::InvalidArguments(format!(
"expected object arguments, got {other}"
)))
}
};
let action_name = take_required_string(&mut args, "action")?;
let expected_domain = take_required_string(&mut args, "expected_domain")?;
let action = parse_action(&action_name)?;
Ok(BrowserActionRequest {
action,
expected_domain,
params: Value::Object(args),
})
}
fn parse_action(action_name: &str) -> Result<Action, BrowserActionAdapterError> {
match action_name {
"click" => Ok(Action::Click),
"type" => Ok(Action::Type),
"navigate" => Ok(Action::Navigate),
"getText" => Ok(Action::GetText),
other => Err(BrowserActionAdapterError::UnsupportedAction(
other.to_string(),
)),
}
}
fn take_required_string(
args: &mut Map<String, Value>,
key: &'static str,
) -> Result<String, BrowserActionAdapterError> {
match args.remove(key) {
Some(Value::String(value)) if !value.trim().is_empty() => Ok(value),
Some(other) => Err(BrowserActionAdapterError::InvalidArguments(format!(
"{key} must be a non-empty string, got {other}"
))),
None => Err(BrowserActionAdapterError::MissingField(key)),
}
}
fn failed_tool_result(error: String) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(error),
}
}
#[derive(Debug, thiserror::Error)]
enum BrowserActionAdapterError {
#[error("unsupported action: {0}")]
UnsupportedAction(String),
#[error("missing required field: {0}")]
MissingField(&'static str),
#[error("invalid tool arguments: {0}")]
InvalidArguments(String),
}

View File

@@ -0,0 +1,38 @@
use std::path::{Path, PathBuf};
use zeroclaw::Config as ZeroClawConfig;
use crate::compat::cron_adapter::configure_embedded_cron;
use crate::compat::memory_adapter::configure_embedded_memory;
use crate::config::DeepSeekSettings;
const SGCLAW_ZEROCLAW_WORKSPACE_DIR: &str = ".sgclaw-zeroclaw-workspace";
pub fn build_zeroclaw_config(workspace_root: &Path) -> Result<ZeroClawConfig, crate::config::ConfigError> {
let settings = DeepSeekSettings::from_env()?;
Ok(build_zeroclaw_config_from_settings(
workspace_root,
&settings,
))
}
pub fn build_zeroclaw_config_from_settings(
workspace_root: &Path,
settings: &DeepSeekSettings,
) -> ZeroClawConfig {
let workspace_dir = zeroclaw_workspace_dir(workspace_root);
let mut config = ZeroClawConfig::default();
config.workspace_dir = workspace_dir.clone();
config.config_path = workspace_dir.join("config.toml");
config.default_provider = Some("deepseek".to_string());
config.default_model = Some(settings.model.clone());
config.api_key = Some(settings.api_key.clone());
config.api_url = Some(settings.base_url.clone());
configure_embedded_memory(&mut config);
configure_embedded_cron(&mut config);
config
}
pub fn zeroclaw_workspace_dir(workspace_root: &Path) -> PathBuf {
workspace_root.join(SGCLAW_ZEROCLAW_WORKSPACE_DIR)
}

View File

@@ -0,0 +1,98 @@
use std::future::Future;
use chrono::{DateTime, Utc};
use zeroclaw::config::Config as ZeroClawConfig;
use zeroclaw::cron::{self, CronJob, CronRun, JobType, Schedule, SessionTarget};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CronExecutionResult {
pub job_id: String,
pub success: bool,
pub output: String,
}
pub fn configure_embedded_cron(config: &mut ZeroClawConfig) {
config.cron.enabled = true;
config.cron.catch_up_on_startup = false;
config.scheduler.enabled = false;
config.scheduler.max_concurrent = 1;
config.scheduler.max_tasks = config.scheduler.max_tasks.max(1);
}
pub fn add_agent_job(
config: &ZeroClawConfig,
name: Option<String>,
schedule: Schedule,
prompt: &str,
allowed_tools: Option<Vec<String>>,
) -> anyhow::Result<CronJob> {
cron::add_agent_job(
config,
name,
schedule,
prompt,
SessionTarget::Isolated,
None,
None,
false,
allowed_tools,
)
}
pub fn list_jobs(config: &ZeroClawConfig) -> anyhow::Result<Vec<CronJob>> {
cron::list_jobs(config)
}
pub fn list_runs(
config: &ZeroClawConfig,
job_id: &str,
limit: usize,
) -> anyhow::Result<Vec<CronRun>> {
cron::list_runs(config, job_id, limit)
}
pub async fn run_due_jobs<F, Fut>(
config: &ZeroClawConfig,
now: DateTime<Utc>,
mut runner: F,
) -> anyhow::Result<Vec<CronExecutionResult>>
where
F: FnMut(&CronJob) -> Fut,
Fut: Future<Output = anyhow::Result<String>>,
{
let jobs = cron::due_jobs(config, now)?;
let mut results = Vec::with_capacity(jobs.len());
for job in jobs {
if !matches!(job.job_type, JobType::Agent) {
anyhow::bail!("unsupported cron job type in sgclaw compat: {:?}", job.job_type);
}
let started_at = Utc::now();
let (success, output) = match runner(&job).await {
Ok(output) => (true, output),
Err(err) => (false, err.to_string()),
};
let finished_at = Utc::now();
let duration_ms = (finished_at - started_at).num_milliseconds();
cron::record_run(
config,
&job.id,
started_at,
finished_at,
if success { "ok" } else { "error" },
Some(&output),
duration_ms,
)?;
cron::reschedule_after_run(config, &job, success, &output)?;
results.push(CronExecutionResult {
job_id: job.id,
success,
output,
});
}
Ok(results)
}

View File

@@ -0,0 +1,63 @@
use serde_json::Value;
use zeroclaw::agent::TurnEvent;
use crate::pipe::AgentMessage;
pub fn log_entry_for_turn_event(event: &TurnEvent) -> Option<AgentMessage> {
match event {
TurnEvent::ToolCall { name, args } => Some(AgentMessage::LogEntry {
level: "info".to_string(),
message: format_tool_call(name, args),
}),
TurnEvent::ToolResult { output, .. } if is_tool_error(output) => Some(AgentMessage::LogEntry {
level: "error".to_string(),
message: output.trim_start_matches("Error: ").to_string(),
}),
_ => None,
}
}
fn format_tool_call(name: &str, args: &Value) -> String {
if name != "browser_action" {
return format!("call {name}");
}
let action = args
.get("action")
.and_then(Value::as_str)
.unwrap_or("unknown");
match action {
"navigate" => {
let url = args.get("url").and_then(Value::as_str).unwrap_or("<missing-url>");
format!("navigate {url}")
}
"type" => {
let text = args.get("text").and_then(Value::as_str).unwrap_or("");
let selector = args
.get("selector")
.and_then(Value::as_str)
.unwrap_or("<missing-selector>");
format!("type {text} into {selector}")
}
"click" => {
let selector = args
.get("selector")
.and_then(Value::as_str)
.unwrap_or("<missing-selector>");
format!("click {selector}")
}
"getText" => {
let selector = args
.get("selector")
.and_then(Value::as_str)
.unwrap_or("<missing-selector>");
format!("getText {selector}")
}
other => format!("browser_action {other}"),
}
}
fn is_tool_error(output: &str) -> bool {
output.starts_with("Error:")
}

View File

@@ -0,0 +1,30 @@
use std::path::{Path, PathBuf};
use zeroclaw::config::Config as ZeroClawConfig;
use zeroclaw::memory::{self, Memory};
pub fn configure_embedded_memory(config: &mut ZeroClawConfig) {
config.memory.backend = "sqlite".to_string();
config.memory.embedding_provider = "none".to_string();
config.memory.response_cache_enabled = false;
config.memory.snapshot_enabled = false;
config.memory.snapshot_on_hygiene = false;
config.storage.provider.config.provider.clear();
config.storage.provider.config.db_url = None;
config.storage.provider.config.connect_timeout_secs = None;
}
pub fn build_memory(config: &ZeroClawConfig) -> anyhow::Result<Box<dyn Memory>> {
memory::create_memory_with_storage_and_routes(
&config.memory,
&config.embedding_routes,
Some(&config.storage.provider.config),
&config.workspace_dir,
config.api_key.as_deref(),
)
}
pub fn brain_db_path(workspace_dir: &Path) -> PathBuf {
workspace_dir.join("memory").join("brain.db")
}

6
src/compat/mod.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod browser_tool_adapter;
pub mod config_adapter;
pub mod cron_adapter;
pub mod event_bridge;
pub mod memory_adapter;
pub mod runtime;

197
src/compat/runtime.rs Normal file
View File

@@ -0,0 +1,197 @@
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use futures_util::{stream, StreamExt};
use zeroclaw::agent::dispatcher::NativeToolDispatcher;
use zeroclaw::agent::{Agent, TurnEvent};
use zeroclaw::config::Config as ZeroClawConfig;
use zeroclaw::observability::{NoopObserver, Observer};
use zeroclaw::providers::{
self, ChatMessage, ChatRequest, ChatResponse, Provider,
};
use zeroclaw::providers::traits::{
ProviderCapabilities, StreamEvent, StreamOptions, StreamResult,
};
use crate::compat::browser_tool_adapter::{ZeroClawBrowserTool, BROWSER_ACTION_TOOL_NAME};
use crate::compat::config_adapter::build_zeroclaw_config;
use crate::compat::event_bridge::log_entry_for_turn_event;
use crate::compat::memory_adapter::build_memory;
use crate::pipe::{BrowserPipeTool, PipeError, Transport};
pub fn execute_task<T: Transport + 'static>(
transport: &T,
browser_tool: BrowserPipeTool<T>,
instruction: &str,
workspace_root: &Path,
) -> Result<String, PipeError> {
let config = build_zeroclaw_config(workspace_root)
.map_err(|err| PipeError::Protocol(err.to_string()))?;
let provider = build_provider(&config)?;
let runtime = tokio::runtime::Runtime::new()
.map_err(|err| PipeError::Protocol(format!("failed to create tokio runtime: {err}")))?;
runtime.block_on(execute_task_with_provider(
transport,
browser_tool,
provider,
instruction,
config,
))
}
pub async fn execute_task_with_provider<T: Transport + 'static>(
transport: &T,
browser_tool: BrowserPipeTool<T>,
provider: Box<dyn Provider>,
instruction: &str,
config: ZeroClawConfig,
) -> Result<String, PipeError> {
let mut agent = build_agent(browser_tool, provider, &config)?;
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(32);
let instruction = instruction.to_string();
let task = tokio::spawn(async move { agent.turn_streamed(&instruction, event_tx).await });
while let Some(event) = event_rx.recv().await {
if let Some(log_entry) = log_entry_for_turn_event(&event) {
transport.send(&log_entry)?;
}
}
task.await
.map_err(|err| PipeError::Protocol(format!("zeroclaw task join failed: {err}")))?
.map_err(|err| PipeError::Protocol(err.to_string()))
}
fn build_agent<T: Transport + 'static>(
browser_tool: BrowserPipeTool<T>,
provider: Box<dyn Provider>,
config: &ZeroClawConfig,
) -> Result<Agent, PipeError> {
let memory = build_memory(config).map_err(map_anyhow_to_pipe_error)?;
let observer: Arc<dyn Observer> = Arc::new(NoopObserver);
let tools: Vec<Box<dyn zeroclaw::tools::Tool>> =
vec![Box::new(ZeroClawBrowserTool::new(browser_tool))];
Agent::builder()
.provider(provider)
.tools(tools)
.memory(Arc::from(memory))
.observer(observer)
.tool_dispatcher(Box::new(NativeToolDispatcher))
.config(config.agent.clone())
.model_name(
config
.default_model
.clone()
.unwrap_or_else(|| "deepseek-chat".to_string()),
)
.temperature(config.default_temperature)
.workspace_dir(config.workspace_dir.clone())
.allowed_tools(Some(vec![BROWSER_ACTION_TOOL_NAME.to_string()]))
.build()
.map_err(map_anyhow_to_pipe_error)
}
fn build_provider(config: &ZeroClawConfig) -> Result<Box<dyn Provider>, PipeError> {
let provider_name = config.default_provider.as_deref().unwrap_or("deepseek");
let model_name = config
.default_model
.as_deref()
.unwrap_or("deepseek-chat");
let runtime_options = providers::provider_runtime_options_from_config(config);
let resolved_provider_name = if provider_name == "deepseek" {
config
.api_url
.as_deref()
.map(str::trim)
.filter(|url| !url.is_empty())
.map(|url| format!("custom:{url}"))
.unwrap_or_else(|| provider_name.to_string())
} else {
provider_name.to_string()
};
let provider = providers::create_routed_provider_with_options(
&resolved_provider_name,
config.api_key.as_deref(),
config.api_url.as_deref(),
&config.reliability,
&config.model_routes,
model_name,
&runtime_options,
)
.map_err(map_anyhow_to_pipe_error)?;
Ok(Box::new(NonStreamingProvider::new(provider)))
}
fn map_anyhow_to_pipe_error(err: anyhow::Error) -> PipeError {
PipeError::Protocol(err.to_string())
}
struct NonStreamingProvider {
inner: Box<dyn Provider>,
}
impl NonStreamingProvider {
fn new(inner: Box<dyn Provider>) -> Self {
Self { inner }
}
}
#[async_trait]
impl Provider for NonStreamingProvider {
fn capabilities(&self) -> ProviderCapabilities {
self.inner.capabilities()
}
async fn chat_with_system(
&self,
system_prompt: Option<&str>,
message: &str,
model: &str,
temperature: f64,
) -> anyhow::Result<String> {
self.inner
.chat_with_system(system_prompt, message, model, temperature)
.await
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
model: &str,
temperature: f64,
) -> anyhow::Result<String> {
self.inner.chat_with_history(messages, model, temperature).await
}
async fn chat(
&self,
request: ChatRequest<'_>,
model: &str,
temperature: f64,
) -> anyhow::Result<ChatResponse> {
self.inner.chat(request, model, temperature).await
}
fn supports_streaming(&self) -> bool {
false
}
fn supports_streaming_tool_events(&self) -> bool {
false
}
fn stream_chat(
&self,
_request: ChatRequest<'_>,
_model: &str,
_temperature: f64,
_options: StreamOptions,
) -> stream::BoxStream<'static, StreamResult<StreamEvent>> {
stream::empty().boxed()
}
}

View File

@@ -1,4 +1,5 @@
pub mod agent;
pub mod compat;
pub mod config;
pub mod llm;
pub mod pipe;

View File

@@ -21,17 +21,29 @@ pub struct BrowserPipeTool<T: Transport> {
transport: Arc<T>,
mac_policy: MacPolicy,
session_key: Vec<u8>,
next_seq: AtomicU64,
next_seq: Arc<AtomicU64>,
response_timeout: Duration,
}
impl<T: Transport> Clone for BrowserPipeTool<T> {
fn clone(&self) -> Self {
Self {
transport: self.transport.clone(),
mac_policy: self.mac_policy.clone(),
session_key: self.session_key.clone(),
next_seq: self.next_seq.clone(),
response_timeout: self.response_timeout,
}
}
}
impl<T: Transport> BrowserPipeTool<T> {
pub fn new(transport: Arc<T>, mac_policy: MacPolicy, session_key: Vec<u8>) -> Self {
Self {
transport,
mac_policy,
session_key,
next_seq: AtomicU64::new(1),
next_seq: Arc::new(AtomicU64::new(1)),
response_timeout: Duration::from_secs(30),
}
}