02|如何使用流式传输减少等待时间

你好,我是月影。

前一节课我们使用最简单的HTTP协议来处理请求响应,作为前端工程师的你,应该对这块内容并不陌生吧。

接下来呢,我们来了解一下对于初级前端工程师来说稍微复杂一点的内容,那就是通过流式(streaming)的传输方式来使用大模型API。

为什么要使用流式传输

在具体实践之前,先来说说为什么要使用流式传输。

由于大模型通常是需要实时推理的,Web应用调用大模型时,它的标准模式是浏览器提交数据,服务端完成推理,然后将结果以JSON数据格式通过标准的HTTP协议返回给前端,这个我们在上一小节里已经通过例子体会过。

但是这么做有一个问题,主要是推理所花费的时间和问题复杂度、以及生成的token数量有关。比如像第一节课里那样,只是简单问候一句,可能Deepseek推理所花费的时间很少,但是如果我们提出稍微复杂一点的要求,比如编写一本小说的章节目录,或者撰写一篇千字的作文,那么AI推理的时间会大大增加,这在具体应用中就带来一个显而易见的问题,那就是用户等待的时间很长。

而你应该已经发现,我们在使用线上大模型服务时,不管是哪一家大模型,通常前端的响应速度并没有太慢,这正是因为它们默认采用了流式(streaming)传输,不必等到整个推理完成再将内容返回,而是可以将逐个token实时返回给前端,这样就大大减少了响应时间。

如果你是熟悉比较传统的Web业务的前端工程师,可能会比较疑惑这种模式具体怎么实现,不要着急,我们接下来通过一个稍微复杂一点的例子,来学习和体会这项技术。

使用流式(streaming)传输减少等待时间

大多数文本模型,都支持使用流式传输来返回内容。在流式传输下,在模型推理过程中,生成的token会及时返回,而不用等待推理过程完全结束。在这一小节,我们先看一下Deepseek Platform下如何使用流式传输。

首先我们从Trae创建一个新项目,这次我们选择创建Vue+Vite+TypeScript项目,在后续的课程中,我们基本上以Vue+Vite+TypeScript为标配。

图片

创建的项目目录结构如下:

图片

别忘了配置我们的.env.local文件:

VITE_DEEPSEEK_API_KEY=sk-xxxxxxxxx

接着我们修改一下 App.vue

<script setup lang="ts">
import { ref } from 'vue';

const question = ref('讲一个关于中国龙的故事');
const content = ref('');
const stream = ref(true);

const update = async () => {
  if(!question) return;
  content.value = "思考中...";

  const endpoint = 'https://api.deepseek.com/chat/completions';
  const headers = {
      'Content-Type': 'application/json',
      Authorization: `Bearer ${import.meta.env.VITE_DEEPSEEK_API_KEY}`
  };

  const response = await fetch(endpoint, {
    method: 'POST',
    headers: headers,
    body: JSON.stringify({
      model: 'deepseek-chat',
      messages: [{ role: 'user', content: question.value }],
      stream: stream.value,
    })
  });

  if(stream.value) {
    content.value = '';

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    let done = false;
    let buffer = '';

    while (!done) {
      const { value, done: doneReading } = await (reader?.read() as Promise<{ value: any; done: boolean }>);
      done = doneReading;
      const chunkValue = buffer + decoder.decode(value);
      buffer = '';

      const lines = chunkValue.split('\n').filter((line) => line.startsWith('data: '));

      for (const line of lines) {
        const incoming = line.slice(6);
        if(incoming === '[DONE]') {
          done = true;
          break;
        }
        try {
          const data = JSON.parse(incoming);
          const delta = data.choices[0].delta.content;
          if(delta) content.value += delta;
        } catch(ex) {
          buffer += `data: ${incoming}`;
        }
      }
    }
  } else {
    const data = await response.json();
    content.value = data.choices[0].message.content;
  }
}
</script>

<template>
  <div class="container">
    <div>
      <label>输入:</label><input class="input" v-model="question" />
      <button @click="update">提交</button>
    </div>
    <div class="output">
      <div><label>Streaming</label><input type="checkbox" v-model="stream"/></div>
      <div>{{ content }}</div>
    </div>
  </div>
</template>

<style scoped>
.container {
  display: flex;
  flex-direction: column;
  align-items: start;
  justify-content: start;
  height: 100vh;
  font-size: .85rem;
}
.input {
  width: 200px;
}
.output {
  margin-top: 10px;
  min-height: 300px;
  width: 100%;
  text-align: left;
}
button {
  padding: 0 10px;
  margin-left: 6px;
}
</style>

运行项目,点击提交按钮,你会看到AI正以流式传输的方式输出内容,这样就能减少用户的等待时间。

图片

好,我们来一起看一下代码的关键部分。

首先,流式输出的API调用机制,和普通的HTTPS输出没有什么区别,都是通过POST请求,只不过提交的数据中,将stream参数设置为true。

  const endpoint = 'https://api.deepseek.com/chat/completions';
  const headers = {
      'Content-Type': 'application/json',
      Authorization: `Bearer ${import.meta.env.VITE_DEEPSEEK_API_KEY}`
  };

  const response = await fetch(endpoint, {
    method: 'POST',
    headers: headers,
    body: JSON.stringify({
      model: 'deepseek-chat',
      messages: [{ role: 'user', content: question.value }],
      stream: stream.value, // 这里 stream.value 值如果是 true,采用流式传输
    })
  });

在浏览器处理请求的时候,会通过HTML5标准的 Streams API 来处理数据,具体处理逻辑如下:

const reader = response.body?.getReader();
const decoder = new TextDecoder();
let done = false;
let buffer = '';

while (!done) {
  const { value, done: doneReading } = await (reader?.read() as Promise<{ value: any; done: boolean }>);
  done = doneReading;
  const chunkValue = buffer + decoder.decode(value);
  buffer = '';

  const lines = chunkValue.split('\n').filter((line) => line.startsWith('data: '));

  for (const line of lines) {
    const incoming = line.slice(6);
    if(incoming === '[DONE]') {
      done = true;
      break;
    }
    try {
      const data = JSON.parse(incoming);
      const delta = data.choices[0].delta.content;
      if(delta) content.value += delta;
    } catch(ex) {
      buffer += `data: ${incoming}`;
    }
  }
}

首先,我们利用ReadableStream API 通过getReader()获取一个读取器,并创建TextDecoder准备对二进制数据进行解码。

然后我们设置控制流标志done,以及一个buffer变量来缓存数据,因为某些情况下,Stream数据返回给前端时,不一定传输完整。

接着我们开始循环读取数据,通过TextDecoder解析数据,将数据转换成文本并按行拆分。

因为API返回流式数据的协议是每一条数据以 “data:” 开头,后续是一个有效的JSON或者[DONE]表示传输结束,所以我们要对每一行以"data:"开头的数据进行处理。

for (const line of lines) {
  const incoming = line.slice(6);
  if(incoming === '[DONE]') {
    done = true;
    break;
  }
  try {
    const data = JSON.parse(incoming);
    const delta = data.choices[0].delta.content;
    if(delta) content.value += delta;
  } catch(ex) {
    buffer += `data: ${incoming}`;
  }
}

如果数据传输完整,且不是[DONE],那么它就是合法JSON,我们从中读取data.choices[0].delta.content,就是需要增量更新的内容,否则说明数据不完整,将它存入缓存,以便后续继续处理。

这样我们就实现了数据的流式传输和浏览器的动态接收。

使用 Server-Sent Events

刚才的做法虽然可以直接使用流式数据,但是处理起来还是略为繁琐。

实际上Deepseek API和其他大部分兼容OpenAI的平台,AI返回的流式输出数据都是符合标准的Server-Sent Events(SSE)规范的,现代浏览器几乎都支持更简单的SSE API,只不过我们目前暂时无法在前端直接使用它。

主要原因是,根据标准,SSE的底层只支持HTTP GET,并且不能发送自定义的Header,而我们的授权却需要将API Key通过Authorization Header发送,而且必须使用POST请求。

尽管如此,并不意味着我们前端就不能使用SSE来处理流式输出,而是我们需要创建一个BFF层,通过Node Server来做中转。

首先我们还是用Trae创建一个新的Vue项目Deepseek API SSE。

接着在IDE终端安装依赖包dotenv和express。

图片

然后在项目根目录下添加如下server.js文件:

import * as dotenv from 'dotenv'
import express from 'express';

dotenv.config({
  path: ['.env.local', '.env']
})

const openaiApiKey = process.env.VITE_DEEPSEEK_API_KEY;
const app = express();
const port = 3000;
const endpoint = 'https://api.deepseek.com/v1/chat/completions';

// SSE 端点
app.get('/stream', async (req, res) => {
    // 设置响应头部
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // 发送初始响应头
  
    try {
      // 发送 OpenAI 请求
      const response = await fetch(
        endpoint,
        {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${openaiApiKey}`,
            },
            body: JSON.stringify({
                model:'deepseek-chat', // 选择你使用的模型
                messages: [{ role: 'user', content: req.query.question }],
                stream: true, // 开启流式响应
            })
        }
      );
  
      if (!response.ok) {
        throw new Error('Failed to fetch from OpenAI');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let done = false;
      let buffer = '';

          // 读取流数据并转发到客户端
      while (!done) {
        const { value, done: doneReading } = await reader.read();
        done = doneReading;
        const chunkValue = buffer + decoder.decode(value, { stream: true });
        buffer = '';
  
        // 按行分割数据,每行以 "data: " 开头,并传递给客户端
        const lines = chunkValue.split('\n').filter(line => line.trim() && line.startsWith('data: '));
        for (const line of lines) {
            const incoming = line.slice(6);
            if(incoming === '[DONE]') {
              done = true;
              break;
            }
            try {
              const data = JSON.parse(incoming);
              const delta = data.choices[0].delta.content;
              if(delta) res.write(`data: ${delta}\n\n`); // 发送数据到客户端
            } catch(ex) {
              buffer += `data: ${incoming}`;
            }
        }
      }
  
      res.write('event: end\n'); // 发送结束事件
      res.write('data: [DONE]\n\n'); // 通知客户端数据流结束
      res.end(); // 关闭连接
  
    } catch (error) {
      console.error('Error fetching from OpenAI:', error);
      res.write('data: Error fetching from OpenAI\n\n');
      res.end();
    }
  });
  
  // 启动服务器
  app.listen(port, () => {
    console.log(`Server running on http://localhost:${port}`);
  });

完成后,我们在终端启动服务:

node server.js

这个server.js的主要作用是在server端处理大模型API的流式响应,并将数据仍以兼容SSE(以"data: "开头)的形式逐步发送给浏览器端。

现在我们在IDE中可以访问 http://localhost:3000/stream?question=hello 进行测试。为了在前端页面上访问,我们可以通过配置vite的server来进行请求转发。

此时需要修改项目中的vite.config.js文件:

import { defineConfig } from 'vite';
import vue from '@vitejs/plugin-vue';
import vueDevTools from 'vite-plugin-vue-devtools';

// https://vitejs.dev/config/
export default defineConfig({
  server: {
    allowedHosts: true,
    port: 4399,
    proxy: {
      '/api': {
        target: 'http://localhost:3000',
        secure: false,
        rewrite: path => path.replace(/^\/api/, ''),
      },
    },
  },
  plugins: [
    vue(),
    vueDevTools(),
  ],
});

这样server请求就被转发到了 /api/stream

最后我们这样实现App.vue:

<script setup lang="ts">
import { ref } from 'vue';

const question = ref('讲一个关于中国龙的故事');
const content = ref('');
const stream = ref(true);

const update = async () => {
  if(!question) return;
  content.value = "思考中...";

  const endpoint = '/api/stream';
  const headers = {
    'Content-Type': 'application/json',
    Authorization: `Bearer ${import.meta.env.VITE_MOONSHOT_API_KEY}`
  };

  if(stream.value) {
    content.value = '';
    const eventSource = new EventSource(`${endpoint}?question=${question.value}`);
    eventSource.addEventListener("message", function(e: any) {
      content.value += e.data;
    });
  } else {
    const response = await fetch(endpoint, {
      method: 'POST',
      headers: headers,
      body: JSON.stringify({
        model: 'moonshot-v1-8k',
        messages: [{ role: 'user', content: question.value }],
        stream: stream.value,
      })
    });
    const data = await response.json();
    content.value = data.choices[0].message.content;
  }
}
</script>

<template>
  <div class="container">
    <div>
      <label>输入:</label><input class="input" v-model="question" />
      <button @click="update">提交</button>
    </div>
    <div class="output">
      <div><label>Streaming</label><input type="checkbox" v-model="stream"/></div>
      <div>{{ content }}</div>
    </div>
  </div>
</template>

<style scoped>
.container {
  display: flex;
  flex-direction: column;
  align-items: start;
  justify-content: start;
  height: 100vh;
  font-size: .85rem;
}
.input {
  width: 200px;
}
.output {
  margin-top: 10px;
  min-height: 300px;
  width: 100%;
  text-align: left;
}
button {
  padding: 0 10px;
  margin-left: 6px;
}
</style>

注意和前面直接通过Streams API处理数据相比,有了server端处理转发后,浏览器只需使用SSE,代码如下:

const eventSource = new EventSource(`${endpoint}?question=${question.value}`);
eventSource.addEventListener("message", function(e: any) {
  content.value += e.data;
});
eventSource.addEventListener('end', () => {
  eventSource.close();
});

这不仅仅让前端代码实现变得简洁很多,而且SSE在浏览器内置了自动重连机制。这意味着当网络、服务器或者客户端连接出现问题,恢复后将自动完成重新连接,不需要用户主动刷新页面,这让SSE特别适合长时间保持连接的应用场景。此外,SSE还支持通过lastEventId来支持数据的续传,这样在错误恢复时,能大大节省数据传输的带宽和接收数据的响应时间。

关于SSE的问题,在后续课程中,我们还会有机会继续深入探讨。

要点总结

在大模型的API调用方式上,除了传统的HTTP调用方式外,还支持流式传输,由于这么做不用等待推理完成就可以实时响应内容,因此能够大大减少用户等待时间,是非常有意义的。

这节课,我们以Deepseek Platform为例,探讨了文本大模型使用Streams API的流式传输和Server-Sent Events的方法。这两种方式,提高了响应实效性,从而能够大大减少用户的等待时间,带来较好的用户体验。这也是我在实际的AI应用产品中推崇并最常使用的两种调用方式。我也希望同为前端的你,能够掌握这些调用方式并将它们运用到实际产品项目中去,从而改进用户的体验。

课后练习

Server-Sent Events 是一种允许服务器主动推送实时更新给浏览器的技术,属于Web标准,它除了实时推送数据外,还可以支持自定义事件(Custom Events)和内容的续传。你可以通过MDN文档和询问AI进一步学习这部分内容,尝试给我们的例子增加事件通知和连接断开恢复后的数据续传能力,这些能力在实际AI应用产品的业务中都是可以用到的。

精选留言

  • 轩爷

    2025-05-07 19:08:03

    MCP 已经改成 stdio 和 streamable HTTP ,废弃了使用 SSE 进行通讯的方式,SSE 有仅支持单向传输等的缺点,那么对于我们而言,掌握 SSE 应用的价值在哪儿?
    作者回复

    SSE对于前端来说还是很好用的,因为用JS很容易处理内容。MCP主要不是针对前端,还是大模型调用工具的协议。

    2025-05-14 08:39:07

  • Geek_22eecd

    2025-04-09 11:05:41

    首先,流式输出的 API 调用机制,和普通的 HTTPS 输出没有什么区别,都是通过 POST 请求,只不过提交的数据中,将 stream 参数设置为 false。

    这里应该是true, (还有下面的代码部分也是true)
    作者回复

    是的,我今天更正一下,感谢反馈

    2025-04-11 08:48:23

  • 李某某

    2025-04-19 17:27:43

    在BFF层调用deepseek的时候,需要加上"Content-Type": "application/json"的header,不然deepseek会返回415的错误
    作者回复

    如果是流式,Content-Type应该是text/event-stream

    2025-05-18 17:28:00

  • Geek_9ddac9

    2025-06-04 13:45:41

    这节内容仓库里有对应的代码吗?请求转发不成功,我找不到问题所在,谢谢
    作者回复

    https://github.com/akira-cn/frontend-dev-large-model-era/tree/main/ai_streaming

    2025-06-07 08:42:15

  • Geek_85d4c3

    2025-04-15 14:45:15

    server.js中,headers中设置Content-Type可以成功,不设置就一直失败
    作者回复

    'text/event-stream' 是必须要设置的

    2025-04-22 11:25:52

  • 有点 cool

    2025-07-06 22:47:25

    这个案列为什么不直接引入 openAI sdk , 在 BFF 层就完成与大模型的对话,仅将大模型回复的内容进行转发,而且将 APKkey 放在请求头中,抓包不就直接暴露了吗?
    作者回复

    只是示例,真正用的时候是放在nodejs里的,不会直接在前端调用

    2025-07-10 13:18:01

  • Geek7243

    2025-06-11 15:41:11

    之前对这种生成式的响应感到好奇,特意去看了openai的网站上打开了控制台,发现用到了 SSE。
  • 佳成

    2025-06-02 19:29:22

    第二个例子中 为啥我这边直接cv server.js代码运行的时候 会报415 response打印Unsupported Media Type
    作者回复

    你这个cv是个什么命令?你是不是用了什么反向代理之类的改变了http的content-type导致流式传输失效了

    2025-06-07 08:44:51

  • Geek_6dceb1

    2025-05-31 00:54:39

    第一例子用的是deepseek的key,第二个例子的秘钥设置的是VITE_MOONSHOT_API_KEY,这是月之暗面的key?
    作者回复

    是的,其实API一样,都可以切换

    2025-06-02 09:44:58

  • 茶贝

    2025-05-14 10:58:12

    server.js 中使用.end()结束响应流方法还是被重复调用,检查不是客户端问题
    作者回复

    因为ling.close中还是会调用一次,如果你要自己强制结束,可能需要取消 ling.close,这个 case 我记下来了,想一想怎么做

    2025-05-18 16:52:42

  • 安安安

    2025-05-07 15:16:47

    HTML5 标准的 Streams API, 这块是不是写错了?Streams API和HTML5没关系吧
    作者回复

    严格来说属于Web API,是大部分浏览器支持的Web标准的一部分,但通俗上来说成H5 API,也没太大问题

    2025-05-14 08:43:40

  • St.

    2025-04-25 16:21:43

    复制文中的代码,运行后,server.js中调用deepseek返回是个空对象,有人和我碰到一样的问题吗?
    作者回复

    是不是.env.local没有配置正确啊

    2025-05-18 17:25:57

  • Yully

    2025-04-24 15:57:11

    在实际获取流的时候,会有[data:数据\n\n]流信息被莫名奇妙的切割了,比如data: 和后面切割,还有可能\n和\n被切割,为什么会出现这种问题?以及怎么接收这种数据呢
    作者回复

    就是有可能刚好分块传输的时候被切割所以用buffer来解决

    2025-04-25 12:10:47

  • 时间如你

    2025-04-22 16:11:58

    VITE_MOONSHOT_API_KEY这变量存在的意义是什么
    作者回复

    因为不想泄漏我自己的key…代码要放在github上开源的,这样配置之后可以在gitignore里把它禁止提交github

    2025-04-25 12:11:55

  • 十一aa

    2025-04-18 17:24:26

    server.js设置了process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; 才能成功
    作者回复

    这个我没设置也能成功,是不是和node版本有关系啊

    2025-04-20 08:00:33

  • 2025-04-18 16:09:47

    之后就把 responseType 设置为 "text",利用 axios 的 onDownloadProgress 函数在相应过程中不断获取 responseText ,就可以处理流式响应数据,不断更新页面。
    ```typescript
    const fetchDeepseekAnswer = async () => {
    try {
    let answerMessageContent = ''
    let joinIndex = 0

    const response = await axios({
    method: 'post',
    url: "/api/chat/completions",
    headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${import.meta.env.VITE_DEEPSEEK_API_KEY}`
    },
    responseType: 'text',
    data: {
    model: "deepseek-chat",
    stream: true,
    messages: messages.value
    },
    onDownloadProgress: (progressEvent) => {
    const data = progressEvent.event?.target?.responseText;
    if(!data) return;

    // 处理SSE格式数据
    const lines = data.split('\n').filter(line => line.trim());

    for (let i = joinIndex; i < lines.length - 1; i++) {
    const jsonStr = lines[i].replace('data: ', '');

    if (jsonStr !== '[DONE]') {
    try {
    const parsed = JSON.parse(jsonStr);
    const lineContent = parsed.choices?.[0]?.delta?.content
    if (lineContent !== undefined && parsed !== null) {
    answerMessageContent += parsed.choices[0].delta.content;
    // 触发响应式更新
    messages.value[messages.value.length - 1].content = answerMessageContent;
    joinIndex++;
    }
    } catch (e) {
    console.error('解析错误:', e);
    }
    }
    }
    }
    })
    } catch(error) {
    console.error('Error fetching answer:', error)
    }
    }
    ```
    作者回复

    2025-05-18 17:29:06

  • 2025-04-18 16:08:38

    尝试使用 axios 去发起请求,并处理返回数据。原本想设置 responseType: "stream":
    ```javascript
    await axios({
    method: 'post',
    url: "/api/chat/completions",
    headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${import.meta.env.VITE_DEEPSEEK_API_KEY}`
    },
    responseType: 'stream',
    data: {
    model: "deepseek-chat",
    stream: true,
    messages: messages.value
    }
    })
    ```
    但是由于 axios 是基于 XHR,会报错。The provided value 'stream' is not a valid enum value of type XMLHttpRequestResponseType.
    作者回复

    其实现在用原生fetch就好,感觉没啥必要用axios

    2025-05-18 17:28:41

  • Geek_16ca32

    2025-04-11 10:55:01

    server.js 发送请求时,headers是不是也需要 'Content-Type': 'application/json'
    作者回复

    流式传输响应content-type不是json

    2025-04-11 12:42:37

  • Geek_16ca32

    2025-04-10 16:47:42

    现在我们在 IDE 中可以访问 http://localhost:3000/steam?question=hello 进行测试。steam 更改为 stream
    作者回复

    感谢反馈,我们已经修正了,刷新可见~

    2025-04-10 17:49:34