MySQL — LLM 应用数据存储
连接与基础操作
python
import pymysql
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# SQLAlchemy 连接(推荐)
engine = create_engine(
"mysql+pymysql://user:password@localhost:3306/finance_db",
pool_size=10,
max_overflow=20,
pool_pre_ping=True # 自动重连
)
# 执行查询
with Session(engine) as session:
result = session.execute(
text("SELECT * FROM loan_applications WHERE status = :status"),
{"status": "pending"}
)
rows = result.fetchall()ORM 模型(存储对话记录)
python
from sqlalchemy import Column, Integer, String, Text, DateTime, Float
from sqlalchemy.orm import DeclarativeBase
from datetime import datetime
class Base(DeclarativeBase):
pass
class ChatSession(Base):
__tablename__ = "chat_sessions"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(64), unique=True, index=True)
user_id = Column(String(64), index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ChatMessage(Base):
__tablename__ = "chat_messages"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(64), index=True)
role = Column(String(20)) # user/assistant/system
content = Column(Text)
tokens = Column(Integer, default=0)
model = Column(String(50))
created_at = Column(DateTime, default=datetime.utcnow)
class LLMCallLog(Base):
__tablename__ = "llm_call_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(64))
model = Column(String(50))
prompt_tokens = Column(Integer)
completion_tokens = Column(Integer)
cost = Column(Float)
latency_ms = Column(Integer)
created_at = Column(DateTime, default=datetime.utcnow)
# 创建表
Base.metadata.create_all(engine)CRUD 操作
python
from sqlalchemy.orm import Session
def save_message(session_id: str, role: str, content: str, model: str = None):
with Session(engine) as db:
msg = ChatMessage(
session_id=session_id,
role=role,
content=content,
model=model
)
db.add(msg)
db.commit()
def get_history(session_id: str, limit: int = 20) -> list:
with Session(engine) as db:
messages = db.query(ChatMessage).filter(
ChatMessage.session_id == session_id
).order_by(ChatMessage.created_at.desc()).limit(limit).all()
return [
{"role": m.role, "content": m.content}
for m in reversed(messages)
]
def get_cost_stats(user_id: str, days: int = 30) -> dict:
with Session(engine) as db:
result = db.execute(text("""
SELECT
SUM(cost) as total_cost,
SUM(prompt_tokens + completion_tokens) as total_tokens,
COUNT(*) as call_count
FROM llm_call_logs l
JOIN chat_sessions s ON l.session_id = s.session_id
WHERE s.user_id = :user_id
AND l.created_at >= DATE_SUB(NOW(), INTERVAL :days DAY)
"""), {"user_id": user_id, "days": days})
return dict(result.fetchone()._mapping)异步操作(FastAPI)
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
async_engine = create_async_engine(
"mysql+aiomysql://user:password@localhost:3306/finance_db"
)
AsyncSessionLocal = sessionmaker(async_engine, class_=AsyncSession)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# FastAPI 依赖注入
from fastapi import Depends
@app.post("/chat")
async def chat(query: str, db: AsyncSession = Depends(get_db)):
# 使用 db 进行异步数据库操作
...