Skip to content

LLM API 调用规范

OpenAI 兼容接口标准

国内主流 LLM 均兼容 OpenAI Chat Completions API,只需修改 base_urlapi_key

python
from openai import OpenAI

# 千问
qwen = OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")

# DeepSeek
deepseek = OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com")

# Kimi
kimi = OpenAI(api_key="sk-xxx", base_url="https://api.moonshot.cn/v1")

# 本地 Ollama
ollama = OpenAI(api_key="ollama", base_url="http://localhost:11434/v1")

请求参数详解

python
response = client.chat.completions.create(
    # 必填
    model="qwen-turbo",
    messages=[...],
    
    # 生成控制
    temperature=0.7,      # 随机性 [0, 2],0=确定性,>1=更随机
    top_p=0.9,            # 核采样,与 temperature 二选一调整
    max_tokens=2048,      # 最大输出 token 数
    
    # 停止条件
    stop=["###", "\n\n"], # 遇到这些字符串停止生成
    
    # 输出格式
    response_format={"type": "json_object"},  # JSON 模式
    
    # 流式
    stream=True,
    
    # 工具
    tools=[...],
    tool_choice="auto",   # auto | none | required
    
    # 其他
    n=1,                  # 生成几个候选
    seed=42,              # 固定随机种子(部分模型支持)
    user="user-123",      # 用户标识,用于监控
)

错误处理与重试

python
import time
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError

def robust_chat(client, messages, model="qwen-turbo", max_retries=3):
    """带重试机制的 LLM 调用"""
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                timeout=30
            )
            return response.choices[0].message.content
            
        except RateLimitError:
            wait = 2 ** attempt  # 指数退避
            print(f"限流,等待 {wait}s 后重试...")
            time.sleep(wait)
            
        except APITimeoutError:
            print(f"超时,第 {attempt+1} 次重试...")
            
        except APIConnectionError as e:
            print(f"连接错误: {e}")
            break
    
    raise RuntimeError("LLM 调用失败,已达最大重试次数")

费用监控

python
def chat_with_cost_tracking(client, messages, model="qwen-turbo"):
    """调用并记录费用"""
    response = client.chat.completions.create(
        model=model, messages=messages
    )
    
    usage = response.usage
    
    # 千问价格(元/百万 token)
    prices = {
        "qwen-turbo": (0.3, 0.6),
        "qwen-plus": (0.8, 2.0),
        "qwen-max": (40, 120),
    }
    
    input_price, output_price = prices.get(model, (1, 2))
    cost = (
        usage.prompt_tokens * input_price / 1_000_000 +
        usage.completion_tokens * output_price / 1_000_000
    )
    
    print(f"输入: {usage.prompt_tokens} tokens")
    print(f"输出: {usage.completion_tokens} tokens")
    print(f"费用: ¥{cost:.6f}")
    
    return response.choices[0].message.content, cost

最佳实践

  1. 生产环境始终设置 timeout 参数
  2. 使用 seed 参数保证测试可复现
  3. 记录每次调用的 token 用量,监控成本
  4. 高并发场景使用异步客户端 AsyncOpenAI

本站内容由 褚成志 整理编写,仅供学习参考