Server-Sent Events (SSE) 实现流式输出
在 ChatGPT 出现之前,大部分 Web 接口都是“请求-响应”模式:用户发一个请求,服务器处理完(可能需要几秒),然后一次性把结果扔回来。但在大模型时代,生成一段长文本可能需要 10 秒甚至更久。如果让用户盯着空白屏幕等 10 秒,体验会非常糟糕。于是,SSE (Server-Sent Events) 再次回到了聚光灯下。它允许服务器一边生成内容,一边通过长连接把数据“推”给前端,也就是我们看到的“打字机效果”。
1.什么是 SSE?
SSE(Server-Sent Events)是一种基于 HTTP 的单向通信机制。
- HTTP 标准:它不需要像 WebSocket 那样复杂的握手协议,它本质上就是一个长连接的 HTTP 请求。
- 单向通信:服务端 -> 客户端。这非常适合 AI 生成内容的场景(用户问一次,AI 持续吐字)。
2.代码展示
前端实现:
方案一:原生 EventSource ,为了支持 Header 传参(如 Token),可以使用 event-source-polyfill。
- 建立连接
// 引入 polyfill
// <script src="eventsource.min.js"></script>
const url = new URL('http://127.0.0.1:9000/chat_stream');
url.searchParams.set('text', "你好");
url.searchParams.set('session_id', "user_123");
// 建立连接
const es = new EventSourcePolyfill(url, {
headers: { Authorization: 'Bearer my-token' }
});
- 接收数据与渲染
const aiMessageDiv = document.createElement('div'); // 创建气泡
let currentText = "";
// 接收默认 message 事件或服务端未指定 event 时触发
es.onmessage = (e) => {
// 1. 判断是否结束
if (e.data === '[[END]]') {
es.close(); // 关闭连接
return;
}
// 2. 拼接数据(增量渲染)
currentText += e.data;
aiMessageDiv.innerText = currentText;
// 3. 自动滚动到底部
scrollToBottom();
};
es.onerror = () => {
console.log("连接异常");
es.close();
};
如果服务端返回的不是默认事件如event: xxx,可以用addEventListener方法监听
// 监听自定义事件(服务端返回 event: xxx 时触发)
sse.addEventListener('progress', (event) => {
const data = JSON.parse(event.data)
console.log('进度更新:', data.percent)
})
方案二:fetch + ReadableStream
EventSource只能发送 GET 请求,大模型的对话往往需要发送很长的 messages 历史记录,GET 请求的 URL 长度限制根本装不下。
而 fetch 完美解决了这些问题:
- 支持 POST 请求(可以发送复杂的 JSON 参数)。
- 可以自由设置 Headers。
- 配合
response.body(ReadableStream),可以直接在浏览器端像“剥洋葱”一样逐行读取 AI 返回的数据。
async function fetchSSE (url, text, session_id, token) {
const res = await fetch(url, {
method: "POST",
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
text: text,
session_id: session_id
})
}
)
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const parts = buffer.split(/\r?\n\r?\n/)
buffer = parts.pop()
for (const part of parts) {
if (!part.trim()) continue
const lines = part.split(/\r?\n/)
let id = ''
let event = 'message' // 默认值
let data = ''
for (const line of lines) {
if (line.startsWith('id:')) {
id = line.replace('id:', '').trim()
}
if (line.startsWith('event:')) {
event = line.replace('event:', '').trim()
}
if (line.startsWith('data:')) {
data += line.slice(5).replace(/^ /, '')
}
}
if (data === '[[END]]') return
}
}
}
后端实现:FastAPI + sse_starlette
from sse_starlette.sse import EventSourceResponse
from langchain_core.messages import HumanMessage, AIMessage
# 定义生成器函数
def stream_generator(session_id: str, text: str):
# 1. 获取上下文(实现记忆功能)
history = get_history(session_id)
current_messages = history + [HumanMessage(content=text)]
# 2. 调用大模型流式接口
for chunk in model.stream(current_messages):
content = chunk.content
if content:
# 3. 按照 SSE 格式推送数据
# 格式通常是: data: <内容>\n\n
yield {
"id": session_id,
"event": "message",
"data": content
}
# 4. 结束标记
yield {"data": "[[END]]"}
# API 接口,前端用EventSource只能是get请求
@app.post("/chat_stream")
async def chat_stream(req: Request):
return EventSourceResponse(stream_generator(session_id=req.session_id, text=req.text))
关键点解析:
yield: 它的作用是“产出”数据但不结束函数,函数会暂停在这里,直到下一次循环。这就实现了“生成一点,发送一点”。EventSourceResponse: 自动处理 HTTP Header(如Content-Type: text/event-stream),告诉浏览器这是一个流。