饮墨

子安饮墨馀三斗,留与卿儿作赋来

用 Python asyncio 加速运维自动化:批量巡检 200 台服务器从 10 分钟压到 30 秒

运维脚本跑得慢,不是逻辑复杂,而是大量时间花在等待网络 IO 上。for 循环逐台 SSH 巡检 200 台服务器要 10 分钟,用 asyncio 并发执行只要 30 秒。本文给出 3 个可直接落地的 asyncio 运维场景,附完整代码。

痛点:串行脚本吃掉运维时间

每天早上巡检是运维的日常。一个典型场景:

  • 200 台服务器,逐台 SSH 执行 df -h + free -h + uptime
  • 每台连接 + 执行约 3 秒(网络延迟 + 命令执行)
  • 串行跑完:200 × 3 = 600 秒 ≈ 10 分钟

问题不在命令本身,而在 IO 等待。SSH 建连、等待响应的时间占了 90%,CPU 几乎在空转。

传统解决方案是多线程或 multiprocessing,但线程有 GIL 争抢、进程有内存开销。对于 IO 密集型运维任务,asyncio 是更优解——单线程事件循环,资源占用极低,代码结构清晰。

方案:asyncio + asyncssh 并发巡检

核心思路:把每台服务器的巡检封装为一个协程(coroutine),用 asyncio.gather() 并发执行全部任务。

依赖安装

pip install asyncssh aiohttp

场景一:批量 SSH 巡检

import asyncio
import asyncssh
import time

SERVERS = [
    {"host": "10.0.1.1", "username": "ops"},
    {"host": "10.0.1.2", "username": "ops"},
    # ... 200 台
]

COMMANDS = ["df -h /", "free -h | grep Mem", "uptime"]

async def check_server(server: dict) -> dict:
    """单台服务器巡检"""
    try:
        async with asyncssh.connect(
            server["host"],
            username=server["username"],
            known_hosts=None,
            connect_timeout=10,
        ) as conn:
            results = {}
            for cmd in COMMANDS:
                result = await conn.run(cmd, timeout=5)
                results[cmd] = result.stdout.strip()
            return {"host": server["host"], "status": "ok", "data": results}
    except Exception as e:
        return {"host": server["host"], "status": "fail", "error": str(e)}

async def main():
    start = time.time()
    # 限制并发数,避免同时打开 200 个 SSH 连接打爆 fd 限制
    semaphore = asyncio.Semaphore(50)

    async def limited_check(s):
        async with semaphore:
            return await check_server(s)

    tasks = [limited_check(s) for s in SERVERS]
    results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r["status"] == "ok")
    fail = sum(1 for r in results if r["status"] == "fail")
    elapsed = time.time() - start
    print(f"巡检完成: {ok} 成功, {fail} 失败, 耗时 {elapsed:.1f}s")

    # 输出失败节点
    for r in results:
        if r["status"] == "fail":
            print(f"  ❌ {r['host']}: {r['error']}")

asyncio.run(main())

关键点asyncio.Semaphore(50) 控制并发上限。200 台服务器并不是同时打开 200 个连接,而是始终保持最多 50 个并发,避免文件描述符耗尽或被目标机器拒绝连接。

场景二:并发 HTTP 健康检查

微服务架构下,几十个服务的 /health 接口需要定时探测:

import asyncio
import aiohttp

ENDPOINTS = [
    "https://svc-a.internal:8080/health",
    "https://svc-b.internal:8080/health",
    "https://svc-c.internal:8080/health",
    # ... 更多服务
]

async def check_health(session: aiohttp.ClientSession, url: str) -> dict:
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
            return {"url": url, "status": resp.status, "healthy": resp.status == 200}
    except Exception as e:
        return {"url": url, "status": -1, "healthy": False, "error": str(e)}

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [check_health(session, url) for url in ENDPOINTS]
        results = await asyncio.gather(*tasks)

    unhealthy = [r for r in results if not r["healthy"]]
    if unhealthy:
        print(f"⚠️ {len(unhealthy)} 个服务异常:")
        for r in unhealthy:
            print(f"  {r['url']} -> {r.get('error', f'HTTP {r['status']}')}")
    else:
        print(f"✅ 全部 {len(results)} 个服务正常")

asyncio.run(main())

场景三:并发日志采集 + 关键词告警

从多台服务器并发拉取最近日志,匹配关键词后集中告警:

async def grep_remote_log(conn, pattern="ERROR|OOMKilled|panic"):
    """远程抓取最近 100 行日志并匹配关键词"""
    cmd = f'journalctl -n 100 --no-pager | grep -iE "{pattern}" || true'
    result = await conn.run(cmd, timeout=10)
    return result.stdout.strip()

async def scan_all_logs(servers):
    semaphore = asyncio.Semaphore(30)
    alerts = []

    async def scan_one(server):
        async with semaphore:
            try:
                async with asyncssh.connect(
                    server["host"], username=server["username"],
                    known_hosts=None, connect_timeout=10
                ) as conn:
                    output = await grep_remote_log(conn)
                    if output:
                        alerts.append({
                            "host": server["host"],
                            "matches": output.split("\n")
                        })
            except Exception as e:
                alerts.append({"host": server["host"], "error": str(e)})

    await asyncio.gather(*[scan_one(s) for s in servers])
    return alerts

避坑:3 个 asyncio 运维常见问题

坑 1:并发不加限制,连接被拒

直接 gather() 200 个 SSH 任务,目标机器 MaxStartups 默认 10:30:100,超过阈值会随机丢弃连接。

解法:始终用 asyncio.Semaphore 控制并发上限。SSH 场景建议 30-50,HTTP 场景可以放到 100-200。

坑 2:单个任务超时拖垮整体

一台机器网络不通,connect_timeout 不设的话默认等 TCP 超时(2 分钟+),拖慢整个批次。

解法:三层超时防护:

# 1. 连接超时
asyncssh.connect(..., connect_timeout=10)
# 2. 命令超时
conn.run(cmd, timeout=5)
# 3. 全局超时
await asyncio.wait_for(task, timeout=30)

坑 3:异常吞没,静默失败

asyncio.gather() 默认一个任务抛异常就中断所有任务。

解法:加 return_exceptions=True,让异常作为返回值而不是中断执行:

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        print(f"任务异常: {r}")

性能对比

方式 200 台巡检耗时 内存占用 代码复杂度
for 循环串行 ~600s
ThreadPool(50) ~15s 中(线程栈) ⭐⭐
asyncio + Semaphore(50) ~12s 极低 ⭐⭐
multiprocessing ~15s ⭐⭐⭐

asyncio 在 IO 密集场景下性能接近多线程,但内存占用远低于线程池和多进程方案。

总结

  • IO 密集型运维任务(SSH 巡检、HTTP 探测、日志采集)是 asyncio 的最佳场景
  • Semaphore 是必需品,不是可选项——不加限制等于 DDoS 自己的服务器
  • 三层超时(连接、命令、全局)缺一不可,否则一台故障机拖垮整个批次
  • 代码量和多线程差不多,但资源消耗更低、调试更直观(没有锁竞争)
  • 适合封装为每日巡检 cron,配合 Prometheus Pushgateway 推送巡检结果做可视化

下次再写 for server in servers 之前,想一想——这些等待时间,是不是可以并发掉?

您还没有登录,请登录后发表评论。