nanobot源码学习笔记2

上篇回顾

上一篇把整体骨架和记忆系统讲清楚了。从CLI启动,到AgentLoop组装prompt,到Consolidator压缩历史,到Dream更新长记忆,一条主线跑下来。但有一个地方是跳过的——上层调用 provider.chat_with_retry(messages, tools, model) 这一句。messages怎么变成HTTP请求?大模型返回什么格式的JSON?工具调用的结果又怎么传回去?这篇专门来讲这些。

provider层是做什么的

先解释一下背景知识,不了解大模型API的同学可以看一下这一节。

大模型API长什么样

你在用ChatGPT、文心一言这些产品的时候,底下有一套HTTP接口供开发者直接调用。它的基本形态是这样的:你发一个POST请求,body里带一个messages数组——每条消息有role(用户/助手/系统)和content(内容)——大模型返回一段JSON,里面是它的回复。

OpenAI最早把这套接口定义成了”事实标准”,其他厂商基本都跟着抄,所以叫OpenAI-compatible API(OpenAI兼容接口)。国内的MiniMax、Deepseek、通义千问(DashScope)、月之暗面(Moonshot)、智谱(Zhipu)……基本上都兼容这个格式,区别只在于请求头、API密钥的传法、以及少数几个参数名的差异。

一次完整的调用长这样:

请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST https://api.openai.com/v1/chat/completions
{
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "你是一个助手"},
{"role": "user", "content": "今天天气怎么样?"}
],
"tools": [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取某地天气",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"}
},
"required": ["city"]
}
}
}
]
}

正常回复(没有工具调用)

1
2
3
4
5
6
7
{
"choices": [{
"message": {"role": "assistant", "content": "我需要先查询一下..."},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 120, "completion_tokens": 30, "total_tokens": 150}
}

工具调用回复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"choices": [{
"message": {
"role": "assistant",
"content": null,
"tool_calls": [{
"id": "call_abc123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"city\": \"北京\"}"
}
}]
},
"finish_reason": "tool_calls"
}]
}

注意这个工具调用回复:content是null,多了一个tool_calls数组,里面每个元素包含工具名字和参数。finish_reason变成了"tool_calls",意思是”我没结束,我需要你帮我执行这个工具然后把结果告诉我”。

上层拿到这个回复,执行get_weather工具,把结果装成role: tool的消息追加进messages数组,再发一次请求,模型才会返回最终的回答。这就是工具调用的完整循环,也是第一篇里AgentRunner做的事情。

为什么需要provider抽象层

既然大家都兼容OpenAI格式,为什么不直接用OpenAI的SDK就完事了?因为”兼容”只是大方向,细节有一堆差异:

  • 参数名不同:OpenAI的推理模型(o1/o3系列)用max_completion_tokens而不是max_tokens,某些推理模型不支持temperature参数
  • streaming格式有差异:各家返回流式chunk的字段名可能不一样
  • 工具调用ID长度限制:Mistral要求tool call ID只能是9位字母数字,OpenAI是一个长UUID字符串,如果照搬会报错
  • 认证方式不同:GitHub Copilot走OAuth,普通key-based的provider直接传Bearer token
  • 本地模型:Ollama、vLLM在本地跑,不需要真实的API key,但接口格式一致

provider层把这些差异全部抹平,对上层暴露统一的接口。上层的AgentRunner只需要调provider.chat_with_retry(messages, tools, model),完全不用关心底下是OpenAI还是Moonshot还是Ollama。

provider层的代码结构

provider整体架构

整个providers/目录:

1
2
3
4
5
6
7
8
9
providers/
├── base.py # 抽象基类 LLMProvider,以及数据结构定义
├── registry.py # ProviderSpec注册表,记录所有provider的元数据
├── openai_compat_provider.py # 核心实现,兼容几十种OpenAI-compatible API
├── anthropic_provider.py # 原生Anthropic SDK实现(Claude专用)
├── azure_openai_provider.py # Azure OpenAI(走Responses API)
├── github_copilot_provider.py # GitHub Copilot(OAuth认证,继承openai_compat)
├── openai_codex_provider.py # OpenAI Codex(OAuth认证,继承openai_compat)
└── openai_responses/ # OpenAI Responses API的格式转换工具

base.py — 三个核心数据结构

ToolCallRequest

1
2
3
4
5
6
7
8
9
@dataclass
class ToolCallRequest:
"""A tool call request from the LLM."""
id: str
name: str
arguments: dict[str, Any]
extra_content: dict[str, Any] | None = None
provider_specific_fields: dict[str, Any] | None = None
function_provider_specific_fields: dict[str, Any] | None = None

这是模型”决定要调用某个工具”的时候,nanobot在内部用来表示这个工具调用请求的对象。id是这次调用的唯一标识,name是工具名,arguments是已经解析好的Python字典(原始JSON是个字符串,需要解析)。

它有一个to_openai_tool_call()方法,把自己序列化回OpenAI格式的字典,供追加到messages数组里用。

LLMResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
@dataclass
class LLMResponse:
"""Response from an LLM provider."""
content: str | None # 文本回复
tool_calls: list[ToolCallRequest] = ... # 工具调用列表(可能为空)
finish_reason: str = "stop" # 结束原因:"stop"/"tool_calls"/"length"/"error"
usage: dict[str, int] = ... # token用量
reasoning_content: str | None = None # 推理过程(DeepSeek-R1, Kimi等)
thinking_blocks: list[dict] | None = None # Anthropic扩展思考块
error_status_code: int | None = None # HTTP状态码(出错时)
error_kind: str | None = None # "timeout"/"connection"
error_should_retry: bool | None = None # 是否应该重试
# ... 其他错误字段

这是provider层向上层返回的统一格式。不管底下调的是哪家大模型,上层拿到的一定是这个结构。

finish_reason是个关键字段:

  • "stop" — 模型正常结束,有文本回复,没有工具调用
  • "tool_calls" — 模型要调工具,tool_calls数组非空
  • "length" — 超过了max_tokens限制,回复被截断
  • "error" — 调用出错,content里是错误描述

上层AgentRunner就是根据finish_reason来决定下一步的:如果是tool_calls就去执行工具,如果是stop就结束循环,如果是error就走重试或报错逻辑。

LLMProvider 抽象基类

1
2
3
4
5
6
7
8
9
10
11
12
13
class LLMProvider(ABC):
@abstractmethod
async def chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
reasoning_effort: str | None = None,
tool_choice: str | dict[str, Any] | None = None,
) -> LLMResponse:
...

chat是唯一的抽象方法,子类必须实现。基类还提供了几个公共方法:

  • chat_with_retry() — 带重试的chat调用
  • chat_stream() — 流式版本,默认回退到普通调用,子类可以重写实现真正的streaming
  • chat_stream_with_retry() — 带重试的流式调用

**调用方只用chat_with_retrychat_stream_with_retry,不直接调chat**,因为重试逻辑是统一在基类里实现的。

registry.py — ProviderSpec注册表

registry.py里定义了一个PROVIDERS列表,每个元素是一个ProviderSpec

1
2
3
4
5
6
7
8
9
10
11
12
13
@dataclass(frozen=True)
class ProviderSpec:
name: str # config里的字段名,如 "deepseek"
keywords: tuple # 用于从model名字匹配provider,如 ("deepseek",)
env_key: str # API key的环境变量名,如 "DEEPSEEK_API_KEY"
backend: str = "openai_compat" # 用哪个实现类
default_api_base: str = "" # 默认的API地址
is_gateway: bool = False # 是否是网关(OpenRouter等)
is_local: bool = False # 是否是本地部署(Ollama等)
strip_model_prefix: bool = False # 发送前是否去掉 "provider/" 前缀
supports_prompt_caching: bool = False # 是否支持prompt缓存标记
model_overrides: tuple = () # 针对特定模型的参数覆盖
# ...

配几个典型例子看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# DeepSeek — 标准OpenAI兼容,换个API地址就完事
ProviderSpec(
name="deepseek",
keywords=("deepseek",),
env_key="DEEPSEEK_API_KEY",
backend="openai_compat",
default_api_base="https://api.deepseek.com",
),

# Moonshot (Kimi) — 兼容但有个小特殊:kimi-k2.5这个模型要求temperature >= 1.0
ProviderSpec(
name="moonshot",
keywords=("moonshot", "kimi"),
env_key="MOONSHOT_API_KEY",
backend="openai_compat",
default_api_base="https://api.moonshot.ai/v1",
model_overrides=(("kimi-k2.5", {"temperature": 1.0}),), # ← 特殊处理
),

# OpenRouter — 一个"中间商"网关,可以路由任意模型
ProviderSpec(
name="openrouter",
keywords=("openrouter",),
env_key="OPENROUTER_API_KEY",
backend="openai_compat",
is_gateway=True,
detect_by_key_prefix="sk-or-", # API key以"sk-or-"开头就认定是OpenRouter
default_api_base="https://openrouter.ai/api/v1",
supports_prompt_caching=True,
),

# Anthropic — 完全独立的SDK,不走openai_compat
ProviderSpec(
name="anthropic",
keywords=("anthropic", "claude"),
env_key="ANTHROPIC_API_KEY",
backend="anthropic", # ← 注意这里
supports_prompt_caching=True,
),

registry.py就是纯数据,不做任何实例化。实例化逻辑在nanobot/cli/commands.py里,根据配置文件里的model名和provider名,找到对应的ProviderSpec,再根据backend字段创建对应的provider对象。

OpenAICompatProvider — 核心实现

终于到重头戏了。openai_compat_provider.py有1016行,是整个providers目录里最大的文件,支撑了除Anthropic和Azure之外的所有provider。

完整的调用路径是这样的:

chat主调用链路

下面逐段走读。

初始化:一个AsyncOpenAI搞定一切

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class OpenAICompatProvider(LLMProvider):
def __init__(
self,
api_key: str | None = None,
api_base: str | None = None,
default_model: str = "gpt-4o",
extra_headers: dict[str, str] | None = None,
spec: ProviderSpec | None = None,
):
super().__init__(api_key, api_base)
self.default_model = default_model
self._spec = spec

effective_base = api_base or (spec.default_api_base if spec else None) or None

self._client = AsyncOpenAI(
api_key=api_key or "no-key",
base_url=effective_base, # ← 换个base_url就对接了不同厂商
default_headers=default_headers,
max_retries=0, # 重试逻辑自己管,不用SDK的
)

精髓在这里:OpenAI的官方Python SDK支持自定义base_url。DeepSeek的API、MiniMax的API……只要兼容OpenAI格式,换个base_url就可以直接用同一个客户端对接。max_retries=0是因为重试逻辑由nanobot自己的_run_with_retry管理,不想SDK和nanobot之间出现重试叠加。

注意api_key or "no-key"这个处理——本地部署的Ollama、vLLM不需要真实的key,但SDK要求这个参数不能为空,所以传个占位字符串。

_build_kwargs — 把通用参数翻译成各厂商能接受的格式

这是adapter模式的核心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def _build_kwargs(self, messages, tools, model, max_tokens, temperature,
reasoning_effort, tool_choice) -> dict:
model_name = model or self.default_model
spec = self._spec

# 1. 如果是通过OpenRouter访问Claude模型,加上cache_control标记(下文会解释)
if spec and spec.supports_prompt_caching:
if any(model_name.lower().startswith(k) for k in ("anthropic/", "claude")):
messages, tools = self._apply_cache_control(messages, tools)

# 2. 有些网关(如AiHubMix)不理解 "anthropic/claude-3" 这种带厂商前缀的名字,
# 需要把"anthropic/"剥掉,只保留"claude-3"
if spec and spec.strip_model_prefix:
model_name = model_name.split("/")[-1]

kwargs = {
"model": model_name,
"messages": self._sanitize_messages(self._sanitize_empty_content(messages)),
}

# 3. 推理模型(o1/o3/o4)和gpt-5不支持temperature,只能在特定条件下传
if self._supports_temperature(model_name, reasoning_effort):
kwargs["temperature"] = temperature

# 4. OpenAI的推理模型用max_completion_tokens,其他用max_tokens
if spec and getattr(spec, "supports_max_completion_tokens", False):
kwargs["max_completion_tokens"] = max(1, max_tokens)
else:
kwargs["max_tokens"] = max(1, max_tokens)

# 5. 针对特定模型的参数覆盖(如kimi-k2.5需要temperature=1.0)
if spec:
model_lower = model_name.lower()
for pattern, overrides in spec.model_overrides:
if pattern in model_lower:
kwargs.update(overrides)
break

# 6. 推理努力程度(Kimi/DashScope/VolcEngine等有自己的参数名)
if reasoning_effort:
kwargs["reasoning_effort"] = reasoning_effort
if spec and reasoning_effort is not None:
thinking_enabled = reasoning_effort.lower() != "minimal"
if spec.name == "dashscope":
kwargs.setdefault("extra_body", {}).update({"enable_thinking": thinking_enabled})
elif spec.name in ("volcengine", "byteplus", ...):
kwargs.setdefault("extra_body", {}).update(
{"thinking": {"type": "enabled" if thinking_enabled else "disabled"}}
)

if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = tool_choice or "auto"

return kwargs

这段代码就是在处理各种”虽然大家都说兼容OpenAI,但具体行为有出入”的情况。比如推理模型不支持temperature这件事,如果你不做判断直接传,某些模型会报400错误。

_sanitize_messages — 消息清洗

消息在进入HTTP请求前要做几件清洗的事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _sanitize_messages(self, messages) -> list:
# 1. 过滤掉非法字段,只保留标准字段
sanitized = LLMProvider._sanitize_request_messages(messages, _ALLOWED_MSG_KEYS)

for clean in sanitized:
if isinstance(clean.get("tool_calls"), list):
# 2. 确保arguments是合法的JSON字符串
for tc in clean["tool_calls"]:
tc["function"]["arguments"] = self._normalize_tool_call_arguments(
tc["function"].get("arguments")
)
# 3. assistant消息同时有content和tool_calls时,content置为None
# (部分gateway不接受同时有这两个字段)
clean["content"] = None

# 4. 保证角色交替(user/assistant交替出现,不能连续两条user)
return self._enforce_role_alternation(sanitized)

_enforce_role_alternation值得单独说一下。OpenAI协议要求user和assistant交替出现,如果两条user消息连续,有些provider会拒绝。nanobot的做法是把相邻的同角色消息合并——两条user文本消息就拼在一起,结尾如果是assistant消息就丢掉(因为让模型接着自己上一条说话语义上不安全)。

chat() — 主调用,两条路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def chat(self, messages, tools=None, model=None, ...) -> LLMResponse:
try:
# 路径一:OpenAI的o系列/gpt-5,走Responses API
if self._should_use_responses_api(model, reasoning_effort):
try:
body = self._build_responses_body(...)
return parse_response_output(await self._client.responses.create(**body))
except Exception as responses_error:
# 如果Responses API返回了兼容性相关的400/404,则降级到Chat API
if not self._should_fallback_from_responses_error(responses_error):
raise

# 路径二:所有其他provider,走Chat Completions API(主路径)
kwargs = self._build_kwargs(...)
return self._parse(await self._client.chat.completions.create(**kwargs))
except Exception as e:
return self._handle_error(e, spec=self._spec, api_base=self.api_base)

有两条API路径:

Chat Completions API/v1/chat/completions)是传统路径,所有OpenAI-compatible provider都支持,这是主路径。

Responses API/v1/responses)是OpenAI 2025年新推出的接口,主要服务于o1/o3/o4这类推理模型和gpt-5。它的格式和Chat Completions不同,单独处理。如果Responses API返回了4xx错误(可能是部分proxy不支持),会自动降级到Chat Completions API。

_parse() — 把响应变成LLMResponse

这是从”大模型返回的原始数据”到”nanobot内部统一格式”的转换。代码很长,但逻辑分两段:

第一段:dict格式(原始JSON解析出来的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def _parse(self, response) -> LLMResponse:
# SDK有时会返回dict,有时返回Pydantic对象,分开处理
response_map = self._maybe_mapping(response)
if response_map is not None:
choices = response_map.get("choices") or []
choice0 = self._maybe_mapping(choices[0]) or {}
msg0 = self._maybe_mapping(choice0.get("message")) or {}

# 提取文本内容(content可能是字符串,也可能是content blocks列表)
content = self._extract_text_content(msg0.get("content"))
finish_reason = str(choice0.get("finish_reason") or "stop")

# 收集所有choices里的tool_calls
raw_tool_calls = []
for ch in choices:
ch_map = self._maybe_mapping(ch) or {}
m = self._maybe_mapping(ch_map.get("message")) or {}
tool_calls = m.get("tool_calls")
if isinstance(tool_calls, list) and tool_calls:
raw_tool_calls.extend(tool_calls)

# 把每个tool_call转成ToolCallRequest
parsed_tool_calls = []
for tc in raw_tool_calls:
tc_map = self._maybe_mapping(tc) or {}
fn = self._maybe_mapping(tc_map.get("function")) or {}
args = fn.get("arguments", {})
if isinstance(args, str):
args = json_repair.loads(args) # 有些模型返回非法JSON,json_repair能容错修复
parsed_tool_calls.append(ToolCallRequest(
id=..., # 工具调用ID
name=str(fn.get("name") or ""),
arguments=args if isinstance(args, dict) else {},
))

return LLMResponse(
content=content,
tool_calls=parsed_tool_calls,
finish_reason=finish_reason,
usage=self._extract_usage(response_map),
reasoning_content=...,
)

json_repair.loads用于容错——有些模型输出的arguments JSON格式有小问题(比如少了个引号),json_repair能自动修复后解析。

第二段:Pydantic对象格式(OpenAI SDK解析后的)

代码结构完全一样,只是访问方式从字典改成属性:

1
2
3
4
5
6
7
# 前面是 response_map.get("choices")
# 后面是 response.choices

choice = response.choices[0]
msg = choice.message
content = msg.content
# ...

_extract_usage() — token用量的归一化

不同provider返回的cached_tokens字段放在不同地方,_extract_usage做了统一处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@classmethod
def _extract_usage(cls, response) -> dict:
# ...解析 prompt_tokens / completion_tokens / total_tokens

# cached_tokens字段各家位置不同,按优先级找:
for path in (
("prompt_tokens_details", "cached_tokens"), # OpenAI/Zhipu/MiniMax/Qwen/Mistral/xAI
("cached_tokens",), # StepFun/Moonshot(顶层字段)
("prompt_cache_hit_tokens",), # DeepSeek/SiliconFlow
):
cached = cls._get_nested_int(usage_map, path)
if cached:
result["cached_tokens"] = cached
break

return result

Prompt缓存是各家大模型都支持的一种优化:如果你前后两次请求的system prompt完全一样,服务器端会缓存这部分,第二次不用重新计算,可以降低延迟和费用。cached_tokens就是这次有多少tokens命中了缓存。各家厂商在response里放这个字段的位置不同,nanobot这里做了统一归一化。

至于如何让大模型开启缓存——我们之前看到_apply_cache_control方法里,会在system消息和tools最后一个元素加上"cache_control": {"type": "ephemeral"}标记,这是告诉模型”请在这个边界之前的内容缓存起来”。

工具调用的完整循环

前面讲了provider如何解析出ToolCallRequest列表,但工具怎么被执行、结果怎么传回去,这部分在AgentRunner里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# runner.py 里的核心循环

if response.has_tool_calls:
# 1. 把模型的工具调用意图序列化成OpenAI格式,追加进messages
assistant_message = build_assistant_message(
response.content or "",
tool_calls=[tc.to_openai_tool_call() for tc in response.tool_calls],
)
messages.append(assistant_message)

# 2. 执行所有工具(可以并发)
results, _, _ = await self._execute_tools(spec, response.tool_calls, ...)

# 3. 把每个工具的执行结果追加进messages
for tool_call, result in zip(response.tool_calls, results):
tool_message = {
"role": "tool",
"tool_call_id": tool_call.id, # ← 必须和上面assistant_message里的id对应
"name": tool_call.name,
"content": str(result), # 工具返回值,字符串
}
messages.append(tool_message)

# 4. 继续下一次LLM调用(带上新追加的消息)
continue

经过这一轮后,messages数组就增加了两块:

  1. assistant消息(包含tool_calls字段,说明它想调哪些工具)
  2. 一条或多条tool消息(每个工具的执行结果)

然后带着这些消息再调一次LLM,模型看到了工具的执行结果,就能给出最终回答了。

tool_call_id是用来配对的:tool消息里的tool_call_id必须和assistant消息里tool_calls[].id一致,模型才能知道哪条tool结果对应哪个工具调用。

流式响应的处理

先从头说起:什么是流式

你用ChatGPT的时候,回复是一个字一个字蹦出来的,而不是等几秒钟突然刷出一大段。这就是流式(streaming)

实现原理是:服务器和客户端之间建立一个HTTP长连接,不等全部内容生成完,服务器边生成边往这个连接里写数据,客户端边读边处理。这个技术叫做SSE(Server-Sent Events,服务器推送事件)

具体格式很简单,每行格式是 data: {JSON内容}\n\n,最后用 data: [DONE]\n\n 表示结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
data: {"choices": [{"delta": {"content": "好"}}]}

data: {"choices": [{"delta": {"content": "的,"}}]}

data: {"choices": [{"delta": {"content": "我查一下"}}]}

data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "abc123", "function": {"name": "get_weather"}}]}}]}

data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "{\"city\""}}]}}]}

data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": ": \"北京\"}"}}]}}]}

data: {"choices": [{"finish_reason": "tool_calls"}], "usage": {"prompt_tokens": 200, "completion_tokens": 50}}

data: [DONE]

服务器不等你把话说完,而是把每个”增量”(delta)推给你。每条数据就叫一个chunk(块)。文本内容的chunk里只有新增的那几个字,工具调用的chunk里也只有新增的那一段参数片段。

对比非流式的一次性返回:

  • 非流式:客户端等待,服务器生成完后发一大段JSON,客户端解析一次
  • 流式:客户端持续接收,服务器每生成一点就推一个chunk,客户端要把所有chunk拼成完整结果

所以流式的处理分两个阶段:

  1. 收集阶段:一边接收chunk,一边把文本内容实时展示给用户
  2. 拼接阶段:等流结束后,把所有chunk拼成完整的LLMResponse(同非流式路径的返回格式)

流式响应拼接流程

chat_stream() — 流式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def chat_stream(self, messages, tools=None, model=None,
on_content_delta=None, ...) -> LLMResponse:
idle_timeout_s = int(os.environ.get("NANOBOT_STREAM_IDLE_TIMEOUT_S", "90"))
try:
kwargs = self._build_kwargs(...)
kwargs["stream"] = True
kwargs["stream_options"] = {"include_usage": True} # 最后一个chunk包含token用量

stream = await self._client.chat.completions.create(**kwargs)
chunks = []
stream_iter = stream.__aiter__()

while True:
try:
chunk = await asyncio.wait_for(
stream_iter.__anext__(),
timeout=idle_timeout_s, # 如果90秒没有新chunk,认为连接挂了
)
except StopAsyncIteration:
break

chunks.append(chunk)
# 实时回调:每收到一个文本chunk,立刻传给终端渲染器
if on_content_delta and chunk.choices:
text = getattr(chunk.choices[0].delta, "content", None)
if text:
await on_content_delta(text)

return self._parse_chunks(chunks)
except asyncio.TimeoutError:
return LLMResponse(content="Error: stream stalled...", finish_reason="error", ...)

有一个90秒的空闲超时——如果90秒之内没有收到任何新chunk,就认为连接卡死了,返回一个error。这是为了防止进程永久阻塞在一个死掉的连接上。

_parse_chunks() — 把碎片化的chunks拼成完整响应

收集完所有chunk之后,需要把这些碎片还原成跟非流式路径一模一样的LLMResponse。这一步有三类内容需要拼接,各有各的处理方式。

文本内容的拼接

最简单。每个chunk的delta.content就是新增的那几个字,追加到content_parts列表里,最后"".join()合并:

1
2
3
4
5
6
7
8
content_parts = []

for chunk in chunks:
delta = chunk.choices[0].delta
if delta.content:
content_parts.append(delta.content) # 追加增量文本

final_content = "".join(content_parts) # "好" + "的," + "我查一下" = "好的,我查一下"

工具调用参数的拼接(难点)

这里要解决的问题是:工具调用的参数是一段JSON字符串,但它是被拆碎了分批推过来的。

比如调get_weather(city="北京"),参数{"city": "北京"}可能被拆成这样发过来:

1
2
3
chunk A: arguments 片段 = '{"city'
chunk B: arguments 片段 = '": "'
chunk C: arguments 片段 = '北京"}'

每一片都不是合法的JSON,只有全部拼起来才能解析。所以没法边收边解析,必须把字符串碎片都攒起来,等全部chunk处理完再统一json.loads

更复杂的是:一次LLM调用可能同时有多个工具调用。比如模型决定同时调get_weathersearch_news,这两个工具调用的参数片段会交错混在一起推过来:

1
2
3
4
5
6
chunk 1: tool_calls[index=0].name = "get_weather"
chunk 2: tool_calls[index=1].name = "search_news"
chunk 3: tool_calls[index=0].arguments = '{"city'
chunk 4: tool_calls[index=1].arguments = '{"query'
chunk 5: tool_calls[index=0].arguments = '": "北京"}'
chunk 6: tool_calls[index=1].arguments = '": "今日新闻"}'

注意每个chunk里有index字段(0、1、2……)来区分是第几个工具调用。_parse_chunkstc_bufs字典,以index为key,分别维护每个工具调用的累积缓冲区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tc_bufs = {}   # key=index, value={id, name, arguments字符串}

for chunk in chunks:
delta = chunk.choices[0].delta
for tc in (delta.tool_calls or []):
tc_index = tc.index
buf = tc_bufs.setdefault(tc_index, {"id": "", "name": "", "arguments": ""})

if tc.id:
buf["id"] = str(tc.id) # id只在第一个片段里出现
if tc.function.name:
buf["name"] += str(tc.function.name) # name也一般只出现一次
if tc.function.arguments:
buf["arguments"] += str(tc.function.arguments) # arguments要累积字符串拼接

等所有chunk处理完,tc_bufs里每个工具调用的arguments都是完整的JSON字符串,再一次性解析:

1
2
3
4
5
6
7
8
tool_calls = [
ToolCallRequest(
id=buf["id"] or _short_tool_id(), # 如果大模型给了id就保留,否则自己生成
name=buf["name"],
arguments=json_repair.loads(buf["arguments"]) if buf["arguments"] else {},
)
for buf in tc_bufs.values()
]

这里用json_repair.loads而不是标准的json.loads,是为了容错——有些模型偶尔输出的参数JSON格式有小问题(比如少了个引号),json_repair能尝试修复后解析,而不是直接报错。

推理内容的拼接

推理模型(DeepSeek-R1、Kimi、MiMo等)在给出最终答案之前会有一段”思考过程”。不同provider返回推理内容的方式不一样:

方式一:独立字段(OpenAI兼容格式的provider)

DeepSeek-R1、MiMo等provider在response的message.reasoning_content字段里返回推理内容。这是最干净的方式——推理内容和正式回复各走各的字段,nanobot提取后存入LLMResponse.reasoning_content

方式二:<think>...</think>标签(MiniMax等非标准provider)

MiniMax的模型把推理内容直接嵌在content文本里,用<think></think>标签包围。比如:

1
2
3
<think>The user wants to know the weather. I should call get_weather tool.</think>

好的,我来帮你查一下天气。

nanobot用strip_think函数把这些标签块从content里抠出来:

1
2
3
4
5
def strip_think(text: str) -> str:
text = re.sub(r"<think>[\s\S]*?</think>", "", text) # 去掉完整的<think>...</think>块
text = re.sub(r"^\s*<think>[\s\S]*$", "", text) # 去掉开头未闭合的<think>块
text = re.sub(r"<thought>[\s\S]*?</thought>", "", text) # Gemma等用<thought>标签
return text.strip()

推理内容不会写入messages

不管是reasoning_content字段还是<think></think>块,都不会出现在后续发给大模型的messages里。原因很简单:推理内容是模型”内部的思考过程”,没必要再喂回去。_sanitize_request_messages的白名单里就没有reasoning_content字段,strip_think也会在发消息前把content里的<think>...</think>块去掉。

在session文件里看到的<think>...</think>,是nanobot在保存历史记录时保留了原始内容(方便用户回溯查看),但下次发请求时会先清洗掉。

最终组装

1
2
3
4
5
6
7
return LLMResponse(
content="".join(content_parts) or None,
tool_calls=tool_calls,
finish_reason=finish_reason, # 来自最后一个有 finish_reason 的chunk
usage=usage, # 来自最后一个chunk(含usage的那个)
reasoning_content="".join(reasoning_parts) or None,
)

可以看到,流式路径的最终返回值和非流式路径的_parse()返回值是同一个LLMResponse结构。这样上层的AgentRunner完全不用关心底下用的是流式还是非流式,拿到LLMResponse后的处理逻辑是完全一样的。

重试机制

重试决策流程

chat_with_retry / _run_with_retry

基类提供了完整的重试逻辑。大模型API频繁报错,429(限流)、5xx(服务器错误)是家常便饭,所以重试是必须的。

1
2
3
4
5
6
7
8
9
10
async def chat_with_retry(self, messages, tools=None, model=None,
retry_mode="standard", ...) -> LLMResponse:
# 补全默认参数(temperature/max_tokens/reasoning_effort)
if max_tokens is self._SENTINEL or max_tokens is None:
max_tokens = self.generation.max_tokens
# ...
return await self._run_with_retry(
self._safe_chat, kw, messages,
retry_mode=retry_mode, on_retry_wait=on_retry_wait,
)

retry_mode有两种:

  • "standard" — 最多重试3次,间隔1→2→4秒,适合正常对话
  • "persistent" — 一直重试直到成功或错误相同次数超过10次,适合后台任务(如Dream的记忆更新,不能因为临时限流就放弃)

重试决策的核心是判断”这个错误能重试吗”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@classmethod
def _is_transient_response(cls, response: LLMResponse) -> bool:
# 优先看结构化的错误元数据
if response.error_should_retry is not None:
return bool(response.error_should_retry)

if response.error_status_code is not None:
status = int(response.error_status_code)
if status == 429:
return cls._is_retryable_429_response(response)
if status in {408, 409} or status >= 500:
return True

# 退化到文本匹配
return cls._is_transient_error(response.content)

429有单独处理,因为429有两种:

  1. rate limit(请求太频繁)— 可以重试,等一下就好
  2. quota exceeded / insufficient_balance(余额不足)— 不能重试,重试也没用

判断方法是看错误消息里是否包含insufficient_quotainsufficient_balance等关键词。

等待时间会尊重provider返回的Retry-After响应头(有些provider会说”你等X秒再来”),如果没有这个头就用默认的指数退避(1→2→4秒)。

有一个特殊情况:如果是非瞬态错误,但消息里有图片,会先尝试去掉图片重试一次——因为有些provider不支持图片,返回400,但这不是真正的”fatal error”,去掉图片后可能就能成功。

总结

这篇把providers层从上到下捋了一遍,有几个设计值得回味:

  1. 注册表驱动ProviderSpec是纯数据,不包含任何实例化逻辑。增加一个新provider只需要在PROVIDERS列表里加一个ProviderSpec,设置好backend字段。各provider的差异(API地址、参数名、认证方式)全靠元数据描述,代码不用改。

  2. 统一接口隔离差异LLMProvider.chat()这个抽象方法把所有provider的差异封装在内部。上层AgentRunner只用chat_with_retry,对底下是哪家大模型完全透明。

  3. 流式和非流式的统一出口chat_stream()最终也返回LLMResponse,和非流式路径的格式一样。上层不用关心用的是哪种方式,逻辑完全复用。

  4. 推理内容的两种格式:OpenAI兼容格式的provider用独立的reasoning_content字段,MiniMax等用<think>...</think>标签嵌在content里。不管是哪种,都不会写入后续发给大模型的messages。

  5. 容错细节json_repair修复JSON格式错误,_enforce_role_alternation修复消息角色顺序,图片失败时自动重试……这些小细节加起来让nanobot在对接多种复杂大模型时稳定得多。

  6. 重试分级:区分了”rate limit”(能等)和”quota exceeded”(无意义)两种429,避免余额用完了还在傻乎乎地无限重试,这个判断在工程上挺实用的。