AI 专题 OCT 22, 2025

RAG 大文件上传:分片上传、断点续传、进度追踪

#RAG#前端#FastAPI#文件上传

RAG 大文件上传:分片上传、断点续传、进度追踪

本文是【AI 专题精讲】系列第 03 篇。 上一篇:RAG 文档切片策略:固定长度 vs 递归 vs 语义切分 | 下一篇:Embedding 模型选型:OpenAI / BGE / Jina 效果与成本对比


这篇文章你会得到什么

前两篇我们解决了”文件怎么解析”和”文本怎么切片”。但在用户实际使用知识库时,有一个更靠前的问题——文件怎么传上来

你可能觉得文件上传有什么难的?<input type="file"> + fetch 不就完了?

小文件确实没问题。但当用户上传一个 80MB 的技术手册 PDF,或者一个 120MB 的数据导出 Excel 时,问题就来了:

  • 上传到一半网络断了,重新来?用户直接关页面走人。
  • 上传 3 分钟看不到任何进度反馈,用户以为卡死了。
  • 服务端一次性接收 100MB 文件,内存直接炸。
  • 上传完了还要解析和入库,用户又等 2 分钟白屏。

今天的目标:实现一个生产级的大文件上传方案——前端分片 + 断点续传 + 实时进度追踪 + 后端异步处理,完整的前后端代码都给你。


整体架构

用户选择文件

前端分片(File.slice)

并发上传分片 → 后端接收并存储
    ↓ (每个分片上传成功后更新进度)
所有分片上传完毕

前端通知后端合并

后端合并 → 解析 → 切片 → Embedding → 入库
    ↓ (SSE 推送处理进度)
完成

核心思路:上传和处理分两个阶段,上传阶段用分片保证可靠性,处理阶段用 SSE 推送进度


前端:分片上传核心逻辑

文件分片

浏览器原生的 File.slice() 就能把文件切成小块:

function createChunks(file, chunkSize = 5 * 1024 * 1024) {
  const chunks = [];
  let start = 0;
  let index = 0;

  while (start < file.size) {
    const end = Math.min(start + chunkSize, file.size);
    chunks.push({
      index,
      blob: file.slice(start, end),
      start,
      end,
    });
    start = end;
    index++;
  }

  return chunks;
}

chunkSize 怎么选?

分片大小特点
1MB分片多,请求频繁,但断点续传粒度细
5MB平衡选择,推荐默认值
10MB分片少,但单个分片失败重传成本高

计算文件指纹

断点续传需要一个唯一标识来识别”这是同一个文件”。用 SparkMD5 计算文件的 MD5 哈希:

import SparkMD5 from 'spark-md5';

async function calculateFileHash(file) {
  return new Promise((resolve) => {
    const spark = new SparkMD5.ArrayBuffer();
    const reader = new FileReader();
    const chunkSize = 2 * 1024 * 1024;
    let offset = 0;

    reader.onload = (e) => {
      spark.append(e.target.result);
      offset += chunkSize;

      if (offset < file.size) {
        readNext();
      } else {
        resolve(spark.end());
      }
    };

    function readNext() {
      const blob = file.slice(offset, offset + chunkSize);
      reader.readAsArrayBuffer(blob);
    }

    readNext();
  });
}

大文件计算哈希也需要分片读取,不然一次性 readAsArrayBuffer 整个文件会卡住浏览器。

并发上传 + 断点续传

核心逻辑:先问服务端哪些分片已经传过了,跳过这些,只传剩下的。

async function uploadFile(file, {
  chunkSize = 5 * 1024 * 1024,
  concurrency = 3,
  onProgress,
} = {}) {
  const fileHash = await calculateFileHash(file);
  const chunks = createChunks(file, chunkSize);
  const totalChunks = chunks.length;

  // 1. 问服务端哪些分片已上传(断点续传核心)
  const { uploadedChunks } = await fetch('/api/upload/status', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      fileHash,
      fileName: file.name,
      totalChunks,
    }),
  }).then(r => r.json());

  // 2. 过滤掉已上传的分片
  const pendingChunks = chunks.filter(
    c => !uploadedChunks.includes(c.index)
  );

  let completedCount = uploadedChunks.length;

  // 3. 并发控制上传
  async function uploadChunk(chunk) {
    const formData = new FormData();
    formData.append('file', chunk.blob);
    formData.append('fileHash', fileHash);
    formData.append('chunkIndex', chunk.index);
    formData.append('totalChunks', totalChunks);

    const response = await fetch('/api/upload/chunk', {
      method: 'POST',
      body: formData,
    });

    if (!response.ok) {
      throw new Error(`分片 ${chunk.index} 上传失败`);
    }

    completedCount++;
    onProgress?.({
      phase: 'uploading',
      progress: Math.round((completedCount / totalChunks) * 100),
      completedChunks: completedCount,
      totalChunks,
    });
  }

  await runWithConcurrency(pendingChunks, uploadChunk, concurrency);

  // 4. 通知服务端合并
  const mergeResponse = await fetch('/api/upload/merge', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      fileHash,
      fileName: file.name,
      totalChunks,
    }),
  });

  return mergeResponse.json();
}

并发控制器

控制同时上传的分片数量,避免浏览器连接数爆满:

async function runWithConcurrency(items, fn, concurrency) {
  const results = [];
  let index = 0;

  async function worker() {
    while (index < items.length) {
      const currentIndex = index++;
      results[currentIndex] = await fn(items[currentIndex]);
    }
  }

  const workers = Array.from(
    { length: Math.min(concurrency, items.length) },
    () => worker()
  );

  await Promise.all(workers);
  return results;
}

失败重试

网络不稳定时,单个分片失败应该自动重试:

async function uploadChunkWithRetry(chunk, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await uploadChunk(chunk);
    } catch (error) {
      if (attempt === maxRetries) throw error;
      const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
      await new Promise(r => setTimeout(r, delay));
    }
  }
}

后端:FastAPI 接收分片

接口设计

接口方法说明
/api/upload/statusPOST查询已上传分片列表
/api/upload/chunkPOST接收单个分片
/api/upload/mergePOST合并分片 + 触发处理
/api/upload/progress/{fileHash}GETSSE 推送处理进度

查询上传状态

from fastapi import FastAPI, UploadFile, Form
from pathlib import Path
import json

app = FastAPI()

UPLOAD_DIR = Path("./uploads/chunks")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)

MERGED_DIR = Path("./uploads/merged")
MERGED_DIR.mkdir(parents=True, exist_ok=True)


@app.post("/api/upload/status")
async def upload_status(request: dict):
    file_hash = request["fileHash"]
    total_chunks = request["totalChunks"]

    chunk_dir = UPLOAD_DIR / file_hash
    uploaded = []

    if chunk_dir.exists():
        uploaded = [
            int(f.stem)
            for f in chunk_dir.iterdir()
            if f.stem.isdigit()
        ]

    return {
        "uploadedChunks": sorted(uploaded),
        "isComplete": len(uploaded) == total_chunks,
    }

接收分片

@app.post("/api/upload/chunk")
async def upload_chunk(
    file: UploadFile,
    fileHash: str = Form(...),
    chunkIndex: int = Form(...),
    totalChunks: int = Form(...),
):
    chunk_dir = UPLOAD_DIR / fileHash
    chunk_dir.mkdir(parents=True, exist_ok=True)

    chunk_path = chunk_dir / str(chunkIndex)

    content = await file.read()
    chunk_path.write_bytes(content)

    return {"status": "ok", "chunkIndex": chunkIndex}

合并分片

@app.post("/api/upload/merge")
async def merge_chunks(request: dict):
    file_hash = request["fileHash"]
    file_name = request["fileName"]
    total_chunks = request["totalChunks"]

    chunk_dir = UPLOAD_DIR / file_hash

    # 验证所有分片都已到齐
    existing = sorted(
        [int(f.stem) for f in chunk_dir.iterdir() if f.stem.isdigit()]
    )
    expected = list(range(total_chunks))

    if existing != expected:
        missing = set(expected) - set(existing)
        raise HTTPException(400, f"缺少分片: {missing}")

    # 合并
    merged_path = MERGED_DIR / f"{file_hash}_{file_name}"
    with open(merged_path, "wb") as outfile:
        for i in range(total_chunks):
            chunk_path = chunk_dir / str(i)
            outfile.write(chunk_path.read_bytes())

    # 清理分片目录
    import shutil
    shutil.rmtree(chunk_dir)

    # 触发异步处理(解析 → 切片 → 入库)
    import asyncio
    asyncio.create_task(process_file(file_hash, str(merged_path)))

    return {
        "status": "merged",
        "fileHash": file_hash,
        "filePath": str(merged_path),
    }

后端:异步处理 + SSE 进度推送

文件合并后还需要 解析 → 切片 → Embedding → 入库,这个过程可能需要几十秒。用 SSE 实时推送进度给前端。

进度管理器

import asyncio
from dataclasses import dataclass, field
from typing import Optional

progress_store: dict[str, dict] = {}

def update_progress(file_hash: str, phase: str, progress: int, detail: str = ""):
    progress_store[file_hash] = {
        "phase": phase,
        "progress": progress,
        "detail": detail,
    }


async def process_file(file_hash: str, file_path: str):
    """异步处理文件:解析 → 切片 → Embedding → 入库"""
    try:
        # 阶段 1:解析
        update_progress(file_hash, "parsing", 0, "正在解析文件...")
        parser = FileParser()
        doc = parser.parse(file_path)
        update_progress(file_hash, "parsing", 100, f"解析完成,{doc.total_chars} 字")

        await asyncio.sleep(0.1)

        # 阶段 2:切片
        update_progress(file_hash, "chunking", 0, "正在切片...")
        chunks = split_with_metadata(
            text=doc.content,
            filename=doc.filename,
        )
        update_progress(file_hash, "chunking", 100, f"切片完成,{len(chunks)} 个片段")

        await asyncio.sleep(0.1)

        # 阶段 3:Embedding
        total = len(chunks)
        batch_size = 20
        for i in range(0, total, batch_size):
            batch = chunks[i:i + batch_size]
            # embeddings = get_embeddings([c.content for c in batch])
            # ... 存入向量库 ...
            progress = min(int((i + batch_size) / total * 100), 100)
            update_progress(
                file_hash, "embedding", progress,
                f"已处理 {min(i + batch_size, total)}/{total} 个片段",
            )
            await asyncio.sleep(0.05)

        # 完成
        update_progress(file_hash, "done", 100, "知识库更新完成")

    except Exception as e:
        update_progress(file_hash, "error", 0, str(e))

SSE 进度接口

from fastapi.responses import StreamingResponse
import json
import asyncio

@app.get("/api/upload/progress/{file_hash}")
async def stream_progress(file_hash: str):
    async def event_stream():
        last_state = None
        while True:
            state = progress_store.get(file_hash)

            if state and state != last_state:
                data = json.dumps(state, ensure_ascii=False)
                yield f"data: {data}\n\n"
                last_state = state

                if state["phase"] in ("done", "error"):
                    break

            await asyncio.sleep(0.3)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

前端:完整上传组件

React 实现

import { useState, useCallback } from 'react';

function useFileUpload() {
  const [status, setStatus] = useState('idle');
  const [progress, setProgress] = useState(null);

  const upload = useCallback(async (file) => {
    setStatus('uploading');

    try {
      // 阶段 1:分片上传
      const result = await uploadFile(file, {
        onProgress: (p) => setProgress(p),
      });

      // 阶段 2:监听处理进度
      setStatus('processing');
      await watchProgress(result.fileHash, (p) => {
        setProgress({
          phase: p.phase,
          progress: p.progress,
          detail: p.detail,
        });
      });

      setStatus('done');
    } catch (error) {
      setStatus('error');
      setProgress({ phase: 'error', detail: error.message });
    }
  }, []);

  return { status, progress, upload };
}


async function watchProgress(fileHash, onProgress) {
  return new Promise((resolve, reject) => {
    const source = new EventSource(`/api/upload/progress/${fileHash}`);

    source.onmessage = (event) => {
      const data = JSON.parse(event.data);
      onProgress(data);

      if (data.phase === 'done') {
        source.close();
        resolve();
      }
      if (data.phase === 'error') {
        source.close();
        reject(new Error(data.detail));
      }
    };

    source.onerror = () => {
      source.close();
      reject(new Error('进度连接断开'));
    };
  });
}

上传界面组件

function FileUploader() {
  const { status, progress, upload } = useFileUpload();
  const [dragOver, setDragOver] = useState(false);

  const handleFile = (file) => {
    const maxSize = 200 * 1024 * 1024; // 200MB
    if (file.size > maxSize) {
      alert('文件大小不能超过 200MB');
      return;
    }
    upload(file);
  };

  const handleDrop = (e) => {
    e.preventDefault();
    setDragOver(false);
    const file = e.dataTransfer.files[0];
    if (file) handleFile(file);
  };

  return (
    <div className="upload-container">
      <div
        className={`drop-zone ${dragOver ? 'active' : ''}`}
        onDragOver={(e) => { e.preventDefault(); setDragOver(true); }}
        onDragLeave={() => setDragOver(false)}
        onDrop={handleDrop}
      >
        {status === 'idle' && (
          <>
            <p>拖拽文件到这里,或点击选择</p>
            <p className="hint">支持 PDF / Word / Excel / HTML,最大 200MB</p>
            <input
              type="file"
              accept=".pdf,.docx,.xlsx,.html,.htm"
              onChange={(e) => e.target.files[0] && handleFile(e.target.files[0])}
            />
          </>
        )}

        {(status === 'uploading' || status === 'processing') && progress && (
          <ProgressDisplay progress={progress} />
        )}

        {status === 'done' && (
          <div className="success">
            <span>✓</span> 文件已上传并处理完成
          </div>
        )}

        {status === 'error' && (
          <div className="error">
            上传失败:{progress?.detail}
            <button onClick={() => window.location.reload()}>重试</button>
          </div>
        )}
      </div>
    </div>
  );
}


function ProgressDisplay({ progress }) {
  const phaseLabels = {
    uploading: '上传中',
    parsing: '解析文件',
    chunking: '文档切片',
    embedding: '生成向量',
    done: '完成',
  };

  return (
    <div className="progress-display">
      <div className="phase">{phaseLabels[progress.phase] || progress.phase}</div>
      <div className="bar-container">
        <div
          className="bar-fill"
          style={{ width: `${progress.progress}%` }}
        />
      </div>
      <div className="detail">
        {progress.detail || `${progress.progress}%`}
      </div>
    </div>
  );
}

Vue 实现

<script setup>
import { ref } from 'vue';

const status = ref('idle');
const progress = ref(null);

async function handleUpload(event) {
  const file = event.target.files[0];
  if (!file) return;

  if (file.size > 200 * 1024 * 1024) {
    alert('文件大小不能超过 200MB');
    return;
  }

  status.value = 'uploading';

  try {
    const result = await uploadFile(file, {
      onProgress: (p) => { progress.value = p; },
    });

    status.value = 'processing';

    await watchProgress(result.fileHash, (p) => {
      progress.value = p;
    });

    status.value = 'done';
  } catch (error) {
    status.value = 'error';
    progress.value = { phase: 'error', detail: error.message };
  }
}

const phaseLabels = {
  uploading: '上传中',
  parsing: '解析文件',
  chunking: '文档切片',
  embedding: '生成向量',
  done: '完成',
};
</script>

<template>
  <div class="upload-container">
    <template v-if="status === 'idle'">
      <p>拖拽文件到这里,或点击选择</p>
      <input
        type="file"
        accept=".pdf,.docx,.xlsx,.html,.htm"
        @change="handleUpload"
      />
    </template>

    <template v-if="status === 'uploading' || status === 'processing'">
      <div class="phase">{{ phaseLabels[progress?.phase] }}</div>
      <div class="bar-container">
        <div class="bar-fill" :style="{ width: progress?.progress + '%' }" />
      </div>
      <div class="detail">{{ progress?.detail || progress?.progress + '%' }}</div>
    </template>

    <div v-if="status === 'done'" class="success">
      ✓ 文件已上传并处理完成
    </div>
  </div>
</template>

进度条样式

.drop-zone {
  border: 2px dashed #334155;
  border-radius: 12px;
  padding: 40px;
  text-align: center;
  transition: all 0.2s;
  cursor: pointer;
}

.drop-zone.active {
  border-color: #f59e0b;
  background: rgba(245, 158, 11, 0.05);
}

.bar-container {
  width: 100%;
  height: 8px;
  background: #1e293b;
  border-radius: 4px;
  overflow: hidden;
  margin: 12px 0;
}

.bar-fill {
  height: 100%;
  background: linear-gradient(90deg, #f59e0b, #d97706);
  border-radius: 4px;
  transition: width 0.3s ease;
}

.phase {
  font-size: 14px;
  font-weight: 600;
  color: #f1f5f9;
}

.detail {
  font-size: 12px;
  color: #94a3b8;
}

.success {
  color: #10b981;
  font-weight: 600;
}

生产环境注意事项

1. 分片过期清理

用户上传到一半不传了,服务端的分片碎片需要定期清理:

import time

async def cleanup_stale_chunks(max_age_hours: int = 24):
    """清理超过 24 小时的未完成分片"""
    now = time.time()
    for chunk_dir in UPLOAD_DIR.iterdir():
        if not chunk_dir.is_dir():
            continue
        age = now - chunk_dir.stat().st_mtime
        if age > max_age_hours * 3600:
            shutil.rmtree(chunk_dir)
            print(f"已清理过期分片: {chunk_dir.name}")

用 FastAPI 的 lifespanBackgroundTasks 定时执行。

2. 存储方案

本地文件系统只适合单机开发。生产环境的分片存储推荐:

方案适合场景
本地磁盘开发/单机小规模
MinIO私有化部署,S3 兼容
阿里云 OSS云端生产环境
AWS S3海外生产环境

3. 安全校验

ALLOWED_EXTENSIONS = {".pdf", ".docx", ".xlsx", ".html", ".htm"}
MAX_FILE_SIZE = 200 * 1024 * 1024  # 200MB

@app.post("/api/upload/chunk")
async def upload_chunk(file: UploadFile, ...):
    # 校验文件扩展名(从 fileName 参数取,不信 chunk 的 content_type)
    ext = Path(fileName).suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise HTTPException(400, f"不支持的文件类型: {ext}")

    # 校验单个分片大小
    content = await file.read()
    if len(content) > 10 * 1024 * 1024:  # 单个分片最大 10MB
        raise HTTPException(400, "分片大小超出限制")

4. 秒传

如果同一个文件已经被其他人传过了(相同 hash),直接返回”已存在”,跳过上传:

@app.post("/api/upload/status")
async def upload_status(request: dict):
    file_hash = request["fileHash"]

    # 检查是否已有合并后的文件
    for f in MERGED_DIR.iterdir():
        if f.name.startswith(file_hash):
            return {
                "uploadedChunks": list(range(request["totalChunks"])),
                "isComplete": True,
                "message": "秒传:文件已存在",
            }

    # ...正常逻辑

和前两篇的串联:完整知识库上传链路

async def process_file(file_hash: str, file_path: str):
    """
    上传完成后的完整处理链路:
    第 01 篇的 FileParser → 第 02 篇的切片 → Embedding → 入库
    """
    # 1. 解析(第 01 篇)
    update_progress(file_hash, "parsing", 0)
    parser = FileParser()
    doc = parser.parse(file_path)
    update_progress(file_hash, "parsing", 100)

    # 2. 切片(第 02 篇)
    update_progress(file_hash, "chunking", 0)
    chunks = split_with_metadata(
        text=doc.content,
        filename=doc.filename,
    )
    update_progress(file_hash, "chunking", 100)

    # 3. Embedding + 入库(第 04、05 篇会详细讲)
    update_progress(file_hash, "embedding", 0)
    for i in range(0, len(chunks), 20):
        batch = chunks[i:i+20]
        # embeddings = get_embeddings([c.content for c in batch])
        # vector_db.upsert(batch, embeddings)
        progress = min(int((i + 20) / len(chunks) * 100), 100)
        update_progress(file_hash, "embedding", progress)

    update_progress(file_hash, "done", 100, "知识库更新完成")

三篇串起来,就是一个从”用户选文件”到”知识库可查询”的完整链路。


总结

  1. 大文件上传必须分片——File.slice() 把文件切成 5MB 的小块,逐个上传。
  2. 断点续传靠文件哈希——用 MD5 做唯一标识,服务端记录已上传分片,中断后跳过已传的。
  3. 并发控制不能少——3 个并发就够了,太多会打满浏览器连接数。
  4. 失败自动重试——指数退避策略,单个分片失败不影响整体。
  5. 上传和处理分两阶段——上传阶段追踪分片进度,处理阶段用 SSE 推送解析/切片/入库进度。
  6. 生产环境要注意:分片过期清理、存储方案(OSS/S3)、安全校验、秒传优化。

下一篇我们进入 RAG 链路的核心环节:Embedding 模型怎么选? OpenAI、BGE、Jina 哪个中文效果好?哪个便宜?哪个能本地部署?实测对比给你看。


下一篇预告04 | Embedding 模型选型:OpenAI / BGE / Jina 效果与成本对比


讨论话题:你做过大文件上传吗?用的什么方案?有没有遇到过上传到 99% 然后断了的惨案?评论区聊聊。

评论