Files
claw/tools/browser_smoke/fake_deepseek_server.mjs

230 lines
5.8 KiB
JavaScript

import http from 'node:http'
const DEFAULT_DEEPSEEK_MODEL = 'deepseek-chat'
const DEFAULT_DEEPSEEK_API_KEY = 'sgclaw-smoke-test-key'
const BAIDU_URL = 'https://www.baidu.com'
const BAIDU_DOMAIN = 'www.baidu.com'
const BAIDU_INPUT_SELECTOR = '#kw'
const BAIDU_BUTTON_SELECTOR = '#su'
const ZHIHU_URL = 'https://www.zhihu.com/search'
const ZHIHU_DOMAIN = 'www.zhihu.com'
export function buildSmokeEnv(baseUrl, baseEnv = process.env) {
return {
...baseEnv,
DEEPSEEK_API_KEY: DEFAULT_DEEPSEEK_API_KEY,
DEEPSEEK_BASE_URL: baseUrl,
DEEPSEEK_MODEL: DEFAULT_DEEPSEEK_MODEL,
}
}
export function buildChatCompletionResponse(requestBody) {
const instruction = latestUserInstruction(requestBody?.messages ?? [])
const plan = instructionPlan(instruction)
const isFinalRound = (requestBody?.messages ?? []).some((message) => message?.role === 'tool')
if (isFinalRound) {
return {
id: `chatcmpl-smoke-final-${plan.key}`,
object: 'chat.completion',
created: unixTimeSeconds(),
model: DEFAULT_DEEPSEEK_MODEL,
choices: [{
index: 0,
message: {
role: 'assistant',
content: plan.summary,
},
finish_reason: 'stop',
}],
usage: {
prompt_tokens: 32,
completion_tokens: 8,
total_tokens: 40,
},
}
}
return {
id: `chatcmpl-smoke-tools-${plan.key}`,
object: 'chat.completion',
created: unixTimeSeconds(),
model: DEFAULT_DEEPSEEK_MODEL,
choices: [{
index: 0,
message: {
role: 'assistant',
content: '',
tool_calls: plan.toolCalls,
},
finish_reason: 'tool_calls',
}],
usage: {
prompt_tokens: 24,
completion_tokens: 12,
total_tokens: 36,
},
}
}
export async function startFakeDeepSeekServer() {
const requests = []
const server = http.createServer(async (req, res) => {
try {
if (req.method !== 'POST') {
res.writeHead(405, {'content-type': 'application/json'})
res.end(JSON.stringify({error: 'method not allowed'}))
return
}
const body = await readJsonBody(req)
requests.push({
method: req.method,
url: req.url || '/',
body,
})
const response = buildChatCompletionResponse(body)
res.writeHead(200, {'content-type': 'application/json'})
res.end(JSON.stringify(response))
} catch (error) {
res.writeHead(500, {'content-type': 'application/json'})
res.end(JSON.stringify({
error: error instanceof Error ? error.message : String(error),
}))
}
})
await new Promise((resolve, reject) => {
server.once('error', reject)
server.listen(0, '127.0.0.1', resolve)
})
const address = server.address()
if (!address || typeof address === 'string') {
server.close()
throw new Error('failed to bind fake DeepSeek server')
}
return {
baseUrl: `http://127.0.0.1:${address.port}`,
requests,
close: () => new Promise((resolve, reject) => {
server.close((error) => error ? reject(error) : resolve())
}),
}
}
function latestUserInstruction(messages) {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index]
if (message?.role === 'user' && typeof message.content === 'string' && message.content.trim()) {
return normalizeSmokeInstruction(message.content)
}
}
throw new Error('missing user instruction in DeepSeek smoke request')
}
export function normalizeSmokeInstruction(rawInstruction) {
return rawInstruction.trim().replace(/^(?:\[[^\]]+\]\s*)+/, '')
}
function instructionPlan(instruction) {
const baiduQuery = extractQuery(instruction, ['打开百度搜索', '打开百度并搜索'])
if (baiduQuery) {
return {
key: 'baidu',
summary: `已在百度搜索${baiduQuery}`,
toolCalls: [
browserToolCall('call_baidu_1', {
action: 'navigate',
expected_domain: BAIDU_DOMAIN,
url: BAIDU_URL,
}),
browserToolCall('call_baidu_2', {
action: 'type',
expected_domain: BAIDU_DOMAIN,
selector: BAIDU_INPUT_SELECTOR,
text: baiduQuery,
clear_first: true,
}),
browserToolCall('call_baidu_3', {
action: 'click',
expected_domain: BAIDU_DOMAIN,
selector: BAIDU_BUTTON_SELECTOR,
}),
],
}
}
const zhihuQuery = extractQuery(instruction, ['打开知乎搜索', '打开知乎并搜索'])
if (zhihuQuery) {
const url = new URL(ZHIHU_URL)
url.searchParams.set('type', 'content')
url.searchParams.set('q', zhihuQuery)
return {
key: 'zhihu',
summary: `已在知乎搜索${zhihuQuery}`,
toolCalls: [
browserToolCall('call_zhihu_1', {
action: 'navigate',
expected_domain: ZHIHU_DOMAIN,
url: url.toString(),
}),
],
}
}
throw new Error(`unsupported smoke instruction: ${instruction}`)
}
function extractQuery(instruction, prefixes) {
for (const prefix of prefixes) {
if (instruction.startsWith(prefix)) {
const query = instruction.slice(prefix.length).trim()
if (!query) {
throw new Error(`missing search query in instruction: ${instruction}`)
}
return query
}
}
return null
}
function browserToolCall(id, args) {
return {
id,
type: 'function',
function: {
name: 'browser_action',
arguments: JSON.stringify(args),
},
}
}
function readJsonBody(req) {
return new Promise((resolve, reject) => {
let raw = ''
req.setEncoding('utf8')
req.on('data', (chunk) => {
raw += chunk
})
req.on('end', () => {
try {
resolve(JSON.parse(raw || '{}'))
} catch (error) {
reject(error)
}
})
req.on('error', reject)
})
}
function unixTimeSeconds() {
return Math.floor(Date.now() / 1000)
}