本文档风哥主要介绍Azkaban工作流调度实战,包括架构原理、安装配置、项目开发等内容,风哥教程参考Azkaban官方文档Getting Started、Workflow等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Azkaban概述
Azkaban是由LinkedIn开源的批量工作流调度系统,用于管理Hadoop作业的执行。学习交流加群风哥微信: itpux-com
- Web界面:友好的Web管理界面
- 工作流管理:支持DAG工作流
- 定时调度:支持Cron表达式
- 权限管理:支持用户权限管理
Azkaban是LinkedIn开源的批量工作流调度系统,
专门用于解决Hadoop作业依赖问题。
主要功能:
1. 工作流管理
– 定义作业依赖关系
– 可视化工作流
– 工作流执行控制
2. 调度管理
– 定时调度
– 手动触发
– 依赖触发
3. 用户管理
– 用户权限
– 项目权限
– 操作审计
# Azkaban组件
1. Web Server
– Web界面
– REST API
– 用户管理
– 项目管理
2. Executor Server
– 作业执行
– 资源管理
– 日志管理
3. MySQL
– 存储元数据
– 存储作业状态
– 存储用户信息
# Azkaban支持的作业类型
作业类型 说明
command Shell命令
java Java程序
hive Hive作业
pig Pig作业
spark Spark作业
hadoopShell Hadoop Shell
# Azkaban优势
1. 简单易用的Web界面
2. 支持复杂工作流
3. 支持作业重试
4. 支持邮件告警
5. 支持权限管理
# Azkaban vs Oozie
特性 Azkaban Oozie
界面 Web界面 Web界面
配置 简单 复杂
工作流 Flow Workflow XML
调度 Cron Coordinator
学习曲线 平缓 陡峭
# Azkaban应用场景
1. ETL数据管道
数据采集 -> 数据清洗 -> 数据加载
2. 定时报表
数据统计 -> 报表生成 -> 邮件发送
3. 数据同步
数据导出 -> 数据传输 -> 数据导入
4. 批量处理
批量作业 -> 并行执行 -> 结果汇总
1.2 架构设计
Azkaban架构设计详解:
┌─────────────────────────────────────────┐
│ Web Server │
│ ┌─────────────────────────────────┐ │
│ │ Web UI │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ REST API │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Project Manager │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ MySQL │
│ ┌─────────────────────────────────┐ │
│ │ 元数据存储 │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ Executor Server │
│ ┌─────────────────────────────────┐ │
│ │ Job Runner │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Thread Pool │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘
│
┌───────────────────┴─────────────────────┐
│ Hadoop Cluster │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ HDFS │ │ YARN │ │ Hive │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
# Web Server
1. Web UI
– 项目管理
– 工作流管理
– 执行历史
– 用户管理
2. REST API
– 项目API
– 执行API
– 用户API
3. Project Manager
– 项目上传
– 项目解析
– 项目分发
# Executor Server
1. Job Runner
– 作业执行
– 状态监控
– 日志收集
2. Thread Pool
– 并发执行
– 资源管理
– 队列管理
# MySQL
1. 元数据表
– projects: 项目信息
– flows: 工作流信息
– jobs: 作业信息
2. 执行表
– executions: 执行记录
– execution_jobs: 作业执行
– execution_flows: 流程执行
3. 用户表
– users: 用户信息
– permissions: 权限信息
# 工作原理
1. 项目上传
用户 -> Web UI -> 上传ZIP -> 解析 -> MySQL
2. 作业执行
Web Server -> Executor -> 执行作业 -> 回调通知
3. 状态更新
Executor -> Web Server -> 更新状态 -> MySQL
# 高可用设计
1. Web Server高可用
– 多实例部署
– 负载均衡
– Session共享
2. Executor高可用
– 多Executor部署
– 故障转移
– 负载均衡
# 作业生命周期
1. READY:准备状态
2. QUEUED:排队状态
3. RUNNING:运行状态
4. SUCCEEDED:成功状态
5. FAILED:失败状态
6. KILLED:终止状态
1.3 工作流模型详解
工作流模型详解:
Azkaban使用.properties文件定义作业,
使用.job文件定义工作流。
# Job文件示例
# fgedu_etl.job
type=command
command=echo “Hello Azkaban”
# 带依赖的Job
# job1.job
type=command
command=echo “Job 1”
# job2.job
type=command
dependencies=job1
command=echo “Job 2”
# job3.job
type=command
dependencies=job1
command=echo “Job 3”
# job4.job
type=command
dependencies=job2,job3
command=echo “Job 4”
# 工作流图
job1
/ \
job2 job3
\ /
job4
# Hive作业
# hive_etl.job
type=hive
hive.script=etl.hql
hive.param.input=/user/fgedu/input
hive.param.output=/user/fgedu/output
# Spark作业
# spark_etl.job
type=spark
spark.class=com.fgedu.etl.SparkETL
spark.master=yarn
spark.deploy-mode=cluster
spark.jars=/user/fgedu/lib/etl.jar
# Command作业
# shell_etl.job
type=command
command=/bin/bash /user/fgedu/scripts/etl.sh
command.1=/bin/bash /user/fgedu/scripts/validate.sh
# 工作流参数
# flow.properties
input.path=/user/fgedu/input
output.path=/user/fgedu/output
date=2026-04-08
# job文件引用参数
type=command
command=echo ${input.path}
# 条件执行
# job1.job
type=command
command=echo “Job 1”
condition=success
# job2.job
type=command
dependencies=job1
command=echo “Job 2”
condition=job1:success
# 重试配置
# retry.job
type=command
command=echo “Retry Job”
retries=3
retry.backoff=30000
# 告警配置
# alert.job
type=command
command=echo “Alert Job”
failure.emails=admin@fgedu.com
success.emails=admin@fgedu.com
notify.emails=admin@fgedu.com
# 资源配置
# resource.job
type=command
command=echo “Resource Job”
job.memory=4096
job.cpus=2
Part02-生产环境规划与建议
2.1 环境规划建议
环境规划建议:
Azkaban版本 Hadoop版本 Java版本
3.90.x 3.x 8+
3.80.x 2.x 8+
推荐版本:Azkaban 3.90.0 + Hadoop 3.3.6 + Java 8
# 硬件规划
组件 配置 数量
Web Server 8C/16G/100G 2(高可用)
Executor Server 16C/32G/200G 3
MySQL 8C/16G/200G 2(主从)
# 软件依赖
1. Java
– JDK 8+
– 配置JAVA_HOME
2. MySQL
– MySQL 5.7+
– 字符集UTF-8
3. Hadoop
– HDFS
– YARN
# 网络规划
端口 用途
8081 Web Server HTTP
8443 Web Server HTTPS
12321 Executor Server
# 目录规划
目录 用途
/bigdata/app/azkaban 安装目录
/bigdata/app/azkaban/web Web Server
/bigdata/app/azkaban/exec Executor Server
/bigdata/app/azkaban/logs 日志目录
# 数据库规划
数据库 大小
azkaban_db 50GB
表空间规划:
– 项目表
– 执行表
– 用户表
– 日志表
# 高可用规划
1. Web Server
– 部署2个实例
– Nginx负载均衡
– Session共享
2. Executor Server
– 部署3个实例
– 自动故障转移
– 负载均衡
2.2 项目规划建议
项目规划建议:
1. ETL项目
用途:数据抽取、转换、加载
频率:每日/每小时
2. 报表项目
用途:生成报表
频率:每日/每周
3. 同步项目
用途:数据同步
频率:每小时
4. 分析项目
用途:数据分析
频率:每日
# 项目命名规范
格式:[业务]_[类型]_[环境]
示例:
user_etl_prod
order_report_prod
data_sync_prod
# 项目目录结构
fgedu_etl/
├── etl.job
├── hive_etl.job
├── spark_etl.job
├── validate.job
├── flow.properties
├── scripts/
│ ├── etl.sh
│ └── validate.sh
└── hql/
└── etl.hql
# 项目权限规划
角色 权限
管理员 所有项目
开发者 指定项目
运维 执行权限
只读 查看权限
# 项目参数规划
参数类型 示例
系统参数 ${azkaban.flow.projectid}
时间参数 ${azkaban.flow.starttime}
业务参数 ${input.path}, ${output.path}
自定义参数 ${date}, ${hour}
2.3 调度规划建议
调度规划建议:
1. 定时调度
– Cron表达式
– 固定周期
– 时间窗口
2. 手动触发
– 立即执行
– 参数传入
– 指定时间
3. 依赖触发
– 前置作业完成
– 数据可用
– 条件满足
# 调度频率规划
频率 Cron表达式 适用场景
分钟 */5 * * * * 实时监控
小时 0 * * * * 实时统计
天 0 1 * * * 日报表
周 0 2 * * 1 周报表
月 0 3 1 * * 月报表
# 调度时间窗口
业务 开始时间 结束时间
ETL 01:00 05:00
报表 06:00 08:00
同步 00:00 23:59
# 重试策略
参数 默认值 说明
retries 0 重试次数
retry.backoff 0 重试间隔(ms)
# 告警配置
告警类型 触发条件
作业失败 作业执行失败
作业超时 作业执行超时
流程失败 流程执行失败
# 监控指标
指标 告警阈值
作业成功率 < 95%
作业延迟 > 30分钟
队列积压 > 100个
Part03-生产环境项目实施方案
3.1 安装配置实战
3.1.1 安装Azkaban
$ cd /bigdata/app
$ wget https://github.com/azkaban/azkaban/archive/refs/tags/3.90.0.tar.gz
$ tar -xzf 3.90.0.tar.gz
$ ln -s azkaban-3.90.0 azkaban
# 2. 编译Azkaban
$ cd /bigdata/app/azkaban
$ ./gradlew build installDist -x test
BUILD SUCCESSFUL in 10m 30s
# 3. 创建数据库
$ mysql -u root -p
mysql> CREATE DATABASE azkaban_db DEFAULT CHARACTER SET utf8mb4;
mysql> CREATE USER ‘azkaban’@’%’ IDENTIFIED BY ‘azkaban123’;
mysql> GRANT ALL PRIVILEGES ON azkaban_db.* TO ‘azkaban’@’%’;
mysql> FLUSH PRIVILEGES;
# 4. 导入数据库脚本
$ mysql -u azkaban -p azkaban_db < /bigdata/app/azkaban/azkaban-db/build/distributions/azkaban-db-3.90.0/create-all-sql-3.90.0.sql
# 5. 配置Web Server
$ cd /bigdata/app/azkaban/azkaban-web-server/build/distributions
$ tar -xzf azkaban-web-server-3.90.0.tar.gz
$ mv azkaban-web-server-3.90.0 /bigdata/app/azkaban/web-server
$ cat > /bigdata/app/azkaban/web-server/conf/azkaban.properties << 'EOF'
# Azkaban Web Server配置
default.timezone.id=Asia/Shanghai
# MySQL配置
database.type=mysql
mysql.port=3306
mysql.host=fgedu-node1
mysql.database=azkaban_db
mysql.user=azkaban
mysql.password=azkaban123
mysql.numconnections=100
# Web Server配置
web.resource.dir=/bigdata/app/azkaban/web-server/web
web.template.dir=/bigdata/app/azkaban/web-server/web/template
web.servlet.cache=false
web.server.port=8081
web.server.ssl.port=8443
web.server.use.ssl=false
# Executor配置
azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
azkaban.executorselector.comparator.Memory=1
azkaban.executorselector.comparator.LastDispatched=1
azkaban.executorselector.comparator.CpuUsage=1
EOF
# 6. 配置Executor Server
$ cd /bigdata/app/azkaban/azkaban-exec-server/build/distributions
$ tar -xzf azkaban-exec-server-3.90.0.tar.gz
$ mv azkaban-exec-server-3.90.0 /bigdata/app/azkaban/exec-server
$ cat > /bigdata/app/azkaban/exec-server/conf/azkaban.properties << 'EOF'
# Azkaban Executor Server配置
default.timezone.id=Asia/Shanghai
# MySQL配置
database.type=mysql
mysql.port=3306
mysql.host=fgedu-node1
mysql.database=azkaban_db
mysql.user=azkaban
mysql.password=azkaban123
mysql.numconnections=100
# Executor配置
executor.maxThreads=50
executor.port=12321
executor.flow.threads=30
EOF
# 7. 启动Executor Server
$ cd /bigdata/app/azkaban/exec-server
$ bin/start-exec.sh
# 8. 激活Executor
$ curl http://localhost:12321/executor?action=activate
{"status":"success"}
# 9. 启动Web Server
$ cd /bigdata/app/azkaban/web-server
$ bin/start-web.sh
# 10. 访问Web UI
http://fgedu-node1:8081
默认用户名: azkaban
默认密码: azkaban
3.1.2 配置高可用
# 在fgedu-node2
$ cd /bigdata/app/azkaban/exec-server
$ bin/start-exec.sh
$ curl http://localhost:12321/executor?action=activate
# 在fgedu-node3
$ cd /bigdata/app/azkaban/exec-server
$ bin/start-exec.sh
$ curl http://localhost:12321/executor?action=activate
# 2. 查看Executor列表
$ mysql -u azkaban -p azkaban_db -e “SELECT host, port, active FROM executors;”
+————-+——-+——–+
| host | port | active |
+————-+——-+——–+
| fgedu-node1 | 12321 | 1 |
| fgedu-node2 | 12321 | 1 |
| fgedu-node3 | 12321 | 1 |
+————-+——-+——–+
# 3. 配置Web Server高可用
# 在fgedu-node2部署Web Server
$ cd /bigdata/app/azkaban/web-server
$ bin/start-web.sh
# 4. 配置Nginx负载均衡
$ cat > /etc/nginx/conf.d/azkaban.conf << 'EOF'
upstream azkaban_cluster {
server fgedu-node1:8081;
server fgedu-node2:8081;
}
server {
listen 8081;
server_name azkaban.fgedu.net.cn;
location / {
proxy_pass http://azkaban_cluster;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
EOF
$ systemctl restart nginx
# 5. 访问高可用地址
http://azkaban.fgedu.net.cn:8081
3.2 项目开发实战
3.2.1 创建ETL项目
$ mkdir -p /tmp/fgedu_etl_project
$ cd /tmp/fgedu_etl_project
# 2. 创建job文件
# 数据采集job
$ cat > sqoop_import.job << 'EOF'
type=command
command=sqoop import \
--connect jdbc:mysql://fgedu-node1:3306/fgedudb \
--username fgedu \
--password fgedu123 \
--table user_info \
--target-dir /user/fgedu/staging/user_info \
--fields-terminated-by ',' \
--m 4
EOF
# 数据清洗job
$ cat > hive_clean.job << 'EOF'
type=hive
dependencies=sqoop_import
hive.script=clean_data.hql
hive.param.input=/user/fgedu/staging/user_info
hive.param.output=/user/fgedu/cleaned/user_info
EOF
# Hive脚本
$ cat > clean_data.hql << 'EOF'
-- clean_data.hql
-- from:www.itpux.com.qq113257174.wx:itpux-com
-- web: http://www.fgedu.net.cn
CREATE EXTERNAL TABLE IF NOT EXISTS temp_user (
user_id STRING,
name STRING,
age INT,
city STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '${hivevar:input}';
INSERT OVERWRITE DIRECTORY '${hivevar:output}'
SELECT
user_id,
UPPER(name) as name,
age,
city
FROM temp_user
WHERE age > 0 AND age < 150;
EOF
# 数据转换job
$ cat > spark_transform.job << 'EOF'
type=spark
dependencies=hive_clean
spark.class=com.fgedu.etl.DataTransform
spark.master=yarn
spark.deploy-mode=cluster
spark.jars=/user/fgedu/lib/etl.jar
spark.args=/user/fgedu/cleaned/user_info /user/fgedu/output/user_info
EOF
# 数据验证job
$ cat > validate.job << 'EOF'
type=command
dependencies=spark_transform
command=/bin/bash validate.sh
EOF
# 验证脚本
$ cat > validate.sh << 'EOF'
#!/bin/bash
# validate.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo "验证数据..."
hdfs dfs -test -e /user/fgedu/output/user_info/_SUCCESS
if [ $? -eq 0 ]; then
echo "验证成功"
exit 0
else
echo "验证失败"
exit 1
fi
EOF
$ chmod +x validate.sh
# 3. 创建flow.properties
$ cat > flow.properties << 'EOF'
input.path=/user/fgedu/staging/user_info
output.path=/user/fgedu/output/user_info
EOF
# 4. 打包项目
$ zip -r fgedu_etl.zip *.job *.hql *.sh flow.properties
# 5. 上传项目
# 访问Azkaban Web UI
# 创建项目: fgedu_etl
# 上传ZIP文件: fgedu_etl.zip
# 6. 执行项目
# 点击Execute Flow
# 选择要执行的flow
# 点击Execute
3.2.2 创建定时调度
# 项目页面 -> Schedule -> Set Schedule
# 2. 使用API创建调度
$ cat > /tmp/schedule.json << 'EOF'
{
"projectId": 1,
"flow": "fgedu_etl",
"scheduleTime": "01:00",
"scheduleDate": "01/04/2026",
"isRecurring": "on",
"period": "1d",
"concurrentOption": "skip"
}
EOF
$ curl -k -X POST \
-d "ajax=scheduleCronFlow" \
-d "projectId=1" \
-d "flow=fgedu_etl" \
-d "cronExpression=0 0 1 * * ? *" \
-b "azkaban.browser.session.id=..." \
http://localhost:8081/schedule
{"message":"Schedule created successfully","status":"success"}
# 3. 查看调度列表
$ curl -k -X GET \
-d "ajax=fetchSchedules" \
-b "azkaban.browser.session.id=..." \
http://localhost:8081/schedule
# 4. 手动触发执行
$ curl -k -X POST \
-d "ajax=executeFlow" \
-d "project=fgedu_etl" \
-d "flow=fgedu_etl" \
-b "azkaban.browser.session.id=..." \
http://localhost:8081/executor
{"message":"Execution submitted successfully with exec id 1","execid":"1"}
3.3 调度配置实战
$ cat > alert.job << 'EOF' type=command command=echo "Alert Job" failure.emails=admin@fgedu.com success.emails=admin@fgedu.com notify.emails=admin@fgedu.com EOF # 2. 配置重试 $ cat > retry.job << 'EOF' type=command command=echo "Retry Job" retries=3 retry.backoff=30000 EOF # 3. 配置超时 $ cat > timeout.job << 'EOF' type=command command=echo "Timeout Job" execution.timeout=3600 EOF # 4. 配置资源 $ cat > resource.job << 'EOF' type=command command=echo "Resource Job" job.memory=4096 job.cpus=2 EOF # 5. 配置并发控制 $ cat > concurrent.job << 'EOF' type=command command=echo "Concurrent Job" concurrent.runs=1 EOF # 6. 监控脚本 $ cat > /tmp/monitor_azkaban.sh << 'EOF' #!/bin/bash # monitor_azkaban.sh # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn AZKABAN_URL="http://localhost:8081" echo "=== Azkaban监控 $(date) ===" # 获取运行中的执行 RUNNING=$(curl -s -k -X GET \ -d "ajax=getRunning" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor | grep -o '"execid"' | wc -l) echo "运行中执行: $RUNNING" # 获取失败的执行 FAILED=$(curl -s -k -X GET \ -d "ajax=fetchExecutions&project=fgedu_etl&flow=fgedu_etl&status=FAILED" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor | grep -o '"execid"' | wc -l) echo "失败执行: $FAILED" # 检查Executor状态 EXECUTORS=$(curl -s -k -X GET \ -d "ajax=getExecutors" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor) echo "Executor状态: $EXECUTORS" # 告警 if [ "$FAILED" -gt 0 ]; then echo "告警: 存在失败执行" fi EOF $ chmod +x /tmp/monitor_azkaban.sh $ /tmp/monitor_azkaban.sh === Azkaban监控 2026年 04月 08日 17:00:00 CST === 运行中执行: 3 失败执行: 0 Executor状态: {"executors":[...]}
Part04-生产案例与实战讲解
4.1 ETL调度案例
# 1. 完整ETL项目结构
$ mkdir -p /tmp/daily_etl
$ cd /tmp/daily_etl
# 2. 创建所有job文件
# start.job
$ cat > start.job << 'EOF'
type=noop
EOF
# sqoop_import.job
$ cat > sqoop_import.job << 'EOF'
type=command
dependencies=start
command=sqoop import \
--connect jdbc:mysql://fgedu-node1:3306/fgedudb \
--username fgedu \
--password fgedu123 \
--query "SELECT * FROM user_info WHERE \$CONDITIONS" \
--target-dir /user/fgedu/staging/user_info/${date} \
--fields-terminated-by ',' \
--m 4 \
--split-by user_id
EOF
# hive_etl.job
$ cat > hive_etl.job << 'EOF'
type=hive
dependencies=sqoop_import
hive.script=etl.hql
hive.param.date=${date}
hive.param.input=/user/fgedu/staging/user_info/${date}
hive.param.output=/user/fgedu/output/user_info/${date}
EOF
# spark_analysis.job
$ cat > spark_analysis.job << 'EOF'
type=spark
dependencies=hive_etl
spark.class=com.fgedu.analysis.UserAnalysis
spark.master=yarn
spark.deploy-mode=cluster
spark.jars=/user/fgedu/lib/analysis.jar
spark.args=/user/fgedu/output/user_info/${date} /user/fgedu/analysis/${date}
EOF
# report.job
$ cat > report.job << 'EOF'
type=command
dependencies=spark_analysis
command=/bin/bash generate_report.sh ${date}
EOF
# email.job
$ cat > email.job << 'EOF'
type=command
dependencies=report
command=echo "Send email notification"
failure.emails=admin@fgedu.com
success.emails=admin@fgedu.com
EOF
# end.job
$ cat > end.job << 'EOF'
type=noop
dependencies=email
EOF
# 3. 打包上传
$ zip -r daily_etl.zip *.job *.hql *.sh
# 4. 创建调度
# Cron: 0 0 1 * * ? *
# 每天凌晨1点执行
4.2 数据管道案例
# 1. 创建并行管道项目
$ mkdir -p /tmp/data_pipeline
$ cd /tmp/data_pipeline
# 2. 创建并行job
# start.job
$ cat > start.job << 'EOF'
type=noop
EOF
# mysql_sync.job
$ cat > mysql_sync.job << 'EOF'
type=command
dependencies=start
command=sqoop import \
--connect jdbc:mysql://fgedu-node1:3306/fgedudb \
--username fgedu \
--password fgedu123 \
--table orders \
--target-dir /user/fgedu/sync/mysql/orders/${date}
EOF
# oracle_sync.job
$ cat > oracle_sync.job << 'EOF'
type=command
dependencies=start
command=sqoop import \
--connect jdbc:oracle:thin:@fgedu-oracle:1521:fgedudb \
--username fgedu \
--password fgedu123 \
--table products \
--target-dir /user/fgedu/sync/oracle/products/${date}
EOF
# postgres_sync.job
$ cat > postgres_sync.job << 'EOF'
type=command
dependencies=start
command=sqoop import \
--connect jdbc:postgresql://fgedu-pg:5432/fgedudb \
--username fgedu \
--password fgedu123 \
--table customers \
--target-dir /user/fgedu/sync/postgres/customers/${date}
EOF
# merge.job
$ cat > merge.job << 'EOF'
type=hive
dependencies=mysql_sync,oracle_sync,postgres_sync
hive.script=merge_data.hql
EOF
# validate.job
$ cat > validate.job << 'EOF'
type=command
dependencies=merge
command=/bin/bash validate.sh
failure.emails=admin@fgedu.com
EOF
# end.job
$ cat > end.job << 'EOF'
type=noop
dependencies=validate
EOF
# 3. 工作流图
start
/ | \
mysql oracle postgres
\ | /
merge
|
validate
|
end
# 4. 打包上传
$ zip -r data_pipeline.zip *.job *.hql *.sh
4.3 常见问题处理
4.3.1 作业失败
# 排查步骤
# 1. 查看作业日志
Web UI -> Execution -> Job -> View Logs
# 2. 查看错误信息
Web UI -> Execution -> Job -> Error
# 3. 查看Executor日志
$ tail -f /bigdata/app/azkaban/exec-server/logs/azkaban-execserver.log
# 解决方案
# 1. 重试作业
Web UI -> Execution -> Retry Failed
# 2. 手动执行
Web UI -> Execution -> Execute Flow
4.3.2 Executor不可用
# 排查步骤
# 1. 检查Executor状态
$ curl http://localhost:12321/status
# 2. 检查数据库
$ mysql -u azkaban -p azkaban_db -e “SELECT * FROM executors WHERE active=0;”
# 解决方案
# 1. 重启Executor
$ cd /bigdata/app/azkaban/exec-server
$ bin/shutdown-exec.sh
$ bin/start-exec.sh
# 2. 激活Executor
$ curl http://localhost:12321/executor?action=activate
# 3. 更新数据库
$ mysql -u azkaban -p azkaban_db -e “UPDATE executors SET active=1 WHERE host=’fgedu-node1′;”
Part05-风哥经验总结与分享
5.1 Azkaban最佳实践
Azkaban最佳实践建议:
1. 合理设计工作流
2. 配置重试和告警
3. 使用参数化配置
4. 监控作业状态
5. 定期清理历史数据
5.2 使用建议
使用建议:
- 工作流设计要简洁
- 合理设置重试次数
- 配置邮件告警
- 定期清理历史执行
5.3 工具推荐
Azkaban相关工具:
- Azkaban Web UI:可视化管理
- Azkaban API:自动化管理
- 监控脚本:自定义监控
- 告警系统:邮件/钉钉告警
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
