MongoDB — 文档数据库
简介
MongoDB 适合存储非结构化的 LLM 对话记录、知识库文档、用户画像等数据。
bash
pip install pymongo motor # motor 是异步驱动基础操作
python
from pymongo import MongoClient
from datetime import datetime
client = MongoClient("mongodb://localhost:27017/")
db = client["finance_ai"]
# 存储对话记录
conversations = db["conversations"]
def save_conversation(session_id: str, messages: list, metadata: dict = None):
doc = {
"session_id": session_id,
"messages": messages,
"metadata": metadata or {},
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
return conversations.insert_one(doc)
def get_conversation(session_id: str) -> dict:
return conversations.find_one({"session_id": session_id})
def update_conversation(session_id: str, new_message: dict):
conversations.update_one(
{"session_id": session_id},
{
"$push": {"messages": new_message},
"$set": {"updated_at": datetime.utcnow()}
},
upsert=True
)存储 RAG 文档
python
documents = db["knowledge_base"]
# 存储带向量的文档
def store_document_with_embedding(content: str, embedding: list, metadata: dict):
doc = {
"content": content,
"embedding": embedding,
"metadata": metadata,
"created_at": datetime.utcnow()
}
return documents.insert_one(doc)
# 创建索引
documents.create_index("metadata.source")
documents.create_index("created_at")
# 全文搜索索引
documents.create_index([("content", "text")])
# 全文搜索
results = documents.find(
{"$text": {"$search": "不良贷款率"}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]).limit(5)异步操作(FastAPI)
python
import motor.motor_asyncio
async_client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
async_db = async_client["finance_ai"]
async def async_save_message(session_id: str, role: str, content: str):
await async_db.conversations.update_one(
{"session_id": session_id},
{
"$push": {"messages": {"role": role, "content": content, "time": datetime.utcnow()}},
"$set": {"updated_at": datetime.utcnow()}
},
upsert=True
)
async def async_get_history(session_id: str) -> list:
doc = await async_db.conversations.find_one({"session_id": session_id})
return doc["messages"] if doc else []聚合统计
python
# 统计每个用户的对话次数和 token 用量
pipeline = [
{"$match": {"created_at": {"$gte": datetime(2024, 1, 1)}}},
{"$group": {
"_id": "$metadata.user_id",
"conversation_count": {"$sum": 1},
"total_messages": {"$sum": {"$size": "$messages"}}
}},
{"$sort": {"conversation_count": -1}},
{"$limit": 10}
]
stats = list(conversations.aggregate(pipeline))