PostgreSQL教程FG292-PG运维自动化实战:Shell/Python脚本编写
本文档风哥主要介绍PostgreSQL运维自动化的实战经验,包括Shell和Python脚本的编写,用于自动化备份、监控、维护等任务。风哥教程参考PostgreSQL官方文档和运维最佳实践,适合企业级PostgreSQL运维自动化。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 运维自动化概述
运维自动化是指使用脚本、工具等手段,自动完成日常运维任务,提高运维效率和可靠性。PostgreSQL运维自动化的核心目标:
- 提高效率:减少手动操作,节省时间和人力
- 提高可靠性:减少人为错误,确保操作一致性
- 提高可追溯性:记录操作过程,便于审计和排查问题
- 实现标准化:统一操作流程,确保环境一致性
自动化可以显著提高运维效率,减少人为错误,确保操作的一致性和可靠性。对于PostgreSQL这样的数据库系统,自动化尤为重要,因为它涉及到数据的安全性和可靠性。
1.2 脚本语言选择
PostgreSQL运维自动化常用的脚本语言:
## 1. Shell脚本
– **优势:**
– 简单易用,适合系统级操作
– 与系统命令紧密集成
– 适合简单的自动化任务
– 不需要额外安装依赖
– **劣势:**
– 复杂逻辑处理能力有限
– 错误处理机制较弱
– 跨平台兼容性差
## 2. Python脚本
– **优势:**
– 强大的处理能力,适合复杂逻辑
– 丰富的库支持(如psycopg2、argparse)
– 良好的错误处理机制
– 跨平台兼容性好
– **劣势:**
– 需要安装Python环境
– 对于简单任务可能过于复杂
## 3. 选择建议
– **Shell脚本:**适合简单的系统级操作,如备份、启动/停止服务
– **Python脚本:**适合复杂的业务逻辑,如监控、数据分析
– **混合使用:**根据任务类型选择合适的语言,或在Shell脚本中调用Python脚本
1.3 PostgreSQL自动化任务
PostgreSQL常见的自动化任务:
- 备份与恢复:定期备份、备份验证、恢复测试
- 监控与告警:性能监控、状态检查、异常告警
- 日常维护:VACUUM、ANALYZE、索引重建
- 配置管理:参数调整、配置版本控制
- 部署与升级:自动化部署、版本升级
- 安全管理:权限检查、安全审计
Part02-生产环境规划与建议
2.1 环境搭建
自动化脚本的运行环境搭建:
## 1. Shell环境
– 确保bash版本在4.0以上
– 安装必要的系统工具:
“`bash
apt-get install -y curl wget cron rsync
“`
## 2. Python环境
– 安装Python 3.8+:
“`bash
apt-get install -y python3 python3-pip
“`
– 安装必要的Python库:
“`bash
pip3 install psycopg2-binary argparse logging configparser
“`
## 3. PostgreSQL客户端
– 确保安装psql客户端:
“`bash
apt-get install -y postgresql-client
“`
## 4. 环境变量配置
– 创建.env文件存储环境变量:
“`bash
# PostgreSQL连接信息
export PGHOST=localfgedu.net.cn
export PGPORT=5432
export PGUSER=pgsql export PGPASSWORD=your_password
export PGDATABASE=pgsql “`
– 在脚本中加载环境变量:
“`bash
source /path/to/.env
“`
2.2 脚本组织
脚本的组织和管理:
## 1. 目录结构
“`
/path/to/scripts/
├── backup/ # 备份脚本
│ ├── full_backup.sh
│ ├── incremental_backup.sh
│ └── verify_backup.sh
├── monitoring/ # 监控脚本
│ ├── check_status.py
│ ├── collect_metrics.py
│ └── alert.sh
├── maintenance/ # 维护脚本
│ ├── vacuum.sh
│ ├── reindex.sh
│ └── analyze.sh
├── deployment/ # 部署脚本
│ ├── deploy.sh
│ ├── upgrade.sh
│ └── rollback.sh
├── common/ # 通用函数
│ ├── functions.sh
│ └── utils.py
├── config/ # 配置文件
│ ├── backup.conf
│ ├── monitoring.conf
│ └── maintenance.conf
└── logs/ # 日志文件
├── backup.log
├── monitoring.log
└── maintenance.log
“`
## 2. 版本控制
– 使用Git进行版本控制:
“`bash
git init
git add .
git commit -m “Initial commit”
“`
## 3. 权限管理
– 设置脚本执行权限:
“`bash
chmod +x *.sh *.py
“`
– 限制配置文件权限:
“`bash
chmod 600 config/*.conf
chmod 600 .env
“`
2.3 安全考虑
自动化脚本的安全考虑:
- 密码管理:使用环境变量或加密的配置文件存储密码,避免硬编码
- 权限控制:限制脚本和配置文件的访问权限
- 日志管理:避免在日志中记录敏感信息
- 错误处理:妥善处理错误,避免暴露敏感信息
- 网络安全:使用SSH密钥认证,避免使用密码登录
- 定期审计:定期检查脚本和配置文件的安全性
Part03-生产环境项目实施方案
3.1 Shell脚本实现
3.1.1 备份脚本
# 全量备份脚本
# 加载环境变量
source /path/to/.env
# 配置
BACKUP_DIR=”/backup/pg_backup”
DATE=$(date +%Y%m%d%H%M%S)
BACKUP_FILE=”$BACKUP_DIR/full_backup_$DATE.tar.gz”
LOG_FILE=”$BACKUP_DIR/backup.log”
# 创建备份目录
mkdir -p $BACKUP_DIR
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始全量备份” >> $LOG_FILE
# 执行备份
pg_basebackup -h $PGHOST -p $PGPORT -U $PGUSER -D $BACKUP_DIR/tmp -F t -X fetch -z
# 检查备份是否成功
if [ $? -eq 0 ]; then
# 压缩备份文件
tar -czf $BACKUP_FILE -C $BACKUP_DIR/tmp .
# 清理临时文件
rm -rf $BACKUP_DIR/tmp
# 记录成功信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份成功:$BACKUP_FILE” >> $LOG_FILE
# 清理过期备份(保留30天)
find $BACKUP_DIR -name “full_backup_*.tar.gz” -mtime +30 -delete
else
# 记录失败信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份失败” >> $LOG_FILE
exit 1
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份完成” >> $LOG_FILE
3.1.2 监控脚本
# 数据库状态检查脚本
# 加载环境变量
source /path/to/.env
# 配置
LOG_FILE=”/path/to/scripts/logs/monitoring.log”
ALERT_FILE=”/path/to/scripts/logs/alert.log”
# 检查数据库连接
check_connection() {
psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT 1” > /dev/null 2>&1
return $?
}
# 检查数据库状态
check_status() {
STATUS=$(psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT pg_is_in_recovery()” -t)
return $?
}
# 检查连接数
check_connections() {
CONNECTIONS=$(psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT count(*) FROM
pg_stat_activity” -t)
return $?
}
# 发送告警
send_alert() {
local message=”$1″
echo “[$(date +%Y-%m-%d%H:%M:%S)] ALERT: $message” >> $ALERT_FILE
# 可以添加邮件或短信告警
# echo “$message” | mail -s “PostgreSQL Alert” admin@fgedu.net.cn
}
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始监控检查” >> $LOG_FILE
# 检查数据库连接
if ! check_connection; then
send_alert “数据库连接失败”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库连接失败” >> $LOG_FILE
else
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库连接正常” >> $LOG_FILE
# 检查数据库状态
if check_status; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库状态正常” >> $LOG_FILE
else
send_alert “数据库状态异常”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库状态异常” >> $LOG_FILE
fi
# 检查连接数
if check_connections; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 连接数检查完成” >> $LOG_FILE
else
send_alert “连接数检查失败”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 连接数检查失败” >> $LOG_FILE
fi
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 监控检查完成” >> $LOG_FILE
3.2 Python脚本实现
3.2.1 性能监控脚本
# 性能监控脚本
import psycopg2
import time
import logging
import argparse
import configparser
# 配置日志
logging.basicConfig(
filename=’/path/to/scripts/logs/performance.log’,
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# 读取配置
config = configparser.ConfigParser()
config.read(‘/path/to/scripts/config/monitoring.conf’)
# 数据库连接信息
db_config = {
‘fgedu.net.cn’: config.get(‘fgedudb’, ‘fgedu.net.cn’),
‘port’: config.get(‘fgedudb’, ‘port’),
‘fgedu’: config.get(‘fgedudb’, ‘fgedu’),
‘password’: config.get(‘fgedudb’, ‘password’),
‘fgedudb’: config.get(‘fgedudb’, ‘fgedudb’)
}
# 性能阈值
thresholds = {
‘cpu_usage’: config.getfloat(‘thresholds’, ‘cpu_usage’),
‘memory_usage’: config.getfloat(‘thresholds’, ‘memory_usage’),
‘disk_usage’: config.getfloat(‘thresholds’, ‘disk_usage’),
‘connection_count’: config.getint(‘thresholds’, ‘connection_count’),
‘slow_query_time’: config.getfloat(‘thresholds’, ‘slow_query_time’)
}
def get_db_connection():
“””获取数据库连接”””
try:
conn = psycopg2.connect(**db_config)
return conn
except Exception as e:
logging.error(f”数据库连接失败: {e}”)
return None
def check_performance():
“””检查数据库性能”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 检查连接数
cur.execute(“SELECT count(*) FROM pg_stat_activity”)
connection_count = cur.fetchone()[0]
logging.info(f”当前连接数: {connection_count}”)
if connection_count > thresholds[‘connection_count’]:
logging.warning(f”连接数超过阈值: {connection_count} > {thresholds[‘connection_count’]}”)
# 检查慢查询
cur.execute(“””
SELECT pid, usename, datname, now() – query_start as duration, query
FROM pg_stat_activity
WHERE state = ‘active’ AND now() – query_start > interval ‘5 seconds’
ORDER BY duration DESC
“””)
slow_queries = cur.fetchall()
if slow_queries:
logging.warning(f”发现慢查询: {len(slow_queries)}个”)
for query in slow_queries:
logging.warning(f”慢查询: PID={query[0]}, 用户={query[1]}, 数据库={query[2]}, 持续时间={query[3]}”)
# 检查表膨胀
cur.execute(“””
SELECT schemaname, tablename,
round(CASE WHEN otta=0 THEN 0.0 ELSE sml.relpages/otta::numeric END,1) AS tbloat,
CASE WHEN relpages < otta THEN 0 ELSE bs*(sml.relpages-otta)::bigint END AS wastedbytes FROM ( SELECT
schemaname, tablename, cc.reltuples, cc.relpages, bs, CEIL((cc.reltuples*((datahdr+ma- (CASE WHEN
datahdr%ma=0 THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)) AS otta FROM ( SELECT
ma,bs,schemaname,tablename, (datawidth+(hdr+ma-(case when hdr%ma=0 THEN ma ELSE hdr%ma END)))::numeric
AS datahdr, (maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma ELSE nullhdr%ma END))) AS nullhdr2
FROM ( SELECT schemaname, tablename, hdr, ma, bs, SUM((1-null_frac)*avg_width) AS datawidth,
MAX(null_frac) AS maxfracsum, hdr+( SELECT 1+count(*)/8 FROM pg_stats s2 WHERE null_frac<>0 AND
s2.schemaname = s.schemaname AND s2.tablename = s.tablename
) AS nullhdr
FROM pg_stats s, (
SELECT
(SELECT current_setting(‘block_size’)::numeric) AS bs,
CASE WHEN substring(v,12,3) IN (‘8.0′,’8.1′,’8.2’) THEN 27 ELSE 23 END AS hdr,
CASE WHEN v ~ ‘mingw32′ THEN 8 ELSE 4 END AS ma
FROM (SELECT version() AS v) AS foo
) AS constants
GROUP BY 1,2,3,4,5
) AS foo
) AS rs
JOIN pg_class cc ON cc.relname = rs.tablename
JOIN pg_namespace nn ON cc.relnamespace = nn.oid AND nn.nspname = rs.schemaname
) AS sml
WHERE sml.relpages – otta > 128
ORDER BY wastedbytes DESC
LIMIT 10
“””)
table_bloat = cur.fetchall()
if table_bloat:
logging.warning(f”发现表膨胀: {len(table_bloat)}个”)
for bloat in table_bloat:
logging.warning(f”表膨胀: {bloat[0]}.{bloat[1]}, 膨胀率: {bloat[2]}, 浪费空间: {bloat[3]}字节”)
except Exception as e:
logging.error(f”性能检查失败: {e}”)
finally:
if conn:
conn.close()
def main():
“””主函数”””
parser = argparse.ArgumentParser(description=’PostgreSQL性能监控脚本’)
parser.add_argument(‘–check’, action=’store_true’, help=’执行性能检查’)
args = parser.parse_args()
if args.check:
logging.info(“开始性能检查”)
check_performance()
logging.info(“性能检查完成”)
if __name__ == ‘__main__’:
main()
3.2.2 自动化维护脚本
# 自动化维护脚本
import psycopg2
import time
import logging
import argparse
import configparser
# 配置日志
logging.basicConfig(
filename=’/path/to/scripts/logs/maintenance.log’,
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# 读取配置
config = configparser.ConfigParser()
config.read(‘/path/to/scripts/config/maintenance.conf’)
# 数据库连接信息
db_config = {
‘fgedu.net.cn’: config.get(‘fgedudb’, ‘fgedu.net.cn’),
‘port’: config.get(‘fgedudb’, ‘port’),
‘fgedu’: config.get(‘fgedudb’, ‘fgedu’),
‘password’: config.get(‘fgedudb’, ‘password’),
‘fgedudb’: config.get(‘fgedudb’, ‘fgedudb’)
}
def get_db_connection():
“””获取数据库连接”””
try:
conn = psycopg2.connect(**db_config)
return conn
except Exception as e:
logging.error(f”数据库连接失败: {e}”)
return None
def run_vacuum():
“””执行VACUUM操作”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 执行VACUUM ANALYZE
logging.info(“开始执行VACUUM ANALYZE”)
cur.execute(“VACUUM ANALYZE”)
conn.commit()
logging.info(“VACUUM ANALYZE执行完成”)
except Exception as e:
logging.error(f”VACUUM执行失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def run_reindex():
“””执行索引重建”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 获取需要重建的索引
cur.execute(“””
SELECT schemaname, tablename, indexname
FROM pg_stat_fgedu_indexes
WHERE idx_scan = 0
ORDER BY schemaname, tablename, indexname
“””)
indexes = cur.fetchall()
if indexes:
logging.info(f”发现 {len(indexes)} 个未使用的索引,开始重建”)
for schema, table, index in indexes:
try:
cur.execute(f”REINDEX INDEX CONCURRENTLY {schema}.{index}”)
conn.commit()
logging.info(f”重建索引 {schema}.{index} 成功”)
except Exception as e:
logging.error(f”重建索引 {schema}.{index} 失败: {e}”)
conn.rollback()
else:
logging.info(“没有发现需要重建的索引”)
except Exception as e:
logging.error(f”索引重建失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def analyze_tables():
“””分析表统计信息”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 获取需要分析的表
cur.execute(“””
SELECT schemaname, tablename
FROM pg_stat_fgedu_tables
WHERE n_mod_since_analyze > 0
ORDER BY n_mod_since_analyze DESC
LIMIT 10
“””)
tables = cur.fetchall()
if tables:
logging.info(f”发现 {len(tables)} 个需要分析的表”)
for schema, table in tables:
try:
cur.execute(f”ANALYZE {schema}.{table}”)
conn.commit()
logging.info(f”分析表 {schema}.{table} 成功”)
except Exception as e:
logging.error(f”分析表 {schema}.{table} 失败: {e}”)
conn.rollback()
else:
logging.info(“没有发现需要分析的表”)
except Exception as e:
logging.error(f”表分析失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def main():
“””主函数”””
parser = argparse.ArgumentParser(description=’PostgreSQL自动化维护脚本’)
parser.add_argument(‘–vacuum’, action=’store_true’, help=’执行VACUUM操作’)
parser.add_argument(‘–reindex’, action=’store_true’, help=’执行索引重建’)
parser.add_argument(‘–analyze’, action=’store_true’, help=’执行表分析’)
parser.add_argument(‘–all’, action=’store_true’, help=’执行所有维护操作’)
args = parser.parse_args()
if args.vacuum or args.all:
run_vacuum()
if args.reindex or args.all:
run_reindex()
if args.analyze or args.all:
analyze_tables()
if __name__ == ‘__main__’:
main()
3.3 任务调度
3.3.1 Crontab配置
# 编辑crontab
# crontab -e
# 全量备份(每天凌晨2点)
0 2 * * * /path/to/scripts/backup/full_backup.sh
# 增量备份(每小时)
0 * * * * /path/to/scripts/backup/incremental_backup.sh
# 备份验证(每天凌晨3点)
0 3 * * * /path/to/scripts/backup/verify_backup.sh
# 监控检查(每5分钟)
*/5 * * * * /path/to/scripts/monitoring/check_status.sh
# 性能监控(每15分钟)
*/15 * * * * /path/to/scripts/monitoring/collect_metrics.py –check
# 维护操作(每周日凌晨4点)
0 4 * * 0 /path/to/scripts/maintenance/maintenance.py –all
# 清理日志(每周日凌晨5点)
0 5 * * 0 /path/to/scripts/common/cleanup_logs.sh
3.3.2 Systemd服务
# 创建服务文件
# /etc/systemd/system/pg-monitoring.service
[Unit]
Description=PostgreSQL Monitoring Service
After=network.target
[Service]
Type=simple
User=pgsql ExecStart=/usr/bin/python3 /path/to/scripts/monitoring/collect_metrics.py –check
Restart=always
RestartSec=300
[Install]
WantedBy=multi-fgedu.target
# 启用服务
systemctl enable pg-monitoring.service
systemctl start pg-monitoring.service
# 查看服务状态
systemctl status pg-monitoring.service
Part04-生产案例与实战讲解
4.1 备份自动化
4.1.1 场景描述
备份是数据库运维的重要组成部分,需要定期执行全量备份和增量备份,并验证备份的有效性。自动化备份可以确保备份的及时性和可靠性。
4.1.2 实现方案
## 1. 全量备份脚本
#!/bin/bash
# 全量备份脚本
# 加载环境变量
source /path/to/.env
# 配置
BACKUP_DIR=”/backup/pg_backup”
DATE=$(date +%Y%m%d%H%M%S)
BACKUP_FILE=”$BACKUP_DIR/full_backup_$DATE.tar.gz”
LOG_FILE=”$BACKUP_DIR/backup.log”
# 创建备份目录
mkdir -p $BACKUP_DIR
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始全量备份” >> $LOG_FILE
# 执行备份
pg_basebackup -h $PGHOST -p $PGPORT -U $PGUSER -D $BACKUP_DIR/tmp -F t -X fetch -z
# 检查备份是否成功
if [ $? -eq 0 ]; then
# 压缩备份文件
tar -czf $BACKUP_FILE -C $BACKUP_DIR/tmp .
# 清理临时文件
rm -rf $BACKUP_DIR/tmp
# 记录成功信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份成功:$BACKUP_FILE” >> $LOG_FILE
# 清理过期备份(保留30天)
find $BACKUP_DIR -name “full_backup_*.tar.gz” -mtime +30 -delete
# 发送通知
echo “PostgreSQL全量备份成功: $BACKUP_FILE” | mail -s “PostgreSQL Backup Success” admin@fgedu.net.cn
else
# 记录失败信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份失败” >> $LOG_FILE
# 发送告警
echo “PostgreSQL全量备份失败” | mail -s “PostgreSQL Backup Failed” admin@fgedu.net.cn
exit 1
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 全量备份完成” >> $LOG_FILE
## 2. 增量备份脚本
#!/bin/bash
# 增量备份脚本
# 加载环境变量
source /path/to/.env
# 配置
BACKUP_DIR=”/backup/pg_backup/incremental”
DATE=$(date +%Y%m%d%H%M%S)
BACKUP_FILE=”$BACKUP_DIR/incremental_backup_$DATE.tar.gz”
LOG_FILE=”$BACKUP_DIR/backup.log”
# 创建备份目录
mkdir -p $BACKUP_DIR
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始增量备份” >> $LOG_FILE
# 执行增量备份(使用wal-e)
wal-e backup-push $BACKUP_DIR
# 检查备份是否成功
if [ $? -eq 0 ]; then
# 记录成功信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 增量备份成功” >> $LOG_FILE
else
# 记录失败信息
echo “[$(date +%Y-%m-%d%H:%M:%S)] 增量备份失败” >> $LOG_FILE
# 发送告警
echo “PostgreSQL增量备份失败” | mail -s “PostgreSQL Incremental Backup Failed” admin@fgedu.net.cn
exit 1
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 增量备份完成” >> $LOG_FILE
## 3. 备份验证脚本
#!/bin/bash
# 备份验证脚本
# 加载环境变量
source /path/to/.env
# 配置
BACKUP_DIR=”/backup/pg_backup”
LOG_FILE=”$BACKUP_DIR/verify.log”
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始备份验证” >> $LOG_FILE
# 获取最新的全量备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/full_backup_*.tar.gz | head -1)
if [ -z “$LATEST_BACKUP” ]; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 没有找到全量备份” >> $LOG_FILE
exit 1
fi
# 验证备份文件
echo “[$(date +%Y-%m-%d%H:%M:%S)] 验证备份文件: $LATEST_BACKUP” >> $LOG_FILE
# 检查文件完整性
tar -tzf $LATEST_BACKUP > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 备份文件完整性检查成功” >> $LOG_FILE
else
echo “[$(date +%Y-%m-%d%H:%M:%S)] 备份文件完整性检查失败” >> $LOG_FILE
# 发送告警
echo “PostgreSQL备份文件损坏: $LATEST_BACKUP” | mail -s “PostgreSQL Backup Corrupted” admin@fgedu.net.cn
exit 1
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 备份验证完成” >> $LOG_FILE
4.2 监控自动化
4.2.1 场景描述
监控是数据库运维的重要组成部分,需要实时监控数据库的状态、性能和资源使用情况,及时发现和解决问题。
4.2.2 实现方案
## 1. 状态监控脚本
#!/bin/bash
# 数据库状态监控脚本
# 加载环境变量
source /path/to/.env
# 配置
LOG_FILE=”/path/to/scripts/logs/monitoring.log”
ALERT_FILE=”/path/to/scripts/logs/alert.log”
# 检查数据库连接
check_connection() {
psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT 1” > /dev/null 2>&1
return $?
}
# 检查数据库状态
check_status() {
STATUS=$(psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT pg_is_in_recovery()” -t)
return $?
}
# 检查复制状态
check_replication() {
if [ “$(psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT pg_is_in_recovery()” -t)” = “f” ];
then
# 主库检查从库状态
psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag FROM pg_replication_slots” > /dev/null 2>&1
else
# 从库检查复制延迟
psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE -c “SELECT pg_wal_lsn_diff(pg_last_wal_replay_lsn(),
pg_last_wal_receive_lsn()) as lag” > /dev/null 2>&1
fi
return $?
}
# 检查资源使用情况
check_resources() {
# 检查CPU使用率
CPU_USAGE=$(top -bn1 | grep “Cpu(s)” | sed “s/.*, *\([0-9.]*\)%* id.*/\1/” | awk ‘{print 100 – $1}’)
# 检查内存使用率
MEM_USAGE=$(free -m | awk ‘/Mem:/ {print $3/$2 * 100.0}’)
# 检查磁盘使用率
DISK_USAGE=$(df -h | grep ‘/dev/sda1’ | awk ‘{print $5}’ | sed ‘s/%//’)
echo “CPU使用率: $CPU_USAGE%” >> $LOG_FILE
echo “内存使用率: $MEM_USAGE%” >> $LOG_FILE
echo “磁盘使用率: $DISK_USAGE%” >> $LOG_FILE
# 检查阈值
if (( $(echo “$CPU_USAGE > 80” | bc -l) )); then
send_alert “CPU使用率超过80%: $CPU_USAGE%”
fi
if (( $(echo “$MEM_USAGE > 80” | bc -l) )); then
send_alert “内存使用率超过80%: $MEM_USAGE%”
fi
if [ $DISK_USAGE -gt 80 ]; then
send_alert “磁盘使用率超过80%: $DISK_USAGE%”
fi
}
# 发送告警
send_alert() {
local message=”$1″
echo “[$(date +%Y-%m-%d%H:%M:%S)] ALERT: $message” >> $ALERT_FILE
# 发送邮件告警
echo “$message” | mail -s “PostgreSQL Alert” admin@fgedu.net.cn
# 可以添加短信告警或其他告警方式
}
# 记录开始时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始监控检查” >> $LOG_FILE
# 检查数据库连接
if ! check_connection; then
send_alert “数据库连接失败”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库连接失败” >> $LOG_FILE
else
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库连接正常” >> $LOG_FILE
# 检查数据库状态
if check_status; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库状态正常” >> $LOG_FILE
else
send_alert “数据库状态异常”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 数据库状态异常” >> $LOG_FILE
fi
# 检查复制状态
if check_replication; then
echo “[$(date +%Y-%m-%d%H:%M:%S)] 复制状态正常” >> $LOG_FILE
else
send_alert “复制状态异常”
echo “[$(date +%Y-%m-%d%H:%M:%S)] 复制状态异常” >> $LOG_FILE
fi
# 检查资源使用情况
check_resources
fi
# 记录结束时间
echo “[$(date +%Y-%m-%d%H:%M:%S)] 监控检查完成” >> $LOG_FILE
## 2. 性能监控脚本
#!/usr/bin/env python3
# 性能监控脚本
import psycopg2
import time
import logging
import argparse
import configparser
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 配置日志
logging.basicConfig(
filename=’/path/to/scripts/logs/performance.log’,
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# 读取配置
config = configparser.ConfigParser()
config.read(‘/path/to/scripts/config/monitoring.conf’)
# 数据库连接信息
db_config = {
‘fgedu.net.cn’: config.get(‘fgedudb’, ‘fgedu.net.cn’),
‘port’: config.get(‘fgedudb’, ‘port’),
‘fgedu’: config.get(‘fgedudb’, ‘fgedu’),
‘password’: config.get(‘fgedudb’, ‘password’),
‘fgedudb’: config.get(‘fgedudb’, ‘fgedudb’)
}
# 邮件配置
email_config = {
‘smtp_server’: config.get(’email’, ‘smtp_server’),
‘smtp_port’: config.get(’email’, ‘smtp_port’),
‘smtp_fgedu’: config.get(’email’, ‘smtp_fgedu’),
‘smtp_password’: config.get(’email’, ‘smtp_password’),
‘recipient’: config.get(’email’, ‘recipient’)
}
# 性能阈值
thresholds = {
‘cpu_usage’: config.getfloat(‘thresholds’, ‘cpu_usage’),
‘memory_usage’: config.getfloat(‘thresholds’, ‘memory_usage’),
‘disk_usage’: config.getfloat(‘thresholds’, ‘disk_usage’),
‘connection_count’: config.getint(‘thresholds’, ‘connection_count’),
‘slow_query_time’: config.getfloat(‘thresholds’, ‘slow_query_time’),
‘replication_lag’: config.getint(‘thresholds’, ‘replication_lag’)
}
def send_email(subject, message):
“””发送邮件”””
try:
msg = MIMEMultipart()
msg[‘From’] = email_config[‘smtp_fgedu’]
msg[‘To’] = email_config[‘recipient’]
msg[‘Subject’] = subject
msg.attach(MIMEText(message, ‘plain’))
server = smtplib.SMTP(email_config[‘smtp_server’], email_config[‘smtp_port’])
server.starttls()
server.login(email_config[‘smtp_fgedu’], email_config[‘smtp_password’])
text = msg.as_string()
server.sendmail(email_config[‘smtp_fgedu’], email_config[‘recipient’], text)
server.quit()
logging.info(“邮件发送成功”)
except Exception as e:
logging.error(f”邮件发送失败: {e}”)
def get_db_connection():
“””获取数据库连接”””
try:
conn = psycopg2.connect(**db_config)
return conn
except Exception as e:
logging.error(f”数据库连接失败: {e}”)
send_email(“PostgreSQL监控告警”, f”数据库连接失败: {e}”)
return None
def check_performance():
“””检查数据库性能”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 检查连接数
cur.execute(“SELECT count(*) FROM pg_stat_activity”)
connection_count = cur.fetchone()[0]
logging.info(f”当前连接数: {connection_count}”)
if connection_count > thresholds[‘connection_count’]:
alert_msg = f”连接数超过阈值: {connection_count} > {thresholds[‘connection_count’]}”
logging.warning(alert_msg)
send_email(“PostgreSQL监控告警”, alert_msg)
# 检查慢查询
cur.execute(“””
SELECT pid, usename, datname, now() – query_start as duration, query
FROM pg_stat_activity
WHERE state = ‘active’ AND now() – query_start > interval ‘5 seconds’
ORDER BY duration DESC
“””)
slow_queries = cur.fetchall()
if slow_queries:
alert_msg = f”发现慢查询: {len(slow_queries)}个\n”
for query in slow_queries:
alert_msg += f”PID={query[0]}, 用户={query[1]}, 数据库={query[2]}, 持续时间={query[3]}\n”
logging.warning(alert_msg)
send_email(“PostgreSQL监控告警”, alert_msg)
# 检查复制延迟
cur.execute(“SELECT pg_is_in_recovery()”)
is_replica = cur.fetchone()[0]
if is_replica:
cur.execute(“””
SELECT pg_wal_lsn_diff(pg_last_wal_replay_lsn(), pg_last_wal_receive_lsn()) as lag
“””)
lag = cur.fetchone()[0]
if lag and lag > thresholds[‘replication_lag’]:
alert_msg = f”复制延迟超过阈值: {lag} > {thresholds[‘replication_lag’]}”
logging.warning(alert_msg)
send_email(“PostgreSQL监控告警”, alert_msg)
# 检查表膨胀
cur.execute(“””
SELECT schemaname, tablename,
round(CASE WHEN otta=0 THEN 0.0 ELSE sml.relpages/otta::numeric END,1) AS tbloat,
CASE WHEN relpages < otta THEN 0 ELSE bs*(sml.relpages-otta)::bigint END AS wastedbytes FROM ( SELECT
schemaname, tablename, cc.reltuples, cc.relpages, bs, CEIL((cc.reltuples*((datahdr+ma- (CASE WHEN
datahdr%ma=0 THEN ma ELSE datahdr%ma END))+nullhdr2+4))/(bs-20::float)) AS otta FROM ( SELECT
ma,bs,schemaname,tablename, (datawidth+(hdr+ma-(case when hdr%ma=0 THEN ma ELSE hdr%ma END)))::numeric
AS datahdr, (maxfracsum*(nullhdr+ma-(case when nullhdr%ma=0 THEN ma ELSE nullhdr%ma END))) AS nullhdr2
FROM ( SELECT schemaname, tablename, hdr, ma, bs, SUM((1-null_frac)*avg_width) AS datawidth,
MAX(null_frac) AS maxfracsum, hdr+( SELECT 1+count(*)/8 FROM pg_stats s2 WHERE null_frac<>0 AND
s2.schemaname = s.schemaname AND s2.tablename = s.tablename
) AS nullhdr
FROM pg_stats s, (
SELECT
(SELECT current_setting(‘block_size’)::numeric) AS bs,
CASE WHEN substring(v,12,3) IN (‘8.0′,’8.1′,’8.2’) THEN 27 ELSE 23 END AS hdr,
CASE WHEN v ~ ‘mingw32′ THEN 8 ELSE 4 END AS ma
FROM (SELECT version() AS v) AS foo
) AS constants
GROUP BY 1,2,3,4,5
) AS foo
) AS rs
JOIN pg_class cc ON cc.relname = rs.tablename
JOIN pg_namespace nn ON cc.relnamespace = nn.oid AND nn.nspname = rs.schemaname
) AS sml
WHERE sml.relpages – otta > 128
ORDER BY wastedbytes DESC
LIMIT 5
“””)
table_bloat = cur.fetchall()
if table_bloat:
alert_msg = f”发现表膨胀: {len(table_bloat)}个\n”
for bloat in table_bloat:
alert_msg += f”表: {bloat[0]}.{bloat[1]}, 膨胀率: {bloat[2]}, 浪费空间: {bloat[3]}字节\n”
logging.warning(alert_msg)
send_email(“PostgreSQL监控告警”, alert_msg)
except Exception as e:
logging.error(f”性能检查失败: {e}”)
send_email(“PostgreSQL监控告警”, f”性能检查失败: {e}”)
finally:
if conn:
conn.close()
def main():
“””主函数”””
parser = argparse.ArgumentParser(description=’PostgreSQL性能监控脚本’)
parser.add_argument(‘–check’, action=’store_true’, help=’执行性能检查’)
args = parser.parse_args()
if args.check:
logging.info(“开始性能检查”)
check_performance()
logging.info(“性能检查完成”)
if __name__ == ‘__main__’:
main()
4.3 维护自动化
4.3.1 场景描述
数据库维护是确保数据库性能和可靠性的重要措施,包括VACUUM、ANALYZE、索引重建等操作。自动化维护可以确保这些操作的定期执行。
4.3.2 实现方案
## 1. 维护脚本
#!/usr/bin/env python3
# 自动化维护脚本
import psycopg2
import time
import logging
import argparse
import configparser
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 配置日志
logging.basicConfig(
filename=’/path/to/scripts/logs/maintenance.log’,
level=logging.INFO,
format=’%(asctime)s – %(levelname)s – %(message)s’
)
# 读取配置
config = configparser.ConfigParser()
config.read(‘/path/to/scripts/config/maintenance.conf’)
# 数据库连接信息
db_config = {
‘fgedu.net.cn’: config.get(‘fgedudb’, ‘fgedu.net.cn’),
‘port’: config.get(‘fgedudb’, ‘port’),
‘fgedu’: config.get(‘fgedudb’, ‘fgedu’),
‘password’: config.get(‘fgedudb’, ‘password’),
‘fgedudb’: config.get(‘fgedudb’, ‘fgedudb’)
}
# 邮件配置
email_config = {
‘smtp_server’: config.get(’email’, ‘smtp_server’),
‘smtp_port’: config.get(’email’, ‘smtp_port’),
‘smtp_fgedu’: config.get(’email’, ‘smtp_fgedu’),
‘smtp_password’: config.get(’email’, ‘smtp_password’),
‘recipient’: config.get(’email’, ‘recipient’)
}
def send_email(subject, message):
“””发送邮件”””
try:
msg = MIMEMultipart()
msg[‘From’] = email_config[‘smtp_fgedu’]
msg[‘To’] = email_config[‘recipient’]
msg[‘Subject’] = subject
msg.attach(MIMEText(message, ‘plain’))
server = smtplib.SMTP(email_config[‘smtp_server’], email_config[‘smtp_port’])
server.starttls()
server.login(email_config[‘smtp_fgedu’], email_config[‘smtp_password’])
text = msg.as_string()
server.sendmail(email_config[‘smtp_fgedu’], email_config[‘recipient’], text)
server.quit()
logging.info(“邮件发送成功”)
except Exception as e:
logging.error(f”邮件发送失败: {e}”)
def get_db_connection():
“””获取数据库连接”””
try:
conn = psycopg2.connect(**db_config)
return conn
except Exception as e:
logging.error(f”数据库连接失败: {e}”)
send_email(“PostgreSQL维护告警”, f”数据库连接失败: {e}”)
return None
def run_vacuum():
“””执行VACUUM操作”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 执行VACUUM ANALYZE
logging.info(“开始执行VACUUM ANALYZE”)
start_time = time.time()
cur.execute(“VACUUM ANALYZE”)
conn.commit()
end_time = time.time()
duration = end_time – start_time
logging.info(f”VACUUM ANALYZE执行完成,耗时: {duration:.2f}秒”)
send_email(“PostgreSQL维护完成”, f”VACUUM ANALYZE执行完成,耗时: {duration:.2f}秒”)
except Exception as e:
logging.error(f”VACUUM执行失败: {e}”)
send_email(“PostgreSQL维护失败”, f”VACUUM执行失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def run_reindex():
“””执行索引重建”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 获取需要重建的索引
cur.execute(“””
SELECT schemaname, tablename, indexname
FROM pg_stat_fgedu_indexes
WHERE idx_scan = 0
ORDER BY schemaname, tablename, indexname
“””)
indexes = cur.fetchall()
if indexes:
logging.info(f”发现 {len(indexes)} 个未使用的索引,开始重建”)
重建_count = 0
失败_count = 0
for schema, table, index in indexes:
try:
start_time = time.time()
cur.execute(f”REINDEX INDEX CONCURRENTLY {schema}.{index}”)
conn.commit()
end_time = time.time()
duration = end_time – start_time
logging.info(f”重建索引 {schema}.{index} 成功,耗时: {duration:.2f}秒”)
重建_count += 1
except Exception as e:
logging.error(f”重建索引 {schema}.{index} 失败: {e}”)
失败_count += 1
conn.rollback()
send_email(“PostgreSQL维护完成”, f”索引重建完成,成功: {重建_count}, 失败: {失败_count}”)
else:
logging.info(“没有发现需要重建的索引”)
send_email(“PostgreSQL维护完成”, “没有发现需要重建的索引”)
except Exception as e:
logging.error(f”索引重建失败: {e}”)
send_email(“PostgreSQL维护失败”, f”索引重建失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def analyze_tables():
“””分析表统计信息”””
conn = get_db_connection()
if not conn:
return
try:
with conn.cursor() as cur:
# 获取需要分析的表
cur.execute(“””
SELECT schemaname, tablename
FROM pg_stat_fgedu_tables
WHERE n_mod_since_analyze > 0
ORDER BY n_mod_since_analyze DESC
LIMIT 10
“””)
tables = cur.fetchall()
if tables:
logging.info(f”发现 {len(tables)} 个需要分析的表”)
分析_count = 0
失败_count = 0
for schema, table in tables:
try:
start_time = time.time()
cur.execute(f”ANALYZE {schema}.{table}”)
conn.commit()
end_time = time.time()
duration = end_time – start_time
logging.info(f”分析表 {schema}.{table} 成功,耗时: {duration:.2f}秒”)
分析_count += 1
except Exception as e:
logging.error(f”分析表 {schema}.{table} 失败: {e}”)
失败_count += 1
conn.rollback()
send_email(“PostgreSQL维护完成”, f”表分析完成,成功: {分析_count}, 失败: {失败_count}”)
else:
logging.info(“没有发现需要分析的表”)
send_email(“PostgreSQL维护完成”, “没有发现需要分析的表”)
except Exception as e:
logging.error(f”表分析失败: {e}”)
send_email(“PostgreSQL维护失败”, f”表分析失败: {e}”)
conn.rollback()
finally:
if conn:
conn.close()
def main():
“””主函数”””
parser = argparse.ArgumentParser(description=’PostgreSQL自动化维护脚本’)
parser.add_argument(‘–vacuum’, action=’store_true’, help=’执行VACUUM操作’)
parser.add_argument(‘–reindex’, action=’store_true’, help=’执行索引重建’)
parser.add_argument(‘–analyze’, action=’store_true’, help=’执行表分析’)
parser.add_argument(‘–all’, action=’store_true’, help=’执行所有维护操作’)
args = parser.parse_args()
if args.vacuum or args.all:
run_vacuum()
if args.reindex or args.all:
run_reindex()
if args.analyze or args.all:
analyze_tables()
if __name__ == ‘__main__’:
main()
## 2. 清理脚本
#!/bin/bash
# 清理脚本
# 配置
LOG_DIR=”/path/to/scripts/logs”
BACKUP_DIR=”/backup/pg_backup”
# 清理日志文件(保留30天)
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始清理日志文件” >> $LOG_DIR/cleanup.log
find $LOG_DIR -name “*.log” -mtime +30 -delete
echo “[$(date +%Y-%m-%d%H:%M:%S)] 日志文件清理完成” >> $LOG_DIR/cleanup.log
# 清理备份文件(保留30天)
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始清理备份文件” >> $LOG_DIR/cleanup.log
find $BACKUP_DIR -name “*.tar.gz” -mtime +30 -delete
echo “[$(date +%Y-%m-%d%H:%M:%S)] 备份文件清理完成” >> $LOG_DIR/cleanup.log
# 清理临时文件
echo “[$(date +%Y-%m-%d%H:%M:%S)] 开始清理临时文件” >> $LOG_DIR/cleanup.log
rm -rf /tmp/pg_*
echo “[$(date +%Y-%m-%d%H:%M:%S)] 临时文件清理完成” >> $LOG_DIR/cleanup.log
Part05-风哥经验总结与分享
5.1
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
