1. 首页 > Linux教程 > 正文

Linux教程FG483-Linux自动化运维平台

内容简介:本文风哥教程参考Linux官方文档、Red Hat Enterprise Linux官方文档、Ansible Automation Platform官方文档、Docker官方文档、Kubernetes官方文档和Podman官方文档等内容,详细介绍了相关技术的配置和使用方法。

本文档介绍Linux自动化运维平台搭建实战。

风哥提示:

Part01-平台架构设计

1.1 运维平台规划

# 运维平台架构设计
[root@fgedu-ops ~]# cat > /root/ops-platform-design.txt << 'EOF' FGEDU自动化运维平台架构 ====================== 1. 核心组件 - CMDB: 资产管理 - Ansible: 配置管理 - Jenkins: CI/CD - Prometheus: 监控告警 - Grafana: 可视化 2. 功能模块 - 资产管理: 主机、应用、服务 - 配置管理: 批量配置、部署 - 任务调度: 定时任务、作业管理 - 监控告警: 性能监控、告警通知 - 日志管理: 集中日志、分析 3.学习交流加群风哥QQ113257174 技术栈 - 后端: Python Django/Flask - 前端: Vue.js - 数据库: MySQL/PostgreSQL - 消息队列: Redis/RabbitMQ - 任务调度: Celery 4. 集成接口 - API接口: RESTful - Webhook: 事件触发 - 插件系统: 功能扩展 EOF

Part02-CMDB资产管理

2.1 资产采集系统

# 创建资产采集脚本
[root@fgedu-ops ~]# cat > /usr/local/bin/asset-collector.py << 'EOF' #!/usr/bin/env python3 # -*- coding: utf-8 -*- # asset-collector.py # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn import json import subprocess import platform import psutil import socket import requests def get_system_info(): info = {} info['hostname'] = socket.gethostname() info['os'] = platform.system() info['os_version'] = platform.version() info['kernel'] = platform.release() info['arch'] = platform.machine() return info def get_cpu_info(): cpu_info = {} cpu_info['cores'] = psutil.cpu_count(logical=False) cpu_info['threads'] = psutil.cpu_count(logical=True) cpu_info['usage'] = psutil.cpu_percent(interval=1) return cpu_info def get_memory_info(): mem = psutil.virtual_memory() mem_info = {} mem_info['total'] = round(mem.total / (1024**3), 2) mem_info['used'] = round(mem.used / (1024**3), 2) mem_info['percent'] = mem.percent return mem_info def get_disk_info(): disks = [] for partition in psutil.disk_partitions(): try: usage = psutil.disk_usage(partition.mountpoint) disks.append({ 'device': partition.device, 'mountpoint': partition.mountpoint, 'fstype': partition.fstype, 'total': round(usage.total / (1024**3), 2), 'used': round(usage.used / (1024**3), 2), 'percent': usage.percent }) except: pass return disks def get_network_info(): net_info = {} for name, addrs in psutil.net_if_addrs().items(): for addr in addrs: if addr.family == socket.AF_INET: net_info[name] = addr.address return net_info def collect_all(): asset = { 'system': get_system_info(), 'cpu': get_cpu_info(), 'memory': get_memory_info(), 'disk': get_disk_info(), 'network': get_network_info() } return asset if __name__ == '__main__': asset_data = collect_all() print(json.dumps(asset_data, indent=2)) EOF [root@fgedu-ops ~]# chmod +x /usr/local/bin/asset-collector.py # 执行资产采集 [root@fgedu-ops ~]# python3 /usr/local/bin/asset-collector.py { "system": { "hostname": "fgedu-ops", "os": "Linux", "os_version": "#1 SMP PREEMPT_DYNAMIC Sat Apr 4 23:00:00 CST 2026", "kernel": "5.14.0-284.11.1.el9_2.x86_64", "arch": "x86_64" }, "cpu": { "cores": 4, "threads": 8, "usage": 5.2 }, "memory": { "total": 31.25, "used": 8.5, "percent": 27.2 }, "disk": [ { "device": "/dev/sda1", "mountpoint": "/", "fstype": "xfs", "total": 100.0, "used": 25.5, "percent": 25.5 } ], "network": { "eth0": "192.168.1.100" } } # 创建批量采集脚本 [root@fgedu-ops ~]# cat > /usr/local/bin/batch-collect.sh << 'EOF' #!/bin/bash # batch-collect.sh # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn HOSTS_FILE="/etc/ansible/hosts" OUTPUT_DIR="/var/log/assets" mkdir -p $OUTPUT_DIR for host in $(grep -E "^[0-9]" $HOSTS_FILE | awk '{print $1}'); do echo "采集主机: $host" ssh root@$host "python3 /usr/local/bin/asset-collector.py" > $OUTPUT_DIR/${host}.json 2>/dev/null
done

echo “资产采集完成,结果保存在 $OUTPUT_DIR”
EOF

[root@fgedu-ops ~]# chmod +x /usr/local/bin/batch-collect.sh

Part03-任务调度系统

3.1 定时任务管理

# 创建任务调度脚本
[root@fgedu-ops ~]# cat > /usr/local/bin/task-scheduler.py << 'EOF' #!/usr/bin/env python3 # -*- coding: utf-8 -*- # task-scheduler.py # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn import os import json import time import subprocess from datetime import datetime import threading TASKS_FILE = "/etc/ops/tasks.json" LOG_FILE = "/var/log/ops/tasks.log" def load_tasks(): if os.path.exists(TASKS_FILE): with open(TASKS_FILE, 'r') as f: return json.load(f) return [] def save_tasks(tasks): with open(TASKS_FILE, 'w') as f: json.dump(tasks, f, indent=2) def run_task(task): log(f"执行任务: {task['name']}") start_time = time.time() try: result = subprocess.run( task['command'], shell=True, capture_output=True, text=True, timeout=task.get('timeout', 300) ) duration = time.time() - start_time status = 'success' if result.returncode == 0 else 'failed' log(f"任务完成: {task['name']}, 状态: {status}, 耗时: {duration:.2f}s") if task.get('notify'): send_notification(task['name'], status) except subprocess.TimeoutExpired: log(f"任务超时: {task['name']}") except Exception as e: log(f"任务异常: {task['name']}, 错误: {str(e)}") def log(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with open(LOG_FILE, 'a') as f: f.write(f"[{timestamp}] {message}\n") print(f"[{timestamp}] {message}") def send_notification(task_name, status): pass def check_and_run(): tasks = load_tasks() current_time = datetime.now() for task in tasks: if not task.get('enabled', True): continue schedule = task.更多视频教程www.fgedu.net.cnget('schedule', {}) if should_run(schedule, current_time): threading.Thread(target=run_task, args=(task,)).start() def should_run(schedule, current_time): return False if __name__ == '__main__': print("任务调度器启动...") while True: check_and_run() time.sleep(60) EOF # 创建任务配置 [root@fgedu-ops ~]# mkdir -p /etc/ops [root@fgedu-ops ~]# cat > /etc/ops/tasks.json << 'EOF' [ { "id": 1, "name": "系统健康检查", "command": "/usr/local/bin/health-check.sh", "schedule": { "type": "interval", "minutes": 30 }, "timeout": 300, "enabled": true, "notify": true }, { "id": 2, "name": "日志清理", "command": "find /var/log -name '*.log' -mtime +30 -delete", "schedule": { "type": "daily", "hour": 2, "minute": 0 }, "timeout": 600, "enabled": true, "notify": false }, { "id": 3, "name": "备份任务", "command": "/usr/local/bin/backup.sh", "schedule": { "type": "daily", "hour": 3, "minute": 0 }, "timeout": 3600, "enabled": true, "notify": true } ] EOF # 创建健康检查脚本 [root@fgedu-ops ~]# cat > /usr/local/bin/health-check.sh << 'EOF' #!/bin/bash # health-check.sh # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn echo "=== 系统健康检查 ===" echo "检查时间: $(date)" echo "" echo "1. CPU使用率: $(top -bn1 | grep 'Cpu(s)' | awk '{print $2}')%" echo "2. 内存使用率: $(free | awk '/Mem/{printf("%.1f"), $3/$2*100}')%" echo "3. 磁盘使用率:" df -h | grep -E '^/dev' echo "" echo "4. 关键服务状态:" for svc in nginx mysql redis; do systemctl is-active $svc > /dev/null 2>&1 && echo “$svc: 运行中” || echo “$svc: 已停止”
done

echo “”
echo “=== 检查完成 ===”
EOF

[root@fgedu-ops ~]# chmod +x /usr/local/bin/health-check.sh

Part04-监控告警集成

4.1 告警通知系统

# 创建告警通知脚本
[root@fgedu-ops ~]# cat > /usr/local/bin/alert-notify.py << 'EOF' #!/usr/bin/env python3 # -*- coding: utf-8 -*- # alert-notify.py # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn import json import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import requests class AlertNotifier: def __init__(from PG视频:www.itpux.comself, config_file='/etc/ops/alert-config.json'): with open(config_file, 'r') as f: self.config = json.load(更多学习教程公众号风哥教程itpux_comf) def send_email(self, subject, message): msg = MIMEMultipart() msg['From'] = self.config['email']['from'] msg['To'] = ','.join(self.config['email']['to']) msg['Subject'] = subject msg.attach(MIMEText(message, 'plain', 'utf-8')) try: server = smtplib.SMTP( self.config['email']['smtp_server'], self.config['email']['smtp_port'] ) server.starttls() server.login( self.config['email']['username'], self.config['email']['password'] ) server.send_message(msg) server.quit() print("邮件发送成功") except Exception as e: print(f"邮件发送失败: {e}") def send_webhook(self, alert_data): try: response = requests.post( self.config['webhook']['url'], json=alert_data, timeout=10 ) print(f"Webhook通知发送成功: {response.status_code}") except Exception as e: print(f"Webhook通知发送失败: {e}") def send_alert(self, alert_type, subject, message, data=None): if alert_type == 'email': self.send_email(subject, message) elif alert_type == 'webhook': self.send_webhook(data or {'subject': subject, 'message': message}) elif alert_type == 'all': self.send_email(subject, message) self.send_webhook(data or {'subject': subject, 'message': message}) if __name__ == '__main__': notifier = AlertNotifier() notifier.send_alert('email', '测试告警', '这是一条测试告警消息') EOF [root@fgedu-ops ~]# chmod +x /usr/local/bin/alert-notify.py # 创建告警配置 [root@fgedu-ops ~]# cat > /etc/ops/alert-config.json << 'EOF' { "email": { 学习交流加群风哥微信: itpux-com "smtp_server": "smtp.fgedu.net.cn", "smtp_port": 587, "from": "ops@fgedu.net.cn", "to": ["admin@fgedu.net.cn"], "username": "ops@fgedu.net.cn", "password": "password123" }, "webhook": { "url": "https://api.fgedu.net.cn/alert/webhook" }, "rules": [ { "name": "cpu_high", "condition": "cpu_usage > 80″,
“duration”: 300,
“severity”: “warning”,
“notify”: [“email”, “webhook”]
},
{
“name”: “disk_full”,
“condition”: “disk_usage > 90”,
“duration”: 60,
“severity”: “critical”,
“notify”: [“email”, “webhook”]
}
]
}
EOF
风哥针对自动化运维建议:

  • 建立完善的资产管理体系
  • 实现任务自动化调度
  • 配置多渠道告警通知
  • 定期审查和优化流程
  • 建立运维知识库

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息