上篇回顾 上一篇把整体骨架和记忆系统讲清楚了。从CLI启动,到AgentLoop组装prompt,到Consolidator压缩历史,到Dream更新长记忆,一条主线跑下来。但有一个地方是跳过的——上层调用 provider.chat_with_retry(messages, tools, model) 这一句。messages怎么变成HTTP请求?大模型返回什么格式的JSON?工具调用的结果又怎么传回去?这篇专门来讲这些。
provider层是做什么的 先解释一下背景知识,不了解大模型API的同学可以看一下这一节。
大模型API长什么样 你在用ChatGPT、文心一言这些产品的时候,底下有一套HTTP接口供开发者直接调用。它的基本形态是这样的:你发一个POST请求,body里带一个messages数组——每条消息有role(用户/助手/系统)和content(内容)——大模型返回一段JSON,里面是它的回复。
OpenAI最早把这套接口定义成了”事实标准”,其他厂商基本都跟着抄,所以叫OpenAI-compatible API (OpenAI兼容接口)。国内的MiniMax、Deepseek、通义千问(DashScope)、月之暗面(Moonshot)、智谱(Zhipu)……基本上都兼容这个格式,区别只在于请求头、API密钥的传法、以及少数几个参数名的差异。
一次完整的调用长这样:
请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 POST https: { "model" : "gpt-4o" , "messages" : [ { "role" : "system" , "content" : "你是一个助手" } , { "role" : "user" , "content" : "今天天气怎么样?" } ] , "tools" : [ { "type" : "function" , "function" : { "name" : "get_weather" , "description" : "获取某地天气" , "parameters" : { "type" : "object" , "properties" : { "city" : { "type" : "string" } } , "required" : [ "city" ] } } } ] }
正常回复(没有工具调用)
1 2 3 4 5 6 7 { "choices" : [ { "message" : { "role" : "assistant" , "content" : "我需要先查询一下..." } , "finish_reason" : "stop" } ] , "usage" : { "prompt_tokens" : 120 , "completion_tokens" : 30 , "total_tokens" : 150 } }
工具调用回复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "choices" : [ { "message" : { "role" : "assistant" , "content" : null , "tool_calls" : [ { "id" : "call_abc123" , "type" : "function" , "function" : { "name" : "get_weather" , "arguments" : "{\"city\": \"北京\"}" } } ] } , "finish_reason" : "tool_calls" } ] }
注意这个工具调用回复:content是null,多了一个tool_calls数组,里面每个元素包含工具名字和参数。finish_reason变成了"tool_calls",意思是”我没结束,我需要你帮我执行这个工具然后把结果告诉我”。
上层拿到这个回复,执行get_weather工具,把结果装成role: tool的消息追加进messages数组,再发一次请求,模型才会返回最终的回答。这就是工具调用的完整循环 ,也是第一篇里AgentRunner做的事情。
为什么需要provider抽象层 既然大家都兼容OpenAI格式,为什么不直接用OpenAI的SDK就完事了?因为”兼容”只是大方向,细节有一堆差异:
参数名不同 :OpenAI的推理模型(o1/o3系列)用max_completion_tokens而不是max_tokens,某些推理模型不支持temperature参数
streaming格式有差异 :各家返回流式chunk的字段名可能不一样
工具调用ID长度限制 :Mistral要求tool call ID只能是9位字母数字,OpenAI是一个长UUID字符串,如果照搬会报错
认证方式不同 :GitHub Copilot走OAuth,普通key-based的provider直接传Bearer token
本地模型 :Ollama、vLLM在本地跑,不需要真实的API key,但接口格式一致
provider层把这些差异全部抹平,对上层暴露统一的接口。上层的AgentRunner只需要调provider.chat_with_retry(messages, tools, model),完全不用关心底下是OpenAI还是Moonshot还是Ollama。
provider层的代码结构
整个providers/目录:
1 2 3 4 5 6 7 8 9 providers/ ├── base.py # 抽象基类 LLMProvider,以及数据结构定义 ├── registry.py # ProviderSpec注册表,记录所有provider的元数据 ├── openai_compat_provider.py # 核心实现,兼容几十种OpenAI-compatible API ├── anthropic_provider.py # 原生Anthropic SDK实现(Claude专用) ├── azure_openai_provider.py # Azure OpenAI(走Responses API) ├── github_copilot_provider.py # GitHub Copilot(OAuth认证,继承openai_compat) ├── openai_codex_provider.py # OpenAI Codex(OAuth认证,继承openai_compat) └── openai_responses/ # OpenAI Responses API的格式转换工具
base.py — 三个核心数据结构1 2 3 4 5 6 7 8 9 @dataclass class ToolCallRequest : """A tool call request from the LLM.""" id : str name: str arguments: dict [str , Any ] extra_content: dict [str , Any ] | None = None provider_specific_fields: dict [str , Any ] | None = None function_provider_specific_fields: dict [str , Any ] | None = None
这是模型”决定要调用某个工具”的时候,nanobot在内部用来表示这个工具调用请求的对象。id是这次调用的唯一标识,name是工具名,arguments是已经解析好的Python字典(原始JSON是个字符串,需要解析)。
它有一个to_openai_tool_call()方法,把自己序列化回OpenAI格式的字典,供追加到messages数组里用。
LLMResponse1 2 3 4 5 6 7 8 9 10 11 12 13 @dataclass class LLMResponse : """Response from an LLM provider.""" content: str | None tool_calls: list [ToolCallRequest] = ... finish_reason: str = "stop" usage: dict [str , int ] = ... reasoning_content: str | None = None thinking_blocks: list [dict ] | None = None error_status_code: int | None = None error_kind: str | None = None error_should_retry: bool | None = None
这是provider层向上层返回的统一格式。不管底下调的是哪家大模型,上层拿到的一定是这个结构。
finish_reason是个关键字段:
"stop" — 模型正常结束,有文本回复,没有工具调用
"tool_calls" — 模型要调工具,tool_calls数组非空
"length" — 超过了max_tokens限制,回复被截断
"error" — 调用出错,content里是错误描述
上层AgentRunner就是根据finish_reason来决定下一步的:如果是tool_calls就去执行工具,如果是stop就结束循环,如果是error就走重试或报错逻辑。
LLMProvider 抽象基类1 2 3 4 5 6 7 8 9 10 11 12 13 class LLMProvider (ABC ): @abstractmethod async def chat ( self, messages: list [dict [str , Any ]], tools: list [dict [str , Any ]] | None = None , model: str | None = None , max_tokens: int = 4096 , temperature: float = 0.7 , reasoning_effort: str | None = None , tool_choice: str | dict [str , Any ] | None = None , ) -> LLMResponse: ...
chat是唯一的抽象方法,子类必须实现。基类还提供了几个公共方法:
chat_with_retry() — 带重试的chat调用
chat_stream() — 流式版本,默认回退到普通调用,子类可以重写实现真正的streaming
chat_stream_with_retry() — 带重试的流式调用
**调用方只用chat_with_retry和chat_stream_with_retry,不直接调chat**,因为重试逻辑是统一在基类里实现的。
registry.py — ProviderSpec注册表registry.py里定义了一个PROVIDERS列表,每个元素是一个ProviderSpec:
1 2 3 4 5 6 7 8 9 10 11 12 13 @dataclass(frozen=True ) class ProviderSpec : name: str keywords: tuple env_key: str backend: str = "openai_compat" default_api_base: str = "" is_gateway: bool = False is_local: bool = False strip_model_prefix: bool = False supports_prompt_caching: bool = False model_overrides: tuple = ()
配几个典型例子看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 ProviderSpec( name="deepseek" , keywords=("deepseek" ,), env_key="DEEPSEEK_API_KEY" , backend="openai_compat" , default_api_base="https://api.deepseek.com" , ), ProviderSpec( name="moonshot" , keywords=("moonshot" , "kimi" ), env_key="MOONSHOT_API_KEY" , backend="openai_compat" , default_api_base="https://api.moonshot.ai/v1" , model_overrides=(("kimi-k2.5" , {"temperature" : 1.0 }),), ), ProviderSpec( name="openrouter" , keywords=("openrouter" ,), env_key="OPENROUTER_API_KEY" , backend="openai_compat" , is_gateway=True , detect_by_key_prefix="sk-or-" , default_api_base="https://openrouter.ai/api/v1" , supports_prompt_caching=True , ), ProviderSpec( name="anthropic" , keywords=("anthropic" , "claude" ), env_key="ANTHROPIC_API_KEY" , backend="anthropic" , supports_prompt_caching=True , ),
registry.py就是纯数据,不做任何实例化。实例化逻辑在nanobot/cli/commands.py里,根据配置文件里的model名和provider名,找到对应的ProviderSpec,再根据backend字段创建对应的provider对象。
OpenAICompatProvider — 核心实现 终于到重头戏了。openai_compat_provider.py有1016行,是整个providers目录里最大的文件,支撑了除Anthropic和Azure之外的所有provider。
完整的调用路径是这样的:
下面逐段走读。
初始化:一个AsyncOpenAI搞定一切 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class OpenAICompatProvider (LLMProvider ): def __init__ ( self, api_key: str | None = None , api_base: str | None = None , default_model: str = "gpt-4o" , extra_headers: dict [str , str ] | None = None , spec: ProviderSpec | None = None , ): super ().__init__(api_key, api_base) self .default_model = default_model self ._spec = spec effective_base = api_base or (spec.default_api_base if spec else None ) or None self ._client = AsyncOpenAI( api_key=api_key or "no-key" , base_url=effective_base, default_headers=default_headers, max_retries=0 , )
精髓在这里:OpenAI的官方Python SDK支持自定义base_url。DeepSeek的API、MiniMax的API……只要兼容OpenAI格式,换个base_url就可以直接用同一个客户端对接。max_retries=0是因为重试逻辑由nanobot自己的_run_with_retry管理,不想SDK和nanobot之间出现重试叠加。
注意api_key or "no-key"这个处理——本地部署的Ollama、vLLM不需要真实的key,但SDK要求这个参数不能为空,所以传个占位字符串。
_build_kwargs — 把通用参数翻译成各厂商能接受的格式这是adapter模式的核心:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def _build_kwargs (self, messages, tools, model, max_tokens, temperature, reasoning_effort, tool_choice ) -> dict : model_name = model or self .default_model spec = self ._spec if spec and spec.supports_prompt_caching: if any (model_name.lower().startswith(k) for k in ("anthropic/" , "claude" )): messages, tools = self ._apply_cache_control(messages, tools) if spec and spec.strip_model_prefix: model_name = model_name.split("/" )[-1 ] kwargs = { "model" : model_name, "messages" : self ._sanitize_messages(self ._sanitize_empty_content(messages)), } if self ._supports_temperature(model_name, reasoning_effort): kwargs["temperature" ] = temperature if spec and getattr (spec, "supports_max_completion_tokens" , False ): kwargs["max_completion_tokens" ] = max (1 , max_tokens) else : kwargs["max_tokens" ] = max (1 , max_tokens) if spec: model_lower = model_name.lower() for pattern, overrides in spec.model_overrides: if pattern in model_lower: kwargs.update(overrides) break if reasoning_effort: kwargs["reasoning_effort" ] = reasoning_effort if spec and reasoning_effort is not None : thinking_enabled = reasoning_effort.lower() != "minimal" if spec.name == "dashscope" : kwargs.setdefault("extra_body" , {}).update({"enable_thinking" : thinking_enabled}) elif spec.name in ("volcengine" , "byteplus" , ...): kwargs.setdefault("extra_body" , {}).update( {"thinking" : {"type" : "enabled" if thinking_enabled else "disabled" }} ) if tools: kwargs["tools" ] = tools kwargs["tool_choice" ] = tool_choice or "auto" return kwargs
这段代码就是在处理各种”虽然大家都说兼容OpenAI,但具体行为有出入”的情况。比如推理模型不支持temperature这件事,如果你不做判断直接传,某些模型会报400错误。
_sanitize_messages — 消息清洗消息在进入HTTP请求前要做几件清洗的事:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def _sanitize_messages (self, messages ) -> list : sanitized = LLMProvider._sanitize_request_messages(messages, _ALLOWED_MSG_KEYS) for clean in sanitized: if isinstance (clean.get("tool_calls" ), list ): for tc in clean["tool_calls" ]: tc["function" ]["arguments" ] = self ._normalize_tool_call_arguments( tc["function" ].get("arguments" ) ) clean["content" ] = None return self ._enforce_role_alternation(sanitized)
_enforce_role_alternation值得单独说一下。OpenAI协议要求user和assistant交替出现,如果两条user消息连续,有些provider会拒绝。nanobot的做法是把相邻的同角色消息合并——两条user文本消息就拼在一起,结尾如果是assistant消息就丢掉(因为让模型接着自己上一条说话语义上不安全)。
chat() — 主调用,两条路1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async def chat (self, messages, tools=None , model=None , ... ) -> LLMResponse: try : if self ._should_use_responses_api(model, reasoning_effort): try : body = self ._build_responses_body(...) return parse_response_output(await self ._client.responses.create(**body)) except Exception as responses_error: if not self ._should_fallback_from_responses_error(responses_error): raise kwargs = self ._build_kwargs(...) return self ._parse(await self ._client.chat.completions.create(**kwargs)) except Exception as e: return self ._handle_error(e, spec=self ._spec, api_base=self .api_base)
有两条API路径:
Chat Completions API (/v1/chat/completions)是传统路径,所有OpenAI-compatible provider都支持,这是主路径。
Responses API (/v1/responses)是OpenAI 2025年新推出的接口,主要服务于o1/o3/o4这类推理模型和gpt-5。它的格式和Chat Completions不同,单独处理。如果Responses API返回了4xx错误(可能是部分proxy不支持),会自动降级到Chat Completions API。
_parse() — 把响应变成LLMResponse这是从”大模型返回的原始数据”到”nanobot内部统一格式”的转换。代码很长,但逻辑分两段:
第一段:dict格式(原始JSON解析出来的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def _parse (self, response ) -> LLMResponse: response_map = self ._maybe_mapping(response) if response_map is not None : choices = response_map.get("choices" ) or [] choice0 = self ._maybe_mapping(choices[0 ]) or {} msg0 = self ._maybe_mapping(choice0.get("message" )) or {} content = self ._extract_text_content(msg0.get("content" )) finish_reason = str (choice0.get("finish_reason" ) or "stop" ) raw_tool_calls = [] for ch in choices: ch_map = self ._maybe_mapping(ch) or {} m = self ._maybe_mapping(ch_map.get("message" )) or {} tool_calls = m.get("tool_calls" ) if isinstance (tool_calls, list ) and tool_calls: raw_tool_calls.extend(tool_calls) parsed_tool_calls = [] for tc in raw_tool_calls: tc_map = self ._maybe_mapping(tc) or {} fn = self ._maybe_mapping(tc_map.get("function" )) or {} args = fn.get("arguments" , {}) if isinstance (args, str ): args = json_repair.loads(args) parsed_tool_calls.append(ToolCallRequest( id =..., name=str (fn.get("name" ) or "" ), arguments=args if isinstance (args, dict ) else {}, )) return LLMResponse( content=content, tool_calls=parsed_tool_calls, finish_reason=finish_reason, usage=self ._extract_usage(response_map), reasoning_content=..., )
json_repair.loads用于容错——有些模型输出的arguments JSON格式有小问题(比如少了个引号),json_repair能自动修复后解析。
第二段:Pydantic对象格式(OpenAI SDK解析后的)
代码结构完全一样,只是访问方式从字典改成属性:
1 2 3 4 5 6 7 choice = response.choices[0 ] msg = choice.message content = msg.content
不同provider返回的cached_tokens字段放在不同地方,_extract_usage做了统一处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @classmethod def _extract_usage (cls, response ) -> dict : for path in ( ("prompt_tokens_details" , "cached_tokens" ), ("cached_tokens" ,), ("prompt_cache_hit_tokens" ,), ): cached = cls._get_nested_int(usage_map, path) if cached: result["cached_tokens" ] = cached break return result
Prompt缓存 是各家大模型都支持的一种优化:如果你前后两次请求的system prompt完全一样,服务器端会缓存这部分,第二次不用重新计算,可以降低延迟和费用。cached_tokens就是这次有多少tokens命中了缓存。各家厂商在response里放这个字段的位置不同,nanobot这里做了统一归一化。
至于如何让大模型开启缓存——我们之前看到_apply_cache_control方法里,会在system消息和tools最后一个元素加上"cache_control": {"type": "ephemeral"}标记,这是告诉模型”请在这个边界之前的内容缓存起来”。
工具调用的完整循环 前面讲了provider如何解析出ToolCallRequest列表,但工具怎么被执行、结果怎么传回去,这部分在AgentRunner里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 if response.has_tool_calls: assistant_message = build_assistant_message( response.content or "" , tool_calls=[tc.to_openai_tool_call() for tc in response.tool_calls], ) messages.append(assistant_message) results, _, _ = await self ._execute_tools(spec, response.tool_calls, ...) for tool_call, result in zip (response.tool_calls, results): tool_message = { "role" : "tool" , "tool_call_id" : tool_call.id , "name" : tool_call.name, "content" : str (result), } messages.append(tool_message) continue
经过这一轮后,messages数组就增加了两块:
assistant消息(包含tool_calls字段,说明它想调哪些工具)
一条或多条tool消息(每个工具的执行结果)
然后带着这些消息再调一次LLM,模型看到了工具的执行结果,就能给出最终回答了。
tool_call_id是用来配对的:tool消息里的tool_call_id必须和assistant消息里tool_calls[].id一致,模型才能知道哪条tool结果对应哪个工具调用。
流式响应的处理 先从头说起:什么是流式 你用ChatGPT的时候,回复是一个字一个字蹦出来的,而不是等几秒钟突然刷出一大段。这就是流式(streaming) 。
实现原理是:服务器和客户端之间建立一个HTTP长连接 ,不等全部内容生成完,服务器边生成边往这个连接里写数据,客户端边读边处理。这个技术叫做SSE(Server-Sent Events,服务器推送事件) 。
具体格式很简单,每行格式是 data: {JSON内容}\n\n,最后用 data: [DONE]\n\n 表示结束:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 data: {"choices": [{"delta": {"content": "好"}}]} data: {"choices": [{"delta": {"content": "的,"}}]} data: {"choices": [{"delta": {"content": "我查一下"}}]} data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "abc123", "function": {"name": "get_weather"}}]}}]} data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "{\"city\""}}]}}]} data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": ": \"北京\"}"}}]}}]} data: {"choices": [{"finish_reason": "tool_calls"}], "usage": {"prompt_tokens": 200, "completion_tokens": 50}} data: [DONE]
服务器不等你把话说完,而是把每个”增量”(delta)推给你。每条数据就叫一个chunk(块) 。文本内容的chunk里只有新增的那几个字,工具调用的chunk里也只有新增的那一段参数片段。
对比非流式的一次性返回:
非流式 :客户端等待,服务器生成完后发一大段JSON,客户端解析一次
流式 :客户端持续接收,服务器每生成一点就推一个chunk,客户端要把所有chunk拼成完整结果
所以流式的处理分两个阶段:
收集阶段 :一边接收chunk,一边把文本内容实时展示给用户
拼接阶段 :等流结束后,把所有chunk拼成完整的LLMResponse(同非流式路径的返回格式)
chat_stream() — 流式调用1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 async def chat_stream (self, messages, tools=None , model=None , on_content_delta=None , ... ) -> LLMResponse: idle_timeout_s = int (os.environ.get("NANOBOT_STREAM_IDLE_TIMEOUT_S" , "90" )) try : kwargs = self ._build_kwargs(...) kwargs["stream" ] = True kwargs["stream_options" ] = {"include_usage" : True } stream = await self ._client.chat.completions.create(**kwargs) chunks = [] stream_iter = stream.__aiter__() while True : try : chunk = await asyncio.wait_for( stream_iter.__anext__(), timeout=idle_timeout_s, ) except StopAsyncIteration: break chunks.append(chunk) if on_content_delta and chunk.choices: text = getattr (chunk.choices[0 ].delta, "content" , None ) if text: await on_content_delta(text) return self ._parse_chunks(chunks) except asyncio.TimeoutError: return LLMResponse(content="Error: stream stalled..." , finish_reason="error" , ...)
有一个90秒的空闲超时——如果90秒之内没有收到任何新chunk,就认为连接卡死了,返回一个error。这是为了防止进程永久阻塞在一个死掉的连接上。
_parse_chunks() — 把碎片化的chunks拼成完整响应收集完所有chunk之后,需要把这些碎片还原成跟非流式路径一模一样的LLMResponse。这一步有三类内容需要拼接,各有各的处理方式。
文本内容的拼接 最简单。每个chunk的delta.content就是新增的那几个字,追加到content_parts列表里,最后"".join()合并:
1 2 3 4 5 6 7 8 content_parts = [] for chunk in chunks: delta = chunk.choices[0 ].delta if delta.content: content_parts.append(delta.content) final_content = "" .join(content_parts)
工具调用参数的拼接(难点) 这里要解决的问题是:工具调用的参数是一段JSON字符串,但它是被拆碎了分批推过来的。
比如调get_weather(city="北京"),参数{"city": "北京"}可能被拆成这样发过来:
1 2 3 chunk A: arguments 片段 = '{"city' chunk B: arguments 片段 = '": "' chunk C: arguments 片段 = '北京"}'
每一片都不是合法的JSON,只有全部拼起来才能解析。所以没法边收边解析,必须把字符串碎片都攒起来,等全部chunk处理完再统一json.loads。
更复杂的是:一次LLM调用可能同时有多个工具调用 。比如模型决定同时调get_weather和search_news,这两个工具调用的参数片段会交错混在一起 推过来:
1 2 3 4 5 6 chunk 1: tool_calls[index=0].name = "get_weather" chunk 2: tool_calls[index=1].name = "search_news" chunk 3: tool_calls[index=0].arguments = '{"city' chunk 4: tool_calls[index=1].arguments = '{"query' chunk 5: tool_calls[index=0].arguments = '": "北京"}' chunk 6: tool_calls[index=1].arguments = '": "今日新闻"}'
注意每个chunk里有index字段(0、1、2……)来区分是第几个工具调用。_parse_chunks用tc_bufs字典,以index为key,分别维护每个工具调用的累积缓冲区:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 tc_bufs = {} for chunk in chunks: delta = chunk.choices[0 ].delta for tc in (delta.tool_calls or []): tc_index = tc.index buf = tc_bufs.setdefault(tc_index, {"id" : "" , "name" : "" , "arguments" : "" }) if tc.id : buf["id" ] = str (tc.id ) if tc.function.name: buf["name" ] += str (tc.function.name) if tc.function.arguments: buf["arguments" ] += str (tc.function.arguments)
等所有chunk处理完,tc_bufs里每个工具调用的arguments都是完整的JSON字符串,再一次性解析:
1 2 3 4 5 6 7 8 tool_calls = [ ToolCallRequest( id =buf["id" ] or _short_tool_id(), name=buf["name" ], arguments=json_repair.loads(buf["arguments" ]) if buf["arguments" ] else {}, ) for buf in tc_bufs.values() ]
这里用json_repair.loads而不是标准的json.loads,是为了容错——有些模型偶尔输出的参数JSON格式有小问题(比如少了个引号),json_repair能尝试修复后解析,而不是直接报错。
推理内容的拼接 推理模型(DeepSeek-R1、Kimi、MiMo等)在给出最终答案之前会有一段”思考过程”。不同provider返回推理内容的方式不一样:
方式一:独立字段 (OpenAI兼容格式的provider)
DeepSeek-R1、MiMo等provider在response的message.reasoning_content字段里返回推理内容。这是最干净的方式——推理内容和正式回复各走各的字段,nanobot提取后存入LLMResponse.reasoning_content。
方式二:<think>...</think>标签 (MiniMax等非标准provider)
MiniMax的模型把推理内容直接嵌在content文本里,用<think>和</think>标签包围。比如:
1 2 3 <think>The user wants to know the weather. I should call get_weather tool.</think> 好的,我来帮你查一下天气。
nanobot用strip_think函数把这些标签块从content里抠出来:
1 2 3 4 5 def strip_think (text: str ) -> str : text = re.sub(r"<think>[\s\S]*?</think>" , "" , text) text = re.sub(r"^\s*<think>[\s\S]*$" , "" , text) text = re.sub(r"<thought>[\s\S]*?</thought>" , "" , text) return text.strip()
推理内容不会写入messages
不管是reasoning_content字段还是<think></think>块,都不会 出现在后续发给大模型的messages里。原因很简单:推理内容是模型”内部的思考过程”,没必要再喂回去。_sanitize_request_messages的白名单里就没有reasoning_content字段,strip_think也会在发消息前把content里的<think>...</think>块去掉。
在session文件里看到的<think>...</think>,是nanobot在保存历史记录 时保留了原始内容(方便用户回溯查看),但下次发请求时会先清洗掉。
最终组装 1 2 3 4 5 6 7 return LLMResponse( content="" .join(content_parts) or None , tool_calls=tool_calls, finish_reason=finish_reason, usage=usage, reasoning_content="" .join(reasoning_parts) or None , )
可以看到,流式路径的最终返回值和非流式路径的_parse()返回值是同一个LLMResponse结构 。这样上层的AgentRunner完全不用关心底下用的是流式还是非流式,拿到LLMResponse后的处理逻辑是完全一样的。
重试机制
chat_with_retry / _run_with_retry基类提供了完整的重试逻辑。大模型API频繁报错,429(限流)、5xx(服务器错误)是家常便饭,所以重试是必须的。
1 2 3 4 5 6 7 8 9 10 async def chat_with_retry (self, messages, tools=None , model=None , retry_mode="standard" , ... ) -> LLMResponse: if max_tokens is self ._SENTINEL or max_tokens is None : max_tokens = self .generation.max_tokens return await self ._run_with_retry( self ._safe_chat, kw, messages, retry_mode=retry_mode, on_retry_wait=on_retry_wait, )
retry_mode有两种:
"standard" — 最多重试3次,间隔1→2→4秒,适合正常对话
"persistent" — 一直重试直到成功或错误相同次数超过10次,适合后台任务(如Dream的记忆更新,不能因为临时限流就放弃)
重试决策的核心是判断”这个错误能重试吗”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @classmethod def _is_transient_response (cls, response: LLMResponse ) -> bool : if response.error_should_retry is not None : return bool (response.error_should_retry) if response.error_status_code is not None : status = int (response.error_status_code) if status == 429 : return cls._is_retryable_429_response(response) if status in {408 , 409 } or status >= 500 : return True return cls._is_transient_error(response.content)
429有单独处理,因为429有两种:
rate limit (请求太频繁)— 可以重试,等一下就好
quota exceeded / insufficient_balance (余额不足)— 不能重试,重试也没用
判断方法是看错误消息里是否包含insufficient_quota、insufficient_balance等关键词。
等待时间会尊重provider返回的Retry-After响应头(有些provider会说”你等X秒再来”),如果没有这个头就用默认的指数退避(1→2→4秒)。
有一个特殊情况:如果是非瞬态错误,但消息里有图片,会先尝试去掉图片重试一次——因为有些provider不支持图片,返回400,但这不是真正的”fatal error”,去掉图片后可能就能成功。
总结 这篇把providers层从上到下捋了一遍,有几个设计值得回味:
注册表驱动 :ProviderSpec是纯数据,不包含任何实例化逻辑。增加一个新provider只需要在PROVIDERS列表里加一个ProviderSpec,设置好backend字段。各provider的差异(API地址、参数名、认证方式)全靠元数据描述,代码不用改。
统一接口隔离差异 :LLMProvider.chat()这个抽象方法把所有provider的差异封装在内部。上层AgentRunner只用chat_with_retry,对底下是哪家大模型完全透明。
流式和非流式的统一出口 :chat_stream()最终也返回LLMResponse,和非流式路径的格式一样。上层不用关心用的是哪种方式,逻辑完全复用。
推理内容的两种格式 :OpenAI兼容格式的provider用独立的reasoning_content字段,MiniMax等用<think>...</think>标签嵌在content里。不管是哪种,都不会写入后续发给大模型的messages。
容错细节 :json_repair修复JSON格式错误,_enforce_role_alternation修复消息角色顺序,图片失败时自动重试……这些小细节加起来让nanobot在对接多种复杂大模型时稳定得多。
重试分级 :区分了”rate limit”(能等)和”quota exceeded”(无意义)两种429,避免余额用完了还在傻乎乎地无限重试,这个判断在工程上挺实用的。