AI 应用的状态管理:比 Redux 复杂 10 倍的挑战
本文是【高级前端的 AI 架构升级之路】系列第 04 篇。 上一篇:AI 网关层设计:多模型路由、降级、限流、成本控制 | 下一篇:AI Streaming 架构:从浏览器到服务端的全链路流式设计
为什么 AI 状态管理特别难
你做过前端状态管理——Redux、Pinia、Zustand、Jotai、TanStack Query……工具链已经很成熟了。但到了 AI 应用场景,你会发现这些方案应对不了新的挑战。
传统前端状态管理假设:
- 请求是短暂的:发出去几百毫秒就回来了
- 响应是完整的:要么成功拿到数据,要么失败报错
- 状态是确定的:相同操作产生相同结果
- 更新是离散的:每次 setState 是一个明确的时间点
AI 场景全部打破:
- 请求是长时间的:一次流式请求可能持续 10-30 秒
- 响应是增量的:一个字一个字到达,状态在持续变化
- 状态是不确定的:AI 可能回答到一半改变方向
- 更新是连续的:每秒可能触发几十次状态更新
这篇文章帮你从架构层面设计一套适合 AI 应用的状态管理方案。
AI 应用的状态全景
一个典型的 AI 聊天应用,需要管理的状态远比你想象的多:
interface AIAppState {
// 对话层
sessions: ChatSession[];
activeSessionId: string;
// 消息层
messages: Message[];
streamingMessage: StreamingMessage | null; // 正在生成的消息
// 请求层
isStreaming: boolean;
abortController: AbortController | null;
streamProgress: {
tokensGenerated: number;
estimatedTotal: number;
elapsedMs: number;
};
// 上下文层
contextWindow: Message[]; // 实际发给 AI 的消息(经过裁剪)
contextTokenCount: number; // 当前上下文 Token 数
// UI 层
inputDraft: string;
isComposing: boolean; // 输入法组合中
scrollPosition: number;
showThinking: boolean; // 是否展示 AI 思考过程
// 错误层
lastError: AIError | null;
retryCount: number;
fallbackProvider: string | null;
}
这还只是单聊天窗口。如果涉及多 Agent、RAG、Tool Calling,状态会再翻几倍。
挑战一:流式消息的状态更新
流式输出意味着一条 AI 消息的 content 在 5-30 秒内持续增长。如果你用最朴素的方式处理:
// 每收到一个 token 就更新整个 messages 数组——性能灾难
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1].content += newToken;
return updated;
});
问题:每秒触发 20-50 次状态更新,每次都创建新数组、触发 diff、重渲染整个消息列表。
解决方案:分离流式状态
把”正在流式生成的消息”从消息列表中分离出来,用独立状态管理:
// 方案:流式消息独立管理
interface ChatStore {
messages: Message[]; // 已完成的消息
streamingContent: string; // 正在流式生成的文本
streamingRole: 'assistant' | null;
}
// 流式进行中:只更新 streamingContent(轻量)
function onStreamToken(token: string) {
store.streamingContent += token;
// 不触发 messages 数组的更新
}
// 流式结束:一次性写入 messages
function onStreamEnd() {
store.messages.push({
role: store.streamingRole,
content: store.streamingContent,
});
store.streamingContent = '';
store.streamingRole = null;
}
渲染时把两者合并:
function MessageList() {
const messages = useStore(s => s.messages);
const streamingContent = useStore(s => s.streamingContent);
return (
<>
{messages.map(msg => <MessageBubble key={msg.id} message={msg} />)}
{streamingContent && (
<StreamingBubble content={streamingContent} />
)}
</>
);
}
StreamingBubble 组件可以用 ref + requestAnimationFrame 来更新 DOM,完全绕过 React 的状态管理和 diff 机制,实现极致性能:
function StreamingBubble({ content }: { content: string }) {
const ref = useRef<HTMLDivElement>(null);
const prevLength = useRef(0);
useEffect(() => {
if (!ref.current) return;
// 只追加新内容,不重新渲染整个文本
const newContent = content.slice(prevLength.current);
if (newContent) {
ref.current.textContent += newContent;
prevLength.current = content.length;
}
}, [content]);
return <div ref={ref} className="message assistant" />;
}
挑战二:对话上下文管理
AI 没有记忆,每次请求都需要把对话历史带上。但历史太长会超出模型的上下文窗口,也会导致成本暴涨。
上下文窗口策略
interface ContextStrategy {
maxTokens: number; // 上下文窗口上限
reserveForResponse: number; // 给回复预留的 token
method: 'sliding-window' | 'summary' | 'hybrid';
}
const STRATEGIES: Record<string, ContextStrategy> = {
'deepseek-chat': {
maxTokens: 64000,
reserveForResponse: 4000,
method: 'sliding-window',
},
'gpt-4o': {
maxTokens: 128000,
reserveForResponse: 4096,
method: 'hybrid',
},
'gpt-4o-mini': {
maxTokens: 128000,
reserveForResponse: 16384,
method: 'sliding-window',
},
};
滑动窗口实现
最简单实用的策略——保留最近的消息,超出时丢弃最旧的:
function buildContextWindow(
messages: Message[],
systemPrompt: string,
strategy: ContextStrategy,
): Message[] {
const maxAvailable = strategy.maxTokens - strategy.reserveForResponse;
let tokenCount = estimateTokens(systemPrompt);
const context: Message[] = [];
// 从最新的消息往前遍历
for (let i = messages.length - 1; i >= 0; i--) {
const msgTokens = estimateTokens(messages[i].content);
if (tokenCount + msgTokens > maxAvailable) break;
tokenCount += msgTokens;
context.unshift(messages[i]);
}
return [{ role: 'system', content: systemPrompt }, ...context];
}
function estimateTokens(text: string): number {
// 粗略估算:中文 1 字 ≈ 2 tokens,英文 1 词 ≈ 1.3 tokens
const chineseChars = (text.match(/[\u4e00-\u9fff]/g) || []).length;
const otherChars = text.length - chineseChars;
return Math.ceil(chineseChars * 2 + otherChars * 0.4);
}
摘要压缩策略
当对话特别长时,可以用 AI 本身来压缩历史:
async function compressHistory(
messages: Message[],
keepRecent: number = 4,
): Promise<Message[]> {
if (messages.length <= keepRecent + 2) return messages;
const toCompress = messages.slice(0, -keepRecent);
const recent = messages.slice(-keepRecent);
const summary = await callAI({
provider: 'deepseek', // 用便宜模型来做摘要
messages: [
{
role: 'system',
content: '将以下对话历史压缩为一段简洁的摘要,保留关键信息和结论。用第三人称描述。不超过 300 字。',
},
{
role: 'user',
content: toCompress.map(m => `${m.role}: ${m.content}`).join('\n\n'),
},
],
});
return [
{ role: 'system', content: `[对话历史摘要] ${summary}` },
...recent,
];
}
状态中的上下文管理
interface ContextState {
fullHistory: Message[]; // 完整历史(持久化存储)
contextWindow: Message[]; // 实际发给 AI 的(裁剪后的)
tokenCount: number;
compressed: boolean; // 是否已经过压缩
compressionRatio: number; // 压缩比
}
前端需要展示上下文状态——让用户知道 AI “能看到”多少历史:
function ContextIndicator({ state }: { state: ContextState }) {
const percentage = (state.tokenCount / maxTokens) * 100;
return (
<div className="context-bar">
<div className="progress" style={{ width: `${percentage}%` }} />
<span>
上下文 {state.contextWindow.length}/{state.fullHistory.length} 条消息
{state.compressed && ' (已压缩)'}
</span>
</div>
);
}
挑战三:分支对话
用户在多轮对话中,可能想”回到第 3 轮重新问”。这就产生了对话分支——和 Git 的分支概念很像。
数据结构
interface MessageNode {
id: string;
parentId: string | null;
role: 'user' | 'assistant' | 'system';
content: string;
children: string[]; // 子节点 ID 列表
activeChild: string | null; // 当前选中的子节点
timestamp: number;
}
interface BranchState {
nodes: Map<string, MessageNode>;
rootId: string;
activeLeafId: string; // 当前对话路径的最末节点
}
获取当前活跃的对话路径:
function getActivePath(state: BranchState): MessageNode[] {
const path: MessageNode[] = [];
let current = state.nodes.get(state.rootId);
while (current) {
path.push(current);
if (!current.activeChild) break;
current = state.nodes.get(current.activeChild);
}
return path;
}
从某个节点创建分支:
function createBranch(
state: BranchState,
parentId: string,
newMessage: { role: string; content: string },
): BranchState {
const newNode: MessageNode = {
id: generateId(),
parentId,
role: newMessage.role,
content: newMessage.content,
children: [],
activeChild: null,
timestamp: Date.now(),
};
const parent = state.nodes.get(parentId);
parent.children.push(newNode.id);
parent.activeChild = newNode.id;
state.nodes.set(newNode.id, newNode);
state.activeLeafId = newNode.id;
return state;
}
前端需要提供”切换分支”的 UI——类似 ChatGPT 的 < 1/3 > 箭头切换:
function BranchSwitcher({ node }: { node: MessageNode }) {
const parent = useNode(node.parentId);
if (!parent || parent.children.length <= 1) return null;
const currentIndex = parent.children.indexOf(node.id);
const total = parent.children.length;
return (
<div className="branch-switcher">
<button onClick={() => switchBranch(parent.id, currentIndex - 1)}
disabled={currentIndex === 0}>‹</button>
<span>{currentIndex + 1}/{total}</span>
<button onClick={() => switchBranch(parent.id, currentIndex + 1)}
disabled={currentIndex === total - 1}>›</button>
</div>
);
}
挑战四:多 Agent 协作状态
当你的系统有多个 AI Agent 同时工作(比如一个负责搜索、一个负责分析、一个负责总结),状态管理又上了一个台阶。
interface AgentState {
id: string;
name: string;
status: 'idle' | 'thinking' | 'executing' | 'done' | 'error';
currentTask: string;
progress: number; // 0-100
output: string;
dependencies: string[]; // 依赖的其他 Agent ID
startTime: number;
elapsedMs: number;
}
interface MultiAgentState {
agents: Map<string, AgentState>;
orchestrationMode: 'sequential' | 'parallel' | 'router';
overallProgress: number;
finalOutput: string | null;
}
并行 Agent 的状态同步
function updateAgentProgress(
state: MultiAgentState,
agentId: string,
update: Partial<AgentState>,
): MultiAgentState {
const agent = state.agents.get(agentId);
if (!agent) return state;
Object.assign(agent, update, { elapsedMs: Date.now() - agent.startTime });
// 重新计算整体进度
const agents = Array.from(state.agents.values());
state.overallProgress = agents.reduce((sum, a) => sum + a.progress, 0) / agents.length;
// 检查是否所有 Agent 都完成了
const allDone = agents.every(a => a.status === 'done' || a.status === 'error');
if (allDone) {
state.finalOutput = aggregateOutputs(agents);
}
return { ...state };
}
前端需要可视化展示多 Agent 的工作状态:
function AgentPanel({ agents }: { agents: AgentState[] }) {
return (
<div className="agent-panel">
{agents.map(agent => (
<div key={agent.id} className={`agent-card ${agent.status}`}>
<div className="agent-name">{agent.name}</div>
<div className="agent-task">{agent.currentTask}</div>
<div className="progress-bar">
<div style={{ width: `${agent.progress}%` }} />
</div>
<div className="agent-status">
{agent.status === 'thinking' && '🤔 思考中...'}
{agent.status === 'executing' && `⚡ 执行中 ${agent.progress}%`}
{agent.status === 'done' && '✅ 完成'}
{agent.status === 'error' && '❌ 出错'}
</div>
</div>
))}
</div>
);
}
挑战五:乐观更新 + AI 回滚
某些场景下,用户操作先展示结果,AI 在后台校验。如果 AI 发现问题,需要回滚。
典型场景:AI 辅助表单填写。用户选了一个选项 → 前端立即更新 → AI 后台检查合理性 → 不合理则回滚并提示。
interface OptimisticState<T> {
confirmed: T; // AI 确认后的值
optimistic: T; // 乐观更新的值(展示给用户的)
pending: boolean; // 是否在等 AI 确认
rollbackReason: string | null;
}
function applyOptimistic<T>(
state: OptimisticState<T>,
newValue: T,
): OptimisticState<T> {
return {
confirmed: state.confirmed,
optimistic: newValue,
pending: true,
rollbackReason: null,
};
}
function confirmOptimistic<T>(state: OptimisticState<T>): OptimisticState<T> {
return {
confirmed: state.optimistic,
optimistic: state.optimistic,
pending: false,
rollbackReason: null,
};
}
function rollbackOptimistic<T>(
state: OptimisticState<T>,
reason: string,
): OptimisticState<T> {
return {
confirmed: state.confirmed,
optimistic: state.confirmed, // 回滚到上次确认的值
pending: false,
rollbackReason: reason,
};
}
与 TanStack Query 的结合
TanStack Query(React Query)本身不是为 AI 设计的,但它的一些理念可以适配:
import { useQuery, useMutation } from '@tanstack/react-query';
function useChatMutation() {
return useMutation({
mutationFn: async (messages: Message[]) => {
// 流式调用不适合直接用 mutation(因为它期望一次性返回结果)
// 但非流式场景可以用
return await callAI({ messages });
},
onMutate: (messages) => {
// 乐观更新:立刻在列表里加一条空的 assistant 消息
},
retry: 2,
retryDelay: (attempt) => Math.min(1000 * 2 ** attempt, 10000),
});
}
// 会话列表适合用 Query
function useSessions() {
return useQuery({
queryKey: ['chat-sessions'],
queryFn: () => fetchSessions(),
staleTime: 30_000,
});
}
核心认知:TanStack Query 适合管理”请求-响应”型的状态(会话列表、历史记录加载),不适合管理流式状态(逐字生成的消息内容)。两者结合使用才是正解。
推荐的状态架构
综合以上挑战,推荐的分层架构:
┌─────────────────────────────────────┐
│ UI 组件层 │
│ (React / Vue 组件,响应式绑定) │
├─────────────────────────────────────┤
│ 状态管理层 │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ 全局状态 │ │ 流式状态 │ │
│ │ Zustand / │ │ Ref + RAF │ │
│ │ Pinia │ │ (绕过框架) │ │
│ └──────────┘ └──────────────────┘ │
├─────────────────────────────────────┤
│ 数据层 │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ TanStack │ │ IndexedDB │ │
│ │ Query │ │ (本地持久化) │ │
│ └──────────┘ └──────────────────┘ │
├─────────────────────────────────────┤
│ 服务层 │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ AI Client │ │ Stream Parser │ │
│ │ (HTTP) │ │ (SSE 解析) │ │
│ └──────────┘ └──────────────────┘ │
└─────────────────────────────────────┘
关键设计决策:
- 流式状态脱离框架管理——用 Ref + requestAnimationFrame 直接操作 DOM,避免高频 re-render
- 全局状态用轻量库——Zustand(React)/ Pinia(Vue),不需要 Redux 的重量
- 请求状态用 TanStack Query——管理会话列表、历史加载等标准请求
- 长对话本地持久化——IndexedDB 存完整历史,避免每次刷页面重新加载
总结
- AI 状态管理的四大挑战:流式更新、上下文窗口、分支对话、多 Agent 协作。
- 流式消息独立管理——分离
streamingContent和messages,流式阶段用 Ref + RAF 绕过框架渲染。 - 上下文窗口管理——滑动窗口 + 摘要压缩,前端展示上下文使用状态。
- 分支对话用树结构——类似 Git 的节点 + 分支模型,支持”回到某一轮重新对话”。
- 多 Agent 状态需要独立追踪——每个 Agent 独立进度、状态,前端可视化展示。
- TanStack Query 管请求状态,Zustand/Pinia 管全局状态,Ref 管流式状态——分层配合。
下一篇,我们从前端状态继续深入到全链路——AI Streaming 架构设计。
讨论话题:你在做 AI 应用时,状态管理最头疼的问题是什么?用的什么方案?评论区聊聊。