47KB 的 AI 网关是怎么设计的——ai-gateway-lite 架构全拆解
47KB 的 AI 网关是怎么设计的——ai-gateway-lite 架构全拆解
优先级路由引擎、条件降级链、固定窗口预算控制、统一 SSE 流式抽象、错误分类体系、指数退避重试——六个子系统,零运行时依赖,47KB,84 个测试。
这是 ai-gateway-lite 推广篇 的技术原理深度拆解。如果你只想试用这个库,看推广篇就够了。
整体架构
src/
├── gateway.ts → Gateway 主类,编排所有子系统
├── index.ts → Barrel exports
├── routing/
│ ├── matcher.ts → 优先级规则匹配
│ ├── router.ts → 路由解析 + Provider 查找
│ └── fallback.ts → 多步条件降级执行
├── providers/
│ ├── base.ts → Provider 接口 + 重试退避
│ ├── openai.ts → OpenAI(/v1/chat/completions)
│ ├── anthropic.ts → Anthropic(/v1/messages)
│ ├── openrouter.ts → OpenRouter(OpenAI 兼容)
│ ├── registry.ts → Provider 工厂 + 实例管理
│ └── stream-parser.ts → SSE 行解析器(所有 Provider 共享)
├── budget/
│ ├── tracker.ts → 按策略+窗口的用量跟踪
│ └── guard.ts → 执行层(hard/soft/warn)
├── pricing/
│ └── pricing-table.ts → 模型定价注册表 + 成本估算
├── errors/
│ └── gateway-error.ts → 类型化错误 + HTTP 状态分类
├── logging/
│ └── usage-logger.ts → 请求级用量事件
└── types/ → 全量 TypeScript 类型定义
84 个测试,TypeScript strict 模式,构建产物 47KB(ESM)。下面逐个拆解六个子系统。
子系统一:优先级路由引擎
路由引擎刻意设计得简单——没有 JSONPath,没有正则匹配,没有表达式语言。只有字段相等比较:
function matchesRequest(match: RouteMatch, request: GatewayRequest): boolean {
if (match.taskType && match.taskType !== request.taskType) return false;
if (match.userTier && match.userTier !== request.userTier) return false;
if (match.feature && match.feature !== request.feature) return false;
return true;
}
规则按 priority 降序排列,第一个匹配的规则生效:
export function findMatchingRoute(
rules: RouteRule[],
request: GatewayRequest,
): RouteRule | undefined {
const enabled = rules.filter((r) => r.enabled !== false);
const sorted = [...enabled].sort((a, b) => b.priority - a.priority);
return sorted.find((rule) => matchesRequest(rule.match, request));
}
为什么这样设计?
- 可预测:优先级最高的匹配规则总是获胜。多规则同时匹配时不存在歧义。
- 够快:排序后线性扫描。典型配置 5-20 条规则,基本等于 O(1)。
- 可调试:从上到下读规则,就能立刻知道任何请求会走哪条路由。
空的 match: {} 充当兜底默认规则。enabled: false 的规则会被过滤——方便临时禁用路由而不用删除配置。
路由解析
Router 类包装了 matcher,解析完整的 Provider 链:
resolve(request: GatewayRequest): ResolvedRoute {
const rule = findMatchingRoute(this.rules, request);
if (!rule) throw GatewayError.routeNotFound(request);
const provider = this.registry.get(rule.target.provider);
const model = request.model ?? rule.target.model ?? provider.defaultModel;
const fallbackChain = rule.target.fallbackChain
? this.fallbackChains?.find(c => c.name === rule.target.fallbackChain)
: undefined;
return { provider, model, fallbackChain };
}
模型优先级:request.model > rule.target.model > provider.defaultModel。调用方可以按请求覆盖模型,路由提供合理默认值。
子系统二:条件降级链
降级系统不是简单的”换个 Provider 重试”。降级链的每一步都有可选的 when 条件,控制是否执行:
{
"name": "complex-fallback",
"steps": [
{ "provider": "anthropic-main" },
{ "provider": "openai-main", "model": "gpt-4o", "when": ["timeout", "provider_error"] },
{ "provider": "openrouter-fallback", "when": ["timeout", "rate_limit", "provider_error"] }
]
}
执行逻辑:
for (let i = 0; i < chain.steps.length; i++) {
const step = chain.steps[i];
try {
return await execute(provider, model);
} catch (err) {
lastError = err;
const trigger = classifyError(err);
const nextStep = chain.steps[i + 1];
if (!nextStep) break; // 没有更多步骤
if (nextStep.when && nextStep.when.length > 0) {
if (!nextStep.when.includes(trigger)) break; // 触发条件不匹配,停止
}
// when 为空或不存在 → 无条件继续
}
}
throw lastError;
when 条件是检查下一步的,不是当前步的。这样读起来很自然:“如果上一步因为 timeout 失败了,就尝试 OpenAI”。
错误分类驱动降级
classifyError 把错误映射到触发类型:
function classifyError(err: unknown): FallbackTrigger {
if (err instanceof GatewayError) {
switch (err.kind) {
case 'provider_timeout': return 'timeout';
case 'provider_rate_limit': return 'rate_limit';
case 'budget_exceeded': return 'budget_exceeded';
default: return 'provider_error';
}
}
return 'provider_error';
}
budget_exceeded 作为独立触发类型很重要。前面的配置示例中,降级步骤的 when 没有包含 budget_exceeded——如果预算超了,切换到更贵的 Provider 只会更糟。
子系统三:预算控制
固定窗口跟踪
预算跟踪使用内存中的计数器 + 固定时间窗口:
type WindowDuration = {
request: 0, // 单次请求
hour: 3_600_000, // 1 小时
day: 86_400_000, // 1 天
month: 2_592_000_000 // ~30 天
};
每个预算策略有一个 bucket,按 策略名 + scope + 窗口 作为 key。当 bucket 的年龄超过窗口时长时,会重置:
check(policy, usage): BudgetCheckResult {
const bucket = this.getOrCreateBucket(policy);
if (Date.now() - bucket.startTime > windowDuration(policy.window)) {
this.resetBucket(bucket); // 窗口到期,重置
}
const projectedTotal = bucket.totalTokens + usage.totalTokens;
const projectedCost = bucket.costUsd + usage.costUsd;
if (limits.maxCostUsd && projectedCost > limits.maxCostUsd) {
return {
allowed: policy.enforcement === "soft",
warn: true,
reason: `cost $${projectedCost} > limit $${limits.maxCostUsd}`,
};
}
}
这是固定窗口,不是滑动窗口。取舍是:23:59 的请求和 00:01 的请求在不同的 bucket 里,所以在窗口边界可能短暂超限。对嵌入式库来说这是可接受的——真正的滑动窗口需要外部存储(Redis 等),与”零依赖”的设计目标冲突。
Hard vs Soft 执行
hard:超预算 → 抛出GatewayError.budgetExceeded()→ 请求被拒绝soft:超预算 → 记录告警 → 请求继续放行
warnAt 阈值(如 0.8 = 80%)在达到限额之前就触发告警,给应用足够时间提醒用户或调整行为。
预检 vs 后记录
预算检查两次:
- 预检(AI 调用前):用零 token 检查——捕获已经超限的策略
- 后记录(AI 调用后):记录实际用量,为后续请求的预算检查提供数据
预检用零 token,因为我们不知道响应会用多少 token。未来可以根据模型和 prompt 长度估算输出 token,但当前的保守策略是:“我们还能不能发起调用?“
子系统四:统一 SSE 流式抽象
每家 AI Provider 的流式响应格式不同,但都基于 Server-Sent Events(SSE)。SSE 行解析器是共享的:
async function* parseSSEStream(body: ReadableStream<Uint8Array>): AsyncGenerator<string> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop()!; // 不完整的最后一行留在 buffer
for (const line of lines) {
if (line.startsWith('data: ')) {
const payload = line.slice(6).trim();
if (payload === '[DONE]') return;
yield payload;
}
}
}
}
每个 Provider 实现自己的 async generator,把特定格式的 JSON 映射到统一的 StreamChunk:
interface StreamChunk {
delta: string; // 文本内容
model?: string; // 模型名
finishReason?: string; // "stop", "length" 等
}
OpenAI 的 chunk 是 choices[0].delta.content。Anthropic 的 chunk 是 content_block_delta 事件里的 delta.text。OpenRouter 遵循 OpenAI 格式。每个 Provider 的流式方法解析自己的格式,yield 统一的 StreamChunk——消费方永远不需要知道背后是哪个 Provider。
Gateway.chatStream 包装 Provider 流,附加元数据:
interface GatewayStreamResult {
stream: AsyncIterable<GatewayStreamChunk>;
getUsageSummary: () => Promise<UsageSummary>;
}
getUsageSummary() 返回一个在流消费完毕后 resolve 的 promise,包含 token 计数和成本估算。预算记录在这里而不是流式过程中——我们需要最终的 token 总数。
子系统五:错误分类体系
所有错误统一为带结构化字段的 GatewayError:
export class GatewayError extends Error {
readonly kind: GatewayErrorKind;
readonly httpStatus?: number;
readonly retryable: boolean;
readonly provider?: string;
static fromHttpStatus(status: number, provider: string): GatewayError {
if (status === 401 || status === 403)
return new GatewayError('provider_auth', status, false, provider);
if (status === 429)
return new GatewayError('provider_rate_limit', status, true, provider);
if (status === 408 || status === 504)
return new GatewayError('provider_timeout', status, true, provider);
if (status >= 500)
return new GatewayError('provider_error', status, true, provider);
return new GatewayError('provider_response', status, false, provider);
}
}
错误类型一览:
| Kind | 可重试 | 含义 |
|---|---|---|
provider_auth | ❌ | 401/403 — API Key 错误 |
provider_rate_limit | ✅ | 429 — 被限流 |
provider_timeout | ✅ | 408/504 — 请求超时 |
provider_error | ✅ | 500+ — Provider 内部错误 |
provider_response | ❌ | 400 — 请求参数错误 |
route_not_found | ❌ | 没有匹配的路由规则 |
budget_exceeded | ❌ | 超预算(hard 模式) |
config_invalid | ❌ | 配置错误 |
retryable 标志驱动重试系统(子系统六),同时也暴露给消费方用于自定义重试逻辑。
子系统六:指数退避 + 抖动重试
重试层包装 Provider 的 HTTP 调用:
export async function fetchProviderWithRetry(
url: string,
init: RequestInit,
providerName: string,
timeoutMs: number,
retry?: ProviderRetry,
): Promise<Response> {
const maxAttempts = retry?.maxAttempts ?? 1;
const baseBackoffMs = retry?.backoffMs ?? 500;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
const response = await fetchWithTimeout(url, init, timeoutMs);
if (response.ok) return response;
if ([429, 502, 503, 504].includes(response.status) && attempt < maxAttempts - 1) {
const jitter = Math.random() * 0.3 + 0.85; // 0.85 - 1.15
await sleep(baseBackoffMs * Math.pow(2, attempt) * jitter);
continue;
}
throw GatewayError.fromHttpStatus(response.status, providerName);
} catch (err) {
if (err instanceof GatewayError && !err.retryable) throw err;
if (attempt >= maxAttempts - 1) throw err;
const jitter = Math.random() * 0.3 + 0.85;
await sleep(baseBackoffMs * Math.pow(2, attempt) * jitter);
}
}
}
四个关键设计:
-
只重试特定状态码:429(限流)、502/503/504(上游故障)。不重试 400(参数错误,你的锅)、401(鉴权失败,重试没用)、500(Provider bug,盲目重试可能加重问题)。
-
指数退避:500ms → 1s → 2s → 4s。防止在 Provider 恢复时产生雪崩。
-
抖动:每次延迟 ±15% 随机化。没有抖动的话,多个客户端在完全相同的时间间隔重试会产生周期性尖峰。
[0.85, 1.15]的范围足够打破同步,又不会让行为变得不可预测。 -
默认不重试:
maxAttempts默认 1(不重试)。自动重试会掩盖问题、加倍成本。重试应该是显式选择。
Gateway 编排
Gateway 类把所有子系统串起来。简化的 chat 流程:
async chat(request: GatewayRequest): Promise<GatewayResponse> {
// 1. 路由匹配
const { provider, model, fallbackChain } = this.router.resolve(request);
// 2. 预算预检
this.budgetGuard.enforce(request, { totalTokens: 0, costUsd: 0 });
// 3. 执行(带可选降级)
const execute = async (p, m) => p.chat({ ...request, model: m });
const result = fallbackChain
? await executeFallbackChain(fallbackChain, this.registry, request, execute)
: await execute(provider, model);
// 4. 成本估算
const cost = estimateCostUsd(result.model, result.inputTokens, result.outputTokens);
// 5. 记录用量 + 日志
this.budgetGuard.record(request, { totalTokens: result.totalTokens, costUsd: cost });
this.logger.log({ request, result, cost });
return { content: result.content, estimatedCostUsd: cost, ...result };
}
流式路径(chatStream)遵循相同的 路由 → 预算 → 执行 → 日志 模式,但把预算记录和日志延迟到流消费完毕后通过 getUsageSummary() 触发。
为什么零运行时依赖
Node.js 18+ 自带了这个库需要的一切:
| 需求 | 内置 API |
|---|---|
| HTTP 客户端 | globalThis.fetch |
| 流式处理 | ReadableStream、TextDecoder |
| 定时器 | setTimeout(退避 sleep) |
| 中止控制 | AbortController(超时) |
不依赖 openai SDK、@anthropic-ai/sdk、axios。直接对接 REST API 意味着:
- 47KB minified——官方 OpenAI SDK 超过 1MB
- 不被 SDK 更新 break——REST API 的变更频率远低于 SDK
- 任何 OpenAI 兼容的 Provider 自动能用——DeepSeek、Groq、Together、本地模型
构建管线是 tsup(单入口,ESM 输出)+ tsc --emitDeclarationOnly(类型声明)。
测试策略
84 个测试按子系统组织:
| 测试文件 | 覆盖 |
|---|---|
gateway.test.ts | 完整集成:chat、budget、fallback、usage logging(Mock Provider) |
gateway-stream.test.ts | 流式 + getUsageSummary |
matcher.test.ts | 路由匹配、优先级排序 |
fallback.test.ts | 降级链、when 条件 |
budget.test.ts | 窗口跟踪、hard/soft 执行 |
retry.test.ts | 状态码重试、退避时间 |
error-classify.test.ts | HTTP 状态 → 错误类型映射 |
pricing.test.ts | 成本估算精度 |
validator.test.ts | 配置 Schema 校验 |
Provider 在 fetch 层 Mock——测试不打真实 API。测试套件跑完 <2s,完全确定性。
开源地址
- GitHub: hyxnj666-creator/ai-gateway-lite
- npm: ai-gateway-lite
TypeScript strict 模式,84 个测试,Node.js 18/20/22 全版本 CI 通过。src/ 目录组织清晰——从 gateway.ts 开始顺着 import 读就好。欢迎 PR 和 issue。