Server-Sent Events (SSE) 实现流式输出

在 ChatGPT 出现之前,大部分 Web 接口都是“请求-响应”模式:用户发一个请求,服务器处理完(可能需要几秒),然后一次性把结果扔回来。但在大模型时代,生成一段长文本可能需要 10 秒甚至更久。如果让用户盯着空白屏幕等 10 秒,体验会非常糟糕。于是,SSE (Server-Sent Events) 再次回到了聚光灯下。它允许服务器一边生成内容,一边通过长连接把数据“推”给前端,也就是我们看到的“打字机效果”。

1.什么是 SSE?

SSE(Server-Sent Events)是一种基于 HTTP 的单向通信机制。

  • HTTP 标准:它不需要像 WebSocket 那样复杂的握手协议,它本质上就是一个长连接的 HTTP 请求。
  • 单向通信:服务端 -> 客户端。这非常适合 AI 生成内容的场景(用户问一次,AI 持续吐字)。

2.代码展示

前端实现:
方案一:原生 EventSource ,为了支持 Header 传参(如 Token),可以使用 event-source-polyfill
  1. 建立连接
// 引入 polyfill
// <script src="eventsource.min.js"></script>

const url = new URL('http://127.0.0.1:9000/chat_stream');
url.searchParams.set('text', "你好");
url.searchParams.set('session_id', "user_123");

// 建立连接
const es = new EventSourcePolyfill(url, {
    headers: { Authorization: 'Bearer my-token' }
});
  1. 接收数据与渲染
const aiMessageDiv = document.createElement('div'); // 创建气泡
let currentText = "";
// 接收默认 message 事件或服务端未指定 event 时触发
es.onmessage = (e) => {
    // 1. 判断是否结束
    if (e.data === '[[END]]') {
        es.close(); // 关闭连接
        return;
    }
    // 2. 拼接数据(增量渲染)
    currentText += e.data;
    aiMessageDiv.innerText = currentText; 
    // 3. 自动滚动到底部
    scrollToBottom();
};
es.onerror = () => {
    console.log("连接异常");
    es.close();
};

如果服务端返回的不是默认事件如event: xxx,可以用addEventListener方法监听

// 监听自定义事件(服务端返回 event: xxx 时触发)
sse.addEventListener('progress', (event) => {
  const data = JSON.parse(event.data)
  console.log('进度更新:', data.percent)
})
方案二:fetch + ReadableStream

EventSource只能发送 GET 请求,大模型的对话往往需要发送很长的 messages 历史记录,GET 请求的 URL 长度限制根本装不下。

fetch 完美解决了这些问题:

  • 支持 POST 请求(可以发送复杂的 JSON 参数)。
  • 可以自由设置 Headers
  • 配合 response.body (ReadableStream),可以直接在浏览器端像“剥洋葱”一样逐行读取 AI 返回的数据。
async function fetchSSE (url, text, session_id, token) {
      const res = await fetch(url, {
        method: "POST",
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${token}`
        },
        body: JSON.stringify({
          text: text,
          session_id: session_id
        })
      }
      )
      const reader = res.body.getReader()
      const decoder = new TextDecoder()
      let buffer = ''

      while (true) {
        const { done, value } = await reader.read()
        if (done) break
        buffer += decoder.decode(value, { stream: true })
        const parts = buffer.split(/\r?\n\r?\n/)
        buffer = parts.pop()
        for (const part of parts) {
          if (!part.trim()) continue

          const lines = part.split(/\r?\n/)

          let id = ''
          let event = 'message' // 默认值
          let data = ''
          for (const line of lines) {
            if (line.startsWith('id:')) {
              id = line.replace('id:', '').trim()
            }
            if (line.startsWith('event:')) {
              event = line.replace('event:', '').trim()
            }
            if (line.startsWith('data:')) {
              data += line.slice(5).replace(/^ /, '')
            }
          }

          if (data === '[[END]]') return
        }
      }
    }
后端实现:FastAPI + sse_starlette
from sse_starlette.sse import EventSourceResponse
from langchain_core.messages import HumanMessage, AIMessage

# 定义生成器函数
def stream_generator(session_id: str, text: str):
    # 1. 获取上下文(实现记忆功能)
    history = get_history(session_id)
    current_messages = history + [HumanMessage(content=text)]
    
    # 2. 调用大模型流式接口
    for chunk in model.stream(current_messages):
        content = chunk.content
        if content:
            # 3. 按照 SSE 格式推送数据
            # 格式通常是: data: <内容>\n\n
            yield {
                "id": session_id,
                "event": "message",
                "data": content
            }
    # 4. 结束标记
    yield {"data": "[[END]]"}

# API 接口,前端用EventSource只能是get请求
@app.post("/chat_stream")
async def chat_stream(req: Request):
    return EventSourceResponse(stream_generator(session_id=req.session_id, text=req.text))

关键点解析:

  • yield: 它的作用是“产出”数据但不结束函数,函数会暂停在这里,直到下一次循环。这就实现了“生成一点,发送一点”。
  • EventSourceResponse: 自动处理 HTTP Header(如 Content-Type: text/event-stream),告诉浏览器这是一个流。