开源项目 APR 02, 2026

47KB 的 AI 网关是怎么设计的——ai-gateway-lite 架构全拆解

#TypeScript#架构#AI Gateway#开源#原理

47KB 的 AI 网关是怎么设计的——ai-gateway-lite 架构全拆解

优先级路由引擎、条件降级链、固定窗口预算控制、统一 SSE 流式抽象、错误分类体系、指数退避重试——六个子系统,零运行时依赖,47KB,84 个测试。

这是 ai-gateway-lite 推广篇 的技术原理深度拆解。如果你只想试用这个库,看推广篇就够了。


整体架构

src/
├── gateway.ts              → Gateway 主类,编排所有子系统
├── index.ts                → Barrel exports
├── routing/
│   ├── matcher.ts          → 优先级规则匹配
│   ├── router.ts           → 路由解析 + Provider 查找
│   └── fallback.ts         → 多步条件降级执行
├── providers/
│   ├── base.ts             → Provider 接口 + 重试退避
│   ├── openai.ts           → OpenAI(/v1/chat/completions)
│   ├── anthropic.ts        → Anthropic(/v1/messages)
│   ├── openrouter.ts       → OpenRouter(OpenAI 兼容)
│   ├── registry.ts         → Provider 工厂 + 实例管理
│   └── stream-parser.ts    → SSE 行解析器(所有 Provider 共享)
├── budget/
│   ├── tracker.ts          → 按策略+窗口的用量跟踪
│   └── guard.ts            → 执行层(hard/soft/warn)
├── pricing/
│   └── pricing-table.ts    → 模型定价注册表 + 成本估算
├── errors/
│   └── gateway-error.ts    → 类型化错误 + HTTP 状态分类
├── logging/
│   └── usage-logger.ts     → 请求级用量事件
└── types/                  → 全量 TypeScript 类型定义

84 个测试,TypeScript strict 模式,构建产物 47KB(ESM)。下面逐个拆解六个子系统。


子系统一:优先级路由引擎

路由引擎刻意设计得简单——没有 JSONPath,没有正则匹配,没有表达式语言。只有字段相等比较:

function matchesRequest(match: RouteMatch, request: GatewayRequest): boolean {
  if (match.taskType && match.taskType !== request.taskType) return false;
  if (match.userTier && match.userTier !== request.userTier) return false;
  if (match.feature && match.feature !== request.feature) return false;
  return true;
}

规则按 priority 降序排列,第一个匹配的规则生效:

export function findMatchingRoute(
  rules: RouteRule[],
  request: GatewayRequest,
): RouteRule | undefined {
  const enabled = rules.filter((r) => r.enabled !== false);
  const sorted = [...enabled].sort((a, b) => b.priority - a.priority);
  return sorted.find((rule) => matchesRequest(rule.match, request));
}

为什么这样设计?

  1. 可预测:优先级最高的匹配规则总是获胜。多规则同时匹配时不存在歧义。
  2. 够快:排序后线性扫描。典型配置 5-20 条规则,基本等于 O(1)。
  3. 可调试:从上到下读规则,就能立刻知道任何请求会走哪条路由。

空的 match: {} 充当兜底默认规则。enabled: false 的规则会被过滤——方便临时禁用路由而不用删除配置。

路由解析

Router 类包装了 matcher,解析完整的 Provider 链:

resolve(request: GatewayRequest): ResolvedRoute {
  const rule = findMatchingRoute(this.rules, request);
  if (!rule) throw GatewayError.routeNotFound(request);

  const provider = this.registry.get(rule.target.provider);
  const model = request.model ?? rule.target.model ?? provider.defaultModel;
  const fallbackChain = rule.target.fallbackChain
    ? this.fallbackChains?.find(c => c.name === rule.target.fallbackChain)
    : undefined;

  return { provider, model, fallbackChain };
}

模型优先级:request.model > rule.target.model > provider.defaultModel。调用方可以按请求覆盖模型,路由提供合理默认值。


子系统二:条件降级链

降级系统不是简单的”换个 Provider 重试”。降级链的每一步都有可选的 when 条件,控制是否执行:

{
  "name": "complex-fallback",
  "steps": [
    { "provider": "anthropic-main" },
    { "provider": "openai-main", "model": "gpt-4o", "when": ["timeout", "provider_error"] },
    { "provider": "openrouter-fallback", "when": ["timeout", "rate_limit", "provider_error"] }
  ]
}

执行逻辑:

for (let i = 0; i < chain.steps.length; i++) {
  const step = chain.steps[i];
  try {
    return await execute(provider, model);
  } catch (err) {
    lastError = err;
    const trigger = classifyError(err);

    const nextStep = chain.steps[i + 1];
    if (!nextStep) break;  // 没有更多步骤

    if (nextStep.when && nextStep.when.length > 0) {
      if (!nextStep.when.includes(trigger)) break;  // 触发条件不匹配,停止
    }
    // when 为空或不存在 → 无条件继续
  }
}
throw lastError;

when 条件是检查下一步的,不是当前步的。这样读起来很自然:“如果上一步因为 timeout 失败了,就尝试 OpenAI”。

错误分类驱动降级

classifyError 把错误映射到触发类型:

function classifyError(err: unknown): FallbackTrigger {
  if (err instanceof GatewayError) {
    switch (err.kind) {
      case 'provider_timeout': return 'timeout';
      case 'provider_rate_limit': return 'rate_limit';
      case 'budget_exceeded': return 'budget_exceeded';
      default: return 'provider_error';
    }
  }
  return 'provider_error';
}

budget_exceeded 作为独立触发类型很重要。前面的配置示例中,降级步骤的 when 没有包含 budget_exceeded——如果预算超了,切换到更贵的 Provider 只会更糟。


子系统三:预算控制

固定窗口跟踪

预算跟踪使用内存中的计数器 + 固定时间窗口:

type WindowDuration = {
  request: 0,         // 单次请求
  hour: 3_600_000,    // 1 小时
  day: 86_400_000,    // 1 天
  month: 2_592_000_000 // ~30 天
};

每个预算策略有一个 bucket,按 策略名 + scope + 窗口 作为 key。当 bucket 的年龄超过窗口时长时,会重置:

check(policy, usage): BudgetCheckResult {
  const bucket = this.getOrCreateBucket(policy);

  if (Date.now() - bucket.startTime > windowDuration(policy.window)) {
    this.resetBucket(bucket);  // 窗口到期,重置
  }

  const projectedTotal = bucket.totalTokens + usage.totalTokens;
  const projectedCost = bucket.costUsd + usage.costUsd;

  if (limits.maxCostUsd && projectedCost > limits.maxCostUsd) {
    return {
      allowed: policy.enforcement === "soft",
      warn: true,
      reason: `cost $${projectedCost} > limit $${limits.maxCostUsd}`,
    };
  }
}

这是固定窗口,不是滑动窗口。取舍是:23:59 的请求和 00:01 的请求在不同的 bucket 里,所以在窗口边界可能短暂超限。对嵌入式库来说这是可接受的——真正的滑动窗口需要外部存储(Redis 等),与”零依赖”的设计目标冲突。

Hard vs Soft 执行

  • hard:超预算 → 抛出 GatewayError.budgetExceeded() → 请求被拒绝
  • soft:超预算 → 记录告警 → 请求继续放行

warnAt 阈值(如 0.8 = 80%)在达到限额之前就触发告警,给应用足够时间提醒用户或调整行为。

预检 vs 后记录

预算检查两次

  1. 预检(AI 调用前):用零 token 检查——捕获已经超限的策略
  2. 后记录(AI 调用后):记录实际用量,为后续请求的预算检查提供数据

预检用零 token,因为我们不知道响应会用多少 token。未来可以根据模型和 prompt 长度估算输出 token,但当前的保守策略是:“我们还能不能发起调用?“


子系统四:统一 SSE 流式抽象

每家 AI Provider 的流式响应格式不同,但都基于 Server-Sent Events(SSE)。SSE 行解析器是共享的:

async function* parseSSEStream(body: ReadableStream<Uint8Array>): AsyncGenerator<string> {
  const reader = body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop()!;  // 不完整的最后一行留在 buffer

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const payload = line.slice(6).trim();
        if (payload === '[DONE]') return;
        yield payload;
      }
    }
  }
}

每个 Provider 实现自己的 async generator,把特定格式的 JSON 映射到统一的 StreamChunk

interface StreamChunk {
  delta: string;          // 文本内容
  model?: string;         // 模型名
  finishReason?: string;  // "stop", "length" 等
}

OpenAI 的 chunk 是 choices[0].delta.contentAnthropic 的 chunk 是 content_block_delta 事件里的 delta.textOpenRouter 遵循 OpenAI 格式。每个 Provider 的流式方法解析自己的格式,yield 统一的 StreamChunk——消费方永远不需要知道背后是哪个 Provider。

Gateway.chatStream 包装 Provider 流,附加元数据:

interface GatewayStreamResult {
  stream: AsyncIterable<GatewayStreamChunk>;
  getUsageSummary: () => Promise<UsageSummary>;
}

getUsageSummary() 返回一个在流消费完毕后 resolve 的 promise,包含 token 计数和成本估算。预算记录在这里而不是流式过程中——我们需要最终的 token 总数。


子系统五:错误分类体系

所有错误统一为带结构化字段的 GatewayError

export class GatewayError extends Error {
  readonly kind: GatewayErrorKind;
  readonly httpStatus?: number;
  readonly retryable: boolean;
  readonly provider?: string;

  static fromHttpStatus(status: number, provider: string): GatewayError {
    if (status === 401 || status === 403)
      return new GatewayError('provider_auth', status, false, provider);
    if (status === 429)
      return new GatewayError('provider_rate_limit', status, true, provider);
    if (status === 408 || status === 504)
      return new GatewayError('provider_timeout', status, true, provider);
    if (status >= 500)
      return new GatewayError('provider_error', status, true, provider);
    return new GatewayError('provider_response', status, false, provider);
  }
}

错误类型一览:

Kind可重试含义
provider_auth401/403 — API Key 错误
provider_rate_limit429 — 被限流
provider_timeout408/504 — 请求超时
provider_error500+ — Provider 内部错误
provider_response400 — 请求参数错误
route_not_found没有匹配的路由规则
budget_exceeded超预算(hard 模式)
config_invalid配置错误

retryable 标志驱动重试系统(子系统六),同时也暴露给消费方用于自定义重试逻辑。


子系统六:指数退避 + 抖动重试

重试层包装 Provider 的 HTTP 调用:

export async function fetchProviderWithRetry(
  url: string,
  init: RequestInit,
  providerName: string,
  timeoutMs: number,
  retry?: ProviderRetry,
): Promise<Response> {
  const maxAttempts = retry?.maxAttempts ?? 1;
  const baseBackoffMs = retry?.backoffMs ?? 500;

  for (let attempt = 0; attempt < maxAttempts; attempt++) {
    try {
      const response = await fetchWithTimeout(url, init, timeoutMs);

      if (response.ok) return response;

      if ([429, 502, 503, 504].includes(response.status) && attempt < maxAttempts - 1) {
        const jitter = Math.random() * 0.3 + 0.85;  // 0.85 - 1.15
        await sleep(baseBackoffMs * Math.pow(2, attempt) * jitter);
        continue;
      }

      throw GatewayError.fromHttpStatus(response.status, providerName);
    } catch (err) {
      if (err instanceof GatewayError && !err.retryable) throw err;
      if (attempt >= maxAttempts - 1) throw err;

      const jitter = Math.random() * 0.3 + 0.85;
      await sleep(baseBackoffMs * Math.pow(2, attempt) * jitter);
    }
  }
}

四个关键设计:

  1. 只重试特定状态码:429(限流)、502/503/504(上游故障)。不重试 400(参数错误,你的锅)、401(鉴权失败,重试没用)、500(Provider bug,盲目重试可能加重问题)。

  2. 指数退避:500ms → 1s → 2s → 4s。防止在 Provider 恢复时产生雪崩。

  3. 抖动:每次延迟 ±15% 随机化。没有抖动的话,多个客户端在完全相同的时间间隔重试会产生周期性尖峰。[0.85, 1.15] 的范围足够打破同步,又不会让行为变得不可预测。

  4. 默认不重试maxAttempts 默认 1(不重试)。自动重试会掩盖问题、加倍成本。重试应该是显式选择。


Gateway 编排

Gateway 类把所有子系统串起来。简化的 chat 流程:

async chat(request: GatewayRequest): Promise<GatewayResponse> {
  // 1. 路由匹配
  const { provider, model, fallbackChain } = this.router.resolve(request);

  // 2. 预算预检
  this.budgetGuard.enforce(request, { totalTokens: 0, costUsd: 0 });

  // 3. 执行(带可选降级)
  const execute = async (p, m) => p.chat({ ...request, model: m });
  const result = fallbackChain
    ? await executeFallbackChain(fallbackChain, this.registry, request, execute)
    : await execute(provider, model);

  // 4. 成本估算
  const cost = estimateCostUsd(result.model, result.inputTokens, result.outputTokens);

  // 5. 记录用量 + 日志
  this.budgetGuard.record(request, { totalTokens: result.totalTokens, costUsd: cost });
  this.logger.log({ request, result, cost });

  return { content: result.content, estimatedCostUsd: cost, ...result };
}

流式路径(chatStream)遵循相同的 路由 → 预算 → 执行 → 日志 模式,但把预算记录和日志延迟到流消费完毕后通过 getUsageSummary() 触发。


为什么零运行时依赖

Node.js 18+ 自带了这个库需要的一切:

需求内置 API
HTTP 客户端globalThis.fetch
流式处理ReadableStreamTextDecoder
定时器setTimeout(退避 sleep)
中止控制AbortController(超时)

不依赖 openai SDK、@anthropic-ai/sdkaxios。直接对接 REST API 意味着:

  • 47KB minified——官方 OpenAI SDK 超过 1MB
  • 不被 SDK 更新 break——REST API 的变更频率远低于 SDK
  • 任何 OpenAI 兼容的 Provider 自动能用——DeepSeek、Groq、Together、本地模型

构建管线是 tsup(单入口,ESM 输出)+ tsc --emitDeclarationOnly(类型声明)。


测试策略

84 个测试按子系统组织:

测试文件覆盖
gateway.test.ts完整集成:chat、budget、fallback、usage logging(Mock Provider)
gateway-stream.test.ts流式 + getUsageSummary
matcher.test.ts路由匹配、优先级排序
fallback.test.ts降级链、when 条件
budget.test.ts窗口跟踪、hard/soft 执行
retry.test.ts状态码重试、退避时间
error-classify.test.tsHTTP 状态 → 错误类型映射
pricing.test.ts成本估算精度
validator.test.ts配置 Schema 校验

Provider 在 fetch 层 Mock——测试不打真实 API。测试套件跑完 <2s,完全确定性。


开源地址

TypeScript strict 模式,84 个测试,Node.js 18/20/22 全版本 CI 通过。src/ 目录组织清晰——从 gateway.ts 开始顺着 import 读就好。欢迎 PR 和 issue。

评论