AI 缓存策略:精确缓存 + 语义缓存,省钱又提速
AI 缓存策略:精确缓存 + 语义缓存,省钱又提速
本文是【AI 专题精讲】系列第 09 篇。 上一篇:结构化输出:让 AI 稳定返回 JSON | 下一篇:Token 计算与上下文窗口管理实战
这篇文章你会得到什么
你的 AI 应用上线了,用户问的最多的问题是什么?
“退货流程是什么” “怎么申请年假” “报销需要什么材料”
这些高频问题,每次都调 AI API,每次都花钱、每次都等 1~3 秒。100 个用户问同一个问题,就是 100 次 API 调用。
缓存能帮你省 30%~70% 的 API 成本,同时把响应时间从秒级降到毫秒级。
但 AI 缓存不像传统缓存那么简单。用户不会每次都问一模一样的话——“退货流程” “怎么退货” “退货步骤是什么” 说的是同一件事,传统的 key-value 缓存命中不了。
今天给你两层缓存方案:
- 精确缓存:完全相同的输入,直接返回缓存结果
- 语义缓存:语义相似的输入,也能命中缓存
为什么 AI 应用特别需要缓存
| 问题 | 没有缓存 | 有缓存 |
|---|---|---|
| 响应时间 | 1~5 秒(API 调用) | < 50ms(缓存命中) |
| API 成本 | 每次都花钱 | 命中率 50% → 省一半 |
| 限流风险 | 高并发容易 429 | 缓存挡住大量重复请求 |
| 稳定性 | API 挂了就全挂 | 缓存兜底,降级可用 |
第一层:精确缓存
原理
把请求的完整输入(model + messages + 参数)做哈希,作为缓存 Key。完全相同的输入直接返回缓存。
内存缓存实现
最简单的方案,适合单进程、小规模:
import hashlib
import json
import time
from dataclasses import dataclass
@dataclass
class CacheEntry:
value: str
created_at: float
ttl: int
hit_count: int = 0
class MemoryCache:
def __init__(self, max_size: int = 1000):
self.cache: dict[str, CacheEntry] = {}
self.max_size = max_size
def _make_key(self, messages: list[dict], model: str, **kwargs) -> str:
content = json.dumps({
"model": model,
"messages": messages,
**kwargs,
}, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def get(self, messages: list[dict], model: str, **kwargs) -> str | None:
key = self._make_key(messages, model, **kwargs)
entry = self.cache.get(key)
if entry is None:
return None
if time.time() - entry.created_at > entry.ttl:
del self.cache[key]
return None
entry.hit_count += 1
return entry.value
def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
key = self._make_key(messages, model, **kwargs)
if len(self.cache) >= self.max_size:
self._evict()
self.cache[key] = CacheEntry(
value=value,
created_at=time.time(),
ttl=ttl,
)
def _evict(self):
oldest_key = min(self.cache, key=lambda k: self.cache[k].created_at)
del self.cache[oldest_key]
Redis 缓存实现
多进程、分布式场景用 Redis:
import redis
import hashlib
import json
class RedisCache:
def __init__(self, redis_url: str = "redis://localhost:6379", prefix: str = "ai_cache"):
self.client = redis.from_url(redis_url)
self.prefix = prefix
def _make_key(self, messages: list[dict], model: str, **kwargs) -> str:
content = json.dumps({
"model": model,
"messages": messages,
**kwargs,
}, ensure_ascii=False, sort_keys=True)
hash_val = hashlib.sha256(content.encode()).hexdigest()
return f"{self.prefix}:{hash_val}"
def get(self, messages: list[dict], model: str, **kwargs) -> str | None:
key = self._make_key(messages, model, **kwargs)
value = self.client.get(key)
if value:
self.client.hincrby(f"{key}:meta", "hits", 1)
return value.decode()
return None
def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
key = self._make_key(messages, model, **kwargs)
self.client.setex(key, ttl, value)
self.client.hset(f"{key}:meta", mapping={
"model": model,
"created_at": str(time.time()),
"hits": "0",
})
精确缓存的局限
问题很明显:用户换个说法就 miss 了。
"退货流程是什么" → 缓存命中
"怎么退货" → 缓存未命中(不同的字符串)
"退货步骤" → 缓存未命中
三个问题问的是同一件事,但精确缓存只能命中第一个。实际项目中精确缓存的命中率通常只有 10%~20%。
第二层:语义缓存
原理
不比较字符串是否相同,而是比较语义是否相似。把查询转成 Embedding 向量,在缓存中找最近的向量,相似度超过阈值就命中。
"退货流程是什么" → [0.12, -0.34, ...]
"怎么退货" → [0.11, -0.33, ...] ← 余弦相似度 0.95 > 阈值 0.9 → 命中!
"今天天气" → [-0.45, 0.23, ...] ← 余弦相似度 0.15 < 阈值 0.9 → 未命中
实现
import numpy as np
class SemanticCache:
def __init__(
self,
embedding_provider,
similarity_threshold: float = 0.9,
max_size: int = 5000,
):
self.embedding_provider = embedding_provider
self.threshold = similarity_threshold
self.max_size = max_size
self.entries: list[dict] = []
self.vectors: list[list[float]] = []
def _get_query_text(self, messages: list[dict]) -> str:
user_messages = [m["content"] for m in messages if m["role"] == "user"]
return user_messages[-1] if user_messages else ""
def get(self, messages: list[dict], model: str) -> str | None:
query_text = self._get_query_text(messages)
if not query_text or not self.vectors:
return None
query_result = self.embedding_provider.embed([query_text])
query_vec = np.array(query_result.embeddings[0])
query_vec = query_vec / np.linalg.norm(query_vec)
cache_vecs = np.array(self.vectors)
norms = np.linalg.norm(cache_vecs, axis=1, keepdims=True)
cache_vecs_norm = cache_vecs / norms
similarities = cache_vecs_norm @ query_vec
best_idx = int(np.argmax(similarities))
best_score = float(similarities[best_idx])
if best_score >= self.threshold:
entry = self.entries[best_idx]
if time.time() - entry["created_at"] > entry["ttl"]:
self._remove(best_idx)
return None
entry["hit_count"] += 1
return entry["value"]
return None
def set(
self,
messages: list[dict],
model: str,
value: str,
ttl: int = 3600,
):
query_text = self._get_query_text(messages)
if not query_text:
return
result = self.embedding_provider.embed([query_text])
vector = result.embeddings[0]
if len(self.entries) >= self.max_size:
self._evict()
self.entries.append({
"query": query_text,
"model": model,
"value": value,
"created_at": time.time(),
"ttl": ttl,
"hit_count": 0,
})
self.vectors.append(vector)
def _remove(self, index: int):
self.entries.pop(index)
self.vectors.pop(index)
def _evict(self):
if not self.entries:
return
oldest_idx = min(range(len(self.entries)), key=lambda i: self.entries[i]["created_at"])
self._remove(oldest_idx)
阈值怎么定
阈值是语义缓存最关键的参数:
| 阈值 | 效果 |
|---|---|
| 0.95+ | 非常严格,几乎要一模一样才命中 |
| 0.90 | 推荐值,语义高度相似才命中 |
| 0.85 | 较松,可能会误命中不太相关的内容 |
| 0.80 | 太松,容易返回错误的缓存结果 |
宁可 miss 也不要误命中。 返回错误的缓存结果比多调一次 API 糟糕得多。
我的建议:先用 0.92 上线,观察一周的命中日志,再微调。
双层缓存:精确 + 语义
两层组合使用,覆盖更多场景:
class TwoLevelCache:
def __init__(
self,
exact_cache: RedisCache | MemoryCache,
semantic_cache: SemanticCache,
):
self.exact = exact_cache
self.semantic = semantic_cache
def get(self, messages: list[dict], model: str, **kwargs) -> tuple[str | None, str]:
# Layer 1: 精确缓存(0ms)
result = self.exact.get(messages, model, **kwargs)
if result is not None:
return result, "exact_hit"
# Layer 2: 语义缓存(~50ms)
result = self.semantic.get(messages, model)
if result is not None:
return result, "semantic_hit"
return None, "miss"
def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
self.exact.set(messages, model, value, ttl, **kwargs)
self.semantic.set(messages, model, value, ttl)
集成到 AI 调用层
class CachedAIClient:
def __init__(self, client, cache: TwoLevelCache):
self.client = client
self.cache = cache
self.stats = {"exact_hit": 0, "semantic_hit": 0, "miss": 0}
async def chat(self, messages: list[dict], model: str = "gpt-4o-mini", **kwargs) -> str:
# 查缓存
cached, hit_type = self.cache.get(messages, model, **kwargs)
if cached is not None:
self.stats[hit_type] += 1
return cached
self.stats["miss"] += 1
# 缓存未命中,调 API
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs,
)
result = response.choices[0].message.content
# 存缓存
ttl = self._get_ttl(messages)
self.cache.set(messages, model, result, ttl, **kwargs)
return result
def _get_ttl(self, messages: list[dict]) -> int:
system_msg = next((m["content"] for m in messages if m["role"] == "system"), "")
if "实时" in system_msg or "最新" in system_msg:
return 300
if "知识库" in system_msg:
return 7200
return 3600
def get_stats(self) -> dict:
total = sum(self.stats.values())
if total == 0:
return self.stats
return {
**self.stats,
"hit_rate": (self.stats["exact_hit"] + self.stats["semantic_hit"]) / total,
}
TTL 策略
不同场景的缓存过期时间应该不同:
| 场景 | 推荐 TTL | 原因 |
|---|---|---|
| 知识库问答 | 2~24 小时 | 知识库更新频率低 |
| 数据查询 | 5~30 分钟 | 数据可能变化 |
| 闲聊 | 1 小时 | 无实效性要求 |
| 实时信息 | 不缓存 | 必须实时 |
| 翻译/摘要 | 24 小时+ | 结果稳定 |
知识库更新时主动清缓存
class CacheInvalidator:
def __init__(self, cache: TwoLevelCache):
self.cache = cache
def on_knowledge_base_updated(self, updated_sources: list[str]):
"""知识库更新时清除相关缓存"""
entries_to_remove = []
for i, entry in enumerate(self.cache.semantic.entries):
if any(source in entry.get("query", "") for source in updated_sources):
entries_to_remove.append(i)
for idx in reversed(entries_to_remove):
self.cache.semantic._remove(idx)
缓存 Key 设计
要不要把 system prompt 算进 Key
要。 同一个用户问题,不同的 system prompt 会产生完全不同的回答:
# 这两个请求的结果完全不同,缓存 Key 必须不同
messages_1 = [
{"role": "system", "content": "你是客服助手"},
{"role": "user", "content": "退货流程"},
]
messages_2 = [
{"role": "system", "content": "你是法律顾问"},
{"role": "user", "content": "退货流程"},
]
要不要把 temperature 算进 Key
看场景:
temperature=0(确定性输出):适合缓存,每次结果一样temperature>0(随机输出):缓存意义不大,但如果用户不在意微小差异也可以缓存
我的做法:temperature=0 的请求缓存,temperature>0.3 的不缓存。
语义缓存只看用户最后一条消息?
对于多轮对话,只看最后一条 user 消息不够:
def _get_query_text(self, messages: list[dict]) -> str:
# 取最后 2 轮对话拼接,提供更多上下文
recent = messages[-4:] # 最后 2 轮
return " ".join(m["content"] for m in recent if m["role"] == "user")
监控和调优
核心指标
class CacheMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"exact_hits": 0,
"semantic_hits": 0,
"misses": 0,
"avg_similarity_on_hit": [],
"avg_similarity_on_miss": [],
}
def record(self, hit_type: str, similarity: float = 0):
self.metrics["total_requests"] += 1
if hit_type == "exact_hit":
self.metrics["exact_hits"] += 1
elif hit_type == "semantic_hit":
self.metrics["semantic_hits"] += 1
self.metrics["avg_similarity_on_hit"].append(similarity)
else:
self.metrics["misses"] += 1
if similarity > 0:
self.metrics["avg_similarity_on_miss"].append(similarity)
def report(self) -> dict:
total = self.metrics["total_requests"]
if total == 0:
return {}
hits = self.metrics["exact_hits"] + self.metrics["semantic_hits"]
return {
"total": total,
"hit_rate": hits / total,
"exact_hit_rate": self.metrics["exact_hits"] / total,
"semantic_hit_rate": self.metrics["semantic_hits"] / total,
"avg_similarity_hit": (
np.mean(self.metrics["avg_similarity_on_hit"])
if self.metrics["avg_similarity_on_hit"] else 0
),
"avg_similarity_miss": (
np.mean(self.metrics["avg_similarity_on_miss"])
if self.metrics["avg_similarity_on_miss"] else 0
),
"estimated_savings": f"${hits * 0.002:.2f}",
}
该关注什么
| 指标 | 健康值 | 异常处理 |
|---|---|---|
| 总命中率 | > 30% | 太低→检查 TTL 和阈值 |
| 语义命中准确率 | > 95% | 太低→调高阈值 |
| 平均命中相似度 | > 0.92 | 太低→阈值可能太松 |
| Miss 的平均相似度 | < 0.85 | 如果接近阈值→考虑调低 |
开源方案:GPTCache
如果不想自己造轮子,GPTCache 是目前最成熟的 AI 缓存开源方案:
from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation
# 初始化
onnx = Onnx()
cache_base = CacheBase("sqlite")
vector_base = VectorBase("faiss", dimension=onnx.dimension)
data_manager = get_data_manager(cache_base, vector_base)
cache.init(
embedding_func=onnx.to_embeddings,
data_manager=data_manager,
similarity_evaluation=SearchDistanceEvaluation(),
)
cache.set_openai_key()
# 使用(和 OpenAI SDK 一样的接口)
response = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "退货流程是什么"}],
)
GPTCache 的优势:内置 Embedding + 向量存储 + 相似度评估,开箱即用。缺点是依赖较重,定制灵活度不如自研。
什么时候不该缓存
- 实时性要求高:用户问”现在几点了”,缓存过期的答案更差
- 上下文高度个性化:每个用户的对话历史不同,缓存几乎不可能命中
- 创意类输出:用户要 AI 写诗、编故事,每次都要不同
- temperature 很高:本身就要求随机性
总结
- 精确缓存是基础——相同请求直接返回,零延迟零成本,但命中率低(10~20%)。
- 语义缓存是杀手锏——语义相似也能命中,命中率提升到 30~50%,是 AI 缓存的核心。
- 双层组合效果最好——精确缓存(0ms)→ 语义缓存(
50ms)→ API 调用(15s)。 - 阈值宁高勿低——误命中比 miss 更严重,推荐 0.90~0.92。
- TTL 因场景而异——知识库问答 2 小时,数据查询 5 分钟,实时信息不缓存。
- 监控是必须的——命中率、准确率、省了多少钱,用数据驱动调优。
下一篇聊 Token 计算与上下文窗口管理:超了上下文窗口就报错,Token 没估准就超预算。怎么精确计算、怎么管理、怎么优化?
下一篇预告:10 | Token 计算与上下文窗口管理实战
讨论话题:你的 AI 应用有做缓存吗?命中率多少?语义缓存的阈值定的多少?评论区聊聊。