1. 首页 > Hadoop教程 > 正文

大数据教程FG087-Azkaban工作流调度实战

本文档风哥主要介绍Azkaban工作流调度实战,包括架构原理、安装配置、项目开发等内容,风哥教程参考Azkaban官方文档Getting Started、Workflow等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Azkaban概述

Azkaban是由LinkedIn开源的批量工作流调度系统,用于管理Hadoop作业的执行。学习交流加群风哥微信: itpux-com

Azkaban核心特性:

  • Web界面:友好的Web管理界面
  • 工作流管理:支持DAG工作流
  • 定时调度:支持Cron表达式
  • 权限管理:支持用户权限管理
# Azkaban简介

Azkaban是LinkedIn开源的批量工作流调度系统,
专门用于解决Hadoop作业依赖问题。

主要功能:
1. 工作流管理
– 定义作业依赖关系
– 可视化工作流
– 工作流执行控制

2. 调度管理
– 定时调度
– 手动触发
– 依赖触发

3. 用户管理
– 用户权限
– 项目权限
– 操作审计

# Azkaban组件

1. Web Server
– Web界面
– REST API
– 用户管理
– 项目管理

2. Executor Server
– 作业执行
– 资源管理
– 日志管理

3. MySQL
– 存储元数据
– 存储作业状态
– 存储用户信息

# Azkaban支持的作业类型

作业类型 说明
command Shell命令
java Java程序
hive Hive作业
pig Pig作业
spark Spark作业
hadoopShell Hadoop Shell

# Azkaban优势

1. 简单易用的Web界面
2. 支持复杂工作流
3. 支持作业重试
4. 支持邮件告警
5. 支持权限管理

# Azkaban vs Oozie

特性 Azkaban Oozie
界面 Web界面 Web界面
配置 简单 复杂
工作流 Flow Workflow XML
调度 Cron Coordinator
学习曲线 平缓 陡峭

# Azkaban应用场景

1. ETL数据管道
数据采集 -> 数据清洗 -> 数据加载

2. 定时报表
数据统计 -> 报表生成 -> 邮件发送

3. 数据同步
数据导出 -> 数据传输 -> 数据导入

4. 批量处理
批量作业 -> 并行执行 -> 结果汇总

1.2 架构设计

Azkaban架构设计详解:

# Azkaban架构

┌─────────────────────────────────────────┐
│ Web Server │
│ ┌─────────────────────────────────┐ │
│ │ Web UI │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ REST API │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Project Manager │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘

┌───────────────────┴─────────────────────┐
│ MySQL │
│ ┌─────────────────────────────────┐ │
│ │ 元数据存储 │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘

┌───────────────────┴─────────────────────┐
│ Executor Server │
│ ┌─────────────────────────────────┐ │
│ │ Job Runner │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Thread Pool │ │
│ └─────────────────────────────────┘ │
└───────────────────┬─────────────────────┘

┌───────────────────┴─────────────────────┐
│ Hadoop Cluster │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ HDFS │ │ YARN │ │ Hive │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘

# Web Server

1. Web UI
– 项目管理
– 工作流管理
– 执行历史
– 用户管理

2. REST API
– 项目API
– 执行API
– 用户API

3. Project Manager
– 项目上传
– 项目解析
– 项目分发

# Executor Server

1. Job Runner
– 作业执行
– 状态监控
– 日志收集

2. Thread Pool
– 并发执行
– 资源管理
– 队列管理

# MySQL

1. 元数据表
– projects: 项目信息
– flows: 工作流信息
– jobs: 作业信息

2. 执行表
– executions: 执行记录
– execution_jobs: 作业执行
– execution_flows: 流程执行

3. 用户表
– users: 用户信息
– permissions: 权限信息

# 工作原理

1. 项目上传
用户 -> Web UI -> 上传ZIP -> 解析 -> MySQL

2. 作业执行
Web Server -> Executor -> 执行作业 -> 回调通知

3. 状态更新
Executor -> Web Server -> 更新状态 -> MySQL

# 高可用设计

1. Web Server高可用
– 多实例部署
– 负载均衡
– Session共享

2. Executor高可用
– 多Executor部署
– 故障转移
– 负载均衡

# 作业生命周期

1. READY:准备状态
2. QUEUED:排队状态
3. RUNNING:运行状态
4. SUCCEEDED:成功状态
5. FAILED:失败状态
6. KILLED:终止状态

1.3 工作流模型详解

工作流模型详解:

# 工作流定义

Azkaban使用.properties文件定义作业,
使用.job文件定义工作流。

# Job文件示例

# fgedu_etl.job
type=command
command=echo “Hello Azkaban”

# 带依赖的Job

# job1.job
type=command
command=echo “Job 1”

# job2.job
type=command
dependencies=job1
command=echo “Job 2”

# job3.job
type=command
dependencies=job1
command=echo “Job 3”

# job4.job
type=command
dependencies=job2,job3
command=echo “Job 4”

# 工作流图

job1
/ \
job2 job3
\ /
job4

# Hive作业

# hive_etl.job
type=hive
hive.script=etl.hql
hive.param.input=/user/fgedu/input
hive.param.output=/user/fgedu/output

# Spark作业

# spark_etl.job
type=spark
spark.class=com.fgedu.etl.SparkETL
spark.master=yarn
spark.deploy-mode=cluster
spark.jars=/user/fgedu/lib/etl.jar

# Command作业

# shell_etl.job
type=command
command=/bin/bash /user/fgedu/scripts/etl.sh
command.1=/bin/bash /user/fgedu/scripts/validate.sh

# 工作流参数

# flow.properties
input.path=/user/fgedu/input
output.path=/user/fgedu/output
date=2026-04-08

# job文件引用参数
type=command
command=echo ${input.path}

# 条件执行

# job1.job
type=command
command=echo “Job 1”
condition=success

# job2.job
type=command
dependencies=job1
command=echo “Job 2”
condition=job1:success

# 重试配置

# retry.job
type=command
command=echo “Retry Job”
retries=3
retry.backoff=30000

# 告警配置

# alert.job
type=command
command=echo “Alert Job”
failure.emails=admin@fgedu.com
success.emails=admin@fgedu.com
notify.emails=admin@fgedu.com

# 资源配置

# resource.job
type=command
command=echo “Resource Job”
job.memory=4096
job.cpus=2

风哥提示:Azkaban使用简单的配置文件定义工作流,比Oozie的XML配置更简洁易用。支持多种作业类型,可以满足大部分调度需求。

Part02-生产环境规划与建议

2.1 环境规划建议

环境规划建议:

# 版本兼容性

Azkaban版本 Hadoop版本 Java版本
3.90.x 3.x 8+
3.80.x 2.x 8+

推荐版本:Azkaban 3.90.0 + Hadoop 3.3.6 + Java 8

# 硬件规划

组件 配置 数量
Web Server 8C/16G/100G 2(高可用)
Executor Server 16C/32G/200G 3
MySQL 8C/16G/200G 2(主从)

# 软件依赖

1. Java
– JDK 8+
– 配置JAVA_HOME

2. MySQL
– MySQL 5.7+
– 字符集UTF-8

3. Hadoop
– HDFS
– YARN

# 网络规划

端口 用途
8081 Web Server HTTP
8443 Web Server HTTPS
12321 Executor Server

# 目录规划

目录 用途
/bigdata/app/azkaban 安装目录
/bigdata/app/azkaban/web Web Server
/bigdata/app/azkaban/exec Executor Server
/bigdata/app/azkaban/logs 日志目录

# 数据库规划

数据库 大小
azkaban_db 50GB

表空间规划:
– 项目表
– 执行表
– 用户表
– 日志表

# 高可用规划

1. Web Server
– 部署2个实例
– Nginx负载均衡
– Session共享

2. Executor Server
– 部署3个实例
– 自动故障转移
– 负载均衡

2.2 项目规划建议

项目规划建议:

# 项目分类

1. ETL项目
用途:数据抽取、转换、加载
频率:每日/每小时

2. 报表项目
用途:生成报表
频率:每日/每周

3. 同步项目
用途:数据同步
频率:每小时

4. 分析项目
用途:数据分析
频率:每日

# 项目命名规范

格式:[业务]_[类型]_[环境]

示例:
user_etl_prod
order_report_prod
data_sync_prod

# 项目目录结构

fgedu_etl/
├── etl.job
├── hive_etl.job
├── spark_etl.job
├── validate.job
├── flow.properties
├── scripts/
│ ├── etl.sh
│ └── validate.sh
└── hql/
└── etl.hql

# 项目权限规划

角色 权限
管理员 所有项目
开发者 指定项目
运维 执行权限
只读 查看权限

# 项目参数规划

参数类型 示例
系统参数 ${azkaban.flow.projectid}
时间参数 ${azkaban.flow.starttime}
业务参数 ${input.path}, ${output.path}
自定义参数 ${date}, ${hour}

2.3 调度规划建议

调度规划建议:

# 调度策略

1. 定时调度
– Cron表达式
– 固定周期
– 时间窗口

2. 手动触发
– 立即执行
– 参数传入
– 指定时间

3. 依赖触发
– 前置作业完成
– 数据可用
– 条件满足

# 调度频率规划

频率 Cron表达式 适用场景
分钟 */5 * * * * 实时监控
小时 0 * * * * 实时统计
天 0 1 * * * 日报表
周 0 2 * * 1 周报表
月 0 3 1 * * 月报表

# 调度时间窗口

业务 开始时间 结束时间
ETL 01:00 05:00
报表 06:00 08:00
同步 00:00 23:59

# 重试策略

参数 默认值 说明
retries 0 重试次数
retry.backoff 0 重试间隔(ms)

# 告警配置

告警类型 触发条件
作业失败 作业执行失败
作业超时 作业执行超时
流程失败 流程执行失败

# 监控指标

指标 告警阈值
作业成功率 < 95% 作业延迟 > 30分钟
队列积压 > 100个

生产环境建议:生产环境建议配置Azkaban高可用,合理规划项目和调度策略。设置合适的重试和告警参数,保障作业稳定执行。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 安装配置实战

3.1.1 安装Azkaban

# 1. 下载Azkaban
$ cd /bigdata/app
$ wget https://github.com/azkaban/azkaban/archive/refs/tags/3.90.0.tar.gz
$ tar -xzf 3.90.0.tar.gz
$ ln -s azkaban-3.90.0 azkaban

# 2. 编译Azkaban
$ cd /bigdata/app/azkaban
$ ./gradlew build installDist -x test

BUILD SUCCESSFUL in 10m 30s

# 3. 创建数据库
$ mysql -u root -p

mysql> CREATE DATABASE azkaban_db DEFAULT CHARACTER SET utf8mb4;
mysql> CREATE USER ‘azkaban’@’%’ IDENTIFIED BY ‘azkaban123’;
mysql> GRANT ALL PRIVILEGES ON azkaban_db.* TO ‘azkaban’@’%’;
mysql> FLUSH PRIVILEGES;

# 4. 导入数据库脚本
$ mysql -u azkaban -p azkaban_db < /bigdata/app/azkaban/azkaban-db/build/distributions/azkaban-db-3.90.0/create-all-sql-3.90.0.sql # 5. 配置Web Server $ cd /bigdata/app/azkaban/azkaban-web-server/build/distributions $ tar -xzf azkaban-web-server-3.90.0.tar.gz $ mv azkaban-web-server-3.90.0 /bigdata/app/azkaban/web-server $ cat > /bigdata/app/azkaban/web-server/conf/azkaban.properties << 'EOF' # Azkaban Web Server配置 default.timezone.id=Asia/Shanghai # MySQL配置 database.type=mysql mysql.port=3306 mysql.host=fgedu-node1 mysql.database=azkaban_db mysql.user=azkaban mysql.password=azkaban123 mysql.numconnections=100 # Web Server配置 web.resource.dir=/bigdata/app/azkaban/web-server/web web.template.dir=/bigdata/app/azkaban/web-server/web/template web.servlet.cache=false web.server.port=8081 web.server.ssl.port=8443 web.server.use.ssl=false # Executor配置 azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1 azkaban.executorselector.comparator.Memory=1 azkaban.executorselector.comparator.LastDispatched=1 azkaban.executorselector.comparator.CpuUsage=1 EOF # 6. 配置Executor Server $ cd /bigdata/app/azkaban/azkaban-exec-server/build/distributions $ tar -xzf azkaban-exec-server-3.90.0.tar.gz $ mv azkaban-exec-server-3.90.0 /bigdata/app/azkaban/exec-server $ cat > /bigdata/app/azkaban/exec-server/conf/azkaban.properties << 'EOF' # Azkaban Executor Server配置 default.timezone.id=Asia/Shanghai # MySQL配置 database.type=mysql mysql.port=3306 mysql.host=fgedu-node1 mysql.database=azkaban_db mysql.user=azkaban mysql.password=azkaban123 mysql.numconnections=100 # Executor配置 executor.maxThreads=50 executor.port=12321 executor.flow.threads=30 EOF # 7. 启动Executor Server $ cd /bigdata/app/azkaban/exec-server $ bin/start-exec.sh # 8. 激活Executor $ curl http://localhost:12321/executor?action=activate {"status":"success"} # 9. 启动Web Server $ cd /bigdata/app/azkaban/web-server $ bin/start-web.sh # 10. 访问Web UI http://fgedu-node1:8081 默认用户名: azkaban 默认密码: azkaban

3.1.2 配置高可用

# 1. 配置多Executor
# 在fgedu-node2
$ cd /bigdata/app/azkaban/exec-server
$ bin/start-exec.sh
$ curl http://localhost:12321/executor?action=activate

# 在fgedu-node3
$ cd /bigdata/app/azkaban/exec-server
$ bin/start-exec.sh
$ curl http://localhost:12321/executor?action=activate

# 2. 查看Executor列表
$ mysql -u azkaban -p azkaban_db -e “SELECT host, port, active FROM executors;”

+————-+——-+——–+
| host | port | active |
+————-+——-+——–+
| fgedu-node1 | 12321 | 1 |
| fgedu-node2 | 12321 | 1 |
| fgedu-node3 | 12321 | 1 |
+————-+——-+——–+

# 3. 配置Web Server高可用
# 在fgedu-node2部署Web Server
$ cd /bigdata/app/azkaban/web-server
$ bin/start-web.sh

# 4. 配置Nginx负载均衡
$ cat > /etc/nginx/conf.d/azkaban.conf << 'EOF' upstream azkaban_cluster { server fgedu-node1:8081; server fgedu-node2:8081; } server { listen 8081; server_name azkaban.fgedu.net.cn; location / { proxy_pass http://azkaban_cluster; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } } EOF $ systemctl restart nginx # 5. 访问高可用地址 http://azkaban.fgedu.net.cn:8081

3.2 项目开发实战

3.2.1 创建ETL项目

# 1. 创建项目目录
$ mkdir -p /tmp/fgedu_etl_project
$ cd /tmp/fgedu_etl_project

# 2. 创建job文件
# 数据采集job
$ cat > sqoop_import.job << 'EOF' type=command command=sqoop import \ --connect jdbc:mysql://fgedu-node1:3306/fgedudb \ --username fgedu \ --password fgedu123 \ --table user_info \ --target-dir /user/fgedu/staging/user_info \ --fields-terminated-by ',' \ --m 4 EOF # 数据清洗job $ cat > hive_clean.job << 'EOF' type=hive dependencies=sqoop_import hive.script=clean_data.hql hive.param.input=/user/fgedu/staging/user_info hive.param.output=/user/fgedu/cleaned/user_info EOF # Hive脚本 $ cat > clean_data.hql << 'EOF' -- clean_data.hql -- from:www.itpux.com.qq113257174.wx:itpux-com -- web: http://www.fgedu.net.cn CREATE EXTERNAL TABLE IF NOT EXISTS temp_user ( user_id STRING, name STRING, age INT, city STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '${hivevar:input}'; INSERT OVERWRITE DIRECTORY '${hivevar:output}' SELECT user_id, UPPER(name) as name, age, city FROM temp_user WHERE age > 0 AND age < 150; EOF # 数据转换job $ cat > spark_transform.job << 'EOF' type=spark dependencies=hive_clean spark.class=com.fgedu.etl.DataTransform spark.master=yarn spark.deploy-mode=cluster spark.jars=/user/fgedu/lib/etl.jar spark.args=/user/fgedu/cleaned/user_info /user/fgedu/output/user_info EOF # 数据验证job $ cat > validate.job << 'EOF' type=command dependencies=spark_transform command=/bin/bash validate.sh EOF # 验证脚本 $ cat > validate.sh << 'EOF' #!/bin/bash # validate.sh # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn echo "验证数据..." hdfs dfs -test -e /user/fgedu/output/user_info/_SUCCESS if [ $? -eq 0 ]; then echo "验证成功" exit 0 else echo "验证失败" exit 1 fi EOF $ chmod +x validate.sh # 3. 创建flow.properties $ cat > flow.properties << 'EOF' input.path=/user/fgedu/staging/user_info output.path=/user/fgedu/output/user_info EOF # 4. 打包项目 $ zip -r fgedu_etl.zip *.job *.hql *.sh flow.properties # 5. 上传项目 # 访问Azkaban Web UI # 创建项目: fgedu_etl # 上传ZIP文件: fgedu_etl.zip # 6. 执行项目 # 点击Execute Flow # 选择要执行的flow # 点击Execute

3.2.2 创建定时调度

# 1. 在Web UI配置调度
# 项目页面 -> Schedule -> Set Schedule

# 2. 使用API创建调度
$ cat > /tmp/schedule.json << 'EOF' { "projectId": 1, "flow": "fgedu_etl", "scheduleTime": "01:00", "scheduleDate": "01/04/2026", "isRecurring": "on", "period": "1d", "concurrentOption": "skip" } EOF $ curl -k -X POST \ -d "ajax=scheduleCronFlow" \ -d "projectId=1" \ -d "flow=fgedu_etl" \ -d "cronExpression=0 0 1 * * ? *" \ -b "azkaban.browser.session.id=..." \ http://localhost:8081/schedule {"message":"Schedule created successfully","status":"success"} # 3. 查看调度列表 $ curl -k -X GET \ -d "ajax=fetchSchedules" \ -b "azkaban.browser.session.id=..." \ http://localhost:8081/schedule # 4. 手动触发执行 $ curl -k -X POST \ -d "ajax=executeFlow" \ -d "project=fgedu_etl" \ -d "flow=fgedu_etl" \ -b "azkaban.browser.session.id=..." \ http://localhost:8081/executor {"message":"Execution submitted successfully with exec id 1","execid":"1"}

3.3 调度配置实战

# 1. 配置告警邮件
$ cat > alert.job << 'EOF' type=command command=echo "Alert Job" failure.emails=admin@fgedu.com success.emails=admin@fgedu.com notify.emails=admin@fgedu.com EOF # 2. 配置重试 $ cat > retry.job << 'EOF' type=command command=echo "Retry Job" retries=3 retry.backoff=30000 EOF # 3. 配置超时 $ cat > timeout.job << 'EOF' type=command command=echo "Timeout Job" execution.timeout=3600 EOF # 4. 配置资源 $ cat > resource.job << 'EOF' type=command command=echo "Resource Job" job.memory=4096 job.cpus=2 EOF # 5. 配置并发控制 $ cat > concurrent.job << 'EOF' type=command command=echo "Concurrent Job" concurrent.runs=1 EOF # 6. 监控脚本 $ cat > /tmp/monitor_azkaban.sh << 'EOF' #!/bin/bash # monitor_azkaban.sh # from:www.itpux.com.qq113257174.wx:itpux-com # web: http://www.fgedu.net.cn AZKABAN_URL="http://localhost:8081" echo "=== Azkaban监控 $(date) ===" # 获取运行中的执行 RUNNING=$(curl -s -k -X GET \ -d "ajax=getRunning" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor | grep -o '"execid"' | wc -l) echo "运行中执行: $RUNNING" # 获取失败的执行 FAILED=$(curl -s -k -X GET \ -d "ajax=fetchExecutions&project=fgedu_etl&flow=fgedu_etl&status=FAILED" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor | grep -o '"execid"' | wc -l) echo "失败执行: $FAILED" # 检查Executor状态 EXECUTORS=$(curl -s -k -X GET \ -d "ajax=getExecutors" \ -b "azkaban.browser.session.id=..." \ $AZKABAN_URL/executor) echo "Executor状态: $EXECUTORS" # 告警 if [ "$FAILED" -gt 0 ]; then echo "告警: 存在失败执行" fi EOF $ chmod +x /tmp/monitor_azkaban.sh $ /tmp/monitor_azkaban.sh === Azkaban监控 2026年 04月 08日 17:00:00 CST === 运行中执行: 3 失败执行: 0 Executor状态: {"executors":[...]}
风哥提示:Azkaban项目开发需要遵循规范,合理组织目录结构。使用Web UI可以方便地管理项目和调度,也可以使用API进行自动化管理。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 ETL调度案例

# 场景:每日ETL数据管道

# 1. 完整ETL项目结构
$ mkdir -p /tmp/daily_etl
$ cd /tmp/daily_etl

# 2. 创建所有job文件
# start.job
$ cat > start.job << 'EOF' type=noop EOF # sqoop_import.job $ cat > sqoop_import.job << 'EOF' type=command dependencies=start command=sqoop import \ --connect jdbc:mysql://fgedu-node1:3306/fgedudb \ --username fgedu \ --password fgedu123 \ --query "SELECT * FROM user_info WHERE \$CONDITIONS" \ --target-dir /user/fgedu/staging/user_info/${date} \ --fields-terminated-by ',' \ --m 4 \ --split-by user_id EOF # hive_etl.job $ cat > hive_etl.job << 'EOF' type=hive dependencies=sqoop_import hive.script=etl.hql hive.param.date=${date} hive.param.input=/user/fgedu/staging/user_info/${date} hive.param.output=/user/fgedu/output/user_info/${date} EOF # spark_analysis.job $ cat > spark_analysis.job << 'EOF' type=spark dependencies=hive_etl spark.class=com.fgedu.analysis.UserAnalysis spark.master=yarn spark.deploy-mode=cluster spark.jars=/user/fgedu/lib/analysis.jar spark.args=/user/fgedu/output/user_info/${date} /user/fgedu/analysis/${date} EOF # report.job $ cat > report.job << 'EOF' type=command dependencies=spark_analysis command=/bin/bash generate_report.sh ${date} EOF # email.job $ cat > email.job << 'EOF' type=command dependencies=report command=echo "Send email notification" failure.emails=admin@fgedu.com success.emails=admin@fgedu.com EOF # end.job $ cat > end.job << 'EOF' type=noop dependencies=email EOF # 3. 打包上传 $ zip -r daily_etl.zip *.job *.hql *.sh # 4. 创建调度 # Cron: 0 0 1 * * ? * # 每天凌晨1点执行

4.2 数据管道案例

# 场景:多数据源同步管道

# 1. 创建并行管道项目
$ mkdir -p /tmp/data_pipeline
$ cd /tmp/data_pipeline

# 2. 创建并行job
# start.job
$ cat > start.job << 'EOF' type=noop EOF # mysql_sync.job $ cat > mysql_sync.job << 'EOF' type=command dependencies=start command=sqoop import \ --connect jdbc:mysql://fgedu-node1:3306/fgedudb \ --username fgedu \ --password fgedu123 \ --table orders \ --target-dir /user/fgedu/sync/mysql/orders/${date} EOF # oracle_sync.job $ cat > oracle_sync.job << 'EOF' type=command dependencies=start command=sqoop import \ --connect jdbc:oracle:thin:@fgedu-oracle:1521:fgedudb \ --username fgedu \ --password fgedu123 \ --table products \ --target-dir /user/fgedu/sync/oracle/products/${date} EOF # postgres_sync.job $ cat > postgres_sync.job << 'EOF' type=command dependencies=start command=sqoop import \ --connect jdbc:postgresql://fgedu-pg:5432/fgedudb \ --username fgedu \ --password fgedu123 \ --table customers \ --target-dir /user/fgedu/sync/postgres/customers/${date} EOF # merge.job $ cat > merge.job << 'EOF' type=hive dependencies=mysql_sync,oracle_sync,postgres_sync hive.script=merge_data.hql EOF # validate.job $ cat > validate.job << 'EOF' type=command dependencies=merge command=/bin/bash validate.sh failure.emails=admin@fgedu.com EOF # end.job $ cat > end.job << 'EOF' type=noop dependencies=validate EOF # 3. 工作流图 start / | \ mysql oracle postgres \ | / merge | validate | end # 4. 打包上传 $ zip -r data_pipeline.zip *.job *.hql *.sh

4.3 常见问题处理

4.3.1 作业失败

# 问题现象:作业执行失败

# 排查步骤
# 1. 查看作业日志
Web UI -> Execution -> Job -> View Logs

# 2. 查看错误信息
Web UI -> Execution -> Job -> Error

# 3. 查看Executor日志
$ tail -f /bigdata/app/azkaban/exec-server/logs/azkaban-execserver.log

# 解决方案
# 1. 重试作业
Web UI -> Execution -> Retry Failed

# 2. 手动执行
Web UI -> Execution -> Execute Flow

4.3.2 Executor不可用

# 问题现象:Executor不可用

# 排查步骤
# 1. 检查Executor状态
$ curl http://localhost:12321/status

# 2. 检查数据库
$ mysql -u azkaban -p azkaban_db -e “SELECT * FROM executors WHERE active=0;”

# 解决方案
# 1. 重启Executor
$ cd /bigdata/app/azkaban/exec-server
$ bin/shutdown-exec.sh
$ bin/start-exec.sh

# 2. 激活Executor
$ curl http://localhost:12321/executor?action=activate

# 3. 更新数据库
$ mysql -u azkaban -p azkaban_db -e “UPDATE executors SET active=1 WHERE host=’fgedu-node1′;”

Part05-风哥经验总结与分享

5.1 Azkaban最佳实践

Azkaban最佳实践建议:

# Azkaban最佳实践
1. 合理设计工作流
2. 配置重试和告警
3. 使用参数化配置
4. 监控作业状态
5. 定期清理历史数据

5.2 使用建议

使用建议:

Azkaban使用建议:

  • 工作流设计要简洁
  • 合理设置重试次数
  • 配置邮件告警
  • 定期清理历史执行

5.3 工具推荐

Azkaban相关工具:

  • Azkaban Web UI:可视化管理
  • Azkaban API:自动化管理
  • 监控脚本:自定义监控
  • 告警系统:邮件/钉钉告警
风哥提示:Azkaban是简单易用的工作流调度系统,适合快速构建数据管道。相比Oozie,Azkaban配置更简单,学习曲线更平缓。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息