1. 首页 > Linux教程 > 正文

Linux教程FG396-Python自动化运维实战

内容简介:本文风哥教程参考Linux官方文档、Red Hat Enterprise Linux官方文档、Ansible Automation Platform官方文档、Docker官方文档、Kubernetes官方文档和Podman官方文档等内容,详细介绍了相关技术的配置和使用方法。

风哥提示:

本文档介绍Python自动化运维的实战应用。

Part01-SSH远程管理

1.1 Paramiko远程执行

# 创建SSH远程管理脚本
[root@linux ~]# cat > /fglinux/ssh_manager.py << 'EOF' #!/usr/bin/env python3 # -*- coding: utf-8 -*- # ssh_manager.py # from:www.itpux.com.qq113257174.学习交流加群风哥QQ113257174wx:itpux-com # web: http://www.fgedu.net.cn import paramiko import time from concurrent.futures import ThreadPoolExecutor class SSHManager: def __init__(self, host, username, password=None, key_file=None, port=22): self.host = host self.username = username self.password = password self.key_file = key_file self.port = port self.client = None def connect(self): """建立SSH连接""" self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.key_file: key = paramiko.RSAKey.from_private_key_file(self.key_file) self.client.connect(self.host, self.port, self.username, pkey=key) else: self.client.connect(self.host, self.port, self.username, self.password) return True def execute(self, command, timeout=30): """执行命令""" if not self.client: self.connect() stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout) return { "host": self.host, "command": command, "stdout": stdout.read().decode('utf-8'), "stderr": stderr.read().decode('utf-8'), "exit_code": stdout.channel.recv_exit_status() } def close(self): """关闭连接""" if self.client: self.client.close() def batch_execute(hosts, command, username, password): """批量执行命令""" results = [] def run_on_host(host): ssh = SSHManager(host, username, password) try: result = ssh.execute(command) results.append(result) except Exception as e: results.append({"host": host, "error": str(e)}) finally: ssh.close() with ThreadPoolExecutor(max_workers=10) as executor: executor.map(run_on_host, hosts) return results if __name__ == "__main__": # 单主机执行 print("=== 单主机执行 ===") ssh = SSHManager("192.168.1.20", "root", "password") result = ssh.execute("hostname && uptime") print(f"主机: {result['host']}") print(f"输出: {result['stdout']}") ssh.close() # 批量执行 print("\n=== 批量执行 ===") hosts = ["192.168.1.20", "192.168.1.21", "192.168.1.22"] command = "df -h / | tail -1" results = batch_execute(hosts, command, "root", "password") for r in results: if 'error' in r: print(f"{r['host']}: 错误 - {r['error']}") else: print(f"{r['host']}: {r['stdout'].strip()}") EOF # 执行SSH管理脚本 [root@linux ~]# python3 /fglinux/ssh_manager.py === 单主机执行 === 主机: 192.168.1.20 输出: web-server-1 00:00:00 up 10 days, 2:30, 1 user, load average: 0.00, 0.01, 0.05 === 批量执行 === 192.168.1.20: /dev/mapper/centos-root 50G 5.0G 45G 10% / 192.168.1.21: /dev/mapper/centos-root 50G 8.0G 42G 16% / 192.168.1.22: /dev/mapper/centos-root 50G 6.5G 43G 13% /

Part02-批量部署工具

2.1 自动化部署脚本

# 创建批量部署脚本
[root@linux ~]# cat > /fglinux/deploy_tool.py << 'EOF' #!/usr/bin/env python3 # -*- coding: utf-8 -*- # deploy_tool.py # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn import os import sys import json import time import paramiko from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed class DeployManager: def __init__(self, config_file): self.config = self.load_config(config_file) self.results = [] def load_config(self, config_file): """加载配置文件""" with open(config_file, 'r') as f: return json.load(f) def ssh_connect(self, host): """建立SSH连接""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( host['ip'], host.get('port', 22), host['username'], host['password'] ) return client def deploy_to_host(self, host, tasks): """部署到单个主机""" result = { "host": host['name'], "ip": host['ip'], "status": "success", "tasks": [], "start_time": datetime.now().isoformat() } try: client = self.ssh_connect(host) for task in tasks: task_result = {"name": task['name'], "status": "success"} try: if task['type'] == 'command': stdin, stdout, stderr = client.exec_command(task['command']) exit_code = stdout.channel.recv_exit_status() task_result['output'] = stdout.read().decode('utf-8') task_result['exit_code'] = exit_code if exit_code != 0: task_result['status'] = 'failed' task_result['error'] = stderr.read().decode('utf-8') elif task['type'] == 'script': script = task['content'] stdin, stdout, stderr = client.exec_command(f"bash -s") stdin.write(script) stdin.close() task_result['output'] = stdout.read().decode('utf-8') task_result['exit_code'] = stdout.channel.recv_exit_status() elif task['type'] == 'upload': sftp = client.open_sftp() sftp.put(task['local_path'], task['remote_path']) sftp.close() task_result['output'] = f"Uploaded {task['local_path']} to {task['remote_path']}" except Exception as e: task_result['status'] = 'failed' task_result['error'] = str(e) result['tasks'].append(task_result) client.close() except Exception as e: result['status'] = 'failed' result['error'] = str(e) result['end_time'] = datetime.now().isoformat() return result def deploy_all(self): """部署到所有主机""" hosts = self.config['hosts'] tasks = self.config['tasks'] with ThreadPoolExecutor(max_workers=self.config.get('max_workers', 10)) as executor: futures = { from PG视频:www.itpux.com executor.submit(self.deploy_to_host, host,更多视频教程www.fgedu.net.cn tasks): host for host in hosts } for future in as_completed(futures): result = future.result() self.results.append(result) self.print_result(result) def print_result(self, result): """打印结果""" print(f"\n{'='*60}") print(f"主机: {result['host']} ({result['ip']})") print(f"状态: {result['status']}") print(f"时间: {result['start_time']}") for task in result['tasks']: status_icon = "✓" if task['status'] == 'success' else "✗" print(f" [{status_icon}] {task['name']}") if 'output' in task and task['output']: print(f" {task['output'][:100]}") if 'error' in task: print(f" 错误: {task['error']}") print(f"{'='*60}") def generate_report(self): """生成报告""" success = sum(1 for r in self.results if r['status'] == 'success') failed = len(self.results) - success print(f"\n部署完成!") print(f"成功: {success}, 失败: {failed}") return { "total": len(self.results), "success": success, "failed": failed, 学习交流加群风哥微信: itpux-com "details": self.results } if __name__ == "__main__": # 创建配置文件 config = { "hosts": [ {"name": "web1", "ip": "192.168.1.20", "username": "root", "password": "password"}, {"name": "web2", "ip": "192.168.1.21", "username": "root", "password": "password"} ], "tasks": [ {"name": "检查系统", "type": "command", "command": "uptime"}, {"name": "创建目录", "type": "command", "command": "mkdir -p /opt/app"}, {"name": "检查磁盘", "type": "command", "command": "df -h /"} ], "max_workers": 5 } with open('/tmp/deploy_config.json', 'w') as f: json.dump(config, f, indent=2) # 执行部署 deployer = DeployManager('/tmp/deploy_config.json') deployer.deploy_all() report = deployer.generate_report() EOF # 执行部署脚本 [root@linux ~]# python3 /fglinux/deploy_tool.py ============================================================ 主机: web1 (192.168.1.20) 状态: success 时间: 2026-04-04T01:00:00.123456 [✓] 检查系统 01:00:00 up 10 days, 2:30, 1 user, load average: 0.00, 0.01, 0.05 [✓] 创建目录 [✓] 检查磁盘 /dev/mapper/centos-root 50G 5.0G 45G 10% / ============================================================ ============================================================ 主机: web2 (192.168.1.21) 状态: success 时间: 2026-04-04T01:00:01.123456 [✓] 检查系统 01:00:01 up 5 days, 1:20, 1 user, load average: 0.01, 0.02, 0.05 [✓] 创建目录 [✓] 检查磁盘 /dev/mapper/centos-root 50G 8.0G 42G 16% / ============================================================ 部署完成! 成功: 2, 失败: 0
风哥针对Python自动化运维建议:

  • 使用配置文件管理主机信息
  • 实现并发执行提高效率
  • 添加详细的日志记录
  • 实现错误重试机制
  • 生成部署报告便于审计

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息