Skip to content

FastAPI — 构建 LLM API 服务

简介

FastAPI 是构建 LLM 服务的首选框架,原生支持异步、自动生成 OpenAPI 文档、类型安全,与 LangChain/OpenAI SDK 完美配合。

bash
pip install fastapi uvicorn python-dotenv pydantic

基础 LLM API

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openai import AsyncOpenAI
from typing import Optional
import os

app = FastAPI(title="金融 AI API", version="1.0.0")

client = AsyncOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None
    model: str = "qwen-turbo"
    temperature: float = 0.7

class ChatResponse(BaseModel):
    answer: str
    session_id: str
    tokens_used: int

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        response = await client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "system", "content": "你是专业的金融 AI 助手"},
                {"role": "user", "content": request.message}
            ],
            temperature=request.temperature
        )
        
        return ChatResponse(
            answer=response.choices[0].message.content,
            session_id=request.session_id or "new",
            tokens_used=response.usage.total_tokens
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

流式输出(SSE)

python
from fastapi.responses import StreamingResponse
import asyncio
import json

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        try:
            stream = await client.chat.completions.create(
                model=request.model,
                messages=[
                    {"role": "system", "content": "你是专业的金融 AI 助手"},
                    {"role": "user", "content": request.message}
                ],
                stream=True
            )
            
            async for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    data = json.dumps({"content": delta.content}, ensure_ascii=False)
                    yield f"data: {data}\n\n"
            
            yield "data: [DONE]\n\n"
            
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"
        }
    )

依赖注入与中间件

python
from fastapi import Depends, Header
from functools import lru_cache

# 配置管理
class Settings(BaseModel):
    api_key: str
    model: str = "qwen-turbo"
    max_tokens: int = 2048

@lru_cache
def get_settings() -> Settings:
    return Settings(api_key=os.getenv("DASHSCOPE_API_KEY", ""))

# API Key 认证
async def verify_api_key(x_api_key: str = Header(...)):
    if x_api_key != os.getenv("SERVICE_API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API Key")
    return x_api_key

@app.post("/chat/secure", dependencies=[Depends(verify_api_key)])
async def secure_chat(request: ChatRequest, settings: Settings = Depends(get_settings)):
    ...

# 请求日志中间件
from fastapi import Request
import time

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    print(f"{request.method} {request.url.path} - {response.status_code} - {duration:.3f}s")
    return response

RAG 接口

python
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings

# 全局向量库(启动时加载)
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5")
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)

class RAGRequest(BaseModel):
    question: str
    top_k: int = 3

class RAGResponse(BaseModel):
    answer: str
    sources: list[str]

@app.post("/rag/query", response_model=RAGResponse)
async def rag_query(request: RAGRequest):
    # 检索相关文档
    docs = vectorstore.similarity_search(request.question, k=request.top_k)
    context = "\n\n".join([doc.page_content for doc in docs])
    sources = [doc.metadata.get("source", "unknown") for doc in docs]
    
    # 构建 RAG Prompt
    prompt = f"""基于以下文档内容回答问题,如果文档中没有相关信息,请说明。

文档内容:
{context}

问题:{request.question}

回答:"""
    
    response = await client.chat.completions.create(
        model="qwen-plus",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return RAGResponse(
        answer=response.choices[0].message.content,
        sources=list(set(sources))
    )

启动与部署

bash
# 开发模式
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# 生产模式(多 worker)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# 使用 gunicorn
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir

COPY . .
EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

生产注意事项

  1. 使用 AsyncOpenAI 而非同步客户端,避免阻塞事件循环
  2. 设置合理的 timeout,防止长时间等待
  3. 流式接口设置 X-Accel-Buffering: no 防止 Nginx 缓冲
  4. 使用 Redis 存储会话历史,支持水平扩展

本站内容由 褚成志 整理编写,仅供学习参考