能调通不等于能上线,真正麻烦的是异常分类、成本结构和兼容层
能跑通和能上线,是两件事
第一次调用大模型 API 通常很顺利。装好 SDK,填入 api_key,几行代码就能拿到模型的回复。问题在于,这几行代码直接放进生产环境会很快遇到一类它从没处理过的情况:网络超时、服务端限速、账户余额耗尽、不同供应商的接口差异。这些情况在 demo 阶段都不会出现,因为 demo 只调用一次、只用一家供应商、网络也恰好正常。
进阶系列的第一篇,就把"能跑通的调用"补全成"能上线的调用层"。涉及五件事:错误分类与重试、退避算法、Token 成本控制、并发限流、多模型适配。每一件单独看都不复杂,但少做一件,调用层就会在某个流量高峰或某次供应商抖动时出问题。
下文的代码统一用 DeepSeek 作为示例供应商,它的接口与 OpenAI 兼容,把 base_url 和模型名换掉即可用于其它供应商。
错误分类:不是所有失败都值得重试
重试是补救调用失败最直接的手段。但在写重试逻辑之前,要先回答一个问题:这次失败到底该不该重试。把所有异常一视同仁地重试,轻则浪费时间,重则把一个本该立刻失败的请求拖成一次缓慢的故障。
按 HTTP 状态码,调用失败可以分成两类。
第一类是瞬时性失败,重试有意义:
429 Too Many Requests:触发限速,对方只是要求放慢500 / 502 / 503 / 504:服务端或网关的瞬时故障408 Request Timeout:请求超时- 网络层错误:连接超时、读取超时、连接被重置
第二类是确定性失败,重试没有意义,因为再试结果也不会变:
401 / 403:密钥错误或没有权限400:请求参数非法,比如模型名拼错、消息结构不对402:账户余额不足404:模型或资源不存在- 内容审核拒绝:对方已经明确拒绝这段内容
一个常见的错误写法是用裸 except Exception 把所有异常吞下来重试。鉴权失败重试五次,结果还是鉴权失败,只是把"立刻报错"拖成了"十几秒后报错",日志里还多了五条噪音。所以重试逻辑的第一步不是决定睡多久,而是判断这个异常属于哪一类:
import httpx
from openai import APIStatusError, APIConnectionError, APITimeoutError
RETRYABLE_STATUS = {408, 409, 429, 500, 502, 503, 504}
def is_retryable(exc: Exception) -> bool:
"""判断一个异常是否值得重试"""
# 网络层错误:连不上、读超时,几乎都值得重试
if isinstance(exc, (APIConnectionError, APITimeoutError, httpx.TransportError)):
return True
# HTTP 错误:只重试瞬时类状态码
if isinstance(exc, APIStatusError):
return exc.status_code in RETRYABLE_STATUS
# 其它未知异常:保守起见不重试,让它尽快暴露
return False
退避算法:为什么指数退避必须加抖动
确定一个失败该重试之后,才轮到决定睡多久。几乎所有资料都会讲指数退避:第一次失败睡 1 秒,再失败睡 2 秒、4 秒、8 秒,间隔随重试次数指数增长。指数增长这部分没有问题,它给了上游越来越长的恢复窗口。
问题出在"间隔太整齐"。设想流量高峰时有 200 个客户端几乎同时撞上限速,它们会在几乎同一时刻收到 429,于是几乎同时开始睡 1 秒,再几乎同时醒来一起重试。结果只是把同一波请求洪峰整体平移了 1 秒,对面该过载还是过载。这种现象叫重试风暴(thundering herd),整齐的退避反而把分散的请求"对齐"成了周期性冲击。
解决办法是给退避时间加随机抖动(jitter),把重试时刻打散开。常见有两种做法。一种是 full jitter,在 [0, 指数退避值] 区间里随机取一个值,打散得最彻底。另一种是 equal jitter,保留一半固定退避、另一半随机,保证既被打散、又不会过于贴近立即重试。两种都可用,high 并发场景下 full jitter 表现更稳。
还有一个比本地计算的退避更准确的信息常被忽略:Retry-After 响应头。供应商返回 429 时经常带上它,值是秒数或 HTTP 日期,含义是"等这么久之后我这边就缓过来了"。这是对方给出的确定信息,应当优先于本地猜测的退避时间。
最后,退避还需要两个上限:单次退避有一个 cap(避免退避到几百秒),整个重试过程有一个总时间预算(避免一个请求悄悄卡住几分钟)。
把重试封装成一个装饰器
把上面的错误分类、抖动退避、Retry-After、上限合在一起,封装成一个可复用的异步装饰器。这样业务函数本身保持干净,重试是套在外面的一层:
import asyncio
import functools
import random
import time
import email.utils
def parse_retry_after(exc: Exception) -> float | None:
"""从异常的响应头里解析 Retry-After,单位换算成秒"""
headers = getattr(getattr(exc, "response", None), "headers", {}) or {}
raw = headers.get("retry-after")
if not raw:
return None
if raw.isdigit(): # 形如 "12"
return float(raw)
dt = email.utils.parsedate_to_datetime(raw) # 形如 HTTP 日期
return max(0.0, dt.timestamp() - time.time()) if dt else None
def with_retry(max_retries: int = 4, base: float = 1.0,
cap: float = 30.0, budget: float = 60.0):
"""异步重试装饰器:错误分类 + full jitter 退避 + Retry-After 优先 + 时间预算"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
started = time.monotonic()
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as exc:
last_attempt = attempt == max_retries - 1
over_budget = time.monotonic() - started > budget
if last_attempt or over_budget or not is_retryable(exc):
raise
# 优先采用服务端给的 Retry-After,否则用 full jitter 退避
hinted = parse_retry_after(exc)
backoff = hinted if hinted is not None \
else random.uniform(0, base * 2 ** attempt)
await asyncio.sleep(min(backoff, cap))
return wrapper
return decorator
Python 生态里也有成熟的重试库,比如 tenacity,能用声明式的方式配置停止条件和等待策略。如果不需要 Retry-After 这种定制逻辑,直接用 tenacity 更省事;需要把服务端提示纳进来,就像上面这样手写一层,控制力更强。
Token 成本:先看懂账单结构
第二件容易在生产环境失控的事是成本。很多团队的 Token 账单是一笔糊涂账,原因是对 Token 本身有几个误解。
第一,Token 不是字数。英文大约 4 个字符对应一个 Token,中文一个汉字大致是 1 到 2 个 Token,而且每个模型用的分词器不一样。OpenAI 用 tiktoken,Claude、Qwen、DeepSeek 各有各的分词器。同一段中文 prompt 在不同模型上算出来的 Token 数能差出一两成,所以拿一家的估算去套另一家往往不准。
第二,输入和输出不同价,而且输出通常比输入贵 2 到 4 倍。这意味着省钱的重点常被搞反:与其费力压缩输入的 prompt,不如先约束输出——让模型不要复述问题、不要长篇展开。一句"直接给结论"省下的钱,往往比精简 system prompt 更多。
第三,用好 prompt 缓存。OpenAI、DeepSeek、Anthropic 现在都支持缓存请求的固定前缀。system prompt、few-shot 示例这些每次都相同的部分,命中缓存后输入价会大幅打折。但缓存命中有个前提:前缀必须逐字节一致。所以不要把当前时间戳、随机 ID 这类动态内容拼在 system prompt 的开头,那会让缓存每次都失效。
最后,把每次请求的 usage 落到日志里,按请求维度真正算账,而不是月底看一个总额去猜:
# 美元 / 1K tokens,按供应商实际价目填写
PRICING = {
"deepseek-chat": {"in": 0.00027, "out": 0.0011, "cached_in": 0.00007},
"gpt-4.1-mini": {"in": 0.0004, "out": 0.0016, "cached_in": 0.0001},
}
def log_cost(model: str, usage) -> float:
"""根据 usage 计算单次请求成本并打印"""
p = PRICING[model]
details = getattr(usage, "prompt_tokens_details", None)
cached = getattr(details, "cached_tokens", 0) or 0
fresh_in = usage.prompt_tokens - cached
cost = (fresh_in * p["in"]
+ cached * p["cached_in"]
+ usage.completion_tokens * p["out"]) / 1000
print(f"model={model} in={usage.prompt_tokens}(cached={cached}) "
f"out={usage.completion_tokens} cost=${cost:.5f}")
return cost
并发限流:并发数不等于 QPS
接下来是并发。在本地加一道并发闸门是必要的——服务端的限流(RPM、TPM)是一条被动的红线,撞上去就吃 429;本地限流则是主动地不去撞它。常用工具是 asyncio.Semaphore,它限制同时进行的请求数。
但这里有一个普遍的误解需要澄清:用 Semaphore(5) 把并发控制在 5,并不等于把速率控制在了 5 QPS。信号量限制的是"同一时刻在飞的请求数",不是"每秒发出的请求数"。如果每个请求 100 毫秒返回,5 个并发实际跑出来是 50 QPS;如果每个请求要 2 秒,5 个并发只有 2.5 QPS。两者不是一回事。如果要严格控制 QPS,需要用令牌桶之类按时间发放额度的算法,而不是信号量。
还有两个容易忽略的点。一是 TPM(每分钟 Token 数)限制,它限的是 Token 而不是请求数,几个超大 prompt 的请求就能把 TPM 配额吃光,哪怕 RPM 还很空闲。二是连接池,AsyncOpenAI 底层是 httpx,默认连接数有上限,并发拉高时如果不调整 httpx 的 limits,请求会卡在等待连接,而不是等待模型。
import asyncio
# 全局并发闸门:同一时刻最多 20 个请求在飞
SEMAPHORE = asyncio.Semaphore(20)
async def call_limited(coro_func, *args, **kwargs):
"""在信号量保护下执行一次调用"""
async with SEMAPHORE:
return await coro_func(*args, **kwargs)
并发数怎么定:先查供应商文档里的 RPM 上限,留 70% 到 80% 的安全余量,再结合平均响应时间反推一个不会越过 RPM 的并发数。prompt 普遍较长时,要额外用 TPM 复核一遍。
多模型适配层
最后是多模型适配。需要适配层的理由很实际:供应商可能涨价、可能限流变严、可能某天接口不稳定,这时要能快速切换,而不是回去翻遍代码改调用。
很多人理解的"适配"是存一张 base_url 和模型名的表。但那只是配置,不是适配。真正的差异藏在更深的地方:多数供应商兼容 OpenAI 的 chat completions 结构,但 Anthropic 原生 API 的 system 是顶层参数而不是一条 message;OpenAI 新接口把 max_tokens 改成了 max_completion_tokens;function calling 的结构各家不一致;流式返回的分块结构不同;推理模型(OpenAI o 系列、deepseek-reasoner)会多返回一段推理内容,而且通常不支持 temperature。
务实的做法是分两层。大多数国产模型都提供 OpenAI 兼容接口,这部分用一个统一的 OpenAI 兼容适配器兜掉;差异确实大的(Anthropic 原生、Gemini)单独写各自的适配器。对上层只暴露一个 chat() 方法,业务代码不需要知道当前用的是哪一家:
from abc import ABC, abstractmethod
from openai import AsyncOpenAI
class ChatAdapter(ABC):
@abstractmethod
async def chat(self, messages: list[dict], **kwargs) -> str:
...
class OpenAICompatAdapter(ChatAdapter):
"""覆盖所有提供 OpenAI 兼容接口的供应商:OpenAI、DeepSeek、本地 vLLM 等"""
def __init__(self, base_url: str, api_key: str, model: str):
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=30.0)
self.model = model
@with_retry()
async def chat(self, messages: list[dict], **kwargs) -> str:
resp = await self.client.chat.completions.create(
model=self.model, messages=messages, **kwargs)
if resp.usage:
log_cost(self.model, resp.usage)
return resp.choices[0].message.content or ""
业务代码只跟 ChatAdapter 这个抽象打交道。某家接口改了字段、或者稳定性变差需要切换时,改动集中在适配层一个文件里,而不是散落在整个项目的每一处 create() 调用。
完整示例:一个生产级的 LLM 网关
把前面的错误分类、重试装饰器、成本日志、并发限流、适配层整合到一起,就是一个可以直接用于生产的最小 LLM 网关。它对外暴露 chat() 和批量的 chat_many():
# llm_gateway.py
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
# 上文定义的 is_retryable / with_retry / log_cost 假设已在同模块或导入
load_dotenv()
PROVIDERS = {
"deepseek": {
"base_url": "https://api.deepseek.com/v1",
"api_key": os.getenv("DEEPSEEK_API_KEY"),
"model": "deepseek-chat",
},
"openai": {
"base_url": "https://api.openai.com/v1",
"api_key": os.getenv("OPENAI_API_KEY"),
"model": "gpt-4.1-mini",
},
}
class LLMGateway:
def __init__(self, provider: str = "deepseek", concurrency: int = 20):
cfg = PROVIDERS[provider]
self.client = AsyncOpenAI(
base_url=cfg["base_url"], api_key=cfg["api_key"], timeout=30.0)
self.model = cfg["model"]
self.semaphore = asyncio.Semaphore(concurrency)
@with_retry(max_retries=4, budget=60.0)
async def _raw_chat(self, messages: list[dict], **kwargs) -> str:
resp = await self.client.chat.completions.create(
model=self.model, messages=messages, temperature=0.2, **kwargs)
if resp.usage:
log_cost(self.model, resp.usage)
return resp.choices[0].message.content or ""
async def chat(self, messages: list[dict], **kwargs) -> str:
"""单次调用:并发闸门 + 重试 + 成本日志"""
async with self.semaphore:
return await self._raw_chat(messages, **kwargs)
async def chat_many(self, batch: list[list[dict]]) -> list[str | None]:
"""批量调用:单条失败不影响整体"""
async def one(messages):
try:
return await self.chat(messages)
except Exception as exc:
print(f"调用失败:{exc}")
return None
return await asyncio.gather(*(one(m) for m in batch))
async def main():
gw = LLMGateway(provider="deepseek", concurrency=20)
questions = ["用一句话解释什么是指数退避", "用一句话解释什么是 TPM 限流"]
batch = [[{"role": "user", "content": q}] for q in questions]
for q, answer in zip(questions, await gw.chat_many(batch)):
print(f"Q: {q}\nA: {answer}\n")
if __name__ == "__main__":
asyncio.run(main())
这个网关不到一百行,但已经具备了上线所需的几项能力:错误分类后的指数退避重试、Retry-After 优先、并发限流、单次成本核算、批量调用时单条失败不拖垮整体,以及通过 PROVIDERS 表切换供应商。
几个常见踩坑
用裸 except 重试所有异常。鉴权失败、参数错误、余额不足都不该重试,重试只是延迟报错并污染日志。重试前一定先做错误分类。
指数退避不加抖动。整齐的退避会把分散的请求对齐成周期性冲击,并发越高这个问题越明显。退避一定要加 jitter。
只配重试次数,不配总时间预算。只有 max_retries 时,一个请求叠加退避可能卡住好几分钟。要同时设一个整体的时间预算。
把并发数当成 QPS。信号量控制的是同时在飞的请求数,实际 QPS 还取决于响应时间。要严格控速得用令牌桶。
忽略 TPM 只看 RPM。长 prompt 场景下,TPM 往往比 RPM 先触顶。限流要把 Token 维度也算进去。
适配层只存配置不抽象接口。只存 base_url 和模型名,遇到 system 参数位置、字段名、流式结构的真实差异时还是要改业务代码。适配层要暴露统一的方法签名。
本篇要点
- 重试前先做错误分类:瞬时性失败(429、5xx、网络错误)才重试,确定性失败(401、400、402、404)不重试
- 指数退避必须加随机抖动,避免重试风暴;有
Retry-After时优先采用服务端的提示 - 重试要同时设单次退避上限和整体时间预算
- Token 成本的关键是分词器差异、输入输出价格不对称、prompt 缓存,并按请求记录
usage - 并发数不等于 QPS,限流要同时考虑 RPM 和 TPM
- 多模型适配层要抽象出统一接口,而不只是存一张配置表
- 把以上整合成一个 LLM 网关,业务代码只跟它的
chat()打交道
下一篇
调用层兜住之后,下一个常见误区是把 Prompt 当成玄学,靠反复试碰运气。下一篇会把 Prompt 拆成 few-shot、思维链、结构化输出、模板管理、评估五件可以工程化的事,说明它为什么更像在写配置而不是抽签。
参考资料
- OpenAI Rate limits 指南
- AWS Architecture Blog: Exponential Backoff and Jitter
- DeepSeek API 文档
- tenacity 重试库文档
- httpx 文档 — AsyncOpenAI 底层的 HTTP 客户端
版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。
(采用 CC BY-NC-SA 4.0 许可协议进行授权)
本文标题:API 调用没你想的那么简单
本文链接:https://www.sshipanoo.com/blog/ai/llm-advanced/01-API调用没你想的那么简单/
本文最后一次更新为 天前,文章中的某些内容可能已过时!