向量数据库选型:ChromaDB / pgvector / Pinecone / Milvus
向量数据库选型:ChromaDB / pgvector / Pinecone / Milvus
本文是【AI 专题精讲】系列第 05 篇。 上一篇:Embedding 模型选型:OpenAI / BGE / Jina 效果与成本对比 | 下一篇:RAG 检索优化:混合搜索、Reranking、多路召回
这篇文章你会得到什么
上一篇我们把文本变成了向量,现在问题来了:这些向量存哪?
你可能会说,存个 JSON 文件不就行了?100 条可以,10 万条试试?检索一次遍历全部向量,光算余弦相似度就要好几秒,用户等不了。
向量数据库就是干这个的——高效存储和检索高维向量。但市面上向量库少说十几种,从嵌入式的 ChromaDB 到分布式的 Milvus,从 PostgreSQL 扩展 pgvector 到全托管 SaaS Pinecone,怎么选?
今天的目标:四大主流向量库逐个拆解,给你 CRUD 代码 + 性能对比 + 选型决策树,看完直接能做技术决策。
向量数据库的核心原理
为什么不能用普通数据库
普通数据库(MySQL、PostgreSQL)擅长精确匹配:WHERE name = '张三'。但向量检索是近似匹配——找和查询向量最相近的 K 个向量。
暴力搜索(逐个比较)的时间复杂度是 O(n),10 万条还能忍,百万级就不行了。
ANN:近似最近邻
向量数据库用 ANN(Approximate Nearest Neighbor) 算法加速检索,核心思路是建索引:
| 算法 | 原理 | 特点 |
|---|---|---|
| HNSW | 多层图结构,逐层缩小搜索范围 | 速度快,内存占用大,最常用 |
| IVF | 先聚类再搜索,只在最近的几个簇里找 | 内存省,需要训练 |
| PQ | 向量压缩,用量化降低存储 | 极省内存,精度有损 |
| Flat | 暴力搜索,不建索引 | 100% 精确,但最慢 |
大部分场景用 HNSW 就够了,这也是四个向量库的默认选项。
ChromaDB:轻量嵌入式,5 分钟跑起来
ChromaDB 的定位类似 SQLite 之于 MySQL——嵌入式、零配置、开箱即用。
安装和基本使用
pip install chromadb
import chromadb
client = chromadb.Client()
# 创建集合(类似表)
collection = client.create_collection(
name="knowledge_base",
metadata={"hnsw:space": "cosine"},
)
# 插入数据
collection.add(
ids=["chunk_1", "chunk_2", "chunk_3"],
documents=["退货需在7天内申请", "年假按工龄计算", "报销需提交发票"],
embeddings=[
[0.12, -0.34, 0.56], # 实际维度会更高
[-0.45, 0.23, -0.12],
[0.33, 0.11, -0.28],
],
metadatas=[
{"source": "售后手册", "page": 12},
{"source": "员工手册", "page": 45},
{"source": "财务制度", "page": 8},
],
)
# 检索
results = collection.query(
query_embeddings=[[0.11, -0.33, 0.55]],
n_results=2,
)
print(results["documents"])
# [['退货需在7天内申请', '报销需提交发票']]
持久化存储
默认数据在内存里,重启就没了。加一个路径就能持久化:
client = chromadb.PersistentClient(path="./chroma_data")
带 Embedding 函数
ChromaDB 可以内置 Embedding 函数,插入时自动 Embed:
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
embedding_fn = OpenAIEmbeddingFunction(
api_key="sk-xxx",
model_name="text-embedding-3-small",
)
collection = client.create_collection(
name="auto_embed",
embedding_function=embedding_fn,
)
# 只传文本,自动生成向量
collection.add(
ids=["chunk_1"],
documents=["退货需在7天内申请"],
)
# 查询也只传文本
results = collection.query(
query_texts=["怎么退货"],
n_results=3,
)
元数据过滤
results = collection.query(
query_embeddings=[query_vec],
n_results=5,
where={"source": "售后手册"},
where_document={"$contains": "退货"},
)
ChromaDB 小结
| 维度 | 评价 |
|---|---|
| 上手难度 | 极低,pip install 即可 |
| 数据规模 | 10 万级以内 |
| 部署方式 | 嵌入式(进程内)或 client/server |
| 持久化 | 本地文件 |
| 适合场景 | 原型、POC、个人项目、小团队 |
| 不适合 | 百万级数据、高并发生产环境 |
pgvector:已有 PostgreSQL 就零成本接入
如果你的项目已经用了 PostgreSQL,pgvector 可能是最省事的选择——不用引入新服务,加个扩展就行。
安装
CREATE EXTENSION IF NOT EXISTS vector;
建表和插入
CREATE TABLE knowledge_chunks (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
source VARCHAR(255),
embedding vector(1024),
created_at TIMESTAMP DEFAULT NOW()
);
-- 创建 HNSW 索引
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
Python 操作(asyncpg)
import asyncpg
import json
class PgVectorStore:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(self.dsn)
async with self.pool.acquire() as conn:
await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
async def insert(self, chunks: list[dict]):
async with self.pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO knowledge_chunks (content, source, embedding)
VALUES ($1, $2, $3::vector)
""",
[
(c["content"], c["source"], str(c["embedding"]))
for c in chunks
],
)
async def search(
self,
query_embedding: list[float],
top_k: int = 5,
source_filter: str | None = None,
) -> list[dict]:
query = """
SELECT id, content, source,
1 - (embedding <=> $1::vector) AS similarity
FROM knowledge_chunks
"""
params = [str(query_embedding)]
if source_filter:
query += " WHERE source = $2"
params.append(source_filter)
query += " ORDER BY embedding <=> $1::vector LIMIT $" + str(len(params) + 1)
params.append(top_k)
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [
{
"id": row["id"],
"content": row["content"],
"source": row["source"],
"similarity": float(row["similarity"]),
}
for row in rows
]
pgvector 操作符
| 操作符 | 含义 | 适用 |
|---|---|---|
<-> | L2 距离(欧几里得) | 一般场景 |
<=> | 余弦距离 | 文本检索(推荐) |
<#> | 内积距离(负数) | 已归一化向量 |
全文检索 + 向量混合搜索
pgvector 的杀手锏:可以和 PostgreSQL 原生的全文检索结合,一条 SQL 搞定混合搜索:
SELECT id, content,
1 - (embedding <=> $1::vector) AS vector_score,
ts_rank(to_tsvector('chinese', content), plainto_tsquery('chinese', $2)) AS text_score
FROM knowledge_chunks
WHERE to_tsvector('chinese', content) @@ plainto_tsquery('chinese', $2)
ORDER BY vector_score * 0.7 + text_score * 0.3 DESC
LIMIT 10;
pgvector 小结
| 维度 | 评价 |
|---|---|
| 上手难度 | 低(会 SQL 就行) |
| 数据规模 | 百万级(单机) |
| 部署方式 | PostgreSQL 扩展 |
| 持久化 | 数据库自带 |
| 适合场景 | 已有 PG、中小规模、需要混合搜索 |
| 不适合 | 千万级数据、需要极致检索速度 |
Pinecone:全托管 SaaS,免运维
Pinecone 是向量数据库领域最知名的 SaaS 服务——不用管服务器,API 调用即可。
安装和初始化
pip install pinecone
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="your-api-key")
# 创建索引
pc.create_index(
name="knowledge-base",
dimension=1024,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
index = pc.Index("knowledge-base")
CRUD 操作
# 插入(Upsert)
index.upsert(
vectors=[
{
"id": "chunk_1",
"values": [0.12, -0.34, ...], # 1024 维向量
"metadata": {"source": "售后手册", "page": 12, "content": "退货需在7天内申请"},
},
{
"id": "chunk_2",
"values": [-0.45, 0.23, ...],
"metadata": {"source": "员工手册", "page": 45, "content": "年假按工龄计算"},
},
],
namespace="production",
)
# 检索
results = index.query(
vector=[0.11, -0.33, ...],
top_k=5,
include_metadata=True,
namespace="production",
filter={"source": {"$eq": "售后手册"}},
)
for match in results["matches"]:
print(f"[{match['score']:.3f}] {match['metadata']['content']}")
# 删除
index.delete(ids=["chunk_1"], namespace="production")
Namespace 隔离
Pinecone 的 Namespace 可以在同一个索引内做数据隔离,很适合多租户场景:
# 公司 A 的知识库
index.upsert(vectors=[...], namespace="company_a")
# 公司 B 的知识库
index.upsert(vectors=[...], namespace="company_b")
# 查询只在某个 namespace 内
results = index.query(vector=[...], namespace="company_a", top_k=5)
Pinecone 小结
| 维度 | 评价 |
|---|---|
| 上手难度 | 极低(纯 API) |
| 数据规模 | 百万到亿级 |
| 部署方式 | 全托管 SaaS |
| 定价 | 免费层 10 万向量,之后按用量计费 |
| 适合场景 | 不想运维、快速上线、海外部署 |
| 不适合 | 数据不能出境、需要私有化部署、预算敏感 |
Milvus:分布式,大规模生产级
Milvus 是开源向量数据库里功能最全面的——分布式架构、亿级数据、多种索引、混合搜索。
Docker 快速启动
# 单机版(开发环境)
docker run -d --name milvus \
-p 19530:19530 \
-p 9091:9091 \
milvusdb/milvus:latest \
milvus run standalone
Python 操作
pip install pymilvus
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
# 连接
connections.connect("default", host="localhost", port="19530")
# 定义 Schema
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=4096),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
]
schema = CollectionSchema(fields, description="知识库")
collection = Collection("knowledge_base", schema)
# 创建索引
collection.create_index(
field_name="embedding",
index_params={
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200},
},
)
collection.load()
插入和检索
# 插入
collection.insert([
["chunk_1", "chunk_2", "chunk_3"],
["退货需在7天内申请", "年假按工龄计算", "报销需提交发票"],
["售后手册", "员工手册", "财务制度"],
[[0.12, -0.34, ...], [-0.45, 0.23, ...], [0.33, 0.11, ...]],
])
# 检索
results = collection.search(
data=[[0.11, -0.33, ...]],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 128}},
limit=5,
output_fields=["content", "source"],
expr='source == "售后手册"',
)
for hits in results:
for hit in hits:
print(f"[{hit.distance:.3f}] {hit.entity.get('content')}")
Milvus 的独特能力
1. 混合搜索(Dense + Sparse)
from pymilvus import AnnSearchRequest, RRFRanker
# Dense 向量检索
dense_req = AnnSearchRequest(
data=[dense_vector],
anns_field="dense_embedding",
param={"metric_type": "COSINE", "params": {"ef": 128}},
limit=10,
)
# Sparse 向量检索
sparse_req = AnnSearchRequest(
data=[sparse_vector],
anns_field="sparse_embedding",
param={"metric_type": "IP"},
limit=10,
)
# RRF 融合排序
results = collection.hybrid_search(
[dense_req, sparse_req],
ranker=RRFRanker(k=60),
limit=5,
output_fields=["content"],
)
2. 分区(Partition)
collection.create_partition("department_hr")
collection.create_partition("department_finance")
# 只搜索某个分区
results = collection.search(
data=[query_vec],
anns_field="embedding",
param={...},
limit=5,
partition_names=["department_hr"],
)
Milvus 小结
| 维度 | 评价 |
|---|---|
| 上手难度 | 中等(需要理解 Schema 和索引) |
| 数据规模 | 亿级(分布式集群) |
| 部署方式 | Docker / K8s / Zilliz Cloud(托管) |
| 持久化 | MinIO / S3 |
| 适合场景 | 大规模生产、需要混合搜索、团队有运维能力 |
| 不适合 | 小项目(太重了)、没有运维人员 |
四库横向对比
| 维度 | ChromaDB | pgvector | Pinecone | Milvus |
|---|---|---|---|---|
| 类型 | 嵌入式 | PG 扩展 | SaaS | 分布式 |
| 数据上限 | ~10 万 | ~百万 | 亿级 | 亿级 |
| 索引类型 | HNSW | HNSW/IVF | 自研 | HNSW/IVF/PQ 等 |
| 混合搜索 | 不支持 | SQL + 全文 | 元数据过滤 | Dense + Sparse |
| 部署难度 | 零 | 低 | 零 | 中高 |
| 运维成本 | 无 | 低(跟着 PG) | 无 | 高 |
| 价格 | 免费 | 免费 | 按用量 | 免费/托管版付费 |
| 多租户 | 不支持 | Schema 隔离 | Namespace | Partition |
| 生态 | Python 优先 | SQL 生态 | REST API | 多语言 SDK |
统一封装:VectorStore 抽象层
和上一篇的 EmbeddingProvider 思路一样,向量库也要做一层抽象,方便切换:
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class SearchResult:
id: str
content: str
score: float
metadata: dict
class VectorStore(ABC):
@abstractmethod
async def insert(self, items: list[dict]) -> None:
"""插入向量。每个 item 包含 id, content, embedding, metadata"""
...
@abstractmethod
async def search(
self,
query_embedding: list[float],
top_k: int = 5,
filter: dict | None = None,
) -> list[SearchResult]:
...
@abstractmethod
async def delete(self, ids: list[str]) -> None:
...
class ChromaVectorStore(VectorStore):
def __init__(self, collection_name: str, persist_dir: str = "./chroma_data"):
import chromadb
self.client = chromadb.PersistentClient(path=persist_dir)
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
async def insert(self, items: list[dict]) -> None:
self.collection.upsert(
ids=[item["id"] for item in items],
embeddings=[item["embedding"] for item in items],
documents=[item["content"] for item in items],
metadatas=[item.get("metadata", {}) for item in items],
)
async def search(self, query_embedding, top_k=5, filter=None):
kwargs = {"query_embeddings": [query_embedding], "n_results": top_k}
if filter:
kwargs["where"] = filter
results = self.collection.query(**kwargs)
return [
SearchResult(
id=results["ids"][0][i],
content=results["documents"][0][i],
score=1 - results["distances"][0][i],
metadata=results["metadatas"][0][i] if results["metadatas"] else {},
)
for i in range(len(results["ids"][0]))
]
async def delete(self, ids: list[str]) -> None:
self.collection.delete(ids=ids)
class PgVectorStore(VectorStore):
def __init__(self, dsn: str, table: str = "knowledge_chunks"):
self.dsn = dsn
self.table = table
self.pool = None
async def connect(self):
import asyncpg
self.pool = await asyncpg.create_pool(self.dsn)
async def insert(self, items: list[dict]) -> None:
async with self.pool.acquire() as conn:
await conn.executemany(
f"""
INSERT INTO {self.table} (id, content, source, embedding)
VALUES ($1, $2, $3, $4::vector)
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding
""",
[
(item["id"], item["content"],
item.get("metadata", {}).get("source", ""),
str(item["embedding"]))
for item in items
],
)
async def search(self, query_embedding, top_k=5, filter=None):
query = f"""
SELECT id, content, source,
1 - (embedding <=> $1::vector) AS score
FROM {self.table}
"""
params = [str(query_embedding)]
if filter and "source" in filter:
query += " WHERE source = $2"
params.append(filter["source"])
query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
params.append(top_k)
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [
SearchResult(
id=str(row["id"]),
content=row["content"],
score=float(row["score"]),
metadata={"source": row["source"]},
)
for row in rows
]
async def delete(self, ids: list[str]) -> None:
async with self.pool.acquire() as conn:
await conn.execute(
f"DELETE FROM {self.table} WHERE id = ANY($1)", ids
)
def create_vector_store(backend: str, **kwargs) -> VectorStore:
stores = {
"chroma": ChromaVectorStore,
"pgvector": PgVectorStore,
}
if backend not in stores:
raise ValueError(f"不支持: {backend},可选: {list(stores.keys())}")
return stores[backend](**kwargs)
使用方式完全一致:
# 开发环境用 ChromaDB
store = create_vector_store("chroma", collection_name="kb")
# 生产环境切 pgvector
store = create_vector_store("pgvector", dsn="postgresql://...")
await store.connect()
# 接口完全一致
await store.insert([
{"id": "1", "content": "退货流程...", "embedding": [...], "metadata": {"source": "售后"}},
])
results = await store.search(query_embedding=[...], top_k=5)
选型决策树
数据量多大?
├── < 10 万条 → 有 PostgreSQL 吗?
│ ├── 有 → pgvector(零成本,够用)
│ └── 没有 → ChromaDB(最简单)
├── 10 万 ~ 百万 → 有运维团队吗?
│ ├── 有 → pgvector 或 Milvus(按需选)
│ └── 没有 → Pinecone(免运维)
└── > 百万 → 有 K8s 集群吗?
├── 有 → Milvus(分布式首选)
└── 没有 → Pinecone 或 Zilliz Cloud(托管 Milvus)
我的实际项目选型
以下是我在不同项目中的真实选择:
| 项目 | 数据量 | 选择 | 原因 |
|---|---|---|---|
| 个人 RAG 助手 | 500 条 | ChromaDB | 零配置,5 分钟跑起来 |
| 公司知识库 v1 | 3 万条 | pgvector | 已有 PG,不想加服务 |
| 多部门知识平台 | 50 万条 | Milvus | 需要分区隔离 + 混合搜索 |
如果你刚入门 RAG,直接用 ChromaDB 跑起来,等数据量上来了再迁移。向量库的切换成本其实不高——前提是你用了上面的抽象层。
常见坑和注意事项
1. 索引不是万能的
HNSW 索引有召回率和速度的权衡。参数 ef_search 越大,召回越准但越慢:
# Milvus:调整搜索时的 ef
param = {"metric_type": "COSINE", "params": {"ef": 64}} # 快但可能漏
param = {"metric_type": "COSINE", "params": {"ef": 256}} # 慢但更准
建议:开发时 ef=128,上线后根据实际效果微调。
2. 向量维度要统一
一个集合里的向量维度必须一致。如果你切换了 Embedding 模型(维度从 1536 变成 1024),需要重建索引并重新 Embed 所有数据。
3. 元数据索引
如果你经常按元数据过滤(where source = 'xxx'),记得给元数据字段建索引。pgvector 的优势就在这——SQL 索引轻车熟路。
4. 备份策略
| 向量库 | 备份方式 |
|---|---|
| ChromaDB | 复制 chroma_data/ 目录 |
| pgvector | pg_dump(跟着 PG 走) |
| Pinecone | API 导出 + 重建 |
| Milvus | Milvus Backup 工具 / S3 快照 |
总结
- ChromaDB 适合快速起步——pip install 就能用,10 万条以内的项目首选。
- pgvector 适合已有 PG 的团队——零额外服务,SQL 生态加持,混合搜索开箱即用。
- Pinecone 适合不想运维——全托管 SaaS,付钱省心,但数据在海外。
- Milvus 适合大规模生产——亿级数据、分布式架构、混合搜索,但运维成本高。
- 务必做抽象层——
VectorStore接口统一封装,后期切换向量库零成本。 - 选型核心三要素:数据量 × 运维能力 × 预算。
下一篇进入 RAG 的检索优化环节:基础的向量检索不够准怎么办?混合搜索、Reranking、多路召回三板斧教你把召回率从 80% 提到 95%。
讨论话题:你的向量数据库用的哪个?数据量多大?有踩过什么坑吗?评论区聊聊。