AI 专题 JAN 25, 2026

AI 缓存策略:精确缓存 + 语义缓存,省钱又提速

#AI 缓存#语义缓存#Redis#成本优化

AI 缓存策略:精确缓存 + 语义缓存,省钱又提速

本文是【AI 专题精讲】系列第 09 篇。 上一篇:结构化输出:让 AI 稳定返回 JSON | 下一篇:Token 计算与上下文窗口管理实战


这篇文章你会得到什么

你的 AI 应用上线了,用户问的最多的问题是什么?

“退货流程是什么” “怎么申请年假” “报销需要什么材料”

这些高频问题,每次都调 AI API,每次都花钱、每次都等 1~3 秒。100 个用户问同一个问题,就是 100 次 API 调用。

缓存能帮你省 30%~70% 的 API 成本,同时把响应时间从秒级降到毫秒级。

但 AI 缓存不像传统缓存那么简单。用户不会每次都问一模一样的话——“退货流程” “怎么退货” “退货步骤是什么” 说的是同一件事,传统的 key-value 缓存命中不了。

今天给你两层缓存方案:

  1. 精确缓存:完全相同的输入,直接返回缓存结果
  2. 语义缓存:语义相似的输入,也能命中缓存

为什么 AI 应用特别需要缓存

问题没有缓存有缓存
响应时间1~5 秒(API 调用)< 50ms(缓存命中)
API 成本每次都花钱命中率 50% → 省一半
限流风险高并发容易 429缓存挡住大量重复请求
稳定性API 挂了就全挂缓存兜底,降级可用

第一层:精确缓存

原理

把请求的完整输入(model + messages + 参数)做哈希,作为缓存 Key。完全相同的输入直接返回缓存。

内存缓存实现

最简单的方案,适合单进程、小规模:

import hashlib
import json
import time
from dataclasses import dataclass


@dataclass
class CacheEntry:
    value: str
    created_at: float
    ttl: int
    hit_count: int = 0


class MemoryCache:
    def __init__(self, max_size: int = 1000):
        self.cache: dict[str, CacheEntry] = {}
        self.max_size = max_size

    def _make_key(self, messages: list[dict], model: str, **kwargs) -> str:
        content = json.dumps({
            "model": model,
            "messages": messages,
            **kwargs,
        }, ensure_ascii=False, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    def get(self, messages: list[dict], model: str, **kwargs) -> str | None:
        key = self._make_key(messages, model, **kwargs)
        entry = self.cache.get(key)

        if entry is None:
            return None

        if time.time() - entry.created_at > entry.ttl:
            del self.cache[key]
            return None

        entry.hit_count += 1
        return entry.value

    def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
        key = self._make_key(messages, model, **kwargs)

        if len(self.cache) >= self.max_size:
            self._evict()

        self.cache[key] = CacheEntry(
            value=value,
            created_at=time.time(),
            ttl=ttl,
        )

    def _evict(self):
        oldest_key = min(self.cache, key=lambda k: self.cache[k].created_at)
        del self.cache[oldest_key]

Redis 缓存实现

多进程、分布式场景用 Redis:

import redis
import hashlib
import json

class RedisCache:
    def __init__(self, redis_url: str = "redis://localhost:6379", prefix: str = "ai_cache"):
        self.client = redis.from_url(redis_url)
        self.prefix = prefix

    def _make_key(self, messages: list[dict], model: str, **kwargs) -> str:
        content = json.dumps({
            "model": model,
            "messages": messages,
            **kwargs,
        }, ensure_ascii=False, sort_keys=True)
        hash_val = hashlib.sha256(content.encode()).hexdigest()
        return f"{self.prefix}:{hash_val}"

    def get(self, messages: list[dict], model: str, **kwargs) -> str | None:
        key = self._make_key(messages, model, **kwargs)
        value = self.client.get(key)
        if value:
            self.client.hincrby(f"{key}:meta", "hits", 1)
            return value.decode()
        return None

    def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
        key = self._make_key(messages, model, **kwargs)
        self.client.setex(key, ttl, value)
        self.client.hset(f"{key}:meta", mapping={
            "model": model,
            "created_at": str(time.time()),
            "hits": "0",
        })

精确缓存的局限

问题很明显:用户换个说法就 miss 了

"退货流程是什么"     → 缓存命中
"怎么退货"          → 缓存未命中(不同的字符串)
"退货步骤"          → 缓存未命中

三个问题问的是同一件事,但精确缓存只能命中第一个。实际项目中精确缓存的命中率通常只有 10%~20%。


第二层:语义缓存

原理

不比较字符串是否相同,而是比较语义是否相似。把查询转成 Embedding 向量,在缓存中找最近的向量,相似度超过阈值就命中。

"退货流程是什么"  → [0.12, -0.34, ...]
"怎么退货"       → [0.11, -0.33, ...]  ← 余弦相似度 0.95 > 阈值 0.9 → 命中!
"今天天气"       → [-0.45, 0.23, ...]  ← 余弦相似度 0.15 < 阈值 0.9 → 未命中

实现

import numpy as np

class SemanticCache:
    def __init__(
        self,
        embedding_provider,
        similarity_threshold: float = 0.9,
        max_size: int = 5000,
    ):
        self.embedding_provider = embedding_provider
        self.threshold = similarity_threshold
        self.max_size = max_size

        self.entries: list[dict] = []
        self.vectors: list[list[float]] = []

    def _get_query_text(self, messages: list[dict]) -> str:
        user_messages = [m["content"] for m in messages if m["role"] == "user"]
        return user_messages[-1] if user_messages else ""

    def get(self, messages: list[dict], model: str) -> str | None:
        query_text = self._get_query_text(messages)
        if not query_text or not self.vectors:
            return None

        query_result = self.embedding_provider.embed([query_text])
        query_vec = np.array(query_result.embeddings[0])
        query_vec = query_vec / np.linalg.norm(query_vec)

        cache_vecs = np.array(self.vectors)
        norms = np.linalg.norm(cache_vecs, axis=1, keepdims=True)
        cache_vecs_norm = cache_vecs / norms

        similarities = cache_vecs_norm @ query_vec
        best_idx = int(np.argmax(similarities))
        best_score = float(similarities[best_idx])

        if best_score >= self.threshold:
            entry = self.entries[best_idx]

            if time.time() - entry["created_at"] > entry["ttl"]:
                self._remove(best_idx)
                return None

            entry["hit_count"] += 1
            return entry["value"]

        return None

    def set(
        self,
        messages: list[dict],
        model: str,
        value: str,
        ttl: int = 3600,
    ):
        query_text = self._get_query_text(messages)
        if not query_text:
            return

        result = self.embedding_provider.embed([query_text])
        vector = result.embeddings[0]

        if len(self.entries) >= self.max_size:
            self._evict()

        self.entries.append({
            "query": query_text,
            "model": model,
            "value": value,
            "created_at": time.time(),
            "ttl": ttl,
            "hit_count": 0,
        })
        self.vectors.append(vector)

    def _remove(self, index: int):
        self.entries.pop(index)
        self.vectors.pop(index)

    def _evict(self):
        if not self.entries:
            return
        oldest_idx = min(range(len(self.entries)), key=lambda i: self.entries[i]["created_at"])
        self._remove(oldest_idx)

阈值怎么定

阈值是语义缓存最关键的参数:

阈值效果
0.95+非常严格,几乎要一模一样才命中
0.90推荐值,语义高度相似才命中
0.85较松,可能会误命中不太相关的内容
0.80太松,容易返回错误的缓存结果

宁可 miss 也不要误命中。 返回错误的缓存结果比多调一次 API 糟糕得多。

我的建议:先用 0.92 上线,观察一周的命中日志,再微调。


双层缓存:精确 + 语义

两层组合使用,覆盖更多场景:

class TwoLevelCache:
    def __init__(
        self,
        exact_cache: RedisCache | MemoryCache,
        semantic_cache: SemanticCache,
    ):
        self.exact = exact_cache
        self.semantic = semantic_cache

    def get(self, messages: list[dict], model: str, **kwargs) -> tuple[str | None, str]:
        # Layer 1: 精确缓存(0ms)
        result = self.exact.get(messages, model, **kwargs)
        if result is not None:
            return result, "exact_hit"

        # Layer 2: 语义缓存(~50ms)
        result = self.semantic.get(messages, model)
        if result is not None:
            return result, "semantic_hit"

        return None, "miss"

    def set(self, messages: list[dict], model: str, value: str, ttl: int = 3600, **kwargs):
        self.exact.set(messages, model, value, ttl, **kwargs)
        self.semantic.set(messages, model, value, ttl)

集成到 AI 调用层

class CachedAIClient:
    def __init__(self, client, cache: TwoLevelCache):
        self.client = client
        self.cache = cache
        self.stats = {"exact_hit": 0, "semantic_hit": 0, "miss": 0}

    async def chat(self, messages: list[dict], model: str = "gpt-4o-mini", **kwargs) -> str:
        # 查缓存
        cached, hit_type = self.cache.get(messages, model, **kwargs)
        if cached is not None:
            self.stats[hit_type] += 1
            return cached

        self.stats["miss"] += 1

        # 缓存未命中,调 API
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs,
        )
        result = response.choices[0].message.content

        # 存缓存
        ttl = self._get_ttl(messages)
        self.cache.set(messages, model, result, ttl, **kwargs)

        return result

    def _get_ttl(self, messages: list[dict]) -> int:
        system_msg = next((m["content"] for m in messages if m["role"] == "system"), "")
        if "实时" in system_msg or "最新" in system_msg:
            return 300
        if "知识库" in system_msg:
            return 7200
        return 3600

    def get_stats(self) -> dict:
        total = sum(self.stats.values())
        if total == 0:
            return self.stats
        return {
            **self.stats,
            "hit_rate": (self.stats["exact_hit"] + self.stats["semantic_hit"]) / total,
        }

TTL 策略

不同场景的缓存过期时间应该不同:

场景推荐 TTL原因
知识库问答2~24 小时知识库更新频率低
数据查询5~30 分钟数据可能变化
闲聊1 小时无实效性要求
实时信息不缓存必须实时
翻译/摘要24 小时+结果稳定

知识库更新时主动清缓存

class CacheInvalidator:
    def __init__(self, cache: TwoLevelCache):
        self.cache = cache

    def on_knowledge_base_updated(self, updated_sources: list[str]):
        """知识库更新时清除相关缓存"""
        entries_to_remove = []
        for i, entry in enumerate(self.cache.semantic.entries):
            if any(source in entry.get("query", "") for source in updated_sources):
                entries_to_remove.append(i)

        for idx in reversed(entries_to_remove):
            self.cache.semantic._remove(idx)

缓存 Key 设计

要不要把 system prompt 算进 Key

要。 同一个用户问题,不同的 system prompt 会产生完全不同的回答:

# 这两个请求的结果完全不同,缓存 Key 必须不同
messages_1 = [
    {"role": "system", "content": "你是客服助手"},
    {"role": "user", "content": "退货流程"},
]
messages_2 = [
    {"role": "system", "content": "你是法律顾问"},
    {"role": "user", "content": "退货流程"},
]

要不要把 temperature 算进 Key

看场景:

  • temperature=0(确定性输出):适合缓存,每次结果一样
  • temperature>0(随机输出):缓存意义不大,但如果用户不在意微小差异也可以缓存

我的做法:temperature=0 的请求缓存,temperature>0.3 的不缓存。

语义缓存只看用户最后一条消息?

对于多轮对话,只看最后一条 user 消息不够:

def _get_query_text(self, messages: list[dict]) -> str:
    # 取最后 2 轮对话拼接,提供更多上下文
    recent = messages[-4:]  # 最后 2 轮
    return " ".join(m["content"] for m in recent if m["role"] == "user")

监控和调优

核心指标

class CacheMonitor:
    def __init__(self):
        self.metrics = {
            "total_requests": 0,
            "exact_hits": 0,
            "semantic_hits": 0,
            "misses": 0,
            "avg_similarity_on_hit": [],
            "avg_similarity_on_miss": [],
        }

    def record(self, hit_type: str, similarity: float = 0):
        self.metrics["total_requests"] += 1
        if hit_type == "exact_hit":
            self.metrics["exact_hits"] += 1
        elif hit_type == "semantic_hit":
            self.metrics["semantic_hits"] += 1
            self.metrics["avg_similarity_on_hit"].append(similarity)
        else:
            self.metrics["misses"] += 1
            if similarity > 0:
                self.metrics["avg_similarity_on_miss"].append(similarity)

    def report(self) -> dict:
        total = self.metrics["total_requests"]
        if total == 0:
            return {}

        hits = self.metrics["exact_hits"] + self.metrics["semantic_hits"]
        return {
            "total": total,
            "hit_rate": hits / total,
            "exact_hit_rate": self.metrics["exact_hits"] / total,
            "semantic_hit_rate": self.metrics["semantic_hits"] / total,
            "avg_similarity_hit": (
                np.mean(self.metrics["avg_similarity_on_hit"])
                if self.metrics["avg_similarity_on_hit"] else 0
            ),
            "avg_similarity_miss": (
                np.mean(self.metrics["avg_similarity_on_miss"])
                if self.metrics["avg_similarity_on_miss"] else 0
            ),
            "estimated_savings": f"${hits * 0.002:.2f}",
        }

该关注什么

指标健康值异常处理
总命中率> 30%太低→检查 TTL 和阈值
语义命中准确率> 95%太低→调高阈值
平均命中相似度> 0.92太低→阈值可能太松
Miss 的平均相似度< 0.85如果接近阈值→考虑调低

开源方案:GPTCache

如果不想自己造轮子,GPTCache 是目前最成熟的 AI 缓存开源方案:

from gptcache import cache
from gptcache.adapter import openai
from gptcache.embedding import Onnx
from gptcache.manager import CacheBase, VectorBase, get_data_manager
from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation

# 初始化
onnx = Onnx()
cache_base = CacheBase("sqlite")
vector_base = VectorBase("faiss", dimension=onnx.dimension)
data_manager = get_data_manager(cache_base, vector_base)

cache.init(
    embedding_func=onnx.to_embeddings,
    data_manager=data_manager,
    similarity_evaluation=SearchDistanceEvaluation(),
)
cache.set_openai_key()

# 使用(和 OpenAI SDK 一样的接口)
response = openai.ChatCompletion.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "退货流程是什么"}],
)

GPTCache 的优势:内置 Embedding + 向量存储 + 相似度评估,开箱即用。缺点是依赖较重,定制灵活度不如自研。


什么时候不该缓存

  1. 实时性要求高:用户问”现在几点了”,缓存过期的答案更差
  2. 上下文高度个性化:每个用户的对话历史不同,缓存几乎不可能命中
  3. 创意类输出:用户要 AI 写诗、编故事,每次都要不同
  4. temperature 很高:本身就要求随机性

总结

  1. 精确缓存是基础——相同请求直接返回,零延迟零成本,但命中率低(10~20%)。
  2. 语义缓存是杀手锏——语义相似也能命中,命中率提升到 30~50%,是 AI 缓存的核心。
  3. 双层组合效果最好——精确缓存(0ms)→ 语义缓存(50ms)→ API 调用(15s)。
  4. 阈值宁高勿低——误命中比 miss 更严重,推荐 0.90~0.92。
  5. TTL 因场景而异——知识库问答 2 小时,数据查询 5 分钟,实时信息不缓存。
  6. 监控是必须的——命中率、准确率、省了多少钱,用数据驱动调优。

下一篇聊 Token 计算与上下文窗口管理:超了上下文窗口就报错,Token 没估准就超预算。怎么精确计算、怎么管理、怎么优化?


下一篇预告10 | Token 计算与上下文窗口管理实战


讨论话题:你的 AI 应用有做缓存吗?命中率多少?语义缓存的阈值定的多少?评论区聊聊。

评论