流式输出
流式输出(Streaming)让模型在生成内容的同时实时返回结果,无需等待完整响应生成。这种方式能显著降低首字节时间(Time To First Byte, TTFB),为用户带来更流畅的交互体验。
为什么使用流式输出?
更低的延迟感知
-
结构化模式下:用户需要等待模型生成完整回复后才能看到结果。对于长文本,这可能需要数秒甚至更长时间。
-
流式模式下:用户几乎立即就能看到第一个字,随后内容逐步呈现,感知延迟大幅降低。
更好的用户体验
- 即时反馈:用户知道模型正在工作
- 渐进阅读:用户可以边生成边阅读
- 提前终止:如果内容不符合预期,可以中途停止
适用场景
| 场景 | 建议模式 | 原因 |
|---|---|---|
| 对话应用 | 流式 | 即时反馈,体验自然 |
| 长文本生成 | 流式 | 减少等待焦虑 |
| 实时助手 | 流式 | 响应迅速,交互流畅 |
| 批量处理 | 非流式 | 代码简洁,易于存储 |
| 简单查询 | 非流式 | 响应快,无需流式 |
快速开始
只需在请求中添加 stream: true 即可启用流式输出。
Python
from openai import OpenAI
client = OpenAI(
base_url="https://api.ant-ling.com/v1/",
api_key="YOUR_API_KEY"
)
response = client.chat.completions.create(
model="Ling-2.6-flash",
# 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T”
messages=[
{"role": "user", "content": "请写一首关于春天的诗"}
],
stream=True # 启用流式输出
)
# 逐块读取响应
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")流式响应格式
百灵大模型使用 Server-Sent Events (SSE) 协议进行流式传输。每个事件以 data: 开头,以两个换行符结束。
事件结构
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"content":"春"},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"Ling-2.6-flash","choices":[{"index":0,"delta":{"content":"天"},"finish_reason":null}]}
data: [DONE]字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
id | string | 请求唯一标识 |
object | string | 固定为 chat.completion.chunk |
created | integer | 响应创建时间戳 |
model | string | 使用的模型名称 |
choices | array | 生成结果数组 |
choices[].delta | object | 增量内容 |
choices[].delta.role | string | 角色(仅在第一个 chunk) |
choices[].delta.content | string | 生成的文本片段 |
choices[].finish_reason | string | 结束原因:stop、length、content_filter 或 null |
结束标识
流式响应以 data: [DONE] 标识结束。
完整示例:构建聊天界面
以下是一个完整的 Web 聊天应用示例,展示如何在实际项目中使用流式输出。
后端(Node.js + Express)
import express from 'express';
import OpenAI from 'openai';
const app = express();
const client = new OpenAI({
baseURL: 'https://api.ant-ling.com/v1/',
apiKey: process.env.ANT_LING_API_KEY,
});
app.post('/chat', async (req, res) => {
const { messages } = req.body;
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
const stream = await client.chat.completions.create({
model: 'Ling-2.6-flash',
// 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T”
messages,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
// 发送 SSE 格式数据
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
// 发送结束标识
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
app.listen(3000);前端(JavaScript)
async function sendMessage(messages) {
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // 保留未完整的部分
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('流式响应结束');
return;
}
try {
const parsed = JSON.parse(data);
if (parsed.content) {
// 更新 UI 显示新内容
appendToChat(parsed.content);
}
} catch (e) {
console.error('解析错误:', e);
}
}
}
}
}
function appendToChat(content) {
// 将内容追加到聊天界面
const chatElement = document.getElementById('chat-output');
chatElement.textContent += content;
}高级用法
异步迭代处理
Python 中可以使用 async for 进行异步处理:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://api.ant-ling.com/v1/",
api_key="YOUR_API_KEY"
)
async def stream_chat():
response = await client.chat.completions.create(
model="Ling-2.6-flash",
# 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T”
messages=[{"role": "user", "content": "讲个故事"}],
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
asyncio.run(stream_chat())取消流式请求
当用户中途停止生成时,应及时取消请求以节省资源:
Python
from openai import OpenAI
client = OpenAI(
base_url="https://api.ant-ling.com/v1/",
api_key="YOUR_API_KEY"
)
# 使用 with 语句确保资源正确释放
with client.chat.completions.create(
model="Ling-2.6-flash",
# 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T”
messages=[{"role": "user", "content": "写长篇小说"}],
stream=True
) as response:
for chunk in response:
content = chunk.choices[0].delta.content
if content:
print(content, end="")
# 用户点击停止按钮时跳出循环
if user_clicked_stop():
break # 自动关闭连接错误处理
流式响应中的错误可能以两种方式返回:
- 连接阶段错误:HTTP 状态码非 200,直接返回 JSON 错误信息
- 流式阶段错误:在 SSE 流中返回错误事件
错误处理示例
from openai import OpenAI, APIError
client = OpenAI(
base_url="https://api.ant-ling.com/v1/",
api_key="YOUR_API_KEY"
)
try:
response = client.chat.completions.create(
model="Ling-2.6-flash",
# 此处以“Ling-2.6-flash”调用为例,可按需调整为“Ling-2.6-1T”
messages=[{"role": "user", "content": "你好"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except APIError as e:
print(f"API 错误: {e.message}")
except Exception as e:
print(f"其他错误: {e}")最佳实践
1. 始终处理空内容
流式响应中可能出现 content 为空的 chunk,务必进行判断:
for chunk in response:
content = chunk.choices[0].delta.content
if content: # 确保内容不为空
print(content, end="")2. 合理设置超时
流式请求可能持续较长时间,建议设置合适的超时:
from openai import OpenAI
client = OpenAI(
base_url="https://api.ant-ling.com/v1/",
api_key="YOUR_API_KEY",
timeout=60.0 # 60 秒超时
)3. 缓冲渲染优化
前端渲染时可以使用缓冲策略,避免过于频繁的 DOM 更新:
let buffer = '';
let renderTimer = null;
function bufferContent(content) {
buffer += content;
// 每 50ms 批量更新一次
if (!renderTimer) {
renderTimer = setTimeout(() => {
appendToChat(buffer);
buffer = '';
renderTimer = null;
}, 50);
}
}4. 监控 Token 使用量
流式响应的最后一个 chunk 通常包含 finish_reason 和用量信息:
for chunk in response:
# 处理内容
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# 检查结束原因
finish_reason = chunk.choices[0].finish_reason
if finish_reason:
print(f"\n生成结束,原因: {finish_reason}")相关资源
Was this page helpful?
Last updated on