饮墨

子安饮墨馀三斗,留与卿儿作赋来

AI Agent 生产环境可观测性:用 Langfuse 实现 LLM 调用链追踪与成本监控

痛点:AI Agent 上线后成了黑盒

你的 AI Agent 终于部署到生产环境了。用户开始使用,账单开始跑——然后问题来了:

  • 某个用户的请求为什么花了 45 秒才返回?是 LLM 慢还是 Tool Call 卡住了?
  • 这个月 Token 消耗比上月翻了 3 倍,到底哪个 Agent 在烧钱?
  • Agent 的多步推理链路中,哪一步的回答质量最差、用户反馈最多?

传统 APM(Datadog、Prometheus)能监控 HTTP 延迟和错误率,但对 LLM 应用的语义级可观测性完全无能为力。你需要的是能追踪每一次 LLM 调用的 prompt、completion、token 数、延迟、成本,并将多步 Agent 执行串成完整 trace 的专用工具。

Langfuse 是目前最成熟的开源 LLM 可观测性平台,填补了这个关键空白。

方案:Langfuse 核心架构

Langfuse 采用 Trace → Span 的 OpenTelemetry 式分层模型,专为 LLM 应用设计:

Trace (一次完整的 Agent 执行)
├── Span: user_input_processing
├── Generation: gpt-4o 调用 ( prompt/completion/token/cost)
├── Span: tool_call_web_search
├── Generation: gpt-4o 第二轮调用
└── Span: response_formatting

核心能力:

功能 说明
Trace 追踪 多步 Agent 执行串联为完整链路
Token & 成本统计 按模型、用户、功能维度拆分
Prompt 管理 版本化管理 prompt template,A/B 对比
评分体系 人工标注 + 自动评估(LLM-as-judge)
Dataset & 评估 构建评测集,CI 中跑回归测试

实操步骤

第一步:部署 Langfuse(Docker Compose 自托管)

对于生产环境,推荐自托管以保证数据安全:

# docker-compose.yml
version: "3.9"
services:
  langfuse:
    image: langfuse/langfuse:2
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://langfuse:langfuse@db:5432/langfuse
      - NEXTAUTH_URL=http://localhost:3000
      - NEXTAUTH_SECRET=your-secret-here-change-it
      - SALT=your-salt-here-change-it
      - ENCRYPTION_KEY=0000000000000000000000000000000000000000000000000000000000000000
    depends_on:
      - db

  db:
    image: postgres:16-alpine
    environment:
      - POSTGRES_USER=langfuse
      - POSTGRES_PASSWORD=langfuse
      - POSTGRES_DB=langfuse
    volumes:
      - pgdata:/var/lib/postgresql/data

volumes:
  pgdata:
docker compose up -d
# 访问 http://localhost:3000 完成初始化
# 创建项目 → 获取 Public Key + Secret Key

第二步:Python Agent 集成(装饰器模式)

Langfuse 提供 @observe() 装饰器,零侵入集成:

# pip install langfuse openai
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI

client = OpenAI()

@observe(as_type="generation")
def call_llm(messages: list, model: str = "gpt-4o") -> str:
    """LLM 调用自动追踪:记录 input/output/token/latency"""
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.7,
    )
    # Langfuse 自动捕获 token 用量和延迟
    langfuse_context.update_current_observation(
        model=model,
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens,
        },
    )
    return response.choices[0].message.content

@observe(as_type="span")
def search_knowledge_base(query: str) -> str:
    """工具调用也作为 Span 追踪"""
    # 模拟向量搜索
    import time
    time.sleep(0.3)  # 模拟检索延迟
    return f"检索结果: 与 '{query}' 相关的 3 条文档"

@observe()  # 顶层 Trace
def run_agent(user_question: str) -> str:
    """完整 Agent 执行链路追踪"""
    # Step 1: 检索
    context = search_knowledge_base(user_question)

    # Step 2: LLM 推理
    messages = [
        {"role": "system", "content": f"基于以下上下文回答问题:\n{context}"},
        {"role": "user", "content": user_question},
    ]
    answer = call_llm(messages)

    # 添加元数据用于后续筛选
    langfuse_context.update_current_trace(
        user_id="user_12345",
        metadata={"source": "api", "version": "v2.1"},
        tags=["rag", "production"],
    )
    return answer

# 执行
result = run_agent("Kubernetes Pod OOMKilled 怎么排查?")
print(result)

第三步:LangChain / LangGraph 集成(回调模式)

如果你的 Agent 基于 LangChain 构建:

from langfuse.callback import CallbackHandler

# 创建 Langfuse callback handler
langfuse_handler = CallbackHandler(
    public_key="pk-lf-xxx",
    secret_key="sk-lf-xxx",
    host="http://your-langfuse:3000",
)

# LangChain Agent 执行时传入
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(agent=agent, tools=tools)
result = agent_executor.invoke(
    {"input": "分析最近一周的服务器 CPU 趋势"},
    config={"callbacks": [langfuse_handler]},
)

第四步:成本监控 + 告警(Prometheus 集成)

Langfuse 提供 API 导出指标,配合 Prometheus 实现成本告警:

# exporter.py — 定时拉取 Langfuse 统计,暴露为 Prometheus 指标
from prometheus_client import start_http_server, Gauge
from langfuse import Langfuse
import time

langfuse = Langfuse()

cost_gauge = Gauge(
    "langfuse_daily_cost_usd",
    "Daily LLM cost in USD",
    ["project", "model"]
)

token_gauge = Gauge(
    "langfuse_daily_tokens_total",
    "Daily token consumption",
    ["project", "model", "type"]  # type: input/output
)

latency_gauge = Gauge(
    "langfuse_generation_latency_p95_seconds",
    "P95 generation latency",
    ["project", "model"]
)

def collect_metrics():
    """每 5 分钟采集一次"""
    # 通过 Langfuse API 获取统计数据
    # 实际生产中用 /api/public/metrics 端点
    generations = langfuse.get_generations(limit=100)
    # ... 聚合计算后更新 gauge
    pass

if __name__ == "__main__":
    start_http_server(8000)
    while True:
        collect_metrics()
        time.sleep(300)

Prometheus 告警规则:

# prometheus-rules.yml
groups:
  - name: llm_cost_alerts
    rules:
      - alert: LLMDailyCostHigh
        expr: sum(langfuse_daily_cost_usd) > 50
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "LLM 日消耗超过 $50"
          description: "当前日消耗: ${{ $value }},请检查是否有异常调用"

      - alert: LLMLatencyHigh
        expr: langfuse_generation_latency_p95_seconds > 10
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "LLM 调用 P95 延迟超过 10s"

避坑指南

1. 异步上报避免阻塞业务请求

Langfuse SDK 默认异步批量上报,但需要确认配置正确:

from langfuse import Langfuse

# ✅ 正确:启用异步 + 批量
langfuse = Langfuse(
    public_key="pk-lf-xxx",
    secret_key="sk-lf-xxx",
    host="http://langfuse:3000",
    flush_at=20,       # 积累 20 条后批量发送
    flush_interval=5,  # 或每 5 秒发送一次
)

# ❌ 错误:程序退出前未 flush,丢失最后一批数据
# 正确做法:
import atexit
atexit.register(langfuse.flush)

2. 自托管 PostgreSQL 要配索引和定期清理

Langfuse 写入量大,生产环境务必:

-- 检查表膨胀
SELECT schemaname, tablename, 
       pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables 
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 10;

-- 配置自动清理(保留 90 天数据)
-- 在 Langfuse 环境变量中设置:
-- LANGFUSE_RETENTION_DAYS=90

3. 多 Agent 环境的 Trace 关联

当一个请求触发多个 Agent 协作时,用 trace_id 串联:

import uuid

# 主 Agent 生成 trace_id
trace_id = str(uuid.uuid4())

@observe(trace_id=trace_id)
def main_agent(query):
    sub_result = sub_agent(query, trace_id=trace_id)
    return synthesize(sub_result)

# 子 Agent 复用相同 trace_id → 在 Langfuse UI 中显示为同一条完整链路

总结

维度 传统 APM Langfuse
追踪粒度 HTTP 请求级 LLM 调用 + Token 级
成本归因 按用户/模型/功能拆分
质量评估 仅错误率 人工评分 + LLM-as-judge
Prompt 管理 版本化 + A/B 测试
部署方式 SaaS/自托管 开源自托管 / Cloud

核心结论:

  1. AI Agent 上生产必须有专用可观测性,传统 APM 不够用
  2. Langfuse 开源自托管,数据不出内网,适合企业级场景
  3. 集成成本极低:@observe() 装饰器 + 5 分钟搞定
  4. 成本监控是刚需——一个失控的 Agent 一天能烧掉上千美元

先跑起来,再逐步完善评估体系。LLM 可观测性不是锦上添花,是生产环境的底线。

您还没有登录,请登录后发表评论。