运维脚本跑得慢,不是逻辑复杂,而是大量时间花在等待网络 IO 上。
for循环逐台 SSH 巡检 200 台服务器要 10 分钟,用 asyncio 并发执行只要 30 秒。本文给出 3 个可直接落地的 asyncio 运维场景,附完整代码。
痛点:串行脚本吃掉运维时间
每天早上巡检是运维的日常。一个典型场景:
- 200 台服务器,逐台 SSH 执行
df -h+free -h+uptime - 每台连接 + 执行约 3 秒(网络延迟 + 命令执行)
- 串行跑完:200 × 3 = 600 秒 ≈ 10 分钟
问题不在命令本身,而在 IO 等待。SSH 建连、等待响应的时间占了 90%,CPU 几乎在空转。
传统解决方案是多线程或 multiprocessing,但线程有 GIL 争抢、进程有内存开销。对于 IO 密集型运维任务,asyncio 是更优解——单线程事件循环,资源占用极低,代码结构清晰。
方案:asyncio + asyncssh 并发巡检
核心思路:把每台服务器的巡检封装为一个协程(coroutine),用 asyncio.gather() 并发执行全部任务。
依赖安装
pip install asyncssh aiohttp
场景一:批量 SSH 巡检
import asyncio
import asyncssh
import time
SERVERS = [
{"host": "10.0.1.1", "username": "ops"},
{"host": "10.0.1.2", "username": "ops"},
# ... 200 台
]
COMMANDS = ["df -h /", "free -h | grep Mem", "uptime"]
async def check_server(server: dict) -> dict:
"""单台服务器巡检"""
try:
async with asyncssh.connect(
server["host"],
username=server["username"],
known_hosts=None,
connect_timeout=10,
) as conn:
results = {}
for cmd in COMMANDS:
result = await conn.run(cmd, timeout=5)
results[cmd] = result.stdout.strip()
return {"host": server["host"], "status": "ok", "data": results}
except Exception as e:
return {"host": server["host"], "status": "fail", "error": str(e)}
async def main():
start = time.time()
# 限制并发数,避免同时打开 200 个 SSH 连接打爆 fd 限制
semaphore = asyncio.Semaphore(50)
async def limited_check(s):
async with semaphore:
return await check_server(s)
tasks = [limited_check(s) for s in SERVERS]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r["status"] == "ok")
fail = sum(1 for r in results if r["status"] == "fail")
elapsed = time.time() - start
print(f"巡检完成: {ok} 成功, {fail} 失败, 耗时 {elapsed:.1f}s")
# 输出失败节点
for r in results:
if r["status"] == "fail":
print(f" ❌ {r['host']}: {r['error']}")
asyncio.run(main())
关键点:asyncio.Semaphore(50) 控制并发上限。200 台服务器并不是同时打开 200 个连接,而是始终保持最多 50 个并发,避免文件描述符耗尽或被目标机器拒绝连接。
场景二:并发 HTTP 健康检查
微服务架构下,几十个服务的 /health 接口需要定时探测:
import asyncio
import aiohttp
ENDPOINTS = [
"https://svc-a.internal:8080/health",
"https://svc-b.internal:8080/health",
"https://svc-c.internal:8080/health",
# ... 更多服务
]
async def check_health(session: aiohttp.ClientSession, url: str) -> dict:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "status": resp.status, "healthy": resp.status == 200}
except Exception as e:
return {"url": url, "status": -1, "healthy": False, "error": str(e)}
async def main():
async with aiohttp.ClientSession() as session:
tasks = [check_health(session, url) for url in ENDPOINTS]
results = await asyncio.gather(*tasks)
unhealthy = [r for r in results if not r["healthy"]]
if unhealthy:
print(f"⚠️ {len(unhealthy)} 个服务异常:")
for r in unhealthy:
print(f" {r['url']} -> {r.get('error', f'HTTP {r['status']}')}")
else:
print(f"✅ 全部 {len(results)} 个服务正常")
asyncio.run(main())
场景三:并发日志采集 + 关键词告警
从多台服务器并发拉取最近日志,匹配关键词后集中告警:
async def grep_remote_log(conn, pattern="ERROR|OOMKilled|panic"):
"""远程抓取最近 100 行日志并匹配关键词"""
cmd = f'journalctl -n 100 --no-pager | grep -iE "{pattern}" || true'
result = await conn.run(cmd, timeout=10)
return result.stdout.strip()
async def scan_all_logs(servers):
semaphore = asyncio.Semaphore(30)
alerts = []
async def scan_one(server):
async with semaphore:
try:
async with asyncssh.connect(
server["host"], username=server["username"],
known_hosts=None, connect_timeout=10
) as conn:
output = await grep_remote_log(conn)
if output:
alerts.append({
"host": server["host"],
"matches": output.split("\n")
})
except Exception as e:
alerts.append({"host": server["host"], "error": str(e)})
await asyncio.gather(*[scan_one(s) for s in servers])
return alerts
避坑:3 个 asyncio 运维常见问题
坑 1:并发不加限制,连接被拒
直接 gather() 200 个 SSH 任务,目标机器 MaxStartups 默认 10:30:100,超过阈值会随机丢弃连接。
解法:始终用 asyncio.Semaphore 控制并发上限。SSH 场景建议 30-50,HTTP 场景可以放到 100-200。
坑 2:单个任务超时拖垮整体
一台机器网络不通,connect_timeout 不设的话默认等 TCP 超时(2 分钟+),拖慢整个批次。
解法:三层超时防护:
# 1. 连接超时
asyncssh.connect(..., connect_timeout=10)
# 2. 命令超时
conn.run(cmd, timeout=5)
# 3. 全局超时
await asyncio.wait_for(task, timeout=30)
坑 3:异常吞没,静默失败
asyncio.gather() 默认一个任务抛异常就中断所有任务。
解法:加 return_exceptions=True,让异常作为返回值而不是中断执行:
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"任务异常: {r}")
性能对比
| 方式 | 200 台巡检耗时 | 内存占用 | 代码复杂度 |
|---|---|---|---|
| for 循环串行 | ~600s | 低 | ⭐ |
| ThreadPool(50) | ~15s | 中(线程栈) | ⭐⭐ |
| asyncio + Semaphore(50) | ~12s | 极低 | ⭐⭐ |
| multiprocessing | ~15s | 高 | ⭐⭐⭐ |
asyncio 在 IO 密集场景下性能接近多线程,但内存占用远低于线程池和多进程方案。
总结
- IO 密集型运维任务(SSH 巡检、HTTP 探测、日志采集)是 asyncio 的最佳场景
Semaphore是必需品,不是可选项——不加限制等于 DDoS 自己的服务器- 三层超时(连接、命令、全局)缺一不可,否则一台故障机拖垮整个批次
- 代码量和多线程差不多,但资源消耗更低、调试更直观(没有锁竞争)
- 适合封装为每日巡检 cron,配合 Prometheus Pushgateway 推送巡检结果做可视化
下次再写 for server in servers 之前,想一想——这些等待时间,是不是可以并发掉?