本文档风哥主要介绍Airflow工作流调度实战,包括架构原理、安装配置、DAG开发等内容,风哥教程参考Airflow官方文档Concepts、DAG等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Airflow概述
Airflow是Apache开源的工作流管理平台,使用Python编写,支持以编程方式创建、调度和监控工作流。学习交流加群风哥微信: itpux-com
- Python编写:使用Python代码定义工作流
- DAG模型:有向无环图工作流
- 丰富的Operator:支持多种作业类型
- Web界面:可视化管理界面
Airflow是Apache开源的工作流管理平台,
由Airbnb开发并贡献给Apache基金会。
主要功能:
1. 工作流定义
– 使用Python代码定义DAG
– 支持复杂依赖关系
– 参数化配置
2. 调度管理
– 定时调度
– 手动触发
– 依赖触发
3. 监控管理
– Web UI监控
– 日志查看
– 告警通知
# Airflow核心概念
1. DAG (Directed Acyclic Graph)
– 有向无环图
– 定义工作流结构
– 包含Task集合
2. Task
– 任务单元
– DAG中的节点
– 执行具体操作
3. Operator
– 任务执行器
– 定义任务类型
– BashOperator, PythonOperator等
4. Task Instance
– 任务实例
– Task的一次运行
– 包含执行状态
# Airflow支持的Operator
Operator 说明
BashOperator 执行Bash命令
PythonOperator 执行Python函数
HiveOperator 执行Hive SQL
SparkSubmitOperator 提交Spark作业
MySqlOperator 执行MySQL SQL
PostgresOperator 执行PostgreSQL SQL
S3Operator 操作S3存储
DockerOperator 执行Docker容器
# Airflow优势
1. Python代码定义工作流
2. 丰富的Operator生态
3. 强大的Web UI
4. 可扩展性强
5. 活跃的社区
# Airflow vs Oozie vs Azkaban
特性 Airflow Oozie Azkaban
语言 Python XML Properties
界面 Web UI Web UI Web UI
学习曲线 平缓 陡峭 平缓
扩展性 强 中 中
社区活跃度 高 中 中
# Airflow应用场景
1. ETL数据管道
数据采集 -> 数据清洗 -> 数据加载
2. 机器学习管道
数据准备 -> 模型训练 -> 模型部署
3. 数据仓库
数据抽取 -> 数据转换 -> 数据加载
4. 定时报表
数据统计 -> 报表生成 -> 邮件发送
1.2 架构设计
Airflow架构设计详解:
┌─────────────────────────────────────────┐
│ Web Server │
│ ┌─────────────────────────────────┐ │
│ │ Web UI │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ REST API │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ Scheduler │
│ ┌─────────────────────────────────┐ │
│ │ DAG Parser │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Task Scheduler │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ Executor │
│ ┌─────────────────────────────────┐ │
│ │ SequentialExecutor │ │
│ │ LocalExecutor │ │
│ │ CeleryExecutor │ │
│ │ KubernetesExecutor │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ Metadata DB │
│ ┌─────────────────────────────────┐ │
│ │ PostgreSQL/MySQL │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
# 核心组件
1. Web Server
– Web UI界面
– REST API
– 用户管理
– DAG管理
2. Scheduler
– 解析DAG文件
– 调度任务
– 触发任务执行
3. Executor
– 执行任务
– 管理资源
– 任务队列
4. Metadata DB
– 存储元数据
– 存储执行状态
– 存储日志
# Executor类型
1. SequentialExecutor
– 单线程执行
– 适合开发测试
2. LocalExecutor
– 多线程执行
– 适合单机部署
3. CeleryExecutor
– 分布式执行
– 适合生产环境
4. KubernetesExecutor
– Kubernetes执行
– 弹性伸缩
# 工作原理
1. DAG解析
Scheduler -> 读取DAG文件 -> 解析DAG -> 存入数据库
2. 任务调度
Scheduler -> 检查调度时间 -> 创建TaskInstance -> 发送到队列
3. 任务执行
Executor -> 从队列获取任务 -> 执行任务 -> 更新状态
# 高可用设计
1. Web Server高可用
– 多实例部署
– 负载均衡
2. Scheduler高可用
– 多实例部署
– 主备切换
3. Executor高可用
– Celery集群
– 故障转移
# 任务状态
状态 说明
queued 排队中
running 运行中
success 成功
failed 失败
up_for_retry 等待重试
upstream_failed 上游失败
skipped 跳过
1.3 DAG模型详解
DAG模型详解:
DAG (Directed Acyclic Graph) 是Airflow中
工作流的核心概念,使用Python代码定义。
# 基本DAG示例
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# 默认参数
default_args = {
‘owner’: ‘fgedu’,
‘depends_on_past’: False,
’email’: [‘admin@fgedu.com’],
’email_on_failure’: True,
’email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
# 定义DAG
dag = DAG(
‘fgedu_etl’,
default_args=default_args,
description=’ETL DAG’,
schedule_interval=’0 1 * * *’,
start_date=datetime(2026, 4, 1),
catchup=False,
tags=[‘etl’, ‘fgedu’],
)
# 定义任务
t1 = BashOperator(
task_id=’print_date’,
bash_command=’date’,
dag=dag,
)
t2 = BashOperator(
task_id=’sleep’,
bash_command=’sleep 5′,
retries=3,
dag=dag,
)
# 定义依赖
t1 >> t2
# DAG参数说明
参数 说明
dag_id DAG唯一标识
description DAG描述
schedule_interval 调度间隔
start_date 开始日期
end_date 结束日期
catchup 是否补跑历史
tags 标签
default_args 默认参数
# 任务依赖
# 1. 线性依赖
t1 >> t2 >> t3
# 2. 分支依赖
t1 >> [t2, t3]
# 3. 合并依赖
[t2, t3] >> t4
# 4. 复杂依赖
t1 >> [t2, t3] >> t4
# 调度表达式
表达式 说明
None 手动触发
@once 执行一次
@hourly 每小时
@daily 每天
@weekly 每周
@monthly 每月
@yearly 每年
0 1 * * * Cron表达式
# Cron表达式
格式: 分 时 日 月 周
示例:
0 1 * * * 每天1点
0 */2 * * * 每2小时
0 0 * * 1 每周一
0 0 1 * * 每月1日
# 任务参数
参数 说明
task_id 任务ID
owner 所有者
retries 重试次数
retry_delay 重试间隔
timeout 超时时间
email 告警邮箱
pool 资源池
queue 队列
# XCom数据传递
# 发送数据
def push_function(**context):
context[‘ti’].xcom_push(key=’data’, value=’hello’)
# 接收数据
def pull_function(**context):
data = context[‘ti’].xcom_pull(key=’data’, task_ids=’push_task’)
print(data)
# Jinja模板
# 使用模板
t1 = BashOperator(
task_id=’templated’,
bash_command=’echo “Date: {{ ds }}”‘,
dag=dag,
)
# 内置变量
{{ ds }} 执行日期 YYYY-MM-DD
{{ ds_nodash }} 执行日期 YYYYMMDD
{{ ts }} 执行时间戳
{{ yesterday_ds }} 昨天日期
{{ tomorrow_ds }} 明天日期
Part02-生产环境规划与建议
2.1 环境规划建议
环境规划建议:
Airflow版本 Python版本 数据库版本
2.8.x 3.8-3.11 PostgreSQL 12+
2.7.x 3.7-3.10 PostgreSQL 11+
推荐版本:Airflow 2.8.0 + Python 3.10 + PostgreSQL 14
# 硬件规划
组件 配置 数量
Web Server 8C/16G/100G 2
Scheduler 8C/16G/100G 2
Worker 16C/32G/200G 3
PostgreSQL 8C/16G/200G 2(主从)
Redis 4C/8G/50G 3(集群)
# 软件依赖
1. Python
– Python 3.8+
– pip包管理
2. 数据库
– PostgreSQL 12+ (推荐)
– MySQL 5.7+
3. 消息队列
– Redis (Celery)
– RabbitMQ
# 网络规划
端口 用途
8080 Web Server HTTP
8443 Web Server HTTPS
8793 Flower (Celery监控)
5432 PostgreSQL
6379 Redis
# 目录规划
目录 用途
/bigdata/app/airflow 安装目录
/bigdata/app/airflow/dags DAG文件目录
/bigdata/app/airflow/logs 日志目录
/bigdata/app/airflow/plugins 插件目录
# 数据库规划
数据库 大小
airflow_db 100GB
表空间规划:
– DAG表
– 任务表
– 执行表
– 日志表
# 高可用规划
1. Web Server
– 部署2个实例
– Nginx负载均衡
2. Scheduler
– 部署2个实例
– 主备切换
3. Worker
– 部署3个实例
– 负载均衡
2.2 DAG规划建议
DAG规划建议:
1. ETL DAG
用途:数据抽取、转换、加载
频率:每日/每小时
2. 报表 DAG
用途:生成报表
频率:每日/每周
3. 同步 DAG
用途:数据同步
频率:每小时
4. ML DAG
用途:机器学习管道
频率:每日
# DAG命名规范
格式:[业务]_[类型]_[频率]
示例:
user_etl_daily
order_report_weekly
data_sync_hourly
ml_training_daily
# DAG文件结构
dags/
├── user_etl_daily.py
├── order_report_weekly.py
├── data_sync_hourly.py
└── ml_training_daily.py
# DAG代码规范
1. 文件头部注释
2. 参数配置清晰
3. 任务命名规范
4. 依赖关系明确
5. 异常处理完善
# DAG参数规划
参数类型 示例
系统参数 owner, start_date
调度参数 schedule_interval, catchup
重试参数 retries, retry_delay
告警参数 email, email_on_failure
资源参数 pool, queue
# DAG依赖规划
1. 任务依赖
– 线性依赖
– 分支依赖
– 条件依赖
2. DAG依赖
– ExternalTaskSensor
– SubDagOperator
2.3 调度规划建议
调度规划建议:
1. 定时调度
– Cron表达式
– 预设间隔
– 自定义间隔
2. 手动触发
– Web UI触发
– API触发
– CLI触发
3. 事件触发
– 文件传感器
– HTTP传感器
– 自定义传感器
# 调度频率规划
频率 表达式 适用场景
分钟 */5 * * * * 实时监控
小时 0 * * * * 实时统计
天 0 1 * * * 日报表
周 0 2 * * 1 周报表
月 0 3 1 * * 月报表
# 调度时间窗口
业务 开始时间 结束时间
ETL 01:00 05:00
报表 06:00 08:00
同步 00:00 23:59
# 重试策略
参数 默认值 说明
retries 0 重试次数
retry_delay 300s 重试间隔
# 告警配置
告警类型 触发条件
任务失败 任务执行失败
DAG失败 DAG执行失败
超时 任务执行超时
# 监控指标
指标 告警阈值
DAG成功率 < 95%
任务延迟 > 30分钟
队列积压 > 100个
Part03-生产环境项目实施方案
3.1 安装配置实战
3.1.1 安装Airflow
$ python3 -m venv /bigdata/app/airflow/venv
$ source /bigdata/app/airflow/venv/bin/activate
# 2. 安装Airflow
$ pip install apache-airflow[celery,postgres,redis]==2.8.0 \
–constraint “https://raw.githubusercontent.com/apache/airflow/constraints-2.8.0/constraints-3.10.txt”
# 3. 配置环境变量
$ export AIRFLOW_HOME=/bigdata/app/airflow
# 4. 初始化数据库
$ airflow db init
# 5. 创建管理员用户
$ airflow users create \
–username admin \
–password admin123 \
–firstname Admin \
–lastname User \
–role Admin \
–email admin@fgedu.com
Admin user admin created
# 6. 配置airflow.cfg
$ cat > /bigdata/app/airflow/airflow.cfg << 'EOF'
[core]
dags_folder = /bigdata/app/airflow/dags
base_log_folder = /bigdata/app/airflow/logs
executor = CeleryExecutor
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow123@fgedu-node1:5432/airflow_db
[celery]
broker_url = redis://fgedu-node1:6379/0
result_backend = db+postgresql://airflow:airflow123@fgedu-node1:5432/airflow_db
[webserver]
web_server_host = 0.0.0.0
web_server_port = 8080
workers = 4
[scheduler]
scheduler_heartbeat_sec = 5
catchup_by_default = False
EOF
# 7. 创建PostgreSQL数据库
$ psql -U postgres
postgres=# CREATE DATABASE airflow_db;
postgres=# CREATE USER airflow WITH PASSWORD 'airflow123';
postgres=# GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow;
postgres=# \q
# 8. 初始化数据库
$ airflow db init
# 9. 启动Web Server
$ airflow webserver --port 8080 &
# 10. 启动Scheduler
$ airflow scheduler &
# 11. 启动Celery Worker
$ airflow celery worker &
# 12. 访问Web UI
http://fgedu-node1:8080
用户名: admin
密码: admin123
3.1.2 配置高可用
# 在fgedu-node1, fgedu-node2, fgedu-node3
$ cat > /etc/redis/redis.conf << 'EOF' bind 0.0.0.0 port 6379 cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 appendonly yes EOF $ systemctl restart redis # 2. 创建Redis集群 $ redis-cli --cluster create \ fgedu-node1:6379 fgedu-node2:6379 fgedu-node3:6379 \ --cluster-replicas 0 # 3. 配置PostgreSQL主从 # 主库配置 $ cat >> /var/lib/pgsql/data/postgresql.conf << 'EOF' wal_level = replica max_wal_senders = 3 EOF $ cat >> /var/lib/pgsql/data/pg_hba.conf << 'EOF' host replication airflow fgedu-node2/32 trust host replication airflow fgedu-node3/32 trust EOF $ systemctl restart postgresql # 从库配置 $ pg_basebackup -h fgedu-node1 -U airflow -D /var/lib/pgsql/data -P -R $ systemctl start postgresql # 4. 部署多Web Server # 在fgedu-node2 $ airflow webserver --port 8080 & # 5. 部署多Scheduler # 在fgedu-node2 $ airflow scheduler & # 6. 部署多Worker # 在fgedu-node2, fgedu-node3 $ airflow celery worker & # 7. 配置Nginx负载均衡 $ cat > /etc/nginx/conf.d/airflow.conf << 'EOF' upstream airflow_cluster { server fgedu-node1:8080; server fgedu-node2:8080; } server { listen 8080; server_name airflow.fgedu.net.cn; location / { proxy_pass http://airflow_cluster; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } } EOF $ systemctl restart nginx # 8. 访问高可用地址 http://airflow.fgedu.net.cn:8080
3.2 DAG开发实战
3.2.1 创建ETL DAG
$ cat > /bigdata/app/airflow/dags/fgedu_etl_dag.py << 'EOF' """ fgedu_etl_dag.py from:www.itpux.com.qq113257174.wx:itpux-com web: http://www.fgedu.net.cn ETL DAG示例 """ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.providers.apache.hive.operators.hive import HiveOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago from datetime import timedelta # 默认参数 default_args = { 'owner': 'fgedu', 'depends_on_past': False, 'email': ['admin@fgedu.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), } # 定义DAG dag = DAG( 'fgedu_etl_daily', default_args=default_args, description='Daily ETL DAG', schedule_interval='0 1 * * *', start_date=days_ago(1), catchup=False, tags=['etl', 'fgedu', 'daily'], ) # 数据采集任务 sqoop_import = BashOperator( task_id='sqoop_import', bash_command=''' sqoop import \ --connect jdbc:mysql://fgedu-node1:3306/fgedudb \ --username fgedu \ --password fgedu123 \ --table user_info \ --target-dir /user/fgedu/staging/user_info/{{ds}} \ --fields-terminated-by ',' \ --m 4 ''', dag=dag, ) # 数据清洗任务 hive_clean = HiveOperator( task_id='hive_clean', hive_cli_conn_id='hive_default', hql=''' INSERT OVERWRITE DIRECTORY '/user/fgedu/cleaned/user_info/{{ds}}' SELECT user_id, UPPER(name) as name, age, city FROM external_user_info WHERE dt = '{{ds}}' AND age > 0 AND age < 150 ''', dag=dag, ) # 数据转换任务 spark_transform = SparkSubmitOperator( task_id='spark_transform', application='/user/fgedu/lib/etl.jar', java_class='com.fgedu.etl.DataTransform', conn_id='spark_default', application_args=[ '/user/fgedu/cleaned/user_info/{{ds}}', '/user/fgedu/output/user_info/{{ds}}' ], conf={ 'spark.executor.memory': '4g', 'spark.executor.cores': '2', 'spark.num.executors': '10' }, dag=dag, ) # 数据验证任务 def validate_data(**context): import subprocess date = context['ds'] cmd = f'hdfs dfs -test -e /user/fgedu/output/user_info/{date}/_SUCCESS' result = subprocess.run(cmd, shell=True) if result.returncode != 0: raise Exception(f'Data validation failed for {date}') print(f'Data validation passed for {date}') validate = PythonOperator( task_id='validate', python_callable=validate_data, dag=dag, ) # 定义任务依赖 sqoop_import >> hive_clean >> spark_transform >> validate
EOF
# 2. 验证DAG
$ airflow dags list
dag_id | filepath
——————+————————
fgedu_etl_daily | fgedu_etl_dag.py
# 3. 测试DAG
$ airflow dags test fgedu_etl_daily 2026-04-08
[2026-04-08 15:00:00,000] {taskinstance.py:1234} INFO – Dependencies all met for
…
[2026-04-08 15:05:00,000] {taskinstance.py:1234} INFO – Task succeeded
# 4. 手动触发DAG
$ airflow dags trigger fgedu_etl_daily
Created
3.2.2 创建复杂DAG
$ cat > /bigdata/app/airflow/dags/fgedu_branch_dag.py << 'EOF' """ fgedu_branch_dag.py from:www.itpux.com.qq113257174.wx:itpux-com web: http://www.fgedu.net.cn 分支DAG示例 """ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator from airflow.utils.dates import days_ago def choose_branch(**context): import random branch = random.choice(['branch_a', 'branch_b']) print(f'Chosen branch: {branch}') return branch dag = DAG( 'fgedu_branch_dag', schedule_interval='0 2 * * *', start_date=days_ago(1), catchup=False, ) start = BashOperator( task_id='start', bash_command='echo "Start"', dag=dag, ) choose = BranchPythonOperator( task_id='choose_branch', python_callable=choose_branch, dag=dag, ) branch_a = BashOperator( task_id='branch_a', bash_command='echo "Branch A"', dag=dag, ) branch_b = BashOperator( task_id='branch_b', bash_command='echo "Branch B"', dag=dag, ) end = BashOperator( task_id='end', bash_command='echo "End"', trigger_rule='none_failed', dag=dag, ) start >> choose >> [branch_a, branch_b] >> end
EOF
# 2. 创建子DAG
$ cat > /bigdata/app/airflow/dags/fgedu_subdag_parent.py << 'EOF'
"""
fgedu_subdag_parent.py
from:www.itpux.com.qq113257174.wx:itpux-com
web: http://www.fgedu.net.cn
子DAG示例
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.dates import days_ago
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval=None,
)
with dag_subdag:
t1 = BashOperator(
task_id='task1',
bash_command='echo "SubDAG Task 1"',
)
t2 = BashOperator(
task_id='task2',
bash_command='echo "SubDAG Task 2"',
)
t1 >> t2
return dag_subdag
dag = DAG(
‘fgedu_subdag_parent’,
schedule_interval=’0 3 * * *’,
start_date=days_ago(1),
catchup=False,
)
start = BashOperator(
task_id=’start’,
bash_command=’echo “Start”‘,
dag=dag,
)
subdag_task = SubDagOperator(
task_id=’subdag_task’,
subdag=subdag(‘fgedu_subdag_parent’, ‘subdag_task’, dag.default_args),
dag=dag,
)
end = BashOperator(
task_id=’end’,
bash_command=’echo “End”‘,
dag=dag,
)
start >> subdag_task >> end
EOF
3.3 Operator使用实战
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id=’bash_task’,
bash_command=’echo “Hello Airflow”‘,
dag=dag,
)
# 2. PythonOperator
from airflow.operators.python import PythonOperator
def python_function(**context):
print(f”Execution date: {context[‘ds’]}”)
return ‘success’
python_task = PythonOperator(
task_id=’python_task’,
python_callable=python_function,
dag=dag,
)
# 3. HiveOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
hive_task = HiveOperator(
task_id=’hive_task’,
hive_cli_conn_id=’hive_default’,
hql=’SELECT COUNT(*) FROM user_info’,
dag=dag,
)
# 4. SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_task = SparkSubmitOperator(
task_id=’spark_task’,
application=’/user/fgedu/lib/etl.jar’,
java_class=’com.fgedu.etl.SparkETL’,
conn_id=’spark_default’,
dag=dag,
)
# 5. MySqlOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
mysql_task = MySqlOperator(
task_id=’mysql_task’,
mysql_conn_id=’mysql_default’,
sql=’SELECT COUNT(*) FROM user_info’,
dag=dag,
)
# 6. DockerOperator
from airflow.providers.docker.operators.docker import DockerOperator
docker_task = DockerOperator(
task_id=’docker_task’,
image=’python:3.10′,
command=’python -c “print(\’Hello Docker\’)”‘,
dag=dag,
)
# 7. Sensor
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id=’file_sensor’,
filepath=’/user/fgedu/input/data.csv’,
fs_conn_id=’hdfs_default’,
poke_interval=60,
timeout=3600,
dag=dag,
)
# 8. 自定义Operator
from airflow.models import BaseOperator
class FgeduCustomOperator(BaseOperator):
def __init__(self, name, **kwargs):
super().__init__(**kwargs)
self.name = name
def execute(self, context):
self.log.info(f’Executing custom operator: {self.name}’)
return f’Hello {self.name}’
custom_task = FgeduCustomOperator(
task_id=’custom_task’,
name=’Fgedu’,
dag=dag,
)
Part04-生产案例与实战讲解
4.1 ETL调度案例
$ cat > /bigdata/app/airflow/dags/fgedu_complete_etl.py << 'EOF'
"""
fgedu_complete_etl.py
from:www.itpux.com.qq113257174.wx:itpux-com
web: http://www.fgedu.net.cn
完整ETL数据管道
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
from datetime import timedelta
import smtplib
from email.mime.text import MIMEText
default_args = {
'owner': 'fgedu',
'depends_on_past': False,
'email': ['admin@fgedu.com'],
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'fgedu_complete_etl',
default_args=default_args,
schedule_interval='0 1 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'complete'],
)
# 等待数据文件
wait_for_data = FileSensor(
task_id='wait_for_data',
filepath='/user/fgedu/input/{{ds}}/_SUCCESS',
fs_conn_id='hdfs_default',
poke_interval=300,
timeout=3600,
dag=dag,
)
# 数据采集
sqoop_import = BashOperator(
task_id='sqoop_import',
bash_command='''
sqoop import \
--connect jdbc:mysql://fgedu-node1:3306/fgedudb \
--username fgedu \
--password fgedu123 \
--query "SELECT * FROM user_info WHERE created_date='{{ds}}' AND \$CONDITIONS" \
--target-dir /user/fgedu/staging/user_info/{{ds}} \
--fields-terminated-by ',' \
--m 4 \
--split-by user_id
''',
dag=dag,
)
# 数据清洗
hive_clean = HiveOperator(
task_id='hive_clean',
hive_cli_conn_id='hive_default',
hql='''
INSERT OVERWRITE TABLE cleaned_user_info PARTITION(dt='{{ds}}')
SELECT
user_id,
UPPER(name) as name,
age,
city,
CURRENT_TIMESTAMP as etl_time
FROM staging_user_info
WHERE dt='{{ds}}' AND age > 0 AND age < 150
''',
dag=dag,
)
# 数据转换
spark_transform = SparkSubmitOperator(
task_id='spark_transform',
application='/user/fgedu/lib/transform.jar',
java_class='com.fgedu.etl.Transform',
conn_id='spark_default',
application_args=['{{ds}}'],
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.num.executors': '10'
},
dag=dag,
)
# 数据验证
def validate_data(**context):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Validate').getOrCreate()
date = context['ds']
df = spark.read.parquet(f'/user/fgedu/output/user_info/{date}')
count = df.count()
if count == 0:
raise Exception(f'No data found for {date}')
print(f'Validated {count} records for {date}')
return count
validate = PythonOperator(
task_id='validate',
python_callable=validate_data,
dag=dag,
)
# 发送通知
def send_notification(**context):
date = context['ds']
count = context['ti'].xcom_pull(task_ids='validate')
msg = MIMEText(f'ETL completed for {date}. Total records: {count}')
msg['Subject'] = f'ETL Notification - {date}'
msg['From'] = 'airflow@fgedu.com'
msg['To'] = 'admin@fgedu.com'
with smtplib.SMTP('smtp.fgedu.com') as server:
server.send_message(msg)
notify = PythonOperator(
task_id='notify',
python_callable=send_notification,
dag=dag,
)
# 定义依赖
wait_for_data >> sqoop_import >> hive_clean >> spark_transform >> validate >> notify
EOF
4.2 数据管道案例
$ cat > /bigdata/app/airflow/dags/fgedu_pipeline.py << 'EOF'
"""
fgedu_pipeline.py
from:www.itpux.com.qq113257174.wx:itpux-com
web: http://www.fgedu.net.cn
多数据源并行管道
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago
dag = DAG(
'fgedu_pipeline',
schedule_interval='0 2 * * *',
start_date=days_ago(1),
catchup=False,
)
# MySQL数据同步
mysql_sync = BashOperator(
task_id='mysql_sync',
bash_command='''
sqoop import \
--connect jdbc:mysql://fgedu-node1:3306/fgedudb \
--username fgedu \
--password fgedu123 \
--table orders \
--target-dir /user/fgedu/sync/mysql/orders/{{ds}}
''',
dag=dag,
)
# Oracle数据同步
oracle_sync = BashOperator(
task_id='oracle_sync',
bash_command='''
sqoop import \
--connect jdbc:oracle:thin:@fgedu-oracle:1521:fgedudb \
--username fgedu \
--password fgedu123 \
--table products \
--target-dir /user/fgedu/sync/oracle/products/{{ds}}
''',
dag=dag,
)
# PostgreSQL数据同步
postgres_sync = BashOperator(
task_id='postgres_sync',
bash_command='''
sqoop import \
--connect jdbc:postgresql://fgedu-pg:5432/fgedudb \
--username fgedu \
--password fgedu123 \
--table customers \
--target-dir /user/fgedu/sync/postgres/customers/{{ds}}
''',
dag=dag,
)
# 数据合并
merge_data = HiveOperator(
task_id='merge_data',
hive_cli_conn_id='hive_default',
hql='''
INSERT OVERWRITE TABLE merged_data PARTITION(dt='{{ds}}')
SELECT * FROM (
SELECT 'mysql' as source, * FROM mysql_orders WHERE dt='{{ds}}'
UNION ALL
SELECT 'oracle' as source, * FROM oracle_products WHERE dt='{{ds}}'
UNION ALL
SELECT 'postgres' as source, * FROM postgres_customers WHERE dt='{{ds}}'
) t
''',
dag=dag,
)
# 数据验证
validate = BashOperator(
task_id='validate',
bash_command='hdfs dfs -test -e /user/fgedu/merged/{{ds}}/_SUCCESS',
dag=dag,
)
# 定义依赖
[mysql_sync, oracle_sync, postgres_sync] >> merge_data >> validate
EOF
4.3 常见问题处理
4.3.1 DAG解析失败
# 排查步骤
# 1. 检查DAG文件
$ airflow dags list
# 2. 查看解析错误
$ airflow dags show fgedu_etl
# 解决方案
# 1. 检查Python语法
$ python -m py_compile /bigdata/app/airflow/dags/fgedu_etl.py
# 2. 检查导入错误
$ python /bigdata/app/airflow/dags/fgedu_etl.py
4.3.2 任务执行失败
# 排查步骤
# 1. 查看任务日志
Web UI -> DAG -> Task -> Log
# 2. 查看Worker日志
$ tail -f /bigdata/app/airflow/logs/scheduler/latest/*.log
# 解决方案
# 1. 清除任务状态
$ airflow tasks clear fgedu_etl -t sqoop_import -s 2026-04-08
# 2. 重试任务
Web UI -> DAG -> Task -> Clear
Part05-风哥经验总结与分享
5.1 Airflow最佳实践
Airflow最佳实践建议:
1. 使用Python代码定义DAG
2. 合理设置重试和超时
3. 使用XCom传递数据
4. 监控DAG执行状态
5. 定期清理历史数据
5.2 使用建议
使用建议:
- DAG设计要简洁
- 使用版本控制
- 配置监控告警
- 定期备份数据库
5.3 工具推荐
Airflow相关工具:
- Airflow Web UI:可视化管理
- Airflow CLI:命令行工具
- Flower:Celery监控
- 自定义插件:扩展功能
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
