能调通不等于能上线,真正麻烦的是异常分类、成本结构和兼容层

能跑通和能上线,是两件事

第一次调用大模型 API 通常很顺利。装好 SDK,填入 api_key,几行代码就能拿到模型的回复。问题在于,这几行代码直接放进生产环境会很快遇到一类它从没处理过的情况:网络超时、服务端限速、账户余额耗尽、不同供应商的接口差异。这些情况在 demo 阶段都不会出现,因为 demo 只调用一次、只用一家供应商、网络也恰好正常。

进阶系列的第一篇,就把"能跑通的调用"补全成"能上线的调用层"。涉及五件事:错误分类与重试、退避算法、Token 成本控制、并发限流、多模型适配。每一件单独看都不复杂,但少做一件,调用层就会在某个流量高峰或某次供应商抖动时出问题。

下文的代码统一用 DeepSeek 作为示例供应商,它的接口与 OpenAI 兼容,把 base_url 和模型名换掉即可用于其它供应商。

错误分类:不是所有失败都值得重试

重试是补救调用失败最直接的手段。但在写重试逻辑之前,要先回答一个问题:这次失败到底该不该重试。把所有异常一视同仁地重试,轻则浪费时间,重则把一个本该立刻失败的请求拖成一次缓慢的故障。

按 HTTP 状态码,调用失败可以分成两类。

第一类是瞬时性失败,重试有意义:

  • 429 Too Many Requests:触发限速,对方只是要求放慢
  • 500 / 502 / 503 / 504:服务端或网关的瞬时故障
  • 408 Request Timeout:请求超时
  • 网络层错误:连接超时、读取超时、连接被重置

第二类是确定性失败,重试没有意义,因为再试结果也不会变:

  • 401 / 403:密钥错误或没有权限
  • 400:请求参数非法,比如模型名拼错、消息结构不对
  • 402:账户余额不足
  • 404:模型或资源不存在
  • 内容审核拒绝:对方已经明确拒绝这段内容

一个常见的错误写法是用裸 except Exception 把所有异常吞下来重试。鉴权失败重试五次,结果还是鉴权失败,只是把"立刻报错"拖成了"十几秒后报错",日志里还多了五条噪音。所以重试逻辑的第一步不是决定睡多久,而是判断这个异常属于哪一类:

import httpx
from openai import APIStatusError, APIConnectionError, APITimeoutError

RETRYABLE_STATUS = {408, 409, 429, 500, 502, 503, 504}


def is_retryable(exc: Exception) -> bool:
    """判断一个异常是否值得重试"""
    # 网络层错误:连不上、读超时,几乎都值得重试
    if isinstance(exc, (APIConnectionError, APITimeoutError, httpx.TransportError)):
        return True
    # HTTP 错误:只重试瞬时类状态码
    if isinstance(exc, APIStatusError):
        return exc.status_code in RETRYABLE_STATUS
    # 其它未知异常:保守起见不重试,让它尽快暴露
    return False

退避算法:为什么指数退避必须加抖动

确定一个失败该重试之后,才轮到决定睡多久。几乎所有资料都会讲指数退避:第一次失败睡 1 秒,再失败睡 2 秒、4 秒、8 秒,间隔随重试次数指数增长。指数增长这部分没有问题,它给了上游越来越长的恢复窗口。

问题出在"间隔太整齐"。设想流量高峰时有 200 个客户端几乎同时撞上限速,它们会在几乎同一时刻收到 429,于是几乎同时开始睡 1 秒,再几乎同时醒来一起重试。结果只是把同一波请求洪峰整体平移了 1 秒,对面该过载还是过载。这种现象叫重试风暴(thundering herd),整齐的退避反而把分散的请求"对齐"成了周期性冲击。

解决办法是给退避时间加随机抖动(jitter),把重试时刻打散开。常见有两种做法。一种是 full jitter,在 [0, 指数退避值] 区间里随机取一个值,打散得最彻底。另一种是 equal jitter,保留一半固定退避、另一半随机,保证既被打散、又不会过于贴近立即重试。两种都可用,high 并发场景下 full jitter 表现更稳。

还有一个比本地计算的退避更准确的信息常被忽略:Retry-After 响应头。供应商返回 429 时经常带上它,值是秒数或 HTTP 日期,含义是"等这么久之后我这边就缓过来了"。这是对方给出的确定信息,应当优先于本地猜测的退避时间。

最后,退避还需要两个上限:单次退避有一个 cap(避免退避到几百秒),整个重试过程有一个总时间预算(避免一个请求悄悄卡住几分钟)。

把重试封装成一个装饰器

把上面的错误分类、抖动退避、Retry-After、上限合在一起,封装成一个可复用的异步装饰器。这样业务函数本身保持干净,重试是套在外面的一层:

import asyncio
import functools
import random
import time
import email.utils


def parse_retry_after(exc: Exception) -> float | None:
    """从异常的响应头里解析 Retry-After,单位换算成秒"""
    headers = getattr(getattr(exc, "response", None), "headers", {}) or {}
    raw = headers.get("retry-after")
    if not raw:
        return None
    if raw.isdigit():                                  # 形如 "12"
        return float(raw)
    dt = email.utils.parsedate_to_datetime(raw)        # 形如 HTTP 日期
    return max(0.0, dt.timestamp() - time.time()) if dt else None


def with_retry(max_retries: int = 4, base: float = 1.0,
               cap: float = 30.0, budget: float = 60.0):
    """异步重试装饰器:错误分类 + full jitter 退避 + Retry-After 优先 + 时间预算"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            started = time.monotonic()
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as exc:
                    last_attempt = attempt == max_retries - 1
                    over_budget = time.monotonic() - started > budget
                    if last_attempt or over_budget or not is_retryable(exc):
                        raise
                    # 优先采用服务端给的 Retry-After,否则用 full jitter 退避
                    hinted = parse_retry_after(exc)
                    backoff = hinted if hinted is not None \
                        else random.uniform(0, base * 2 ** attempt)
                    await asyncio.sleep(min(backoff, cap))
        return wrapper
    return decorator

Python 生态里也有成熟的重试库,比如 tenacity,能用声明式的方式配置停止条件和等待策略。如果不需要 Retry-After 这种定制逻辑,直接用 tenacity 更省事;需要把服务端提示纳进来,就像上面这样手写一层,控制力更强。

Token 成本:先看懂账单结构

第二件容易在生产环境失控的事是成本。很多团队的 Token 账单是一笔糊涂账,原因是对 Token 本身有几个误解。

第一,Token 不是字数。英文大约 4 个字符对应一个 Token,中文一个汉字大致是 1 到 2 个 Token,而且每个模型用的分词器不一样。OpenAI 用 tiktoken,Claude、Qwen、DeepSeek 各有各的分词器。同一段中文 prompt 在不同模型上算出来的 Token 数能差出一两成,所以拿一家的估算去套另一家往往不准。

第二,输入和输出不同价,而且输出通常比输入贵 2 到 4 倍。这意味着省钱的重点常被搞反:与其费力压缩输入的 prompt,不如先约束输出——让模型不要复述问题、不要长篇展开。一句"直接给结论"省下的钱,往往比精简 system prompt 更多。

第三,用好 prompt 缓存。OpenAI、DeepSeek、Anthropic 现在都支持缓存请求的固定前缀。system prompt、few-shot 示例这些每次都相同的部分,命中缓存后输入价会大幅打折。但缓存命中有个前提:前缀必须逐字节一致。所以不要把当前时间戳、随机 ID 这类动态内容拼在 system prompt 的开头,那会让缓存每次都失效。

最后,把每次请求的 usage 落到日志里,按请求维度真正算账,而不是月底看一个总额去猜:

# 美元 / 1K tokens,按供应商实际价目填写
PRICING = {
    "deepseek-chat": {"in": 0.00027, "out": 0.0011, "cached_in": 0.00007},
    "gpt-4.1-mini": {"in": 0.0004, "out": 0.0016, "cached_in": 0.0001},
}


def log_cost(model: str, usage) -> float:
    """根据 usage 计算单次请求成本并打印"""
    p = PRICING[model]
    details = getattr(usage, "prompt_tokens_details", None)
    cached = getattr(details, "cached_tokens", 0) or 0
    fresh_in = usage.prompt_tokens - cached
    cost = (fresh_in * p["in"]
            + cached * p["cached_in"]
            + usage.completion_tokens * p["out"]) / 1000
    print(f"model={model} in={usage.prompt_tokens}(cached={cached}) "
          f"out={usage.completion_tokens} cost=${cost:.5f}")
    return cost

并发限流:并发数不等于 QPS

接下来是并发。在本地加一道并发闸门是必要的——服务端的限流(RPM、TPM)是一条被动的红线,撞上去就吃 429;本地限流则是主动地不去撞它。常用工具是 asyncio.Semaphore,它限制同时进行的请求数。

但这里有一个普遍的误解需要澄清:用 Semaphore(5) 把并发控制在 5,并不等于把速率控制在了 5 QPS。信号量限制的是"同一时刻在飞的请求数",不是"每秒发出的请求数"。如果每个请求 100 毫秒返回,5 个并发实际跑出来是 50 QPS;如果每个请求要 2 秒,5 个并发只有 2.5 QPS。两者不是一回事。如果要严格控制 QPS,需要用令牌桶之类按时间发放额度的算法,而不是信号量。

还有两个容易忽略的点。一是 TPM(每分钟 Token 数)限制,它限的是 Token 而不是请求数,几个超大 prompt 的请求就能把 TPM 配额吃光,哪怕 RPM 还很空闲。二是连接池,AsyncOpenAI 底层是 httpx,默认连接数有上限,并发拉高时如果不调整 httpx 的 limits,请求会卡在等待连接,而不是等待模型。

import asyncio

# 全局并发闸门:同一时刻最多 20 个请求在飞
SEMAPHORE = asyncio.Semaphore(20)


async def call_limited(coro_func, *args, **kwargs):
    """在信号量保护下执行一次调用"""
    async with SEMAPHORE:
        return await coro_func(*args, **kwargs)

并发数怎么定:先查供应商文档里的 RPM 上限,留 70% 到 80% 的安全余量,再结合平均响应时间反推一个不会越过 RPM 的并发数。prompt 普遍较长时,要额外用 TPM 复核一遍。

多模型适配层

最后是多模型适配。需要适配层的理由很实际:供应商可能涨价、可能限流变严、可能某天接口不稳定,这时要能快速切换,而不是回去翻遍代码改调用。

很多人理解的"适配"是存一张 base_url 和模型名的表。但那只是配置,不是适配。真正的差异藏在更深的地方:多数供应商兼容 OpenAI 的 chat completions 结构,但 Anthropic 原生 API 的 system 是顶层参数而不是一条 message;OpenAI 新接口把 max_tokens 改成了 max_completion_tokens;function calling 的结构各家不一致;流式返回的分块结构不同;推理模型(OpenAI o 系列、deepseek-reasoner)会多返回一段推理内容,而且通常不支持 temperature

务实的做法是分两层。大多数国产模型都提供 OpenAI 兼容接口,这部分用一个统一的 OpenAI 兼容适配器兜掉;差异确实大的(Anthropic 原生、Gemini)单独写各自的适配器。对上层只暴露一个 chat() 方法,业务代码不需要知道当前用的是哪一家:

from abc import ABC, abstractmethod
from openai import AsyncOpenAI


class ChatAdapter(ABC):
    @abstractmethod
    async def chat(self, messages: list[dict], **kwargs) -> str:
        ...


class OpenAICompatAdapter(ChatAdapter):
    """覆盖所有提供 OpenAI 兼容接口的供应商:OpenAI、DeepSeek、本地 vLLM 等"""

    def __init__(self, base_url: str, api_key: str, model: str):
        self.client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=30.0)
        self.model = model

    @with_retry()
    async def chat(self, messages: list[dict], **kwargs) -> str:
        resp = await self.client.chat.completions.create(
            model=self.model, messages=messages, **kwargs)
        if resp.usage:
            log_cost(self.model, resp.usage)
        return resp.choices[0].message.content or ""

业务代码只跟 ChatAdapter 这个抽象打交道。某家接口改了字段、或者稳定性变差需要切换时,改动集中在适配层一个文件里,而不是散落在整个项目的每一处 create() 调用。

完整示例:一个生产级的 LLM 网关

把前面的错误分类、重试装饰器、成本日志、并发限流、适配层整合到一起,就是一个可以直接用于生产的最小 LLM 网关。它对外暴露 chat() 和批量的 chat_many()

# llm_gateway.py
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv

# 上文定义的 is_retryable / with_retry / log_cost 假设已在同模块或导入

load_dotenv()

PROVIDERS = {
    "deepseek": {
        "base_url": "https://api.deepseek.com/v1",
        "api_key": os.getenv("DEEPSEEK_API_KEY"),
        "model": "deepseek-chat",
    },
    "openai": {
        "base_url": "https://api.openai.com/v1",
        "api_key": os.getenv("OPENAI_API_KEY"),
        "model": "gpt-4.1-mini",
    },
}


class LLMGateway:
    def __init__(self, provider: str = "deepseek", concurrency: int = 20):
        cfg = PROVIDERS[provider]
        self.client = AsyncOpenAI(
            base_url=cfg["base_url"], api_key=cfg["api_key"], timeout=30.0)
        self.model = cfg["model"]
        self.semaphore = asyncio.Semaphore(concurrency)

    @with_retry(max_retries=4, budget=60.0)
    async def _raw_chat(self, messages: list[dict], **kwargs) -> str:
        resp = await self.client.chat.completions.create(
            model=self.model, messages=messages, temperature=0.2, **kwargs)
        if resp.usage:
            log_cost(self.model, resp.usage)
        return resp.choices[0].message.content or ""

    async def chat(self, messages: list[dict], **kwargs) -> str:
        """单次调用:并发闸门 + 重试 + 成本日志"""
        async with self.semaphore:
            return await self._raw_chat(messages, **kwargs)

    async def chat_many(self, batch: list[list[dict]]) -> list[str | None]:
        """批量调用:单条失败不影响整体"""
        async def one(messages):
            try:
                return await self.chat(messages)
            except Exception as exc:
                print(f"调用失败:{exc}")
                return None
        return await asyncio.gather(*(one(m) for m in batch))


async def main():
    gw = LLMGateway(provider="deepseek", concurrency=20)
    questions = ["用一句话解释什么是指数退避", "用一句话解释什么是 TPM 限流"]
    batch = [[{"role": "user", "content": q}] for q in questions]
    for q, answer in zip(questions, await gw.chat_many(batch)):
        print(f"Q: {q}\nA: {answer}\n")


if __name__ == "__main__":
    asyncio.run(main())

这个网关不到一百行,但已经具备了上线所需的几项能力:错误分类后的指数退避重试、Retry-After 优先、并发限流、单次成本核算、批量调用时单条失败不拖垮整体,以及通过 PROVIDERS 表切换供应商。

几个常见踩坑

用裸 except 重试所有异常。鉴权失败、参数错误、余额不足都不该重试,重试只是延迟报错并污染日志。重试前一定先做错误分类。

指数退避不加抖动。整齐的退避会把分散的请求对齐成周期性冲击,并发越高这个问题越明显。退避一定要加 jitter。

只配重试次数,不配总时间预算。只有 max_retries 时,一个请求叠加退避可能卡住好几分钟。要同时设一个整体的时间预算。

把并发数当成 QPS。信号量控制的是同时在飞的请求数,实际 QPS 还取决于响应时间。要严格控速得用令牌桶。

忽略 TPM 只看 RPM。长 prompt 场景下,TPM 往往比 RPM 先触顶。限流要把 Token 维度也算进去。

适配层只存配置不抽象接口。只存 base_url 和模型名,遇到 system 参数位置、字段名、流式结构的真实差异时还是要改业务代码。适配层要暴露统一的方法签名。

本篇要点

  • 重试前先做错误分类:瞬时性失败(429、5xx、网络错误)才重试,确定性失败(401、400、402、404)不重试
  • 指数退避必须加随机抖动,避免重试风暴;有 Retry-After 时优先采用服务端的提示
  • 重试要同时设单次退避上限和整体时间预算
  • Token 成本的关键是分词器差异、输入输出价格不对称、prompt 缓存,并按请求记录 usage
  • 并发数不等于 QPS,限流要同时考虑 RPM 和 TPM
  • 多模型适配层要抽象出统一接口,而不只是存一张配置表
  • 把以上整合成一个 LLM 网关,业务代码只跟它的 chat() 打交道

下一篇

调用层兜住之后,下一个常见误区是把 Prompt 当成玄学,靠反复试碰运气。下一篇会把 Prompt 拆成 few-shot、思维链、结构化输出、模板管理、评估五件可以工程化的事,说明它为什么更像在写配置而不是抽签。

参考资料

版权声明: 如无特别声明,本文版权归 sshipanoo 所有,转载请注明本文链接。

(采用 CC BY-NC-SA 4.0 许可协议进行授权)

本文标题:API 调用没你想的那么简单

本文链接:https://www.sshipanoo.com/blog/ai/llm-advanced/01-API调用没你想的那么简单/

本文最后一次更新为 天前,文章中的某些内容可能已过时!