内容大纲
1. ETL监控概述
ETL(Extract-Transform-Load)监控是数据仓库建设中的关键环节,它通过实时监控ETL作业的执行状态、数据质量、性能指标等,确保数据处理的准确性和及时性。ETL监控需要覆盖数据抽取、转换、加载的整个过程,及时发现和处理异常情况。
ETL监控的核心目标包括:
- 监控ETL作业的执行状态和进度
- 监控数据质量和数据完整性
- 监控ETL性能和资源使用
- 及时发现和处理ETL错误
- 确保数据处理的时效性
- 提供ETL运行的可视化展示
更多学习教程www.fgedu.net.cn
2. 监控架构设计
2.1 监控系统架构
# 部署Prometheus监控
$ docker run -d –name prometheus \
–network etl-monitoring \
-p 9090:9090 \
-v /data/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus:latest
# 部署Grafana可视化
$ docker run -d –name grafana \
–network etl-monitoring \
-p 3000:3000 \
-v /data/grafana:/var/lib/grafana \
grafana/grafana:latest
# 部署Alertmanager告警
$ docker run -d –name alertmanager \
–network etl-monitoring \
-p 9093:9093 \
-v /data/alertmanager:/etc/alertmanager \
prom/alertmanager:latest
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a1b2c3d4e5f6 prom/prometheus:latest “/bin/prometheus –c…” 5 seconds ago Up 4 seconds 0.0.0.0:9090->9090/tcp prometheus
f6e5d4c3b2a1 grafana/grafana:latest “/run.sh” 3 seconds ago Up 2 seconds 0.0.0.0:3000->3000/tcp grafana
1a2b3c4d5e6f prom/alertmanager:latest “/bin/alertmanager -…” 1 second ago Up 1 second 0.0.0.0:9093->9093/tcp alertmanager
2.2 监控指标配置
$ cat > /data/prometheus/prometheus.yml << 'EOF' global: scrape_interval: 15s evaluation_interval: 15s alerting: alertmanagers: - static_configs: - targets: - alertmanager:9093 rule_files: - "etl_alert_rules.yml" scrape_configs: - job_name: 'prometheus' static_configs: - targets: ['fgedudb:9090'] - job_name: 'etl-server' static_configs: - targets: ['etl-server:9091'] - job_name: 'kafka-exporter' static_configs: - targets: ['kafka-exporter:9308'] - job_name: 'mysql-exporter' static_configs: - targets: ['mysql-exporter:9104'] - job_name: 'node-exporter' static_configs: - targets: ['node-exporter:9100'] EOF
3. 作业监控
3.1 作业状态监控
$ cat > /usr/local/bin/etl_job_monitor.sh << 'EOF' #!/bin/bash # ETL作业列表 declare -A ETL_JOBS=( ["daily_extract"]="extract_data.sh" ["daily_transform"]="transform_data.sh" ["daily_load"]="load_data.sh" ["hourly_sync"]="sync_data.sh" ) # 监控作业状态 for job_name in "${!ETL_JOBS[@]}"; do job_script="${ETL_JOBS[$job_name]}" # 检查作业是否在运行 if pgrep -f "$job_script" > /dev/null; then
echo “etl_job_status{job=\”$job_name\”} 1″
# 获取作业运行时间
PID=$(pgrep -f “$job_script”)
RUNTIME=$(ps -p $PID -o etimes= | tr -d ‘ ‘)
echo “etl_job_runtime_seconds{job=\”$job_name\”} ${RUNTIME:-0}”
else
echo “etl_job_status{job=\”$job_name\”} 0″
echo “etl_job_runtime_seconds{job=\”$job_name\”} 0″
fi
# 检查作业最后执行时间
LOG_FILE=”/var/log/etl/${job_name}.log”
if [ -f “$LOG_FILE” ]; then
LAST_RUN=$(stat -c %Y “$LOG_FILE”)
CURRENT=$(date +%s)
TIME_SINCE_LAST=$((CURRENT – LAST_RUN))
echo “etl_job_time_since_last_run_seconds{job=\”$job_name\”} $TIME_SINCE_LAST”
fi
done
EOF
$ chmod +x /usr/local/bin/etl_job_monitor.sh
etl_job_status{job=”daily_extract”} 1
etl_job_runtime_seconds{job=”daily_extract”} 1800
etl_job_status{job=”daily_transform”} 0
etl_job_runtime_seconds{job=”daily_transform”} 0
etl_job_status{job=”daily_load”} 0
etl_job_runtime_seconds{job=”daily_load”} 0
etl_job_status{job=”hourly_sync”} 1
etl_job_runtime_seconds{job=”hourly_sync”} 300
etl_job_time_since_last_run_seconds{job=”daily_extract”} 1800
etl_job_time_since_last_run_seconds{job=”daily_transform”} 3600
etl_job_time_since_last_run_seconds{job=”daily_load”} 7200
etl_job_time_since_last_run_seconds{job=”hourly_sync”} 300
3.2 作业进度监控
$ cat > /usr/local/bin/etl_progress_monitor.sh << 'EOF' #!/bin/bash # 监控作业进度 monitor_job_progress() { local job_name=$1 local progress_file="/var/log/etl/${job_name}.progress" if [ -f "$progress_file" ]; then # 读取进度信息 TOTAL=$(grep "total_records" "$progress_file" | awk -F '=' '{print $2}') PROCESSED=$(grep "processed_records" "$progress_file" | awk -F '=' '{print $2}') FAILED=$(grep "failed_records" "$progress_file" | awk -F '=' '{print $2}') # 计算进度百分比 if [ "$TOTAL" -gt 0 ]; then PROGRESS=$((PROCESSED * 100 / TOTAL)) else PROGRESS=0 fi # 输出监控指标 echo "etl_job_total_records{job=\"$job_name\"} ${TOTAL:-0}" echo "etl_job_processed_records{job=\"$job_name\"} ${PROCESSED:-0}" echo "etl_job_failed_records{job=\"$job_name\"} ${FAILED:-0}" echo "etl_job_progress_percent{job=\"$job_name\"} ${PROGRESS:-0}" else echo "etl_job_total_records{job=\"$job_name\"} 0" echo "etl_job_processed_records{job=\"$job_name\"} 0" echo "etl_job_failed_records{job=\"$job_name\"} 0" echo "etl_job_progress_percent{job=\"$job_name\"} 0" fi } # 监控所有作业进度 for job in daily_extract daily_transform daily_load hourly_sync; do monitor_job_progress $job done EOF $ chmod +x /usr/local/bin/etl_progress_monitor.sh
etl_job_total_records{job=”daily_extract”} 1000000
etl_job_processed_records{job=”daily_extract”} 750000
etl_job_failed_records{job=”daily_extract”} 50
etl_job_progress_percent{job=”daily_extract”} 75
etl_job_total_records{job=”daily_transform”} 750000
etl_job_processed_records{job=”daily_transform”} 0
etl_job_failed_records{job=”daily_transform”} 0
etl_job_progress_percent{job=”daily_transform”} 0
etl_job_total_records{job=”daily_load”} 0
etl_job_processed_records{job=”daily_load”} 0
etl_job_failed_records{job=”daily_load”} 0
etl_job_progress_percent{job=”daily_load”} 0
etl_job_total_records{job=”hourly_sync”} 50000
etl_job_processed_records{job=”hourly_sync”} 50000
etl_job_failed_records{job=”hourly_sync”} 0
etl_job_progress_percent{job=”hourly_sync”} 100
学习交流加群风哥微信: itpux-com
4. 数据质量监控
4.1 数据完整性监控
$ cat > /usr/local/bin/data_quality_monitor.sh << 'EOF' #!/bin/bash # 数据库连接信息 DB_HOST="fgedudb" DB_USER="monitor" DB_PASS="monitor_password" DB_NAME="data_warehouse" # 监控数据完整性 monitor_data_integrity() { local table=$1 local date_column=$2 # 检查记录数 RECORD_COUNT=$(mysql -h $DB_HOST -u $DB_USER -p$PASS $DB_NAME -N -e \ "SELECT COUNT(*) FROM $table WHERE DATE($date_column) = CURDATE()") # 检查空值数量 NULL_COUNT=$(mysql -h $DB_HOST -u $DB_USER -p$PASS $DB_NAME -N -e \ "SELECT COUNT(*) FROM $table WHERE DATE($date_column) = CURDATE() AND id IS NULL") # 检查重复记录 DUPLICATE_COUNT=$(mysql -h $DB_HOST -u $DB_USER -p$PASS $DB_NAME -N -e \ "SELECT COUNT(*) - COUNT(DISTINCT id) FROM $table WHERE DATE($date_column) = CURDATE()") # 输出监控指标 echo "etl_data_record_count{table=\"$table\"} ${RECORD_COUNT:-0}" echo "etl_data_null_count{table=\"$table\"} ${NULL_COUNT:-0}" echo "etl_data_duplicate_count{table=\"$table\"} ${DUPLICATE_COUNT:-0}" } # 监控关键表 monitor_data_integrity "fact_fgsales" "sale_date" monitor_data_integrity "fact_orders" "order_date" monitor_data_integrity "dim_customers" "created_date" monitor_data_integrity "dim_products" "created_date" EOF $ chmod +x /usr/local/bin/data_quality_monitor.sh
etl_data_record_count{table=”fact_fgsales”} 125000
etl_data_null_count{table=”fact_fgsales”} 0
etl_data_duplicate_count{table=”fact_fgsales”} 0
etl_data_record_count{table=”fact_orders”} 85000
etl_data_null_count{table=”fact_orders”} 0
etl_data_duplicate_count{table=”fact_orders”} 0
etl_data_record_count{table=”dim_customers”} 1500
etl_data_null_count{table=”dim_customers”} 0
etl_data_duplicate_count{table=”dim_customers”} 0
etl_data_record_count{table=”dim_products”} 250
etl_data_null_count{table=”dim_products”} 0
etl_data_duplicate_count{table=”dim_products”} 0
4.2 数据一致性监控
$ cat > /usr/local/bin/data_consistency_monitor.sh << 'EOF' #!/bin/bash # 监控源系统和目标系统数据一致性 monitor_data_consistency() { local source_table=$1 local target_table=$2 local key_column=$3 # 获取源系统记录数 SOURCE_COUNT=$(mysql -h source-db -u $DB_USER -p$DB_PASS source_db -N -e \ "SELECT COUNT(*) FROM $source_table") # 获取目标系统记录数 TARGET_COUNT=$(mysql -h target-db -u $DB_USER -p$DB_PASS target_db -N -e \ "SELECT COUNT(*) FROM $target_table") # 计算差异 DIFF=$((SOURCE_COUNT - TARGET_COUNT)) # 计算一致性百分比 if [ "$SOURCE_COUNT" -gt 0 ]; then CONSISTENCY=$((TARGET_COUNT * 100 / SOURCE_COUNT)) else CONSISTENCY=0 fi # 输出监控指标 echo "etl_source_record_count{table=\"$source_table\"} ${SOURCE_COUNT:-0}" echo "etl_target_record_count{table=\"$target_table\"} ${TARGET_COUNT:-0}" echo "etl_data_difference{source=\"$source_table\",target=\"$target_table\"} ${DIFF:-0}" echo "etl_data_consistency_percent{source=\"$source_table\",target=\"$target_table\"} ${CONSISTENCY:-0}" } # 监控关键表一致性 monitor_data_consistency "source_fgsales" "fact_fgsales" "sale_id" monitor_data_consistency "source_orders" "fact_orders" "order_id" monitor_data_consistency "source_customers" "dim_customers" "customer_id" EOF $ chmod +x /usr/local/bin/data_consistency_monitor.sh
etl_source_record_count{table=”source_fgsales”} 125000
etl_target_record_count{table=”fact_fgsales”} 125000
etl_data_difference{source=”source_fgsales”,target=”fact_fgsales”} 0
etl_data_consistency_percent{source=”source_fgsales”,target=”fact_fgsales”} 100
etl_source_record_count{table=”source_orders”} 85000
etl_target_record_count{table=”fact_orders”} 85000
etl_data_difference{source=”source_orders”,target=”fact_orders”} 0
etl_data_consistency_percent{source=”source_orders”,target=”fact_orders”} 100
etl_source_record_count{table=”source_customers”} 1500
etl_target_record_count{table=”dim_customers”} 1500
etl_data_difference{source=”source_customers”,target=”dim_customers”} 0
etl_data_consistency_percent{source=”source_customers”,target=”dim_customers”} 100
5. 性能监控
5.1 ETL性能监控
$ cat > /usr/local/bin/etl_performance_monitor.sh << 'EOF' #!/bin/bash # 监控ETL作业性能 monitor_etl_performance() { local job_name=$1 local log_file="/var/log/etl/${job_name}.log" if [ -f "$log_file" ]; then # 获取作业执行时间 START_TIME=$(grep "Job started" "$log_file" | tail -1 | awk '{print $1" "$2}') END_TIME=$(grep "Job completed" "$log_file" | tail -1 | awk '{print $1" "$2}') if [ -n "$START_TIME" ] && [ -n "$END_TIME" ]; then START_EPOCH=$(date -d "$START_TIME" +%s) END_EPOCH=$(date -d "$END_TIME" +%s) DURATION=$((END_EPOCH - START_EPOCH)) else DURATION=0 fi # 获取处理记录数 RECORDS_PROCESSED=$(grep "Records processed" "$log_file" | tail -1 | awk '{print $3}') # 计算处理速率 if [ "$DURATION" -gt 0 ] && [ -n "$RECORDS_PROCESSED" ]; then RECORDS_PER_SECOND=$((RECORDS_PROCESSED / DURATION)) else RECORDS_PER_SECOND=0 fi # 输出监控指标 echo "etl_job_duration_seconds{job=\"$job_name\"} ${DURATION:-0}" echo "etl_job_records_processed{job=\"$job_name\"} ${RECORDS_PROCESSED:-0}" echo "etl_job_records_per_second{job=\"$job_name\"} ${RECORDS_PER_SECOND:-0}" fi } # 监控所有作业性能 for job in daily_extract daily_transform daily_load hourly_sync; do monitor_etl_performance $job done EOF $ chmod +x /usr/local/bin/etl_performance_monitor.sh
etl_job_duration_seconds{job=”daily_extract”} 1800
etl_job_records_processed{job=”daily_extract”} 1000000
etl_job_records_per_second{job=”daily_extract”} 555
etl_job_duration_seconds{job=”daily_transform”} 2400
etl_job_records_processed{job=”daily_transform”} 1000000
etl_job_records_per_second{job=”daily_transform”} 416
etl_job_duration_seconds{job=”daily_load”} 3600
etl_job_records_processed{job=”daily_load”} 1000000
etl_job_records_per_second{job=”daily_load”} 277
etl_job_duration_seconds{job=”hourly_sync”} 300
etl_job_records_processed{job=”hourly_sync”} 50000
etl_job_records_per_second{job=”hourly_sync”} 166
5.2 数据库性能监控
$ cat > /usr/local/bin/db_performance_monitor.sh << 'EOF' #!/bin/bash # 监控数据库连接数 CONNECTIONS=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASS -N -e \ "SHOW STATUS LIKE 'Threads_connected'" | awk '{print $2}') # 监控数据库查询数 QUERIES=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASS -N -e \ "SHOW STATUS LIKE 'Questions'" | awk '{print $2}') # 监控慢查询数量 SLOW_QUERIES=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASS -N -e \ "SHOW STATUS LIKE 'Slow_queries'" | awk '{print $2}') # 监控锁等待 LOCK_WAITS=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASS -N -e \ "SHOW STATUS LIKE 'Table_locks_waited'" | awk '{print $2}') # 监控缓冲池使用率 BUFFER_POOL_USAGE=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASS -N -e \ "SELECT (1 - (Innodb_buffer_pool_pages_free / Innodb_buffer_pool_pages_total)) * 100 FROM information_schema.GLOBAL_STATUS WHERE VARIABLE_NAME IN ('Innodb_buffer_pool_pages_free', 'Innodb_buffer_pool_pages_total')") # 输出监控指标 echo "etl_db_connections $CONNECTIONS" echo "etl_db_queries_total $QUERIES" echo "etl_db_slow_queries_total $SLOW_QUERIES" echo "etl_db_lock_waits_total $LOCK_WAITS" echo "etl_db_buffer_pool_usage_percent ${BUFFER_POOL_USAGE:-0}" EOF $ chmod +x /usr/local/bin/db_performance_monitor.sh
etl_db_connections 45
etl_db_queries_total 1234567
etl_db_slow_queries_total 12
etl_db_lock_waits_total 5
etl_db_buffer_pool_usage_percent 85.5
学习交流加群风哥QQ113257174
6. 错误处理监控
6.1 错误统计监控
$ cat > /usr/local/bin/error_monitor.sh << 'EOF' #!/bin/bash # 监控ETL错误 monitor_etl_errors() { local job_name=$1 local error_log="/var/log/etl/${job_name}_errors.log" if [ -f "$error_log" ]; then # 统计今日错误数量 TODAY=$(date +%Y-%m-%d) ERROR_COUNT=$(grep "$TODAY" "$error_log" | wc -l) # 统计不同类型错误 CONNECTION_ERRORS=$(grep "$TODAY" "$error_log" | grep -i "connection" | wc -l) TIMEOUT_ERRORS=$(grep "$TODAY" "$error_log" | grep -i "timeout" | wc -l) DATA_ERRORS=$(grep "$TODAY" "$error_log" | grep -i "data" | wc -l) VALIDATION_ERRORS=$(grep "$TODAY" "$error_log" | grep -i "validation" | wc -l) # 输出监控指标 echo "etl_errors_total{job=\"$job_name\"} $ERROR_COUNT" echo "etl_connection_errors{job=\"$job_name\"} $CONNECTION_ERRORS" echo "etl_timeout_errors{job=\"$job_name\"} $TIMEOUT_ERRORS" echo "etl_data_errors{job=\"$job_name\"} $DATA_ERRORS" echo "etl_validation_errors{job=\"$job_name\"} $VALIDATION_ERRORS" else echo "etl_errors_total{job=\"$job_name\"} 0" echo "etl_connection_errors{job=\"$job_name\"} 0" echo "etl_timeout_errors{job=\"$job_name\"} 0" echo "etl_data_errors{job=\"$job_name\"} 0" echo "etl_validation_errors{job=\"$job_name\"} 0" fi } # 监控所有作业错误 for job in daily_extract daily_transform daily_load hourly_sync; do monitor_etl_errors $job done EOF $ chmod +x /usr/local/bin/error_monitor.sh
etl_errors_total{job=”daily_extract”} 5
etl_connection_errors{job=”daily_extract”} 2
etl_timeout_errors{job=”daily_extract”} 1
etl_data_errors{job=”daily_extract”} 2
etl_validation_errors{job=”daily_extract”} 0
etl_errors_total{job=”daily_transform”} 3
etl_connection_errors{job=”daily_transform”} 0
etl_timeout_errors{job=”daily_transform”} 0
etl_data_errors{job=”daily_transform”} 3
etl_validation_errors{job=”daily_transform”} 0
etl_errors_total{job=”daily_load”} 0
etl_connection_errors{job=”daily_load”} 0
etl_timeout_errors{job=”daily_load”} 0
etl_data_errors{job=”daily_load”} 0
etl_validation_errors{job=”daily_load”} 0
etl_errors_total{job=”hourly_sync”} 1
etl_connection_errors{job=”hourly_sync”} 1
etl_timeout_errors{job=”hourly_sync”} 0
etl_data_errors{job=”hourly_sync”} 0
etl_validation_errors{job=”hourly_sync”} 0
6.2 错误趋势分析
$ cat > /usr/local/bin/error_trend_analysis.sh << 'EOF' #!/bin/bash # 分析最近7天的错误趋势 analyze_error_trend() { local job_name=$1 local error_log="/var/log/etl/${job_name}_errors.log" if [ -f "$error_log" ]; then # 获取最近7天的错误数量 for i in {0..6}; do DATE=$(date -d "$i days ago" +%Y-%m-%d) COUNT=$(grep "$DATE" "$error_log" | wc -l) echo "etl_errors_trend{job=\"$job_name\",date=\"$DATE\"} $COUNT" done # 计算平均错误率 TOTAL_ERRORS=0 for i in {0..6}; do DATE=$(date -d "$i days ago" +%Y-%m-%d) COUNT=$(grep "$DATE" "$error_log" | wc -l) TOTAL_ERRORS=$((TOTAL_ERRORS + COUNT)) done AVG_ERRORS=$((TOTAL_ERRORS / 7)) echo "etl_errors_average_7days{job=\"$job_name\"} $AVG_ERRORS" fi } # 分析所有作业错误趋势 for job in daily_extract daily_transform daily_load hourly_sync; do analyze_error_trend $job done EOF $ chmod +x /usr/local/bin/error_trend_analysis.sh
etl_errors_trend{job=”daily_extract”,date=”2026-04-03″} 5
etl_errors_trend{job=”daily_extract”,date=”2026-04-02″} 3
etl_errors_trend{job=”daily_extract”,date=”2026-04-01″} 4
etl_errors_trend{job=”daily_extract”,date=”2026-03-31″} 2
etl_errors_trend{job=”daily_extract”,date=”2026-03-30″} 6
etl_errors_trend{job=”daily_extract”,date=”2026-03-29″} 3
etl_errors_trend{job=”daily_extract”,date=”2026-03-28″} 4
etl_errors_average_7days{job=”daily_extract”} 3
7. 资源使用监控
7.1 CPU和内存监控
$ cat > /usr/local/bin/resource_monitor.sh << 'EOF' #!/bin/bash # 监控CPU使用率 CPU_USAGE=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1) # 监控内存使用率 MEM_USAGE=$(free | grep Mem | awk '{print ($3/$2) * 100.0}') # 监控ETL进程资源使用 ETL_CPU=$(ps aux | grep -E "etl|extract|transform|load" | grep -v grep | awk '{sum+=$3} END {print sum}') ETL_MEM=$(ps aux | grep -E "etl|extract|transform|load" | grep -v grep | awk '{sum+=$4} END {print sum}') # 监控磁盘I/O DISK_READ=$(iostat -x 1 2 | grep -A1 "sda" | tail -1 | awk '{print $3}') DISK_WRITE=$(iostat -x 1 2 | grep -A1 "sda" | tail -1 | awk '{print $4}') # 监控网络I/O NETWORK_IN=$(cat /proc/net/dev | grep eth0 | awk '{print $2}') NETWORK_OUT=$(cat /proc/net/dev | grep eth0 | awk '{print $10}') # 输出监控指标 echo "etl_cpu_usage_percent ${CPU_USAGE:-0}" echo "etl_memory_usage_percent ${MEM_USAGE:-0}" echo "etl_process_cpu_usage_percent ${ETL_CPU:-0}" echo "etl_process_memory_usage_percent ${ETL_MEM:-0}" echo "etl_disk_read_kbps ${DISK_READ:-0}" echo "etl_disk_write_kbps ${DISK_WRITE:-0}" echo "etl_network_in_bytes ${NETWORK_IN:-0}" echo "etl_network_out_bytes ${NETWORK_OUT:-0}" EOF $ chmod +x /usr/local/bin/resource_monitor.sh
etl_cpu_usage_percent 45.2
etl_memory_usage_percent 72.5
etl_process_cpu_usage_percent 125.5
etl_process_memory_usage_percent 35.2
etl_disk_read_kbps 1256.78
etl_disk_write_kbps 987.45
etl_network_in_bytes 1234567890
etl_network_out_bytes 987654321
7.2 磁盘空间监控
$ cat > /usr/local/bin/disk_monitor.sh << 'EOF' #!/bin/bash # 监控关键目录磁盘使用 monitor_disk_usage() { local mount_point=$1 local usage=$(df -h "$mount_point" | tail -1 | awk '{print $5}' | tr -d '%') local available=$(df -h "$mount_point" | tail -1 | awk '{print $4}') echo "etl_disk_usage_percent{mount=\"$mount_point\"} ${usage:-0}" echo "etl_disk_available{mount=\"$mount_point\"} ${available:-0}" } # 监控ETL相关目录 monitor_disk_usage "/data/etl" monitor_disk_usage "/data/staging" monitor_disk_usage "/data/warehouse" monitor_disk_usage "/var/log/etl" # 监控临时文件 TEMP_FILE_COUNT=$(find /tmp -name "etl_*" -type f | wc -l) TEMP_FILE_SIZE=$(du -sh /tmp/etl_* 2>/dev/null | awk ‘{sum+=$1} END {print sum}’)
echo “etl_temp_file_count $TEMP_FILE_COUNT”
echo “etl_temp_file_size_mb ${TEMP_FILE_SIZE:-0}”
EOF
$ chmod +x /usr/local/bin/disk_monitor.sh
etl_disk_usage_percent{mount=”/data/etl”} 65
etl_disk_available{mount=”/data/etl”} 350G
etl_disk_usage_percent{mount=”/data/staging”} 45
etl_disk_available{mount=”/data/staging”} 550G
etl_disk_usage_percent{mount=”/data/warehouse”} 78
etl_disk_available{mount=”/data/warehouse”} 220G
etl_disk_usage_percent{mount=”/var/log/etl”} 32
etl_disk_available{mount=”/var/log/etl”} 680G
etl_temp_file_count 15
etl_temp_file_size_mb 1250
风哥风哥提示:资源使用监控是ETL监控的重要组成部分,需要实时监控CPU、内存、磁盘和网络等资源的使用情况,确保ETL作业有足够的资源运行。
8. 告警管理
8.1 告警规则配置
$ cat > /data/prometheus/etl_alert_rules.yml << 'EOF' groups: - name: etl_alerts rules: # 作业失败告警 - alert: ETLJobFailed expr: etl_job_status == 0 and etl_job_time_since_last_run_seconds > 3600
for: 5m
labels:
severity: critical
annotations:
summary: “ETL作业失败”
description: “ETL作业 {{ $labels.job }} 超过1小时未运行”
# 作业执行时间过长告警
– alert: ETLJobSlow
expr: etl_job_duration_seconds > 7200
for: 5m
labels:
severity: warning
annotations:
summary: “ETL作业执行时间过长”
description: “ETL作业 {{ $labels.job }} 执行时间超过2小时”
# 数据质量告警
– alert: DataQualityIssue
expr: etl_data_null_count > 100 or etl_data_duplicate_count > 50
for: 10m
labels:
severity: warning
annotations:
summary: “数据质量问题”
description: “表 {{ $labels.table }} 存在数据质量问题”
# 数据一致性告警
– alert: DataInconsistency
expr: etl_data_consistency_percent < 95
for: 15m
labels:
severity: critical
annotations:
summary: "数据不一致"
description: "源表和目标表数据一致性低于95%"
# 错误率告警
- alert: HighErrorRate
expr: etl_errors_total > 10
for: 5m
labels:
severity: warning
annotations:
summary: “ETL错误率过高”
description: “ETL作业 {{ $labels.job }} 错误数量过多”
# 资源使用告警
– alert: HighResourceUsage
expr: etl_cpu_usage_percent > 80 or etl_memory_usage_percent > 85
for: 10m
labels:
severity: warning
annotations:
summary: “资源使用率过高”
description: “CPU或内存使用率过高”
# 磁盘空间告警
– alert: DiskSpaceLow
expr: etl_disk_usage_percent > 85
for: 10m
labels:
severity: critical
annotations:
summary: “磁盘空间不足”
description: “挂载点 {{ $labels.mount }} 磁盘使用率超过85%”
EOF
8.2 告警通知配置
$ cat > /data/alertmanager/alertmanager.yml << 'EOF' global: resolve_timeout: 5m smtp_smarthost: 'smtp.fgedu.net.cn:587' smtp_from: 'etl-alert@fgedu.net.cn' smtp_auth_username: 'etl-alert@fgedu.net.cn' smtp_auth_password: 'password' route: group_by: ['alertname', 'severity'] group_wait: 30s group_interval: 5m repeat_interval: 4h receiver: 'etl-team' routes: - match: severity: critical receiver: 'etl-critical' - match: severity: warning receiver: 'etl-warning' receivers: - name: 'etl-team' email_configs: - to: 'etl-team@fgedu.net.cn' send_resolved: true - name: 'etl-critical' email_configs: - to: 'etl-team@fgedu.net.cn,manager@fgedu.net.cn' send_resolved: true webhook_configs: - url: 'http://webhook-server:5000/alert' send_resolved: true - name: 'etl-warning' email_configs: - to: 'etl-team@fgedu.net.cn' send_resolved: true inhibit_rules: - source_match: severity: 'critical' target_match: severity: 'warning' equal: ['alertname', 'job'] EOF
9. 日志管理
9.1 日志收集
$ cat > /etc/rsyslog.d/etl.conf << 'EOF' # ETL日志收集配置 $ModLoad imfile $InputFilePollInterval 10 # 收集ETL作业日志 $InputFileName /var/log/etl/daily_extract.log $InputFileTag etl_extract: $InputFileStateFile stat-etl-extract $InputFileSeverity info $InputFileFacility local3 $InputRunFileMonitor $InputFileName /var/log/etl/daily_transform.log $InputFileTag etl_transform: $InputFileStateFile stat-etl-transform $InputFileSeverity info $InputFileFacility local3 $InputRunFileMonitor $InputFileName /var/log/etl/daily_load.log $InputFileTag etl_load: $InputFileStateFile stat-etl-load $InputFileSeverity info $InputFileFacility local3 $InputRunFileMonitor # 发送到日志服务器 local3.* @@logserver.fgedu.net.cn:514 EOF # 重启rsyslog服务 $ systemctl restart rsyslog
Job for rsyslog.service failed because the control process exited with error code.
See “systemctl status rsyslog.service” and “journalctl -xe” for details.
9.2 日志分析
$ cat > /usr/local/bin/log_analysis.sh << 'EOF' #!/bin/bash # 分析ETL日志 analyze_etl_logs() { local log_file=$1 local job_name=$2 if [ -f "$log_file" ]; then # 统计日志行数 TOTAL_LINES=$(wc -l < "$log_file") # 统计错误数量 ERROR_COUNT=$(grep -i "error" "$log_file" | wc -l) # 统计警告数量 WARNING_COUNT=$(grep -i "warning" "$log_file" | wc -l) # 统计信息数量 INFO_COUNT=$(grep -i "info" "$log_file" | wc -l) # 输出监控指标 echo "etl_log_total_lines{job=\"$job_name\"} $TOTAL_LINES" echo "etl_log_error_count{job=\"$job_name\"} $ERROR_COUNT" echo "etl_log_warning_count{job=\"$job_name\"} $WARNING_COUNT" echo "etl_log_info_count{job=\"$job_name\"} $INFO_COUNT" fi } # 分析所有ETL日志 analyze_etl_logs "/var/log/etl/daily_extract.log" "daily_extract" analyze_etl_logs "/var/log/etl/daily_transform.log" "daily_transform" analyze_etl_logs "/var/log/etl/daily_load.log" "daily_load" analyze_etl_logs "/var/log/etl/hourly_sync.log" "hourly_sync" EOF $ chmod +x /usr/local/bin/log_analysis.sh
etl_log_total_lines{job=”daily_extract”} 1250
etl_log_error_count{job=”daily_extract”} 5
etl_log_warning_count{job=”daily_extract”} 12
etl_log_info_count{job=”daily_extract”} 1233
etl_log_total_lines{job=”daily_transform”} 980
etl_log_error_count{job=”daily_transform”} 3
etl_log_warning_count{job=”daily_transform”} 8
etl_log_info_count{job=”daily_transform”} 969
etl_log_total_lines{job=”daily_load”} 1500
etl_log_error_count{job=”daily_load”} 0
etl_log_warning_count{job=”daily_load”} 5
etl_log_info_count{job=”daily_load”} 1495
etl_log_total_lines{job=”hourly_sync”} 320
etl_log_error_count{job=”hourly_sync”} 1
etl_log_warning_count{job=”hourly_sync”} 2
etl_log_info_count{job=”hourly_sync”} 317
更多学习教程公众号风哥教程itpux_com
10. 最佳实践
10.1 监控系统设计原则
– 监控系统应独立于ETL系统部署,避免单点故障
– 采用多层次监控架构,覆盖作业、数据、性能和资源
– 监控数据应实时采集和存储,支持历史数据分析
– 告警规则应根据业务重要性分级设置
– 定期验证监控系统的有效性
10.2 监控指标选择
# 1. 作业监控指标
– 作业状态(Job Status)
– 作业进度(Job Progress)
– 作业执行时间(Job Duration)
– 作业成功率(Job Success Rate)
# 2. 数据质量指标
– 数据完整性(Data Integrity)
– 数据一致性(Data Consistency)
– 数据准确性(Data Accuracy)
– 数据及时性(Data Timeliness)
# 3. 性能指标
– 处理速率(Processing Rate)
– 响应时间(Response Time)
– 吞吐量(Throughput)
– 资源利用率(Resource Utilization)
# 4. 错误指标
– 错误数量(Error Count)
– 错误类型(Error Type)
– 错误趋势(Error Trend)
– 错误恢复时间(Error Recovery Time)
10.3 告警管理最佳实践
– 告警应分级管理,区分严重程度和响应时间
– 避免告警风暴,合理设置告警阈值和持续时间
– 告警通知应多渠道发送,包括邮件、短信、电话等
– 告警应包含详细的上下文信息,便于快速定位问题
– 定期审查和优化告警规则,减少误报和漏报
10.4 监控数据管理
$ cat > /usr/local/bin/monitor_data_management.sh << 'EOF' #!/bin/bash # 监控数据保留策略 RETENTION_DAYS=90 # 清理过期的监控数据 clean_old_data() { echo "开始清理 $RETENTION_DAYS 天前的监控数据..." # 清理Prometheus数据 find /data/prometheus/data -type f -mtime +$RETENTION_DAYS -delete # 清理Grafana数据 find /data/grafana/data -type f -mtime +$RETENTION_DAYS -delete # 清理日志文件 find /var/log/etl -type f -name "*.log" -mtime +$RETENTION_DAYS -delete echo "监控数据清理完成" } # 备份监控数据 backup_monitor_data() { BACKUP_DIR="/backup/etl-monitor-$(date +%Y%m%d)" mkdir -p $BACKUP_DIR echo "开始备份监控数据到 $BACKUP_DIR..." # 备份Prometheus配置 cp -r /data/prometheus/*.yml $BACKUP_DIR/ # 备份Grafana仪表板 cp -r /data/grafana/dashboards $BACKUP_DIR/ # 备份告警规则 cp -r /data/prometheus/etl_alert_rules.yml $BACKUP_DIR/ echo "监控数据备份完成" } # 执行管理任务 clean_old_data backup_monitor_data EOF $ chmod +x /usr/local/bin/monitor_data_management.sh
开始清理 90 天前的监控数据…
监控数据清理完成
开始备份监控数据到 /backup/etl-monitor-20260403…
监控数据备份完成
10.5 监控系统维护
$ cat > /usr/local/bin/monitor_health_check.sh << 'EOF' #!/bin/bash # 检查Prometheus状态 check_prometheus() { if curl -f -s http://fgedudb:9090/-/healthy > /dev/null; then
echo “prometheus_status 1”
else
echo “prometheus_status 0”
fi
}
# 检查Grafana状态
check_grafana() {
if curl -f -s http://fgedudb:3000/api/health > /dev/null; then
echo “grafana_status 1”
else
echo “grafana_status 0”
fi
}
# 检查Alertmanager状态
check_alertmanager() {
if curl -f -s http://fgedudb:9093/-/healthy > /dev/null; then
echo “alertmanager_status 1”
else
echo “alertmanager_status 0”
fi
}
# 检查监控数据采集
check_data_collection() {
# 检查最近5分钟是否有数据采集
LAST_SCRAPE=$(curl -s http://fgedudb:9090/api/v1/query?query=up | jq -r ‘.data.result[0].value[1]’)
if [ “$LAST_SCRAPE” = “1” ]; then
echo “data_collection_status 1”
else
echo “data_collection_status 0”
fi
}
# 执行健康检查
check_prometheus
check_grafana
check_alertmanager
check_data_collection
EOF
$ chmod +x /usr/local/bin/monitor_health_check.sh
prometheus_status 1
grafana_status 1
alertmanager_status 1
data_collection_status 1
10.6 ETL监控仪表板
$ cat > /data/grafana/dashboards/etl-monitoring.json << 'EOF' { "dashboard": { "title": "ETL监控仪表板", "panels": [ { "title": "作业状态", "type": "stat", "targets": [ { "expr": "etl_job_status", "legendFormat": "{{job}}" } ] }, { "title": "作业进度", "type": "gauge", "targets": [ { "expr": "etl_job_progress_percent", "legendFormat": "{{job}}" } ] }, { "title": "数据质量", "type": "graph", "targets": [ { "expr": "etl_data_record_count", "legendFormat": "{{table}}" } ] }, { "title": "错误统计", "type": "graph", "targets": [ { "expr": "etl_errors_total", "legendFormat": "{{job}}" } ] }, { "title": "资源使用", "type": "graph", "targets": [ { "expr": "etl_cpu_usage_percent", "legendFormat": "CPU" }, { "expr": "etl_memory_usage_percent", "legendFormat": "内存" } ] } ] } } EOF
– 定期进行ETL演练,验证监控系统的有效性
– 监控系统本身应具备高可用性,避免单点故障
– 监控数据应定期备份,确保数据安全
– 监控指标应根据业务需求持续优化和调整
– 告警规则应定期审查,确保告警的准确性和有效性
author:www.itpux.com
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
