Skip to Content
教程流式输出

流式输出

流式输出(Streaming)让模型在生成内容的同时实时返回结果,无需等待完整响应生成。这种方式能显著降低首字节时间(Time To First Byte, TTFB),为用户带来更流畅的交互体验。


为什么使用流式输出?

更低的延迟感知

  • 结构化模式下:用户需要等待模型生成完整回复后才能看到结果。对于长文本,这可能需要数秒甚至更长时间。

  • 流式模式下:用户几乎立即就能看到第一个字,随后内容逐步呈现,感知延迟大幅降低。

更好的用户体验

  • 即时反馈:用户知道模型正在工作
  • 渐进阅读:用户可以边生成边阅读
  • 提前终止:如果内容不符合预期,可以中途停止

适用场景

场景建议模式原因
对话应用流式即时反馈,体验自然
长文本生成流式减少等待焦虑
实时助手流式响应迅速,交互流畅
批量处理非流式代码简洁,易于存储
简单查询非流式响应快,无需流式

快速开始

只需在请求中添加 stream: true 即可启用流式输出。

from openai import OpenAI client = OpenAI( base_url="https://api.ant-ling.com/v1/", api_key="YOUR_API_KEY" ) response = client.chat.completions.create( model="Ling-2.6-flash", # 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T” messages=[ {"role": "user", "content": "请写一首关于春天的诗"} ], stream=True # 启用流式输出 ) # 逐块读取响应 for chunk in response: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="")

流式响应格式

百灵大模型使用 Server-Sent Events (SSE) 协议进行流式传输。每个事件以 data: 开头,以两个换行符结束。

事件结构

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]} data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"content":"春"},"finish_reason":null}]} data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"content":"天"},"finish_reason":null}]} data: [DONE]

字段说明

字段类型说明
idstring请求唯一标识
objectstring固定为 chat.completion.chunk
createdinteger响应创建时间戳
modelstring使用的模型名称
choicesarray生成结果数组
choices[].deltaobject增量内容
choices[].delta.rolestring角色(仅在第一个 chunk)
choices[].delta.contentstring生成的文本片段
choices[].finish_reasonstring结束原因:stoplengthcontent_filternull

结束标识

流式响应以 data: [DONE] 标识结束。


完整示例:构建聊天界面

以下是一个完整的 Web 聊天应用示例,展示如何在实际项目中使用流式输出。

后端(Node.js + Express)

import express from 'express'; import OpenAI from 'openai'; const app = express(); const client = new OpenAI({ baseURL: 'https://api.ant-ling.com/v1/', apiKey: process.env.ANT_LING_API_KEY, }); app.post('/chat', async (req, res) => { const { messages } = req.body; // 设置 SSE 响应头 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); try { const stream = await client.chat.completions.create({ model: 'Ling-2.6-flash', // 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T” messages, stream: true, }); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content; if (content) { // 发送 SSE 格式数据 res.write(`data: ${JSON.stringify({ content })}\n\n`); } } // 发送结束标识 res.write('data: [DONE]\n\n'); res.end(); } catch (error) { res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`); res.end(); } }); app.listen(3000);

前端(JavaScript)

async function sendMessage(messages) { const response = await fetch('/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages }), }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n\n'); buffer = lines.pop(); // 保留未完整的部分 for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') { console.log('流式响应结束'); return; } try { const parsed = JSON.parse(data); if (parsed.content) { // 更新 UI 显示新内容 appendToChat(parsed.content); } } catch (e) { console.error('解析错误:', e); } } } } } function appendToChat(content) { // 将内容追加到聊天界面 const chatElement = document.getElementById('chat-output'); chatElement.textContent += content; }

高级用法

异步迭代处理

Python 中可以使用 async for 进行异步处理:

import asyncio from openai import AsyncOpenAI client = AsyncOpenAI( base_url="https://api.ant-ling.com/v1/", api_key="YOUR_API_KEY" ) async def stream_chat(): response = await client.chat.completions.create( model="Ling-2.6-flash", # 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T” messages=[{"role": "user", "content": "讲个故事"}], stream=True ) async for chunk in response: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") asyncio.run(stream_chat())

取消流式请求

当用户中途停止生成时,应及时取消请求以节省资源:

from openai import OpenAI client = OpenAI( base_url="https://api.ant-ling.com/v1/", api_key="YOUR_API_KEY" ) # 使用 with 语句确保资源正确释放 with client.chat.completions.create( model="Ling-2.6-flash", # 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T” messages=[{"role": "user", "content": "写长篇小说"}], stream=True ) as response: for chunk in response: content = chunk.choices[0].delta.content if content: print(content, end="") # 用户点击停止按钮时跳出循环 if user_clicked_stop(): break # 自动关闭连接

错误处理

流式响应中的错误可能以两种方式返回:

  1. 连接阶段错误:HTTP 状态码非 200,直接返回 JSON 错误信息
  2. 流式阶段错误:在 SSE 流中返回错误事件

错误处理示例

from openai import OpenAI, APIError client = OpenAI( base_url="https://api.ant-ling.com/v1/", api_key="YOUR_API_KEY" ) try: response = client.chat.completions.create( model="Ling-2.6-flash", # 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T” messages=[{"role": "user", "content": "你好"}], stream=True ) for chunk in response: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") except APIError as e: print(f"API 错误: {e.message}") except Exception as e: print(f"其他错误: {e}")

最佳实践

1. 始终处理空内容

流式响应中可能出现 content 为空的 chunk,务必进行判断:

for chunk in response: content = chunk.choices[0].delta.content if content: # 确保内容不为空 print(content, end="")

2. 合理设置超时

流式请求可能持续较长时间,建议设置合适的超时:

from openai import OpenAI client = OpenAI( base_url="https://api.ant-ling.com/v1/", api_key="YOUR_API_KEY", timeout=60.0 # 60 秒超时 )

3. 缓冲渲染优化

前端渲染时可以使用缓冲策略,避免过于频繁的 DOM 更新:

let buffer = ''; let renderTimer = null; function bufferContent(content) { buffer += content; // 每 50ms 批量更新一次 if (!renderTimer) { renderTimer = setTimeout(() => { appendToChat(buffer); buffer = ''; renderTimer = null; }, 50); } }

4. 监控 Token 使用量

流式响应的最后一个 chunk 通常包含 finish_reason 和用量信息:

for chunk in response: # 处理内容 if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="") # 检查结束原因 finish_reason = chunk.choices[0].finish_reason if finish_reason: print(f"\n生成结束,原因: {finish_reason}")

相关资源

  • 快速开始 - 5 分钟完成首次 API 调用
  • 模型选择 - 了解 Ling、Ring、Ming 各系列模型的适用场景
Was this page helpful?
Last updated on