Skip to content

MySQL — LLM 应用数据存储

连接与基础操作

python
import pymysql
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

# SQLAlchemy 连接(推荐)
engine = create_engine(
    "mysql+pymysql://user:password@localhost:3306/finance_db",
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True  # 自动重连
)

# 执行查询
with Session(engine) as session:
    result = session.execute(
        text("SELECT * FROM loan_applications WHERE status = :status"),
        {"status": "pending"}
    )
    rows = result.fetchall()

ORM 模型(存储对话记录)

python
from sqlalchemy import Column, Integer, String, Text, DateTime, Float
from sqlalchemy.orm import DeclarativeBase
from datetime import datetime

class Base(DeclarativeBase):
    pass

class ChatSession(Base):
    __tablename__ = "chat_sessions"
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), unique=True, index=True)
    user_id = Column(String(64), index=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class ChatMessage(Base):
    __tablename__ = "chat_messages"
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), index=True)
    role = Column(String(20))  # user/assistant/system
    content = Column(Text)
    tokens = Column(Integer, default=0)
    model = Column(String(50))
    created_at = Column(DateTime, default=datetime.utcnow)

class LLMCallLog(Base):
    __tablename__ = "llm_call_logs"
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64))
    model = Column(String(50))
    prompt_tokens = Column(Integer)
    completion_tokens = Column(Integer)
    cost = Column(Float)
    latency_ms = Column(Integer)
    created_at = Column(DateTime, default=datetime.utcnow)

# 创建表
Base.metadata.create_all(engine)

CRUD 操作

python
from sqlalchemy.orm import Session

def save_message(session_id: str, role: str, content: str, model: str = None):
    with Session(engine) as db:
        msg = ChatMessage(
            session_id=session_id,
            role=role,
            content=content,
            model=model
        )
        db.add(msg)
        db.commit()

def get_history(session_id: str, limit: int = 20) -> list:
    with Session(engine) as db:
        messages = db.query(ChatMessage).filter(
            ChatMessage.session_id == session_id
        ).order_by(ChatMessage.created_at.desc()).limit(limit).all()
        
        return [
            {"role": m.role, "content": m.content}
            for m in reversed(messages)
        ]

def get_cost_stats(user_id: str, days: int = 30) -> dict:
    with Session(engine) as db:
        result = db.execute(text("""
            SELECT 
                SUM(cost) as total_cost,
                SUM(prompt_tokens + completion_tokens) as total_tokens,
                COUNT(*) as call_count
            FROM llm_call_logs l
            JOIN chat_sessions s ON l.session_id = s.session_id
            WHERE s.user_id = :user_id
            AND l.created_at >= DATE_SUB(NOW(), INTERVAL :days DAY)
        """), {"user_id": user_id, "days": days})
        
        return dict(result.fetchone()._mapping)

异步操作(FastAPI)

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

async_engine = create_async_engine(
    "mysql+aiomysql://user:password@localhost:3306/finance_db"
)
AsyncSessionLocal = sessionmaker(async_engine, class_=AsyncSession)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# FastAPI 依赖注入
from fastapi import Depends

@app.post("/chat")
async def chat(query: str, db: AsyncSession = Depends(get_db)):
    # 使用 db 进行异步数据库操作
    ...

本站内容由 褚成志 整理编写,仅供学习参考