饮墨

子安饮墨馀三斗,留与卿儿作赋来

Temporal 分布式工作流引擎生产实践:告别脆弱的 Cron Job 和消息队列

痛点:分布式任务编排的脆弱性

运维团队经常面对这样的场景:一个跨服务的业务流程——比如「用户注册 → 发邮件 → 创建资源 → 配置权限 → 通知上游」——被拆成 5 个 Cron Job + 3 个消息队列 + 若干重试逻辑。任何一个环节失败,排查成本极高:

  • Cron Job 没有状态持久化,失败后从头开始还是从断点续跑?靠人肉判断
  • 消息队列的消费者幂等性难保证,重试导致重复执行
  • 跨步骤的超时、取消、补偿逻辑散落在各处,维护噩梦
  • 流程可观测性几乎为零,只能靠日志 grep

Temporal 正是为解决这类问题而生的开源分布式工作流引擎。它将「工作流即代码」的理念落地,让你用普通编程语言(Go/Python/Java/TypeScript)定义复杂流程,引擎负责持久化、重试、超时、版本管理。

方案:Temporal 核心架构

Temporal 的核心设计:

组件 作用
Temporal Server 工作流状态持久化与调度(支持 PostgreSQL/MySQL/Cassandra 后端)
Worker 执行 Workflow 和 Activity 的进程,可水平扩展
Workflow 流程编排逻辑,必须确定性(Deterministic)
Activity 实际执行的任务(HTTP 调用、DB 操作等),可非确定性
Signal/Query 外部与运行中 Workflow 交互的机制

与传统方案对比:

特性 Cron + MQ Temporal
状态持久化 需自行实现 内置,自动记录每步状态
失败重试 手动编码 声明式配置(指数退避、最大次数)
超时管理 各服务各自实现 Workflow/Activity 级别统一配置
流程可视化 Web UI 实时查看每个 Workflow 执行状态
长时间运行 Cron 不适合 原生支持,Workflow 可运行数天甚至数月
版本管理 困难 内置 Versioning,平滑迁移

实操步骤

Step 1:部署 Temporal Server(Docker Compose 快速启动)

# 克隆官方 docker-compose 仓库
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose

# 使用 PostgreSQL 后端启动(生产推荐)
docker compose -f docker-compose-postgres.yml up -d

# 验证服务状态
docker compose ps
# temporal-server, temporal-ui, postgresql 均应为 running

# Web UI 访问:http://localhost:8080

生产环境推荐使用 Helm Chart 部署到 Kubernetes:

helm repo add temporal https://charts.temporal.io
helm install temporal temporal/temporal \
  --set server.replicaCount=3 \
  --set persistence.default.driver=sql \
  --set persistence.default.sql.driver=postgres \
  --set persistence.default.sql.host=your-pg-host \
  --set persistence.default.sql.port=5432 \
  --set persistence.default.sql.database=temporal \
  --namespace temporal --create-namespace

Step 2:用 Python SDK 编写工作流

安装 SDK:

pip install temporalio

定义一个运维场景的工作流——自动化服务器初始化:

# workflows.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy

with workflow.unsafe.imports_passed_through():
    from activities import (
        provision_instance,
        configure_network,
        install_agent,
        register_monitoring,
        notify_team,
    )


@workflow.defn
class ServerProvisionWorkflow:
    """服务器自动化初始化工作流"""

    @workflow.run
    async def run(self, server_spec: dict) -> dict:
        # Step 1: 创建实例
        instance = await workflow.execute_activity(
            provision_instance,
            server_spec,
            start_to_close_timeout=timedelta(minutes=10),
            retry_policy=RetryPolicy(
                maximum_attempts=3,
                backoff_coefficient=2.0,
            ),
        )

        # Step 2: 配置网络(依赖 Step 1 结果)
        await workflow.execute_activity(
            configure_network,
            instance["instance_id"],
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=RetryPolicy(maximum_attempts=5),
        )

        # Step 3: 安装监控 Agent
        await workflow.execute_activity(
            install_agent,
            instance["instance_id"],
            start_to_close_timeout=timedelta(minutes=8),
            retry_policy=RetryPolicy(maximum_attempts=3),
        )

        # Step 4: 注册到监控系统
        await workflow.execute_activity(
            register_monitoring,
            instance,
            start_to_close_timeout=timedelta(minutes=2),
        )

        # Step 5: 通知团队
        await workflow.execute_activity(
            notify_team,
            {"instance": instance, "status": "ready"},
            start_to_close_timeout=timedelta(seconds=30),
        )

        return {"status": "completed", "instance": instance}
# activities.py
from temporalio import activity

@activity.defn
async def provision_instance(spec: dict) -> dict:
    """调用云 API 创建实例"""
    # 实际调用 AWS/GCP API
    activity.logger.info(f"Provisioning instance: {spec['name']}")
    # ... 实际逻辑
    return {"instance_id": "i-abc123", "ip": "10.0.1.50"}

@activity.defn
async def configure_network(instance_id: str) -> None:
    """配置安全组、VPC 路由等"""
    activity.logger.info(f"Configuring network for {instance_id}")
    # ... 实际逻辑

@activity.defn
async def install_agent(instance_id: str) -> None:
    """SSH 到实例安装监控 Agent"""
    activity.logger.info(f"Installing agent on {instance_id}")
    # ... SSH + ansible/script

@activity.defn
async def register_monitoring(instance: dict) -> None:
    """注册到 Prometheus/Grafana"""
    activity.logger.info(f"Registering {instance['instance_id']} to monitoring")

@activity.defn
async def notify_team(payload: dict) -> None:
    """发送通知到钉钉/Slack"""
    activity.logger.info(f"Notifying team: {payload['status']}")

Step 3:启动 Worker 并触发工作流

# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ServerProvisionWorkflow
from activities import (
    provision_instance, configure_network,
    install_agent, register_monitoring, notify_team,
)

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="server-provision",
        workflows=[ServerProvisionWorkflow],
        activities=[
            provision_instance, configure_network,
            install_agent, register_monitoring, notify_team,
        ],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

触发工作流执行:

# trigger.py
import asyncio
from temporalio.client import Client
from workflows import ServerProvisionWorkflow

async def main():
    client = await Client.connect("localhost:7233")
    result = await client.execute_workflow(
        ServerProvisionWorkflow.run,
        {"name": "web-prod-03", "type": "c5.xlarge", "region": "us-east-1"},
        id="provision-web-prod-03",
        task_queue="server-provision",
    )
    print(f"Workflow completed: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Step 4:配置监控与告警

Temporal Server 暴露 Prometheus 指标,关键监控项:

# prometheus rules 片段
groups:
  - name: temporal
    rules:
      - alert: TemporalWorkflowTaskScheduleToStartLatencyHigh
        expr: histogram_quantile(0.95, sum(rate(temporal_workflow_task_schedule_to_start_latency_bucket[5m])) by (le, namespace)) > 2
        for: 5m
        annotations:
          summary: "Temporal Workflow Task 调度延迟过高(P95 > 2s)"

      - alert: TemporalActivityFailureRateHigh
        expr: sum(rate(temporal_activity_execution_failed_total[5m])) / sum(rate(temporal_activity_execution_total[5m])) > 0.1
        for: 3m
        annotations:
          summary: "Temporal Activity 失败率超过 10%"

      - alert: TemporalWorkflowRunning TooLong
        expr: temporal_workflow_endtoend_latency_seconds{quantile="0.99"} > 3600
        for: 10m
        annotations:
          summary: "存在 Workflow 运行超过 1 小时"

避坑指南

1. Workflow 代码必须确定性(Deterministic)

Workflow 函数中不能使用 time.time()random.random()、直接 HTTP 调用等非确定性操作。所有 I/O 必须通过 Activity 执行。违反此规则会导致 Workflow Replay 失败:

# ❌ 错误:Workflow 中直接调用外部服务
@workflow.defn
class BadWorkflow:
    @workflow.run
    async def run(self):
        import requests
        resp = requests.get("http://api.example.com")  # 禁止!

# ✅ 正确:通过 Activity 调用
@workflow.defn
class GoodWorkflow:
    @workflow.run
    async def run(self):
        result = await workflow.execute_activity(
            call_external_api,
            start_to_close_timeout=timedelta(seconds=30),
        )

2. 生产环境存储后端选型

  • 小规模(< 100 Workflow/s):PostgreSQL 即可,运维简单
  • 大规模(> 1000 Workflow/s):Cassandra/ScyllaDB,水平扩展能力强
  • 不要用 SQLite(仅开发测试),MySQL 性能介于 PG 和 Cassandra 之间

3. Worker 部署与扩缩容

  • Worker 是无状态进程,可以随意水平扩展
  • 建议 Worker 以 Kubernetes Deployment 部署,配合 HPA 按 CPU/任务积压量自动扩缩
  • 同一个 Task Queue 可以有多个 Worker 消费,Temporal Server 自动负载均衡
  • Worker 优雅退出:处理完当前任务再关闭,设置合理的 terminationGracePeriodSeconds

总结

Temporal 的核心价值在于:把分布式系统中最难的部分(状态管理、重试、超时、补偿)从业务代码中剥离出来,交给基础设施处理

适用场景判断: - 流程 > 3 步且有依赖关系 → 考虑 Temporal - 需要长时间运行(小时/天级别)→ 强烈推荐 - 需要人工审批节点 → Signal 机制完美适配 - 单步简单定时任务 → Cron 够用,别过度工程

生产落地建议:从一个非核心但痛点明显的流程开始试点(如服务器初始化、CI/CD 流水线编排),验证稳定后再推广到核心业务流程。

您还没有登录,请登录后发表评论。