Skip to content

MongoDB — 文档数据库

简介

MongoDB 适合存储非结构化的 LLM 对话记录、知识库文档、用户画像等数据。

bash
pip install pymongo motor  # motor 是异步驱动

基础操作

python
from pymongo import MongoClient
from datetime import datetime

client = MongoClient("mongodb://localhost:27017/")
db = client["finance_ai"]

# 存储对话记录
conversations = db["conversations"]

def save_conversation(session_id: str, messages: list, metadata: dict = None):
    doc = {
        "session_id": session_id,
        "messages": messages,
        "metadata": metadata or {},
        "created_at": datetime.utcnow(),
        "updated_at": datetime.utcnow()
    }
    return conversations.insert_one(doc)

def get_conversation(session_id: str) -> dict:
    return conversations.find_one({"session_id": session_id})

def update_conversation(session_id: str, new_message: dict):
    conversations.update_one(
        {"session_id": session_id},
        {
            "$push": {"messages": new_message},
            "$set": {"updated_at": datetime.utcnow()}
        },
        upsert=True
    )

存储 RAG 文档

python
documents = db["knowledge_base"]

# 存储带向量的文档
def store_document_with_embedding(content: str, embedding: list, metadata: dict):
    doc = {
        "content": content,
        "embedding": embedding,
        "metadata": metadata,
        "created_at": datetime.utcnow()
    }
    return documents.insert_one(doc)

# 创建索引
documents.create_index("metadata.source")
documents.create_index("created_at")

# 全文搜索索引
documents.create_index([("content", "text")])

# 全文搜索
results = documents.find(
    {"$text": {"$search": "不良贷款率"}},
    {"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]).limit(5)

异步操作(FastAPI)

python
import motor.motor_asyncio

async_client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
async_db = async_client["finance_ai"]

async def async_save_message(session_id: str, role: str, content: str):
    await async_db.conversations.update_one(
        {"session_id": session_id},
        {
            "$push": {"messages": {"role": role, "content": content, "time": datetime.utcnow()}},
            "$set": {"updated_at": datetime.utcnow()}
        },
        upsert=True
    )

async def async_get_history(session_id: str) -> list:
    doc = await async_db.conversations.find_one({"session_id": session_id})
    return doc["messages"] if doc else []

聚合统计

python
# 统计每个用户的对话次数和 token 用量
pipeline = [
    {"$match": {"created_at": {"$gte": datetime(2024, 1, 1)}}},
    {"$group": {
        "_id": "$metadata.user_id",
        "conversation_count": {"$sum": 1},
        "total_messages": {"$sum": {"$size": "$messages"}}
    }},
    {"$sort": {"conversation_count": -1}},
    {"$limit": 10}
]

stats = list(conversations.aggregate(pipeline))

本站内容由 褚成志 整理编写,仅供学习参考