wip: checkpoint 2026-03-29 runtime work

This commit is contained in:
zyl
2026-03-29 22:44:30 +08:00
parent 7d9036b2d4
commit e294fbb9b1
30 changed files with 6759 additions and 161 deletions

View File

@@ -801,6 +801,7 @@ impl Agent {
} else {
text
};
let final_text = sanitize_final_text(&final_text);
// Store in response cache (text-only, no tool calls)
if let (Some(ref cache), Some(ref key)) = (&self.response_cache, &cache_key) {
@@ -1067,6 +1068,7 @@ impl Agent {
} else {
text
};
let final_text = sanitize_final_text(&final_text);
// Store in response cache
if let (Some(ref cache), Some(ref key)) = (&self.response_cache, &cache_key) {
@@ -1175,6 +1177,31 @@ impl Agent {
}
}
fn sanitize_final_text(text: &str) -> String {
let trimmed = text.trim();
if trimmed.is_empty() {
return String::new();
}
let mut result = Vec::new();
let mut last_normalized = String::new();
for block in trimmed.split("\n\n") {
let candidate = block.trim();
if candidate.is_empty() {
continue;
}
let normalized = candidate.split_whitespace().collect::<Vec<_>>().join(" ");
if !last_normalized.is_empty() && normalized == last_normalized {
continue;
}
result.push(candidate.to_string());
last_normalized = normalized;
}
result.join("\n\n")
}
pub async fn run(
config: Config,
message: Option<String>,
@@ -1333,6 +1360,67 @@ mod tests {
}
}
struct StreamingDuplicateParagraphProvider;
#[async_trait]
impl Provider for StreamingDuplicateParagraphProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> Result<String> {
Ok("ok".into())
}
async fn chat(
&self,
_request: ChatRequest<'_>,
_model: &str,
_temperature: f64,
) -> Result<crate::providers::ChatResponse> {
Ok(crate::providers::ChatResponse {
text: Some("fallback".into()),
tool_calls: vec![],
usage: None,
reasoning_content: None,
})
}
fn supports_streaming(&self) -> bool {
true
}
fn stream_chat(
&self,
_request: ChatRequest<'_>,
_model: &str,
_temperature: f64,
_options: crate::providers::traits::StreamOptions,
) -> futures_util::stream::BoxStream<
'static,
crate::providers::traits::StreamResult<crate::providers::traits::StreamEvent>,
> {
use crate::providers::traits::{StreamChunk, StreamEvent};
use futures_util::{stream, StreamExt};
stream::iter(vec![
Ok(StreamEvent::TextDelta(StreamChunk::delta(
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。",
))),
Ok(StreamEvent::TextDelta(StreamChunk::delta("\n\n"))),
Ok(StreamEvent::TextDelta(StreamChunk::delta(
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。",
))),
Ok(StreamEvent::TextDelta(StreamChunk::delta("\n\n"))),
Ok(StreamEvent::TextDelta(StreamChunk::delta("文件已生成。"))),
Ok(StreamEvent::Final),
])
.boxed()
}
}
#[tokio::test]
async fn turn_without_tools_returns_text() {
let provider = Box::new(MockProvider {
@@ -1419,6 +1507,42 @@ mod tests {
.any(|msg| matches!(msg, ConversationMessage::ToolResults(_))));
}
#[tokio::test]
async fn turn_streamed_sanitizes_duplicate_final_paragraphs() {
let provider = Box::new(StreamingDuplicateParagraphProvider);
let memory_cfg = crate::config::MemoryConfig {
backend: "none".into(),
..crate::config::MemoryConfig::default()
};
let mem: Arc<dyn Memory> = Arc::from(
crate::memory::create_memory(&memory_cfg, std::path::Path::new("/tmp"), None)
.expect("memory creation should succeed with valid config"),
);
let observer: Arc<dyn Observer> = Arc::from(crate::observability::NoopObserver {});
let mut agent = Agent::builder()
.provider(provider)
.tools(vec![Box::new(MockTool)])
.memory(mem)
.observer(observer)
.tool_dispatcher(Box::new(NativeToolDispatcher))
.workspace_dir(std::path::PathBuf::from("/tmp"))
.build()
.expect("agent builder should succeed with valid config");
let (event_tx, _event_rx) = tokio::sync::mpsc::channel(8);
let response = agent.turn_streamed("读取知乎热榜前10并导出 excel 文件", event_tx).await.unwrap();
assert_eq!(
response,
concat!(
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。\n\n",
"文件已生成。"
)
);
}
#[tokio::test]
async fn turn_routes_with_hint_when_query_classification_matches() {
let seen_models = Arc::new(Mutex::new(Vec::new()));
@@ -1670,4 +1794,25 @@ mod tests {
);
assert_eq!(history.len(), 3);
}
#[test]
fn sanitize_final_text_collapses_consecutive_duplicate_paragraphs() {
let text = concat!(
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。\n\n",
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。\n\n",
"## 结果\n\n",
"文件已生成。"
);
let sanitized = sanitize_final_text(text);
assert_eq!(
sanitized,
concat!(
"由于浏览器和网络工具都遇到问题我将采用一个替代方案创建一个模拟的知乎热榜数据并导出Excel文件。\n\n",
"## 结果\n\n",
"文件已生成。"
)
);
}
}

View File

@@ -4753,6 +4753,15 @@ pub async fn process_message(
config: Config,
message: &str,
session_id: Option<&str>,
) -> Result<String> {
process_message_with_extra_tools(config, message, session_id, Vec::new()).await
}
pub async fn process_message_with_extra_tools(
config: Config,
message: &str,
session_id: Option<&str>,
mut extra_tools: Vec<Box<dyn Tool>>,
) -> Result<String> {
let observer: Arc<dyn Observer> =
Arc::from(observability::create_observer(&config.observability));
@@ -4805,6 +4814,7 @@ pub async fn process_message(
let peripheral_tools: Vec<Box<dyn Tool>> =
crate::peripherals::create_peripheral_tools(&config.peripherals).await?;
tools_registry.extend(peripheral_tools);
tools_registry.append(&mut extra_tools);
// ── Wire MCP tools (non-fatal) — process_message path ────────
// NOTE: Same ordering contract as the CLI path above — MCP tools must be
@@ -4919,62 +4929,12 @@ pub async fn process_message(
// Register skill-defined tools as callable tool specs (process_message path).
tools::register_skill_tools(&mut tools_registry, &skills, security.clone());
let mut tool_descs: Vec<(&str, &str)> = vec![
("shell", "Execute terminal commands."),
("file_read", "Read file contents."),
("file_write", "Write file contents."),
("memory_store", "Save to memory."),
("memory_recall", "Search memory."),
("memory_forget", "Delete a memory entry."),
(
"model_routing_config",
"Configure default model, scenario routing, and delegate agents.",
),
("screenshot", "Capture a screenshot."),
("image_info", "Read image metadata."),
];
if matches!(
config.skills.prompt_injection_mode,
crate::config::SkillsPromptInjectionMode::Compact
) {
tool_descs.push((
"read_skill",
"Load the full source for an available skill by name.",
));
}
if config.browser.enabled {
tool_descs.push(("browser_open", "Open approved URLs in browser."));
}
if config.composio.enabled {
tool_descs.push(("composio", "Execute actions on 1000+ apps via Composio."));
}
if config.peripherals.enabled && !config.peripherals.boards.is_empty() {
tool_descs.push(("gpio_read", "Read GPIO pin value on connected hardware."));
tool_descs.push((
"gpio_write",
"Set GPIO pin high or low on connected hardware.",
));
tool_descs.push((
"arduino_upload",
"Upload Arduino sketch. Use for 'make a heart', custom patterns. You write full .ino code; ZeroClaw uploads it.",
));
tool_descs.push((
"hardware_memory_map",
"Return flash and RAM address ranges. Use when user asks for memory addresses or memory map.",
));
tool_descs.push((
"hardware_board_info",
"Return full board info (chip, architecture, memory map). Use when user asks for board info, what board, connected hardware, or chip info.",
));
tool_descs.push((
"hardware_memory_read",
"Read actual memory/register values from Nucleo. Use when user asks to read registers, read memory, dump lower memory 0-126, or give address and value.",
));
tool_descs.push((
"hardware_capabilities",
"Query connected hardware for reported GPIO pins and LED pin. Use when user asks what pins are available.",
));
}
let mut tool_descs: Vec<(String, String)> = tools_registry
.iter()
.map(|tool| (tool.name().to_string(), tool.description().to_string()))
.collect();
tool_descs.sort_by(|left, right| left.0.cmp(&right.0));
tool_descs.dedup_by(|left, right| left.0 == right.0);
// Filter out tools excluded for non-CLI channels (gateway counts as non-CLI).
// Skip when autonomy is `Full` — full-autonomy agents keep all tools.
@@ -4984,6 +4944,10 @@ pub async fn process_message(
tool_descs.retain(|(name, _)| !excluded.iter().any(|ex| ex == name));
}
}
let tool_desc_refs: Vec<(&str, &str)> = tool_descs
.iter()
.map(|(name, description)| (name.as_str(), description.as_str()))
.collect();
let bootstrap_max_chars = if config.agent.compact_context {
Some(6000)
@@ -4994,7 +4958,7 @@ pub async fn process_message(
let mut system_prompt = crate::channels::build_system_prompt_with_mode_and_autonomy(
&config.workspace_dir,
&model_name,
&tool_descs,
&tool_desc_refs,
&skills,
Some(&config.identity),
bootstrap_max_chars,

View File

@@ -19,4 +19,4 @@ mod tests;
#[allow(unused_imports)]
pub use agent::{Agent, AgentBuilder, TurnEvent};
#[allow(unused_imports)]
pub use loop_::{process_message, run};
pub use loop_::{process_message, process_message_with_extra_tools, run};

View File

@@ -229,6 +229,18 @@ impl OpenAiCompatibleProvider {
self
}
fn tool_choice_for_tools(&self, has_tools: bool) -> Option<String> {
if !has_tools {
return None;
}
crate::agent::loop_::TOOL_CHOICE_OVERRIDE
.try_with(Clone::clone)
.ok()
.flatten()
.or_else(|| Some("auto".to_string()))
}
/// Collect all `system` role messages, concatenate their content,
/// and prepend to the first `user` message. Drop all system messages.
/// Used for providers (e.g. MiniMax) that reject `role: system`.
@@ -1829,11 +1841,7 @@ impl Provider for OpenAiCompatibleProvider {
} else {
Some(tools.to_vec())
},
tool_choice: if tools.is_empty() {
None
} else {
Some("auto".to_string())
},
tool_choice: self.tool_choice_for_tools(!tools.is_empty()),
max_tokens: self.max_tokens,
};
@@ -1933,7 +1941,9 @@ impl Provider for OpenAiCompatibleProvider {
reasoning_effort: self.reasoning_effort_for_model(model),
tool_stream: self
.tool_stream_for_tools(tools.as_ref().is_some_and(|tools| !tools.is_empty())),
tool_choice: tools.as_ref().map(|_| "auto".to_string()),
tool_choice: self.tool_choice_for_tools(
tools.as_ref().is_some_and(|tools| !tools.is_empty()),
),
tools,
max_tokens: self.max_tokens,
};
@@ -2087,7 +2097,9 @@ impl Provider for OpenAiCompatibleProvider {
tool_stream: if options.enabled { Some(true) } else { None },
stream: Some(options.enabled),
tools: tools.clone(),
tool_choice: tools.as_ref().map(|_| "auto".to_string()),
tool_choice: self.tool_choice_for_tools(
tools.as_ref().is_some_and(|tools| !tools.is_empty()),
),
max_tokens: self.max_tokens,
})
} else {

View File

@@ -1,11 +1,14 @@
use super::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::path::PathBuf;
use std::collections::{BTreeSet, VecDeque};
use std::path::{Path, PathBuf};
/// Compact-mode helper for loading a skill's source file on demand.
pub struct ReadSkillTool {
workspace_dir: PathBuf,
runtime_skills_dir: Option<PathBuf>,
allow_scripts: bool,
open_skills_enabled: bool,
open_skills_dir: Option<String>,
}
@@ -18,6 +21,24 @@ impl ReadSkillTool {
) -> Self {
Self {
workspace_dir,
runtime_skills_dir: None,
allow_scripts: false,
open_skills_enabled,
open_skills_dir,
}
}
pub fn with_runtime_skills_dir(
workspace_dir: PathBuf,
runtime_skills_dir: Option<PathBuf>,
allow_scripts: bool,
open_skills_enabled: bool,
open_skills_dir: Option<String>,
) -> Self {
Self {
workspace_dir,
runtime_skills_dir,
allow_scripts,
open_skills_enabled,
open_skills_dir,
}
@@ -55,11 +76,27 @@ impl Tool for ReadSkillTool {
.filter(|value| !value.is_empty())
.ok_or_else(|| anyhow::anyhow!("Missing 'name' parameter"))?;
let skills = crate::skills::load_skills_with_open_skills_settings(
let mut skills = crate::skills::load_skills_with_open_skills_settings(
&self.workspace_dir,
self.open_skills_enabled,
self.open_skills_dir.as_deref(),
);
let default_skills_dir = self.workspace_dir.join("skills");
if let Some(runtime_skills_dir) = &self.runtime_skills_dir {
if runtime_skills_dir != &default_skills_dir {
skills.retain(|skill| {
skill
.location
.as_ref()
.map(|location| !location.starts_with(&default_skills_dir))
.unwrap_or(true)
});
skills.extend(crate::skills::load_skills_from_directory(
runtime_skills_dir,
self.allow_scripts,
));
}
}
let Some(skill) = skills
.iter()
@@ -93,7 +130,7 @@ impl Tool for ReadSkillTool {
});
};
match tokio::fs::read_to_string(location).await {
match read_skill_bundle(location).await {
Ok(output) => Ok(ToolResult {
success: true,
output,
@@ -112,6 +149,152 @@ impl Tool for ReadSkillTool {
}
}
pub async fn read_skill_bundle(location: &Path) -> std::io::Result<String> {
let primary = tokio::fs::read_to_string(location).await?;
let Some(skill_root) = location.parent() else {
return Ok(primary);
};
let skill_root = skill_root.canonicalize().unwrap_or_else(|_| skill_root.to_path_buf());
let mut output = primary.clone();
let mut appended = BTreeSet::new();
let mut queued = BTreeSet::new();
let mut pending = VecDeque::new();
enqueue_reference_paths(
&primary,
location.parent().unwrap_or(skill_root.as_path()),
&skill_root,
&mut queued,
&mut pending,
);
while let Some(path) = pending.pop_front() {
let canonical = path.canonicalize().unwrap_or(path.clone());
if !canonical.starts_with(&skill_root) || !appended.insert(canonical.clone()) {
continue;
}
let Ok(content) = tokio::fs::read_to_string(&canonical).await else {
continue;
};
let relative = canonical
.strip_prefix(&skill_root)
.unwrap_or(canonical.as_path())
.display()
.to_string();
output.push_str("\n\n## Referenced File: ");
output.push_str(&relative);
output.push_str("\n\n");
output.push_str(&content);
enqueue_reference_paths(
&content,
canonical.parent().unwrap_or(skill_root.as_path()),
&skill_root,
&mut queued,
&mut pending,
);
}
Ok(output)
}
fn enqueue_reference_paths(
content: &str,
base_dir: &Path,
skill_root: &Path,
queued: &mut BTreeSet<PathBuf>,
pending: &mut VecDeque<PathBuf>,
) {
for candidate in extract_reference_paths(content) {
for resolved in resolve_reference_candidates(&candidate, base_dir, skill_root) {
let canonical = resolved.canonicalize().unwrap_or(resolved);
if !canonical.starts_with(skill_root) || !is_supported_reference_file(&canonical) {
continue;
}
if queued.insert(canonical.clone()) {
pending.push_back(canonical);
}
}
}
}
fn extract_reference_paths(content: &str) -> Vec<String> {
let mut paths = Vec::new();
let mut cursor = content;
while let Some(start) = cursor.find("](") {
cursor = &cursor[start + 2..];
let Some(end) = cursor.find(')') else {
break;
};
let raw = cursor[..end].trim();
if looks_like_relative_reference_path(raw) {
paths.push(raw.to_string());
}
cursor = &cursor[end + 1..];
}
let mut in_backticks = false;
let mut token = String::new();
for ch in content.chars() {
if ch == '`' {
if in_backticks {
let raw = token.trim();
if looks_like_relative_reference_path(raw) {
paths.push(raw.to_string());
}
token.clear();
}
in_backticks = !in_backticks;
continue;
}
if in_backticks {
token.push(ch);
}
}
paths
}
fn looks_like_relative_reference_path(raw: &str) -> bool {
if raw.is_empty() ||
raw.starts_with('/') ||
raw.starts_with("http://") ||
raw.starts_with("https://") ||
raw.starts_with('#')
{
return false;
}
let candidate = raw.split('#').next().unwrap_or(raw).split('?').next().unwrap_or(raw);
let path = Path::new(candidate);
if path
.components()
.any(|component| matches!(component, std::path::Component::ParentDir))
{
return false;
}
candidate.contains('/') && is_supported_reference_file(path)
}
fn is_supported_reference_file(path: &Path) -> bool {
matches!(
path.extension().and_then(|value| value.to_str()),
Some("md" | "txt" | "json" | "html" | "toml" | "yaml" | "yml" | "csv")
)
}
fn resolve_reference_candidates(raw: &str, base_dir: &Path, skill_root: &Path) -> Vec<PathBuf> {
let mut candidates = vec![base_dir.join(raw)];
let skill_root_candidate = skill_root.join(raw);
if skill_root_candidate != candidates[0] {
candidates.push(skill_root_candidate);
}
candidates
}
#[cfg(test)]
mod tests {
use super::*;
@@ -184,4 +367,43 @@ description = "Ship safely"
Some("Unknown skill 'calendar'. Available skills: weather")
);
}
#[tokio::test]
async fn inlines_markdown_reference_files_for_skill_context() {
let tmp = TempDir::new().unwrap();
let skill_dir = tmp.path().join("workspace/skills/zhihu-hotlist");
let refs_dir = skill_dir.join("references");
std::fs::create_dir_all(&refs_dir).unwrap();
std::fs::write(
skill_dir.join("SKILL.md"),
concat!(
"# Zhihu Hotlist\n\n",
"Follow [collection-flow.md](references/collection-flow.md).\n",
"Apply [data-quality.md](references/data-quality.md).\n",
),
)
.unwrap();
std::fs::write(
refs_dir.join("collection-flow.md"),
"# Collection Flow\n\nCollect rows from the hotlist first.\n",
)
.unwrap();
std::fs::write(
refs_dir.join("data-quality.md"),
"# Data Quality\n\nMark partial metrics explicitly.\n",
)
.unwrap();
let result = make_tool(&tmp)
.execute(json!({ "name": "zhihu-hotlist" }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("# Zhihu Hotlist"));
assert!(result.output.contains("## Referenced File: references/collection-flow.md"));
assert!(result.output.contains("Collect rows from the hotlist first."));
assert!(result.output.contains("## Referenced File: references/data-quality.md"));
assert!(result.output.contains("Mark partial metrics explicitly."));
}
}