痛点:分布式任务编排的脆弱性
运维团队经常面对这样的场景:一个跨服务的业务流程——比如「用户注册 → 发邮件 → 创建资源 → 配置权限 → 通知上游」——被拆成 5 个 Cron Job + 3 个消息队列 + 若干重试逻辑。任何一个环节失败,排查成本极高:
- Cron Job 没有状态持久化,失败后从头开始还是从断点续跑?靠人肉判断
- 消息队列的消费者幂等性难保证,重试导致重复执行
- 跨步骤的超时、取消、补偿逻辑散落在各处,维护噩梦
- 流程可观测性几乎为零,只能靠日志 grep
Temporal 正是为解决这类问题而生的开源分布式工作流引擎。它将「工作流即代码」的理念落地,让你用普通编程语言(Go/Python/Java/TypeScript)定义复杂流程,引擎负责持久化、重试、超时、版本管理。
方案:Temporal 核心架构
Temporal 的核心设计:
| 组件 | 作用 |
|---|---|
| Temporal Server | 工作流状态持久化与调度(支持 PostgreSQL/MySQL/Cassandra 后端) |
| Worker | 执行 Workflow 和 Activity 的进程,可水平扩展 |
| Workflow | 流程编排逻辑,必须确定性(Deterministic) |
| Activity | 实际执行的任务(HTTP 调用、DB 操作等),可非确定性 |
| Signal/Query | 外部与运行中 Workflow 交互的机制 |
与传统方案对比:
| 特性 | Cron + MQ | Temporal |
|---|---|---|
| 状态持久化 | 需自行实现 | 内置,自动记录每步状态 |
| 失败重试 | 手动编码 | 声明式配置(指数退避、最大次数) |
| 超时管理 | 各服务各自实现 | Workflow/Activity 级别统一配置 |
| 流程可视化 | 无 | Web UI 实时查看每个 Workflow 执行状态 |
| 长时间运行 | Cron 不适合 | 原生支持,Workflow 可运行数天甚至数月 |
| 版本管理 | 困难 | 内置 Versioning,平滑迁移 |
实操步骤
Step 1:部署 Temporal Server(Docker Compose 快速启动)
# 克隆官方 docker-compose 仓库
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
# 使用 PostgreSQL 后端启动(生产推荐)
docker compose -f docker-compose-postgres.yml up -d
# 验证服务状态
docker compose ps
# temporal-server, temporal-ui, postgresql 均应为 running
# Web UI 访问:http://localhost:8080
生产环境推荐使用 Helm Chart 部署到 Kubernetes:
helm repo add temporal https://charts.temporal.io
helm install temporal temporal/temporal \
--set server.replicaCount=3 \
--set persistence.default.driver=sql \
--set persistence.default.sql.driver=postgres \
--set persistence.default.sql.host=your-pg-host \
--set persistence.default.sql.port=5432 \
--set persistence.default.sql.database=temporal \
--namespace temporal --create-namespace
Step 2:用 Python SDK 编写工作流
安装 SDK:
pip install temporalio
定义一个运维场景的工作流——自动化服务器初始化:
# workflows.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from activities import (
provision_instance,
configure_network,
install_agent,
register_monitoring,
notify_team,
)
@workflow.defn
class ServerProvisionWorkflow:
"""服务器自动化初始化工作流"""
@workflow.run
async def run(self, server_spec: dict) -> dict:
# Step 1: 创建实例
instance = await workflow.execute_activity(
provision_instance,
server_spec,
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
maximum_attempts=3,
backoff_coefficient=2.0,
),
)
# Step 2: 配置网络(依赖 Step 1 结果)
await workflow.execute_activity(
configure_network,
instance["instance_id"],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=5),
)
# Step 3: 安装监控 Agent
await workflow.execute_activity(
install_agent,
instance["instance_id"],
start_to_close_timeout=timedelta(minutes=8),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Step 4: 注册到监控系统
await workflow.execute_activity(
register_monitoring,
instance,
start_to_close_timeout=timedelta(minutes=2),
)
# Step 5: 通知团队
await workflow.execute_activity(
notify_team,
{"instance": instance, "status": "ready"},
start_to_close_timeout=timedelta(seconds=30),
)
return {"status": "completed", "instance": instance}
# activities.py
from temporalio import activity
@activity.defn
async def provision_instance(spec: dict) -> dict:
"""调用云 API 创建实例"""
# 实际调用 AWS/GCP API
activity.logger.info(f"Provisioning instance: {spec['name']}")
# ... 实际逻辑
return {"instance_id": "i-abc123", "ip": "10.0.1.50"}
@activity.defn
async def configure_network(instance_id: str) -> None:
"""配置安全组、VPC 路由等"""
activity.logger.info(f"Configuring network for {instance_id}")
# ... 实际逻辑
@activity.defn
async def install_agent(instance_id: str) -> None:
"""SSH 到实例安装监控 Agent"""
activity.logger.info(f"Installing agent on {instance_id}")
# ... SSH + ansible/script
@activity.defn
async def register_monitoring(instance: dict) -> None:
"""注册到 Prometheus/Grafana"""
activity.logger.info(f"Registering {instance['instance_id']} to monitoring")
@activity.defn
async def notify_team(payload: dict) -> None:
"""发送通知到钉钉/Slack"""
activity.logger.info(f"Notifying team: {payload['status']}")
Step 3:启动 Worker 并触发工作流
# worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import ServerProvisionWorkflow
from activities import (
provision_instance, configure_network,
install_agent, register_monitoring, notify_team,
)
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="server-provision",
workflows=[ServerProvisionWorkflow],
activities=[
provision_instance, configure_network,
install_agent, register_monitoring, notify_team,
],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
触发工作流执行:
# trigger.py
import asyncio
from temporalio.client import Client
from workflows import ServerProvisionWorkflow
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
ServerProvisionWorkflow.run,
{"name": "web-prod-03", "type": "c5.xlarge", "region": "us-east-1"},
id="provision-web-prod-03",
task_queue="server-provision",
)
print(f"Workflow completed: {result}")
if __name__ == "__main__":
asyncio.run(main())
Step 4:配置监控与告警
Temporal Server 暴露 Prometheus 指标,关键监控项:
# prometheus rules 片段
groups:
- name: temporal
rules:
- alert: TemporalWorkflowTaskScheduleToStartLatencyHigh
expr: histogram_quantile(0.95, sum(rate(temporal_workflow_task_schedule_to_start_latency_bucket[5m])) by (le, namespace)) > 2
for: 5m
annotations:
summary: "Temporal Workflow Task 调度延迟过高(P95 > 2s)"
- alert: TemporalActivityFailureRateHigh
expr: sum(rate(temporal_activity_execution_failed_total[5m])) / sum(rate(temporal_activity_execution_total[5m])) > 0.1
for: 3m
annotations:
summary: "Temporal Activity 失败率超过 10%"
- alert: TemporalWorkflowRunning TooLong
expr: temporal_workflow_endtoend_latency_seconds{quantile="0.99"} > 3600
for: 10m
annotations:
summary: "存在 Workflow 运行超过 1 小时"
避坑指南
1. Workflow 代码必须确定性(Deterministic)
Workflow 函数中不能使用 time.time()、random.random()、直接 HTTP 调用等非确定性操作。所有 I/O 必须通过 Activity 执行。违反此规则会导致 Workflow Replay 失败:
# ❌ 错误:Workflow 中直接调用外部服务
@workflow.defn
class BadWorkflow:
@workflow.run
async def run(self):
import requests
resp = requests.get("http://api.example.com") # 禁止!
# ✅ 正确:通过 Activity 调用
@workflow.defn
class GoodWorkflow:
@workflow.run
async def run(self):
result = await workflow.execute_activity(
call_external_api,
start_to_close_timeout=timedelta(seconds=30),
)
2. 生产环境存储后端选型
- 小规模(< 100 Workflow/s):PostgreSQL 即可,运维简单
- 大规模(> 1000 Workflow/s):Cassandra/ScyllaDB,水平扩展能力强
- 不要用 SQLite(仅开发测试),MySQL 性能介于 PG 和 Cassandra 之间
3. Worker 部署与扩缩容
- Worker 是无状态进程,可以随意水平扩展
- 建议 Worker 以 Kubernetes Deployment 部署,配合 HPA 按 CPU/任务积压量自动扩缩
- 同一个 Task Queue 可以有多个 Worker 消费,Temporal Server 自动负载均衡
- Worker 优雅退出:处理完当前任务再关闭,设置合理的
terminationGracePeriodSeconds
总结
Temporal 的核心价值在于:把分布式系统中最难的部分(状态管理、重试、超时、补偿)从业务代码中剥离出来,交给基础设施处理。
适用场景判断: - 流程 > 3 步且有依赖关系 → 考虑 Temporal - 需要长时间运行(小时/天级别)→ 强烈推荐 - 需要人工审批节点 → Signal 机制完美适配 - 单步简单定时任务 → Cron 够用,别过度工程
生产落地建议:从一个非核心但痛点明显的流程开始试点(如服务器初始化、CI/CD 流水线编排),验证稳定后再推广到核心业务流程。