Skip to content

流式输出 SSE

服务端实现(FastAPI)

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json

app = FastAPI()
client = AsyncOpenAI(api_key="sk-xxx", base_url="...")

@app.post("/chat/stream")
async def stream_chat(query: str):
    async def generate():
        stream = await client.chat.completions.create(
            model="qwen-turbo",
            messages=[{"role": "user", "content": query}],
            stream=True
        )
        async for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {json.dumps({'content': content}, ensure_ascii=False)}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"}
    )

客户端消费(JavaScript)

javascript
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: {'Content-Type': 'application/json'},
  body: JSON.stringify({query: '分析银行股投资价值'})
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const {done, value} = await reader.read();
  if (done) break;
  
  const text = decoder.decode(value);
  const lines = text.split('\n');
  
  for (const line of lines) {
    if (line.startsWith('data: ') && line !== 'data: [DONE]') {
      const data = JSON.parse(line.slice(6));
      process.stdout.write(data.content);
    }
  }
}

Python 客户端消费

python
import httpx
import json

def stream_from_api(query: str):
    with httpx.stream("POST", "http://localhost:8000/chat/stream",
                      params={"query": query}) as response:
        for line in response.iter_lines():
            if line.startswith("data: ") and line != "data: [DONE]":
                data = json.loads(line[6:])
                print(data["content"], end="", flush=True)

本站内容由 褚成志 整理编写,仅供学习参考