RAG 大文件上传:分片上传、断点续传、进度追踪
RAG 大文件上传:分片上传、断点续传、进度追踪
本文是【AI 专题精讲】系列第 03 篇。 上一篇:RAG 文档切片策略:固定长度 vs 递归 vs 语义切分 | 下一篇:Embedding 模型选型:OpenAI / BGE / Jina 效果与成本对比
这篇文章你会得到什么
前两篇我们解决了”文件怎么解析”和”文本怎么切片”。但在用户实际使用知识库时,有一个更靠前的问题——文件怎么传上来。
你可能觉得文件上传有什么难的?<input type="file"> + fetch 不就完了?
小文件确实没问题。但当用户上传一个 80MB 的技术手册 PDF,或者一个 120MB 的数据导出 Excel 时,问题就来了:
- 上传到一半网络断了,重新来?用户直接关页面走人。
- 上传 3 分钟看不到任何进度反馈,用户以为卡死了。
- 服务端一次性接收 100MB 文件,内存直接炸。
- 上传完了还要解析和入库,用户又等 2 分钟白屏。
今天的目标:实现一个生产级的大文件上传方案——前端分片 + 断点续传 + 实时进度追踪 + 后端异步处理,完整的前后端代码都给你。
整体架构
用户选择文件
↓
前端分片(File.slice)
↓
并发上传分片 → 后端接收并存储
↓ (每个分片上传成功后更新进度)
所有分片上传完毕
↓
前端通知后端合并
↓
后端合并 → 解析 → 切片 → Embedding → 入库
↓ (SSE 推送处理进度)
完成
核心思路:上传和处理分两个阶段,上传阶段用分片保证可靠性,处理阶段用 SSE 推送进度。
前端:分片上传核心逻辑
文件分片
浏览器原生的 File.slice() 就能把文件切成小块:
function createChunks(file, chunkSize = 5 * 1024 * 1024) {
const chunks = [];
let start = 0;
let index = 0;
while (start < file.size) {
const end = Math.min(start + chunkSize, file.size);
chunks.push({
index,
blob: file.slice(start, end),
start,
end,
});
start = end;
index++;
}
return chunks;
}
chunkSize 怎么选?
| 分片大小 | 特点 |
|---|---|
| 1MB | 分片多,请求频繁,但断点续传粒度细 |
| 5MB | 平衡选择,推荐默认值 |
| 10MB | 分片少,但单个分片失败重传成本高 |
计算文件指纹
断点续传需要一个唯一标识来识别”这是同一个文件”。用 SparkMD5 计算文件的 MD5 哈希:
import SparkMD5 from 'spark-md5';
async function calculateFileHash(file) {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer();
const reader = new FileReader();
const chunkSize = 2 * 1024 * 1024;
let offset = 0;
reader.onload = (e) => {
spark.append(e.target.result);
offset += chunkSize;
if (offset < file.size) {
readNext();
} else {
resolve(spark.end());
}
};
function readNext() {
const blob = file.slice(offset, offset + chunkSize);
reader.readAsArrayBuffer(blob);
}
readNext();
});
}
大文件计算哈希也需要分片读取,不然一次性
readAsArrayBuffer整个文件会卡住浏览器。
并发上传 + 断点续传
核心逻辑:先问服务端哪些分片已经传过了,跳过这些,只传剩下的。
async function uploadFile(file, {
chunkSize = 5 * 1024 * 1024,
concurrency = 3,
onProgress,
} = {}) {
const fileHash = await calculateFileHash(file);
const chunks = createChunks(file, chunkSize);
const totalChunks = chunks.length;
// 1. 问服务端哪些分片已上传(断点续传核心)
const { uploadedChunks } = await fetch('/api/upload/status', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
fileHash,
fileName: file.name,
totalChunks,
}),
}).then(r => r.json());
// 2. 过滤掉已上传的分片
const pendingChunks = chunks.filter(
c => !uploadedChunks.includes(c.index)
);
let completedCount = uploadedChunks.length;
// 3. 并发控制上传
async function uploadChunk(chunk) {
const formData = new FormData();
formData.append('file', chunk.blob);
formData.append('fileHash', fileHash);
formData.append('chunkIndex', chunk.index);
formData.append('totalChunks', totalChunks);
const response = await fetch('/api/upload/chunk', {
method: 'POST',
body: formData,
});
if (!response.ok) {
throw new Error(`分片 ${chunk.index} 上传失败`);
}
completedCount++;
onProgress?.({
phase: 'uploading',
progress: Math.round((completedCount / totalChunks) * 100),
completedChunks: completedCount,
totalChunks,
});
}
await runWithConcurrency(pendingChunks, uploadChunk, concurrency);
// 4. 通知服务端合并
const mergeResponse = await fetch('/api/upload/merge', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
fileHash,
fileName: file.name,
totalChunks,
}),
});
return mergeResponse.json();
}
并发控制器
控制同时上传的分片数量,避免浏览器连接数爆满:
async function runWithConcurrency(items, fn, concurrency) {
const results = [];
let index = 0;
async function worker() {
while (index < items.length) {
const currentIndex = index++;
results[currentIndex] = await fn(items[currentIndex]);
}
}
const workers = Array.from(
{ length: Math.min(concurrency, items.length) },
() => worker()
);
await Promise.all(workers);
return results;
}
失败重试
网络不稳定时,单个分片失败应该自动重试:
async function uploadChunkWithRetry(chunk, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await uploadChunk(chunk);
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
await new Promise(r => setTimeout(r, delay));
}
}
}
后端:FastAPI 接收分片
接口设计
| 接口 | 方法 | 说明 |
|---|---|---|
/api/upload/status | POST | 查询已上传分片列表 |
/api/upload/chunk | POST | 接收单个分片 |
/api/upload/merge | POST | 合并分片 + 触发处理 |
/api/upload/progress/{fileHash} | GET | SSE 推送处理进度 |
查询上传状态
from fastapi import FastAPI, UploadFile, Form
from pathlib import Path
import json
app = FastAPI()
UPLOAD_DIR = Path("./uploads/chunks")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
MERGED_DIR = Path("./uploads/merged")
MERGED_DIR.mkdir(parents=True, exist_ok=True)
@app.post("/api/upload/status")
async def upload_status(request: dict):
file_hash = request["fileHash"]
total_chunks = request["totalChunks"]
chunk_dir = UPLOAD_DIR / file_hash
uploaded = []
if chunk_dir.exists():
uploaded = [
int(f.stem)
for f in chunk_dir.iterdir()
if f.stem.isdigit()
]
return {
"uploadedChunks": sorted(uploaded),
"isComplete": len(uploaded) == total_chunks,
}
接收分片
@app.post("/api/upload/chunk")
async def upload_chunk(
file: UploadFile,
fileHash: str = Form(...),
chunkIndex: int = Form(...),
totalChunks: int = Form(...),
):
chunk_dir = UPLOAD_DIR / fileHash
chunk_dir.mkdir(parents=True, exist_ok=True)
chunk_path = chunk_dir / str(chunkIndex)
content = await file.read()
chunk_path.write_bytes(content)
return {"status": "ok", "chunkIndex": chunkIndex}
合并分片
@app.post("/api/upload/merge")
async def merge_chunks(request: dict):
file_hash = request["fileHash"]
file_name = request["fileName"]
total_chunks = request["totalChunks"]
chunk_dir = UPLOAD_DIR / file_hash
# 验证所有分片都已到齐
existing = sorted(
[int(f.stem) for f in chunk_dir.iterdir() if f.stem.isdigit()]
)
expected = list(range(total_chunks))
if existing != expected:
missing = set(expected) - set(existing)
raise HTTPException(400, f"缺少分片: {missing}")
# 合并
merged_path = MERGED_DIR / f"{file_hash}_{file_name}"
with open(merged_path, "wb") as outfile:
for i in range(total_chunks):
chunk_path = chunk_dir / str(i)
outfile.write(chunk_path.read_bytes())
# 清理分片目录
import shutil
shutil.rmtree(chunk_dir)
# 触发异步处理(解析 → 切片 → 入库)
import asyncio
asyncio.create_task(process_file(file_hash, str(merged_path)))
return {
"status": "merged",
"fileHash": file_hash,
"filePath": str(merged_path),
}
后端:异步处理 + SSE 进度推送
文件合并后还需要 解析 → 切片 → Embedding → 入库,这个过程可能需要几十秒。用 SSE 实时推送进度给前端。
进度管理器
import asyncio
from dataclasses import dataclass, field
from typing import Optional
progress_store: dict[str, dict] = {}
def update_progress(file_hash: str, phase: str, progress: int, detail: str = ""):
progress_store[file_hash] = {
"phase": phase,
"progress": progress,
"detail": detail,
}
async def process_file(file_hash: str, file_path: str):
"""异步处理文件:解析 → 切片 → Embedding → 入库"""
try:
# 阶段 1:解析
update_progress(file_hash, "parsing", 0, "正在解析文件...")
parser = FileParser()
doc = parser.parse(file_path)
update_progress(file_hash, "parsing", 100, f"解析完成,{doc.total_chars} 字")
await asyncio.sleep(0.1)
# 阶段 2:切片
update_progress(file_hash, "chunking", 0, "正在切片...")
chunks = split_with_metadata(
text=doc.content,
filename=doc.filename,
)
update_progress(file_hash, "chunking", 100, f"切片完成,{len(chunks)} 个片段")
await asyncio.sleep(0.1)
# 阶段 3:Embedding
total = len(chunks)
batch_size = 20
for i in range(0, total, batch_size):
batch = chunks[i:i + batch_size]
# embeddings = get_embeddings([c.content for c in batch])
# ... 存入向量库 ...
progress = min(int((i + batch_size) / total * 100), 100)
update_progress(
file_hash, "embedding", progress,
f"已处理 {min(i + batch_size, total)}/{total} 个片段",
)
await asyncio.sleep(0.05)
# 完成
update_progress(file_hash, "done", 100, "知识库更新完成")
except Exception as e:
update_progress(file_hash, "error", 0, str(e))
SSE 进度接口
from fastapi.responses import StreamingResponse
import json
import asyncio
@app.get("/api/upload/progress/{file_hash}")
async def stream_progress(file_hash: str):
async def event_stream():
last_state = None
while True:
state = progress_store.get(file_hash)
if state and state != last_state:
data = json.dumps(state, ensure_ascii=False)
yield f"data: {data}\n\n"
last_state = state
if state["phase"] in ("done", "error"):
break
await asyncio.sleep(0.3)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
前端:完整上传组件
React 实现
import { useState, useCallback } from 'react';
function useFileUpload() {
const [status, setStatus] = useState('idle');
const [progress, setProgress] = useState(null);
const upload = useCallback(async (file) => {
setStatus('uploading');
try {
// 阶段 1:分片上传
const result = await uploadFile(file, {
onProgress: (p) => setProgress(p),
});
// 阶段 2:监听处理进度
setStatus('processing');
await watchProgress(result.fileHash, (p) => {
setProgress({
phase: p.phase,
progress: p.progress,
detail: p.detail,
});
});
setStatus('done');
} catch (error) {
setStatus('error');
setProgress({ phase: 'error', detail: error.message });
}
}, []);
return { status, progress, upload };
}
async function watchProgress(fileHash, onProgress) {
return new Promise((resolve, reject) => {
const source = new EventSource(`/api/upload/progress/${fileHash}`);
source.onmessage = (event) => {
const data = JSON.parse(event.data);
onProgress(data);
if (data.phase === 'done') {
source.close();
resolve();
}
if (data.phase === 'error') {
source.close();
reject(new Error(data.detail));
}
};
source.onerror = () => {
source.close();
reject(new Error('进度连接断开'));
};
});
}
上传界面组件
function FileUploader() {
const { status, progress, upload } = useFileUpload();
const [dragOver, setDragOver] = useState(false);
const handleFile = (file) => {
const maxSize = 200 * 1024 * 1024; // 200MB
if (file.size > maxSize) {
alert('文件大小不能超过 200MB');
return;
}
upload(file);
};
const handleDrop = (e) => {
e.preventDefault();
setDragOver(false);
const file = e.dataTransfer.files[0];
if (file) handleFile(file);
};
return (
<div className="upload-container">
<div
className={`drop-zone ${dragOver ? 'active' : ''}`}
onDragOver={(e) => { e.preventDefault(); setDragOver(true); }}
onDragLeave={() => setDragOver(false)}
onDrop={handleDrop}
>
{status === 'idle' && (
<>
<p>拖拽文件到这里,或点击选择</p>
<p className="hint">支持 PDF / Word / Excel / HTML,最大 200MB</p>
<input
type="file"
accept=".pdf,.docx,.xlsx,.html,.htm"
onChange={(e) => e.target.files[0] && handleFile(e.target.files[0])}
/>
</>
)}
{(status === 'uploading' || status === 'processing') && progress && (
<ProgressDisplay progress={progress} />
)}
{status === 'done' && (
<div className="success">
<span>✓</span> 文件已上传并处理完成
</div>
)}
{status === 'error' && (
<div className="error">
上传失败:{progress?.detail}
<button onClick={() => window.location.reload()}>重试</button>
</div>
)}
</div>
</div>
);
}
function ProgressDisplay({ progress }) {
const phaseLabels = {
uploading: '上传中',
parsing: '解析文件',
chunking: '文档切片',
embedding: '生成向量',
done: '完成',
};
return (
<div className="progress-display">
<div className="phase">{phaseLabels[progress.phase] || progress.phase}</div>
<div className="bar-container">
<div
className="bar-fill"
style={{ width: `${progress.progress}%` }}
/>
</div>
<div className="detail">
{progress.detail || `${progress.progress}%`}
</div>
</div>
);
}
Vue 实现
<script setup>
import { ref } from 'vue';
const status = ref('idle');
const progress = ref(null);
async function handleUpload(event) {
const file = event.target.files[0];
if (!file) return;
if (file.size > 200 * 1024 * 1024) {
alert('文件大小不能超过 200MB');
return;
}
status.value = 'uploading';
try {
const result = await uploadFile(file, {
onProgress: (p) => { progress.value = p; },
});
status.value = 'processing';
await watchProgress(result.fileHash, (p) => {
progress.value = p;
});
status.value = 'done';
} catch (error) {
status.value = 'error';
progress.value = { phase: 'error', detail: error.message };
}
}
const phaseLabels = {
uploading: '上传中',
parsing: '解析文件',
chunking: '文档切片',
embedding: '生成向量',
done: '完成',
};
</script>
<template>
<div class="upload-container">
<template v-if="status === 'idle'">
<p>拖拽文件到这里,或点击选择</p>
<input
type="file"
accept=".pdf,.docx,.xlsx,.html,.htm"
@change="handleUpload"
/>
</template>
<template v-if="status === 'uploading' || status === 'processing'">
<div class="phase">{{ phaseLabels[progress?.phase] }}</div>
<div class="bar-container">
<div class="bar-fill" :style="{ width: progress?.progress + '%' }" />
</div>
<div class="detail">{{ progress?.detail || progress?.progress + '%' }}</div>
</template>
<div v-if="status === 'done'" class="success">
✓ 文件已上传并处理完成
</div>
</div>
</template>
进度条样式
.drop-zone {
border: 2px dashed #334155;
border-radius: 12px;
padding: 40px;
text-align: center;
transition: all 0.2s;
cursor: pointer;
}
.drop-zone.active {
border-color: #f59e0b;
background: rgba(245, 158, 11, 0.05);
}
.bar-container {
width: 100%;
height: 8px;
background: #1e293b;
border-radius: 4px;
overflow: hidden;
margin: 12px 0;
}
.bar-fill {
height: 100%;
background: linear-gradient(90deg, #f59e0b, #d97706);
border-radius: 4px;
transition: width 0.3s ease;
}
.phase {
font-size: 14px;
font-weight: 600;
color: #f1f5f9;
}
.detail {
font-size: 12px;
color: #94a3b8;
}
.success {
color: #10b981;
font-weight: 600;
}
生产环境注意事项
1. 分片过期清理
用户上传到一半不传了,服务端的分片碎片需要定期清理:
import time
async def cleanup_stale_chunks(max_age_hours: int = 24):
"""清理超过 24 小时的未完成分片"""
now = time.time()
for chunk_dir in UPLOAD_DIR.iterdir():
if not chunk_dir.is_dir():
continue
age = now - chunk_dir.stat().st_mtime
if age > max_age_hours * 3600:
shutil.rmtree(chunk_dir)
print(f"已清理过期分片: {chunk_dir.name}")
用 FastAPI 的 lifespan 或 BackgroundTasks 定时执行。
2. 存储方案
本地文件系统只适合单机开发。生产环境的分片存储推荐:
| 方案 | 适合场景 |
|---|---|
| 本地磁盘 | 开发/单机小规模 |
| MinIO | 私有化部署,S3 兼容 |
| 阿里云 OSS | 云端生产环境 |
| AWS S3 | 海外生产环境 |
3. 安全校验
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".xlsx", ".html", ".htm"}
MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB
@app.post("/api/upload/chunk")
async def upload_chunk(file: UploadFile, ...):
# 校验文件扩展名(从 fileName 参数取,不信 chunk 的 content_type)
ext = Path(fileName).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"不支持的文件类型: {ext}")
# 校验单个分片大小
content = await file.read()
if len(content) > 10 * 1024 * 1024: # 单个分片最大 10MB
raise HTTPException(400, "分片大小超出限制")
4. 秒传
如果同一个文件已经被其他人传过了(相同 hash),直接返回”已存在”,跳过上传:
@app.post("/api/upload/status")
async def upload_status(request: dict):
file_hash = request["fileHash"]
# 检查是否已有合并后的文件
for f in MERGED_DIR.iterdir():
if f.name.startswith(file_hash):
return {
"uploadedChunks": list(range(request["totalChunks"])),
"isComplete": True,
"message": "秒传:文件已存在",
}
# ...正常逻辑
和前两篇的串联:完整知识库上传链路
async def process_file(file_hash: str, file_path: str):
"""
上传完成后的完整处理链路:
第 01 篇的 FileParser → 第 02 篇的切片 → Embedding → 入库
"""
# 1. 解析(第 01 篇)
update_progress(file_hash, "parsing", 0)
parser = FileParser()
doc = parser.parse(file_path)
update_progress(file_hash, "parsing", 100)
# 2. 切片(第 02 篇)
update_progress(file_hash, "chunking", 0)
chunks = split_with_metadata(
text=doc.content,
filename=doc.filename,
)
update_progress(file_hash, "chunking", 100)
# 3. Embedding + 入库(第 04、05 篇会详细讲)
update_progress(file_hash, "embedding", 0)
for i in range(0, len(chunks), 20):
batch = chunks[i:i+20]
# embeddings = get_embeddings([c.content for c in batch])
# vector_db.upsert(batch, embeddings)
progress = min(int((i + 20) / len(chunks) * 100), 100)
update_progress(file_hash, "embedding", progress)
update_progress(file_hash, "done", 100, "知识库更新完成")
三篇串起来,就是一个从”用户选文件”到”知识库可查询”的完整链路。
总结
- 大文件上传必须分片——
File.slice()把文件切成 5MB 的小块,逐个上传。 - 断点续传靠文件哈希——用 MD5 做唯一标识,服务端记录已上传分片,中断后跳过已传的。
- 并发控制不能少——3 个并发就够了,太多会打满浏览器连接数。
- 失败自动重试——指数退避策略,单个分片失败不影响整体。
- 上传和处理分两阶段——上传阶段追踪分片进度,处理阶段用 SSE 推送解析/切片/入库进度。
- 生产环境要注意:分片过期清理、存储方案(OSS/S3)、安全校验、秒传优化。
下一篇我们进入 RAG 链路的核心环节:Embedding 模型怎么选? OpenAI、BGE、Jina 哪个中文效果好?哪个便宜?哪个能本地部署?实测对比给你看。
讨论话题:你做过大文件上传吗?用的什么方案?有没有遇到过上传到 99% 然后断了的惨案?评论区聊聊。