流式输出 SSE
服务端实现(FastAPI)
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
app = FastAPI()
client = AsyncOpenAI(api_key="sk-xxx", base_url="...")
@app.post("/chat/stream")
async def stream_chat(query: str):
async def generate():
stream = await client.chat.completions.create(
model="qwen-turbo",
messages=[{"role": "user", "content": query}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield f"data: {json.dumps({'content': content}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"}
)客户端消费(JavaScript)
javascript
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({query: '分析银行股投资价值'})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const data = JSON.parse(line.slice(6));
process.stdout.write(data.content);
}
}
}Python 客户端消费
python
import httpx
import json
def stream_from_api(query: str):
with httpx.stream("POST", "http://localhost:8000/chat/stream",
params={"query": query}) as response:
for line in response.iter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
data = json.loads(line[6:])
print(data["content"], end="", flush=True)