1. ETL概述
ETL(Extract-Transform-Load)是数据仓库建设中的核心流程,包括数据抽取、转换和加载三个阶段。更多学习教程www.fgedu.net.cn
ETL流程:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Extract │ -> │ Transform │ -> │ Load │
│ 数据抽取 │ │ 数据转换 │ │ 数据加载 │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
v v v
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 源系统 │ │ 清洗规则 │ │ 目标系统 │
│ 数据库 │ │ 转换逻辑 │ │ 数据仓库 │
│ 文件系统 │ │ 聚合计算 │ │ 数据集市 │
│ API接口 │ │ 数据映射 │ │ 报表系统 │
└─────────────┘ └─────────────┘ └─────────────┘
# 查看ETL作业状态
# curl -s http://etl.fgedu.net.cn/api/jobs | jq
{
“jobs”: [
{
“id”: “ETL_001”,
“name”: “fgedu_fgsales_daily”,
“status”: “SUCCESS”,
“start_time”: “2026-04-03T02:00:00”,
“end_time”: “2026-04-03T02:30:00”,
“records_processed”: 1000000,
“records_failed”: 0
},
{
“id”: “ETL_002”,
“name”: “fgedu_customer_sync”,
“status”: “RUNNING”,
“start_time”: “2026-04-03T03:00:00”,
“records_processed”: 500000,
“records_failed”: 0
}
]
}
# 查看ETL配置
# cat /opt/etl/config/etl_config.yaml
etl:
name: fgedu_etl_pipeline
version: “1.0”
schedule: “0 2 * * *”
source:
type: mysql
host: 192.168.1.10
port: 3306
database: fgedu_source
username: etl_user
password: ${MYSQL_PASSWORD}
target:
type: postgresql
host: 192.168.1.20
port: 5432
database: fgedu_dwh
username: etl_user
password: ${PG_PASSWORD}
transform:
rules: /opt/etl/rules/transform_rules.json
quality_check: true
error_threshold: 0.01
2. 数据抽取
数据抽取是从源系统获取数据的过程。学习交流加群风哥微信: itpux-com
# cat > /opt/etl/scripts/full_extract.py << 'EOF' #!/usr/bin/env python3 import pymysql import pandas as pd from datetime import datetime def full_extract(): """全量抽取数据""" conn = pymysql.connect( host='192.168.1.10', port=3306, user='etl_user', password='Fgedu@ETL123', database='fgedu_source' ) query = """ SELECT order_id, customer_id, product_id, order_date, quantity, amount, status FROM orders """ df = pd.read_sql(query, conn) output_file = f'/data/etl/extract/orders_full_{datetime.now().strftime("%Y%m%d")}.csv' df.to_csv(output_file, index=False) print(f"全量抽取完成: {len(df)} 条记录") print(f"输出文件: {output_file}") conn.close() return len(df) if __name__ == '__main__': records = full_extract() print(f"抽取记录数: {records}") EOF # python3 /opt/etl/scripts/full_extract.py 全量抽取完成: 1000000 条记录 输出文件: /data/etl/extract/orders_full_20260403.csv 抽取记录数: 1000000 # 增量抽取 # cat > /opt/etl/scripts/incremental_extract.py << 'EOF' #!/usr/bin/env python3 import pymysql import pandas as pd from datetime import datetime, timedelta def incremental_extract(): """增量抽取数据""" conn = pymysql.connect( host='192.168.1.10', port=3306, user='etl_user', password='Fgedu@ETL123', database='fgedu_source' ) last_extract_time = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S') query = f""" SELECT order_id, customer_id, product_id, order_date, quantity, amount, status, update_time FROM orders WHERE update_time > ‘{last_extract_time}’
“””
df = pd.read_sql(query, conn)
output_file = f’/data/etl/extract/orders_incr_{datetime.now().strftime(“%Y%m%d”)}.csv’
df.to_csv(output_file, index=False)
print(f”增量抽取完成: {len(df)} 条记录”)
print(f”最后抽取时间: {last_extract_time}”)
conn.close()
return len(df)
if __name__ == ‘__main__’:
records = incremental_extract()
print(f”抽取记录数: {records}”)
EOF
# python3 /opt/etl/scripts/incremental_extract.py
增量抽取完成: 50000 条记录
最后抽取时间: 2026-04-02 10:00:00
抽取记录数: 50000
# cat > /opt/etl/scripts/multi_source_extract.py << 'EOF' #!/usr/bin/env python3 import pymysql import psycopg2 import pandas as pd import requests from datetime import datetime def extract_from_mysql(): """从MySQL抽取""" conn = pymysql.connect( host='192.168.1.10', port=3306, user='etl_user', password='Fgedu@ETL123', database='fgedu_source' ) df = pd.read_sql("SELECT * FROM customers", conn) conn.close() return df def extract_from_postgresql(): """从PostgreSQL抽取""" conn = psycopg2.connect( host='192.168.1.11', port=5432, user='etl_user', password='Fgedu@ETL123', database='fgedu_crm' ) df = pd.read_sql("SELECT * FROM contacts", conn) conn.close() return df def extract_from_api(): """从API抽取""" url = "http://api.fgedu.net.cn/v1/products" headers = {"Authorization": "Bearer token123"} response = requests.get(url, headers=headers) data = response.json() return pd.DataFrame(data['products']) def extract_from_file(): """从文件抽取""" file_path = "/data/source/suppliers.csv" return pd.read_csv(file_path) def main(): """多源数据抽取""" print("开始多源数据抽取...") print("1. 抽取MySQL数据...") mysql_df = extract_from_mysql() print(f" MySQL数据: {len(mysql_df)} 条") print("2. 抽取PostgreSQL数据...") pg_df = extract_from_postgresql() print(f" PostgreSQL数据: {len(pg_df)} 条") print("3. 抽取API数据...") api_df = extract_from_api() print(f" API数据: {len(api_df)} 条") print("4. 抽取文件数据...") file_df = extract_from_file() print(f" 文件数据: {len(file_df)} 条") print("多源数据抽取完成") if __name__ == '__main__': main() EOF # python3 /opt/etl/scripts/multi_source_extract.py 开始多源数据抽取... 1. 抽取MySQL数据... MySQL数据: 100000 条 2. 抽取PostgreSQL数据... PostgreSQL数据: 50000 条 3. 抽取API数据... API数据: 10000 条 4. 抽取文件数据... 文件数据: 5000 条 多源数据抽取完成
3. 数据转换
数据转换包括清洗、映射、聚合等操作。学习交流加群风哥QQ113257174
# cat > /opt/etl/scripts/data_cleanse.py << 'EOF' #!/usr/bin/env python3 import pandas as pd import numpy as np from datetime import datetime def cleanse_data(input_file, output_file): """数据清洗""" df = pd.read_csv(input_file) original_count = len(df) print(f"原始数据: {original_count} 条") # 1. 删除重复记录 df = df.drop_duplicates() print(f"删除重复: {original_count - len(df)} 条") # 2. 处理空值 df = df.dropna(subset=['order_id', 'customer_id']) print(f"删除空值: {original_count - len(df)} 条") # 3. 数据类型转换 df['order_date'] = pd.to_datetime(df['order_date']) df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce') df['amount'] = pd.to_numeric(df['amount'], errors='coerce') # 4. 数据标准化 df['status'] = df['status'].str.upper().str.strip() # 5. 异常值处理 df = df[df['quantity'] > 0]
df = df[df[‘amount’] > 0]
# 6. 填充默认值
df[‘status’] = df[‘status’].fillna(‘UNKNOWN’)
# 7. 添加清洗标记
df[‘cleanse_time’] = datetime.now()
df[‘cleanse_flag’] = ‘Y’
df.to_csv(output_file, index=False)
print(f”清洗后数据: {len(df)} 条”)
print(f”输出文件: {output_file}”)
return len(df)
if __name__ == ‘__main__’:
input_file = ‘/data/etl/extract/orders_full_20260403.csv’
output_file = ‘/data/etl/transform/orders_cleanse_20260403.csv’
cleanse_data(input_file, output_file)
EOF
# python3 /opt/etl/scripts/data_cleanse.py
原始数据: 1000000 条
删除重复: 100 条
删除空值: 50 条
清洗后数据: 999850 条
输出文件: /data/etl/transform/orders_cleanse_20260403.csv
# 数据转换
# cat > /opt/etl/scripts/data_transform.py << 'EOF'
#!/usr/bin/env python3
import pandas as pd
from datetime import datetime
def transform_data(input_file, output_file):
"""数据转换"""
df = pd.read_csv(input_file)
print(f"输入数据: {len(df)} 条")
# 1. 字段映射
df = df.rename(columns={
'order_id': 'ORDER_ID',
'customer_id': 'CUSTOMER_ID',
'product_id': 'PRODUCT_ID',
'order_date': 'ORDER_DATE',
'quantity': 'QTY',
'amount': 'AMOUNT',
'status': 'STATUS'
})
# 2. 计算派生字段
df['YEAR'] = pd.to_datetime(df['ORDER_DATE']).dt.year
df['MONTH'] = pd.to_datetime(df['ORDER_DATE']).dt.month
df['DAY'] = pd.to_datetime(df['ORDER_DATE']).dt.day
df['WEEKDAY'] = pd.to_datetime(df['ORDER_DATE']).dt.weekday
df['UNIT_PRICE'] = df['AMOUNT'] / df['QTY']
# 3. 数据分类
df['AMOUNT_LEVEL'] = pd.cut(df['AMOUNT'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['SMALL', 'MEDIUM', 'LARGE', 'EXTRA_LARGE'])
# 4. 状态编码
status_map = {'NEW': 1, 'PROCESSING': 2, 'COMPLETED': 3, 'CANCELLED': 4}
df['STATUS_CODE'] = df['STATUS'].map(status_map)
# 5. 添加转换时间
df['TRANSFORM_TIME'] = datetime.now()
df.to_csv(output_file, index=False)
print(f"转换后数据: {len(df)} 条")
print(f"输出文件: {output_file}")
return len(df)
if __name__ == '__main__':
input_file = '/data/etl/transform/orders_cleanse_20260403.csv'
output_file = '/data/etl/transform/orders_transform_20260403.csv'
transform_data(input_file, output_file)
EOF
# python3 /opt/etl/scripts/data_transform.py
输入数据: 999850 条
转换后数据: 999850 条
输出文件: /data/etl/transform/orders_transform_20260403.csv
4. 数据加载
数据加载将转换后的数据写入目标系统。更多学习教程公众号风哥教程itpux_com
# cat > /opt/etl/scripts/full_load.py << 'EOF' #!/usr/bin/env python3 import psycopg2 import pandas as pd from datetime import datetime def full_load(input_file): """全量加载数据""" df = pd.read_csv(input_file) conn = psycopg2.connect( host='192.168.1.20', port=5432, user='etl_user', password='Fgedu@ETL123', database='fgedu_dwh' ) cursor = conn.cursor() # 清空目标表 cursor.execute("TRUNCATE TABLE fact_orders") conn.commit() # 批量插入 records = [] for _, row in df.iterrows(): record = ( row['ORDER_ID'], row['CUSTOMER_ID'], row['PRODUCT_ID'], row['ORDER_DATE'], row['QTY'], row['AMOUNT'], row['STATUS'], row['YEAR'], row['MONTH'], row['DAY'], datetime.now() ) records.append(record) insert_sql = """ INSERT INTO fact_orders ( order_id, customer_id, product_id, order_date, qty, amount, status, year, month, day, load_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.executemany(insert_sql, records) conn.commit() print(f"全量加载完成: {len(records)} 条记录") cursor.close() conn.close() return len(records) if __name__ == '__main__': input_file = '/data/etl/transform/orders_transform_20260403.csv' full_load(input_file) EOF # python3 /opt/etl/scripts/full_load.py 全量加载完成: 999850 条记录 # 增量加载 # cat > /opt/etl/scripts/incremental_load.py << 'EOF' #!/usr/bin/env python3 import psycopg2 import pandas as pd from datetime import datetime def incremental_load(input_file): """增量加载数据""" df = pd.read_csv(input_file) conn = psycopg2.connect( host='192.168.1.20', port=5432, user='etl_user', password='Fgedu@ETL123', database='fgedu_dwh' ) cursor = conn.cursor() insert_count = 0 update_count = 0 for _, row in df.iterrows(): # 检查记录是否存在 cursor.execute( "SELECT 1 FROM fact_orders WHERE order_id = %s", (row['ORDER_ID'],) ) if cursor.fetchone(): # 更新记录 update_sql = """ UPDATE fact_orders SET customer_id = %s, product_id = %s, order_date = %s, qty = %s, amount = %s, status = %s, update_time = %s WHERE order_id = %s """ cursor.execute(update_sql, ( row['CUSTOMER_ID'], row['PRODUCT_ID'], row['ORDER_DATE'], row['QTY'], row['AMOUNT'], row['STATUS'], datetime.now(), row['ORDER_ID'] )) update_count += 1 else: # 插入新记录 insert_sql = """ INSERT INTO fact_orders ( order_id, customer_id, product_id, order_date, qty, amount, status, load_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_sql, ( row['ORDER_ID'], row['CUSTOMER_ID'], row['PRODUCT_ID'], row['ORDER_DATE'], row['QTY'], row['AMOUNT'], row['STATUS'], datetime.now() )) insert_count += 1 conn.commit() print(f"增量加载完成:") print(f" 新增: {insert_count} 条") print(f" 更新: {update_count} 条") cursor.close() conn.close() return insert_count, update_count if __name__ == '__main__': input_file = '/data/etl/transform/orders_transform_20260403.csv' incremental_load(input_file) EOF # python3 /opt/etl/scripts/incremental_load.py 增量加载完成: 新增: 45000 条 更新: 5000 条
5. 数据质量
数据质量检查确保数据的准确性和完整性。author:www.itpux.com
# cat > /opt/etl/scripts/data_quality.py << 'EOF' #!/usr/bin/env python3 import pandas as pd import numpy as np from datetime import datetime def quality_check(input_file): """数据质量检查""" df = pd.read_csv(input_file) report = { 'check_time': datetime.now(), 'total_records': len(df), 'checks': [] } # 1. 完整性检查 null_count = df.isnull().sum().sum() null_rate = null_count / (len(df) * len(df.columns)) * 100 report['checks'].append({ 'name': '完整性检查', 'result': 'PASS' if null_rate < 1 else 'FAIL', 'detail': f'空值率: {null_rate:.2f}%' }) # 2. 唯一性检查 dup_count = df.duplicated().sum() dup_rate = dup_count / len(df) * 100 report['checks'].append({ 'name': '唯一性检查', 'result': 'PASS' if dup_rate < 0.1 else 'FAIL', 'detail': f'重复率: {dup_rate:.2f}%' }) # 3. 准确性检查 if 'AMOUNT' in df.columns: negative_amount = (df['AMOUNT'] < 0).sum() report['checks'].append({ 'name': '金额准确性', 'result': 'PASS' if negative_amount == 0 else 'FAIL', 'detail': f'负金额记录: {negative_amount}' }) # 4. 一致性检查 if 'QTY' in df.columns and 'AMOUNT' in df.columns: df['CALC_PRICE'] = df['AMOUNT'] / df['QTY'] price_variance = df['CALC_PRICE'].var() report['checks'].append({ 'name': '价格一致性', 'result': 'PASS' if price_variance < 1000 else 'FAIL', 'detail': f'价格方差: {price_variance:.2f}' }) # 5. 时效性检查 if 'ORDER_DATE' in df.columns: df['ORDER_DATE'] = pd.to_datetime(df['ORDER_DATE']) max_date = df['ORDER_DATE'].max() days_old = (datetime.now() - max_date).days report['checks'].append({ 'name': '时效性检查', 'result': 'PASS' if days_old < 7 else 'FAIL', 'detail': f'最新数据距今: {days_old} 天' }) # 输出报告 print("=" * 50) print("数据质量检查报告") print("=" * 50) print(f"检查时间: {report['check_time']}") print(f"总记录数: {report['total_records']}") print("-" * 50) for check in report['checks']: status = "✓" if check['result'] == 'PASS' else "✗" print(f"{status} {check['name']}: {check['detail']}") print("=" * 50) return report if __name__ == '__main__': input_file = '/data/etl/transform/orders_transform_20260403.csv' quality_check(input_file) EOF # python3 /opt/etl/scripts/data_quality.py ================================================== 数据质量检查报告 ================================================== 检查时间: 2026-04-03 10:00:00 总记录数: 999850 -------------------------------------------------- ✓ 完整性检查: 空值率: 0.05% ✓ 唯一性检查: 重复率: 0.00% ✓ 金额准确性: 负金额记录: 0 ✓ 价格一致性: 价格方差: 123.45 ✓ 时效性检查: 最新数据距今: 1 天 ==================================================
6. ETL工具使用
使用专业ETL工具提高开发效率。
# cat > /opt/airflow/dags/fgedu_etl_dag.py << 'EOF' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'fgedu', 'depends_on_past': False, 'start_date': datetime(2026, 4, 1), 'email': ['admin@fgedu.net.cn'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'fgedu_etl_pipeline', default_args=default_args, description='FGedu ETL Pipeline', schedule_interval='0 2 * * *', catchup=False ) def extract_data(**context): print("执行数据抽取...") # 抽取逻辑 return 1000000 def transform_data(**context): print("执行数据转换...") # 转换逻辑 return 999850 def load_data(**context): print("执行数据加载...") # 加载逻辑 return 999850 def quality_check(**context): print("执行质量检查...") # 质量检查逻辑 return "PASS" extract_task = PythonOperator( task_id='extract_data', python_callable=extract_data, dag=dag, ) transform_task = PythonOperator( task_id='transform_data', python_callable=transform_data, dag=dag, ) load_task = PythonOperator( task_id='load_data', python_callable=load_data, dag=dag, ) quality_task = PythonOperator( task_id='quality_check', python_callable=quality_check, dag=dag, ) extract_task >> transform_task >> load_task >> quality_task
EOF
# 查看Airflow DAG状态
# airflow dags list
dag_id | filepath
——————–|——————
fgedu_etl_pipeline | fgedu_etl_dag.py
# 触发DAG执行
# airflow dags trigger fgedu_etl_pipeline
# 使用Kettle (PDI)
# kettle作业配置
# cat > /opt/kettle/jobs/fgedu_etl.kjb << 'EOF'
EOF
7. 调度管理
调度管理确保ETL作业按时执行。
# crontab -l
# ETL作业调度
0 2 * * * /opt/etl/scripts/run_etl.sh >> /var/log/etl/etl.log 2>&1
0 6 * * * /opt/etl/scripts/run_incremental.sh >> /var/log/etl/incr.log 2>&1
0 */4 * * * /opt/etl/scripts/run_realtime.sh >> /var/log/etl/realtime.log 2>&1
# ETL执行脚本
# cat > /opt/etl/scripts/run_etl.sh << 'EOF'
#!/bin/bash
LOG_DIR="/var/log/etl"
DATE=$(date +%Y%m%d)
LOG_FILE="${LOG_DIR}/etl_${DATE}.log"
echo "==========================================" >> $LOG_FILE
echo “ETL作业开始: $(date)” >> $LOG_FILE
echo “==========================================” >> $LOG_FILE
# 1. 数据抽取
echo “1. 执行数据抽取…” >> $LOG_FILE
python3 /opt/etl/scripts/full_extract.py >> $LOG_FILE 2>&1
if [ $? -ne 0 ]; then
echo “ERROR: 数据抽取失败” >> $LOG_FILE
exit 1
fi
# 2. 数据转换
echo “2. 执行数据转换…” >> $LOG_FILE
python3 /opt/etl/scripts/data_transform.py >> $LOG_FILE 2>&1
if [ $? -ne 0 ]; then
echo “ERROR: 数据转换失败” >> $LOG_FILE
exit 1
fi
# 3. 数据加载
echo “3. 执行数据加载…” >> $LOG_FILE
python3 /opt/etl/scripts/full_load.py >> $LOG_FILE 2>&1
if [ $? -ne 0 ]; then
echo “ERROR: 数据加载失败” >> $LOG_FILE
exit 1
fi
# 4. 数据质量检查
echo “4. 执行质量检查…” >> $LOG_FILE
python3 /opt/etl/scripts/data_quality.py >> $LOG_FILE 2>&1
echo “==========================================” >> $LOG_FILE
echo “ETL作业完成: $(date)” >> $LOG_FILE
echo “==========================================” >> $LOG_FILE
EOF
# chmod +x /opt/etl/scripts/run_etl.sh
8. 监控告警
监控告警确保及时发现ETL问题。
# cat > /opt/etl/scripts/etl_monitor.sh << 'EOF' #!/bin/bash LOG_DIR="/var/log/etl" ALERT_EMAIL="admin@fgedu.net.cn" check_etl_status() { LATEST_LOG=$(ls -t ${LOG_DIR}/etl_*.log | head -1) if grep -q "ERROR" $LATEST_LOG; then echo "ETL作业执行失败" | mail -s "ETL告警" $ALERT_EMAIL return 1 fi if grep -q "ETL作业完成" $LATEST_LOG; then echo "ETL作业执行成功" return 0 else echo "ETL作业状态未知" return 2 fi } check_data_freshness() { LATEST_FILE=$(ls -t /data/etl/extract/*.csv | head -1) FILE_TIME=$(stat -c %Y $LATEST_FILE) CURRENT_TIME=$(date +%s) DIFF=$((CURRENT_TIME - FILE_TIME)) if [ $DIFF -gt 86400 ]; then echo "数据过期: 超过24小时未更新" | mail -s "ETL数据过期告警" $ALERT_EMAIL return 1 fi echo "数据新鲜度正常" return 0 } check_record_count() { LATEST_FILE=$(ls -t /data/etl/extract/*.csv | head -1) RECORD_COUNT=$(wc -l < $LATEST_FILE) if [ $RECORD_COUNT -lt 100000 ]; then echo "记录数异常: $RECORD_COUNT" | mail -s "ETL记录数告警" $ALERT_EMAIL return 1 fi echo "记录数正常: $RECORD_COUNT" return 0 } echo "ETL监控检查" echo "==========================================" check_etl_status check_data_freshness check_record_count echo "==========================================" EOF # chmod +x /opt/etl/scripts/etl_monitor.sh # 配置监控定时任务 # crontab -e */30 * * * * /opt/etl/scripts/etl_monitor.sh >> /var/log/etl/monitor.log 2>&1
9. 性能优化
性能优化提高ETL处理效率。
# cat > /opt/etl/scripts/parallel_etl.py << 'EOF' #!/usr/bin/env python3 from multiprocessing import Pool import pandas as pd def process_chunk(chunk_file): """处理数据块""" df = pd.read_csv(chunk_file) # 转换逻辑 df['processed'] = True return df def parallel_process(input_file, output_file, num_processes=4): """并行处理""" # 分割文件 df = pd.read_csv(input_file) chunks = np.array_split(df, num_processes) # 保存临时文件 chunk_files = [] for i, chunk in enumerate(chunks): chunk_file = f'/tmp/chunk_{i}.csv' chunk.to_csv(chunk_file, index=False) chunk_files.append(chunk_file) # 并行处理 with Pool(num_processes) as pool: results = pool.map(process_chunk, chunk_files) # 合并结果 final_df = pd.concat(results) final_df.to_csv(output_file, index=False) print(f"并行处理完成: {len(final_df)} 条记录") if __name__ == '__main__': parallel_process( '/data/etl/extract/orders_full_20260403.csv', '/data/etl/transform/orders_parallel_20260403.csv' ) EOF # 批量插入优化 # cat > /opt/etl/scripts/batch_load.py << 'EOF' #!/usr/bin/env python3 import psycopg2 import pandas as pd from io import StringIO def batch_load(input_file, batch_size=10000): """批量加载优化""" df = pd.read_csv(input_file) conn = psycopg2.connect( host='192.168.1.20', port=5432, user='etl_user', password='Fgedu@ETL123', database='fgedu_dwh' ) cursor = conn.cursor() # 使用COPY命令批量导入 buffer = StringIO() df.to_csv(buffer, index=False, header=False) buffer.seek(0) cursor.copy_from(buffer, 'fact_orders', sep=',', null='') conn.commit() print(f"批量加载完成: {len(df)} 条记录") cursor.close() conn.close() if __name__ == '__main__': batch_load('/data/etl/transform/orders_transform_20260403.csv') EOF
10. 最佳实践
ETL最佳实践确保数据处理高效可靠。
# cat > /opt/etl/docs/best_practices.md << 'EOF' # ETL最佳实践 ## 1. 数据抽取 - 使用增量抽取减少数据量 - 建立变更数据捕获(CDC)机制 - 记录抽取日志和统计信息 - 处理抽取失败的重试机制 ## 2. 数据转换 - 数据清洗规则可配置 - 建立数据质量检查点 - 保留转换历史记录 - 异常数据处理机制 ## 3. 数据加载 - 使用批量加载提高效率 - 实现增量更新机制 - 建立加载失败回滚 - 验证加载结果 ## 4. 调度管理 - 合理设置调度时间 - 建立作业依赖关系 - 实现失败重试机制 - 记录执行日志 ## 5. 监控告警 - 监控作业执行状态 - 监控数据新鲜度 - 监控数据质量 - 及时发送告警 ## 6. 性能优化 - 并行处理数据 - 批量操作数据库 - 优化查询语句 - 合理使用缓存 ## 7. 安全管理 - 加密敏感数据 - 控制访问权限 - 审计操作日志 - 定期安全检查 ## 8. 文档管理 - 维护ETL文档 - 记录变更历史 - 编写操作手册 - 定期更新文档 EOF # ETL健康检查脚本 # cat > /opt/etl/scripts/health_check.sh << 'EOF' #!/bin/bash echo "ETL健康检查" echo "==========================================" echo "1. 检查源系统连接" python3 -c " import pymysql conn = pymysql.connect(host='192.168.1.10', port=3306, user='etl_user', password='Fgedu@ETL123') print('MySQL连接: 正常') conn.close() " echo "" echo "2. 检查目标系统连接" python3 -c " import psycopg2 conn = psycopg2.connect(host='192.168.1.20', port=5432, user='etl_user', password='Fgedu@ETL123', database='fgedu_dwh') print('PostgreSQL连接: 正常') conn.close() " echo "" echo "3. 检查磁盘空间" df -h /data/etl | tail -1 | awk '{print "ETL目录使用率: "$5}' echo "" echo "4. 检查最近作业状态" ls -lt /var/log/etl/*.log | head -3 echo "" echo "==========================================" EOF # chmod +x /opt/etl/scripts/health_check.sh
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
