AI 专题 NOV 20, 2025

向量数据库选型:ChromaDB / pgvector / Pinecone / Milvus

#RAG#向量数据库#ChromaDB#pgvector

向量数据库选型:ChromaDB / pgvector / Pinecone / Milvus

本文是【AI 专题精讲】系列第 05 篇。 上一篇:Embedding 模型选型:OpenAI / BGE / Jina 效果与成本对比 | 下一篇:RAG 检索优化:混合搜索、Reranking、多路召回


这篇文章你会得到什么

上一篇我们把文本变成了向量,现在问题来了:这些向量存哪?

你可能会说,存个 JSON 文件不就行了?100 条可以,10 万条试试?检索一次遍历全部向量,光算余弦相似度就要好几秒,用户等不了。

向量数据库就是干这个的——高效存储和检索高维向量。但市面上向量库少说十几种,从嵌入式的 ChromaDB 到分布式的 Milvus,从 PostgreSQL 扩展 pgvector 到全托管 SaaS Pinecone,怎么选?

今天的目标:四大主流向量库逐个拆解,给你 CRUD 代码 + 性能对比 + 选型决策树,看完直接能做技术决策。


向量数据库的核心原理

为什么不能用普通数据库

普通数据库(MySQL、PostgreSQL)擅长精确匹配:WHERE name = '张三'。但向量检索是近似匹配——找和查询向量最相近的 K 个向量。

暴力搜索(逐个比较)的时间复杂度是 O(n),10 万条还能忍,百万级就不行了。

ANN:近似最近邻

向量数据库用 ANN(Approximate Nearest Neighbor) 算法加速检索,核心思路是建索引:

算法原理特点
HNSW多层图结构,逐层缩小搜索范围速度快,内存占用大,最常用
IVF先聚类再搜索,只在最近的几个簇里找内存省,需要训练
PQ向量压缩,用量化降低存储极省内存,精度有损
Flat暴力搜索,不建索引100% 精确,但最慢

大部分场景用 HNSW 就够了,这也是四个向量库的默认选项。


ChromaDB:轻量嵌入式,5 分钟跑起来

ChromaDB 的定位类似 SQLite 之于 MySQL——嵌入式、零配置、开箱即用

安装和基本使用

pip install chromadb
import chromadb

client = chromadb.Client()

# 创建集合(类似表)
collection = client.create_collection(
    name="knowledge_base",
    metadata={"hnsw:space": "cosine"},
)

# 插入数据
collection.add(
    ids=["chunk_1", "chunk_2", "chunk_3"],
    documents=["退货需在7天内申请", "年假按工龄计算", "报销需提交发票"],
    embeddings=[
        [0.12, -0.34, 0.56],  # 实际维度会更高
        [-0.45, 0.23, -0.12],
        [0.33, 0.11, -0.28],
    ],
    metadatas=[
        {"source": "售后手册", "page": 12},
        {"source": "员工手册", "page": 45},
        {"source": "财务制度", "page": 8},
    ],
)

# 检索
results = collection.query(
    query_embeddings=[[0.11, -0.33, 0.55]],
    n_results=2,
)
print(results["documents"])
# [['退货需在7天内申请', '报销需提交发票']]

持久化存储

默认数据在内存里,重启就没了。加一个路径就能持久化:

client = chromadb.PersistentClient(path="./chroma_data")

带 Embedding 函数

ChromaDB 可以内置 Embedding 函数,插入时自动 Embed:

from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction

embedding_fn = OpenAIEmbeddingFunction(
    api_key="sk-xxx",
    model_name="text-embedding-3-small",
)

collection = client.create_collection(
    name="auto_embed",
    embedding_function=embedding_fn,
)

# 只传文本,自动生成向量
collection.add(
    ids=["chunk_1"],
    documents=["退货需在7天内申请"],
)

# 查询也只传文本
results = collection.query(
    query_texts=["怎么退货"],
    n_results=3,
)

元数据过滤

results = collection.query(
    query_embeddings=[query_vec],
    n_results=5,
    where={"source": "售后手册"},
    where_document={"$contains": "退货"},
)

ChromaDB 小结

维度评价
上手难度极低,pip install 即可
数据规模10 万级以内
部署方式嵌入式(进程内)或 client/server
持久化本地文件
适合场景原型、POC、个人项目、小团队
不适合百万级数据、高并发生产环境

pgvector:已有 PostgreSQL 就零成本接入

如果你的项目已经用了 PostgreSQL,pgvector 可能是最省事的选择——不用引入新服务,加个扩展就行

安装

CREATE EXTENSION IF NOT EXISTS vector;

建表和插入

CREATE TABLE knowledge_chunks (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    source VARCHAR(255),
    embedding vector(1024),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建 HNSW 索引
CREATE INDEX ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

Python 操作(asyncpg)

import asyncpg
import json

class PgVectorStore:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.dsn)
        async with self.pool.acquire() as conn:
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

    async def insert(self, chunks: list[dict]):
        async with self.pool.acquire() as conn:
            await conn.executemany(
                """
                INSERT INTO knowledge_chunks (content, source, embedding)
                VALUES ($1, $2, $3::vector)
                """,
                [
                    (c["content"], c["source"], str(c["embedding"]))
                    for c in chunks
                ],
            )

    async def search(
        self,
        query_embedding: list[float],
        top_k: int = 5,
        source_filter: str | None = None,
    ) -> list[dict]:
        query = """
            SELECT id, content, source,
                   1 - (embedding <=> $1::vector) AS similarity
            FROM knowledge_chunks
        """
        params = [str(query_embedding)]

        if source_filter:
            query += " WHERE source = $2"
            params.append(source_filter)

        query += " ORDER BY embedding <=> $1::vector LIMIT $" + str(len(params) + 1)
        params.append(top_k)

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)
            return [
                {
                    "id": row["id"],
                    "content": row["content"],
                    "source": row["source"],
                    "similarity": float(row["similarity"]),
                }
                for row in rows
            ]

pgvector 操作符

操作符含义适用
<->L2 距离(欧几里得)一般场景
<=>余弦距离文本检索(推荐)
<#>内积距离(负数)已归一化向量

全文检索 + 向量混合搜索

pgvector 的杀手锏:可以和 PostgreSQL 原生的全文检索结合,一条 SQL 搞定混合搜索:

SELECT id, content,
       1 - (embedding <=> $1::vector) AS vector_score,
       ts_rank(to_tsvector('chinese', content), plainto_tsquery('chinese', $2)) AS text_score
FROM knowledge_chunks
WHERE to_tsvector('chinese', content) @@ plainto_tsquery('chinese', $2)
ORDER BY vector_score * 0.7 + text_score * 0.3 DESC
LIMIT 10;

pgvector 小结

维度评价
上手难度低(会 SQL 就行)
数据规模百万级(单机)
部署方式PostgreSQL 扩展
持久化数据库自带
适合场景已有 PG、中小规模、需要混合搜索
不适合千万级数据、需要极致检索速度

Pinecone:全托管 SaaS,免运维

Pinecone 是向量数据库领域最知名的 SaaS 服务——不用管服务器,API 调用即可

安装和初始化

pip install pinecone
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="your-api-key")

# 创建索引
pc.create_index(
    name="knowledge-base",
    dimension=1024,
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)

index = pc.Index("knowledge-base")

CRUD 操作

# 插入(Upsert)
index.upsert(
    vectors=[
        {
            "id": "chunk_1",
            "values": [0.12, -0.34, ...],  # 1024 维向量
            "metadata": {"source": "售后手册", "page": 12, "content": "退货需在7天内申请"},
        },
        {
            "id": "chunk_2",
            "values": [-0.45, 0.23, ...],
            "metadata": {"source": "员工手册", "page": 45, "content": "年假按工龄计算"},
        },
    ],
    namespace="production",
)

# 检索
results = index.query(
    vector=[0.11, -0.33, ...],
    top_k=5,
    include_metadata=True,
    namespace="production",
    filter={"source": {"$eq": "售后手册"}},
)

for match in results["matches"]:
    print(f"[{match['score']:.3f}] {match['metadata']['content']}")

# 删除
index.delete(ids=["chunk_1"], namespace="production")

Namespace 隔离

Pinecone 的 Namespace 可以在同一个索引内做数据隔离,很适合多租户场景:

# 公司 A 的知识库
index.upsert(vectors=[...], namespace="company_a")

# 公司 B 的知识库
index.upsert(vectors=[...], namespace="company_b")

# 查询只在某个 namespace 内
results = index.query(vector=[...], namespace="company_a", top_k=5)

Pinecone 小结

维度评价
上手难度极低(纯 API)
数据规模百万到亿级
部署方式全托管 SaaS
定价免费层 10 万向量,之后按用量计费
适合场景不想运维、快速上线、海外部署
不适合数据不能出境、需要私有化部署、预算敏感

Milvus:分布式,大规模生产级

Milvus 是开源向量数据库里功能最全面的——分布式架构、亿级数据、多种索引、混合搜索

Docker 快速启动

# 单机版(开发环境)
docker run -d --name milvus \
  -p 19530:19530 \
  -p 9091:9091 \
  milvusdb/milvus:latest \
  milvus run standalone

Python 操作

pip install pymilvus
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# 连接
connections.connect("default", host="localhost", port="19530")

# 定义 Schema
fields = [
    FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=64),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=4096),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
]

schema = CollectionSchema(fields, description="知识库")
collection = Collection("knowledge_base", schema)

# 创建索引
collection.create_index(
    field_name="embedding",
    index_params={
        "index_type": "HNSW",
        "metric_type": "COSINE",
        "params": {"M": 16, "efConstruction": 200},
    },
)
collection.load()

插入和检索

# 插入
collection.insert([
    ["chunk_1", "chunk_2", "chunk_3"],
    ["退货需在7天内申请", "年假按工龄计算", "报销需提交发票"],
    ["售后手册", "员工手册", "财务制度"],
    [[0.12, -0.34, ...], [-0.45, 0.23, ...], [0.33, 0.11, ...]],
])

# 检索
results = collection.search(
    data=[[0.11, -0.33, ...]],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 128}},
    limit=5,
    output_fields=["content", "source"],
    expr='source == "售后手册"',
)

for hits in results:
    for hit in hits:
        print(f"[{hit.distance:.3f}] {hit.entity.get('content')}")

Milvus 的独特能力

1. 混合搜索(Dense + Sparse)

from pymilvus import AnnSearchRequest, RRFRanker

# Dense 向量检索
dense_req = AnnSearchRequest(
    data=[dense_vector],
    anns_field="dense_embedding",
    param={"metric_type": "COSINE", "params": {"ef": 128}},
    limit=10,
)

# Sparse 向量检索
sparse_req = AnnSearchRequest(
    data=[sparse_vector],
    anns_field="sparse_embedding",
    param={"metric_type": "IP"},
    limit=10,
)

# RRF 融合排序
results = collection.hybrid_search(
    [dense_req, sparse_req],
    ranker=RRFRanker(k=60),
    limit=5,
    output_fields=["content"],
)

2. 分区(Partition)

collection.create_partition("department_hr")
collection.create_partition("department_finance")

# 只搜索某个分区
results = collection.search(
    data=[query_vec],
    anns_field="embedding",
    param={...},
    limit=5,
    partition_names=["department_hr"],
)

Milvus 小结

维度评价
上手难度中等(需要理解 Schema 和索引)
数据规模亿级(分布式集群)
部署方式Docker / K8s / Zilliz Cloud(托管)
持久化MinIO / S3
适合场景大规模生产、需要混合搜索、团队有运维能力
不适合小项目(太重了)、没有运维人员

四库横向对比

维度ChromaDBpgvectorPineconeMilvus
类型嵌入式PG 扩展SaaS分布式
数据上限~10 万~百万亿级亿级
索引类型HNSWHNSW/IVF自研HNSW/IVF/PQ 等
混合搜索不支持SQL + 全文元数据过滤Dense + Sparse
部署难度中高
运维成本低(跟着 PG)
价格免费免费按用量免费/托管版付费
多租户不支持Schema 隔离NamespacePartition
生态Python 优先SQL 生态REST API多语言 SDK

统一封装:VectorStore 抽象层

和上一篇的 EmbeddingProvider 思路一样,向量库也要做一层抽象,方便切换:

from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class SearchResult:
    id: str
    content: str
    score: float
    metadata: dict


class VectorStore(ABC):
    @abstractmethod
    async def insert(self, items: list[dict]) -> None:
        """插入向量。每个 item 包含 id, content, embedding, metadata"""
        ...

    @abstractmethod
    async def search(
        self,
        query_embedding: list[float],
        top_k: int = 5,
        filter: dict | None = None,
    ) -> list[SearchResult]:
        ...

    @abstractmethod
    async def delete(self, ids: list[str]) -> None:
        ...


class ChromaVectorStore(VectorStore):
    def __init__(self, collection_name: str, persist_dir: str = "./chroma_data"):
        import chromadb
        self.client = chromadb.PersistentClient(path=persist_dir)
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"},
        )

    async def insert(self, items: list[dict]) -> None:
        self.collection.upsert(
            ids=[item["id"] for item in items],
            embeddings=[item["embedding"] for item in items],
            documents=[item["content"] for item in items],
            metadatas=[item.get("metadata", {}) for item in items],
        )

    async def search(self, query_embedding, top_k=5, filter=None):
        kwargs = {"query_embeddings": [query_embedding], "n_results": top_k}
        if filter:
            kwargs["where"] = filter

        results = self.collection.query(**kwargs)
        return [
            SearchResult(
                id=results["ids"][0][i],
                content=results["documents"][0][i],
                score=1 - results["distances"][0][i],
                metadata=results["metadatas"][0][i] if results["metadatas"] else {},
            )
            for i in range(len(results["ids"][0]))
        ]

    async def delete(self, ids: list[str]) -> None:
        self.collection.delete(ids=ids)


class PgVectorStore(VectorStore):
    def __init__(self, dsn: str, table: str = "knowledge_chunks"):
        self.dsn = dsn
        self.table = table
        self.pool = None

    async def connect(self):
        import asyncpg
        self.pool = await asyncpg.create_pool(self.dsn)

    async def insert(self, items: list[dict]) -> None:
        async with self.pool.acquire() as conn:
            await conn.executemany(
                f"""
                INSERT INTO {self.table} (id, content, source, embedding)
                VALUES ($1, $2, $3, $4::vector)
                ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding
                """,
                [
                    (item["id"], item["content"],
                     item.get("metadata", {}).get("source", ""),
                     str(item["embedding"]))
                    for item in items
                ],
            )

    async def search(self, query_embedding, top_k=5, filter=None):
        query = f"""
            SELECT id, content, source,
                   1 - (embedding <=> $1::vector) AS score
            FROM {self.table}
        """
        params = [str(query_embedding)]

        if filter and "source" in filter:
            query += " WHERE source = $2"
            params.append(filter["source"])

        query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
        params.append(top_k)

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)
            return [
                SearchResult(
                    id=str(row["id"]),
                    content=row["content"],
                    score=float(row["score"]),
                    metadata={"source": row["source"]},
                )
                for row in rows
            ]

    async def delete(self, ids: list[str]) -> None:
        async with self.pool.acquire() as conn:
            await conn.execute(
                f"DELETE FROM {self.table} WHERE id = ANY($1)", ids
            )


def create_vector_store(backend: str, **kwargs) -> VectorStore:
    stores = {
        "chroma": ChromaVectorStore,
        "pgvector": PgVectorStore,
    }
    if backend not in stores:
        raise ValueError(f"不支持: {backend},可选: {list(stores.keys())}")
    return stores[backend](**kwargs)

使用方式完全一致:

# 开发环境用 ChromaDB
store = create_vector_store("chroma", collection_name="kb")

# 生产环境切 pgvector
store = create_vector_store("pgvector", dsn="postgresql://...")
await store.connect()

# 接口完全一致
await store.insert([
    {"id": "1", "content": "退货流程...", "embedding": [...], "metadata": {"source": "售后"}},
])
results = await store.search(query_embedding=[...], top_k=5)

选型决策树

数据量多大?
├── < 10 万条 → 有 PostgreSQL 吗?
│               ├── 有 → pgvector(零成本,够用)
│               └── 没有 → ChromaDB(最简单)
├── 10 万 ~ 百万 → 有运维团队吗?
│                   ├── 有 → pgvector 或 Milvus(按需选)
│                   └── 没有 → Pinecone(免运维)
└── > 百万 → 有 K8s 集群吗?
             ├── 有 → Milvus(分布式首选)
             └── 没有 → Pinecone 或 Zilliz Cloud(托管 Milvus)

我的实际项目选型

以下是我在不同项目中的真实选择:

项目数据量选择原因
个人 RAG 助手500 条ChromaDB零配置,5 分钟跑起来
公司知识库 v13 万条pgvector已有 PG,不想加服务
多部门知识平台50 万条Milvus需要分区隔离 + 混合搜索

如果你刚入门 RAG,直接用 ChromaDB 跑起来,等数据量上来了再迁移。向量库的切换成本其实不高——前提是你用了上面的抽象层。


常见坑和注意事项

1. 索引不是万能的

HNSW 索引有召回率和速度的权衡。参数 ef_search 越大,召回越准但越慢:

# Milvus:调整搜索时的 ef
param = {"metric_type": "COSINE", "params": {"ef": 64}}   # 快但可能漏
param = {"metric_type": "COSINE", "params": {"ef": 256}}  # 慢但更准

建议:开发时 ef=128,上线后根据实际效果微调。

2. 向量维度要统一

一个集合里的向量维度必须一致。如果你切换了 Embedding 模型(维度从 1536 变成 1024),需要重建索引并重新 Embed 所有数据

3. 元数据索引

如果你经常按元数据过滤(where source = 'xxx'),记得给元数据字段建索引。pgvector 的优势就在这——SQL 索引轻车熟路。

4. 备份策略

向量库备份方式
ChromaDB复制 chroma_data/ 目录
pgvectorpg_dump(跟着 PG 走)
PineconeAPI 导出 + 重建
MilvusMilvus Backup 工具 / S3 快照

总结

  1. ChromaDB 适合快速起步——pip install 就能用,10 万条以内的项目首选。
  2. pgvector 适合已有 PG 的团队——零额外服务,SQL 生态加持,混合搜索开箱即用。
  3. Pinecone 适合不想运维——全托管 SaaS,付钱省心,但数据在海外。
  4. Milvus 适合大规模生产——亿级数据、分布式架构、混合搜索,但运维成本高。
  5. 务必做抽象层——VectorStore 接口统一封装,后期切换向量库零成本。
  6. 选型核心三要素:数据量 × 运维能力 × 预算。

下一篇进入 RAG 的检索优化环节:基础的向量检索不够准怎么办?混合搜索、Reranking、多路召回三板斧教你把召回率从 80% 提到 95%。


下一篇预告06 | RAG 检索优化:混合搜索、Reranking、多路召回


讨论话题:你的向量数据库用的哪个?数据量多大?有踩过什么坑吗?评论区聊聊。

评论