本文详细介绍大数据流水线最佳实践,包括工作流调度、批量处理、实时处理、CI/CD等内容,风哥教程参考Oozie、Airflow、DolphinScheduler官方文档,适合大数据开发工程师使用。学习交流加群风哥微信: itpux-com
Part01-基础概念与理论知识
1.1 大数据流水线概述
大数据流水线是指将数据从采集、清洗、转换、存储、分析到可视化的整个流程自动化处理的过程。更多视频教程www.fgedu.net.cn
- 自动化:减少人工操作,提高效率
- 可靠性:保证流程稳定运行
- 可追溯:记录每一步执行情况
- 可监控:实时监控流程状态
- 可告警:异常时及时告警
- 可扩展:支持流程的灵活扩展
1.2 流水线架构设计
典型的大数据流水线架构:
数据采集层:
– 日志采集:Flume、Filebeat
– 数据库采集:Sqoop、DataX
– 实时采集:Kafka
– 文件采集:FTP、S3
数据存储层:
– 原始数据:HDFS
– 实时数据:Kafka
– 数据仓库:Hive
– 实时分析:Doris、ClickHouse
数据处理层:
– 批量处理:Spark、MapReduce
– 实时处理:Flink、Spark Streaming
– SQL处理:Hive、Presto、Impala
工作流调度层:
– Oozie
– Airflow
– DolphinScheduler
– Azkaban
数据服务层:
– API服务
– BI报表
– 数据可视化
1.3 常用工具介绍
常用调度工具对比:
- Oozie:Apache开源,Hadoop原生,集成度高
- Airflow:Python编写,灵活易用,生态丰富
- DolphinScheduler:国产开源,可视化,易用性好
- Azkaban:LinkedIn开源,简单易用
Part02-生产环境规划与建议
2.1 流水线架构规划
流水线架构规划要点:
调度服务器:
– 数量:2台(高可用)
– 配置:8核16GB
– 组件:Airflow/DolphinScheduler
元数据库:
– MySQL/PostgreSQL
– 高可用配置
对象存储:
– HDFS/S3
– 存储日志和临时文件
# 流水线分类
批量流水线:
– 定时调度
– 小时/天/周
– 处理历史数据
实时流水线:
– 持续运行
– 低延迟
– 处理实时数据
混合流水线:
– 结合批量和实时
– Lambda/Kappa架构
# 流水线设计原则
1. 单一职责:每个Job只做一件事
2. 可重跑:支持失败重跑
3. 可监控:有完善的监控
4. 可告警:异常时及时告警
5. 可回滚:支持回滚操作
2.2 开发规范与标准
开发规范与标准:
Job命名:
– 格式:业务_场景_时间粒度
– 示例:fgedu_user_etl_daily
– 示例:fgedu_order_report_hourly
脚本命名:
– 格式:业务_场景_功能.后缀
– 示例:fgedu_user_etl.py
– 示例:fgedu_order_report.sql
目录结构:
/bigdata/fgdata/pipelines/
├── dags/ # DAG定义
├── scripts/ # 脚本
│ ├── python/
│ ├── shell/
│ └── sql/
├── conf/ # 配置文件
├── logs/ # 日志
└── data/ # 数据
# 代码规范
Python:
– 遵循PEP8
– 完善的注释
– 异常处理
– 日志记录
SQL:
– 统一格式化
– 注释说明
– 性能优化
2.3 监控与告警规划
监控与告警规划:
- Job状态:成功、失败、运行中
- 执行时间:每次执行时长
- 数据量:处理的数据量
- 成功率:Job成功率
- 延迟:数据延迟
更多学习教程公众号风哥教程itpux_com
Part03-生产环境项目实施方案
3.1 Oozie工作流调度
3.1.1 Oozie安装配置
cd /bigdata/app
wget https://archive.apache.org/dist/oozie/5.2.1/oozie-5.2.1.tar.gz
tar -zxvf oozie-5.2.1.tar.gz
ln -s oozie-5.2.1 oozie
# 2. 准备WAR包
cd /bigdata/app/oozie
bin/oozie-setup.sh prepare-war
# 3. 创建数据库
mysql -u root -p
CREATE DATABASE oozie DEFAULT CHARACTER SET utf8;
CREATE USER ‘oozie’@’%’ IDENTIFIED BY ‘fgedu123’;
GRANT ALL PRIVILEGES ON oozie.* TO ‘oozie’@’%’;
FLUSH PRIVILEGES;
# 4. 配置oozie-site.xml
cat > /bigdata/app/oozie/conf/oozie-site.xml << ‘EOF’
<?xml version=”1.0″?>
<configuration>
<property>
<name>oozie.service.JPAService.jdbc.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.url</name>
<value>jdbc:mysql://fgedu-mysql:3306/oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.username</name>
<value>oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.password</name>
<value>fgedu123</value>
</property>
<property>
<name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
<value>*=/bigdata/app/hadoop/etc/hadoop</value>
</property>
</configuration>
EOF
# 5. 上传ShareLib
bin/oozie-setup.sh sharelib create -fs hdfs://fgedu-nn:8020
# 6. 初始化数据库
bin/ooziedb.sh create -sqlfile oozie.sql -run
# 7. 启动Oozie
bin/oozied.sh start
# 8. 验证Oozie
bin/oozie admin -oozie http://localhost:11000/oozie -status
# 9. 编写Workflow
mkdir -p /bigdata/fgdata/pipelines/oozie/fgedu_etl
cat > /bigdata/fgdata/pipelines/oozie/fgedu_etl/workflow.xml << ‘EOF’
<workflow-app xmlns=”uri:oozie:workflow:0.5″ name=”fgedu-etl-wf”>
<start to=”fgedu-hive-node”/>
<action name=”fgedu-hive-node”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<job-tracker>fgedu-rm:8032</job-tracker>
<name-node>hdfs://fgedu-nn:8020</name-node>
<script>fgedu_etl.sql</script>
</hive>
<ok to=”end”/>
<error to=”fail”/>
</action>
<kill name=”fail”>
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=”end”/>
</workflow-app>
EOF
# 10. 上传到HDFS
hdfs dfs -put /bigdata/fgdata/pipelines/oozie/fgedu_etl /bigdata/fgdata/oozie/
# 11. 编写job.properties
cat > /bigdata/fgdata/pipelines/oozie/fgedu_etl/job.properties << ‘EOF’
nameNode=hdfs://fgedu-nn:8020
jobTracker=fgedu-rm:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/bigdata/fgdata/oozie/fgedu_etl
EOF
# 12. 提交Job
bin/oozie job -oozie http://localhost:11000/oozie -config job.properties -run
# 13. 查看Job状态
bin/oozie job -oozie http://localhost:11000/oozie -info <job-id>
3.2 Airflow工作流调度
3.2.1 Airflow安装配置
yum install -y python3 python3-pip
pip3 install apache-airflow==2.6.3
pip3 install apache-airflow-providers-apache-hive
pip3 install apache-airflow-providers-apache-spark
# 2. 初始化Airflow
export AIRFLOW_HOME=/bigdata/app/airflow
airflow db init
# 3. 创建用户
airflow users create \
–username admin \
–firstname Admin \
–lastname User \
–role Admin \
–email admin@fgedu.net.cn
# 4. 配置airflow.cfg
cat > /bigdata/app/airflow/airflow.cfg << ‘EOF’
[core]
dags_folder = /bigdata/fgdata/pipelines/airflow/dags
base_log_folder = /bigdata/fgdata/logs/airflow
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:fgedu123@fgedu-pg:5432/airflow
[celery]
broker_url = redis://:fgedu123@fgedu-redis:6379/0
result_backend = db+postgresql://airflow:fgedu123@fgedu-pg:5432/airflow
[scheduler]
dag_dir_list_interval = 300
[webserver]
web_server_host = 0.0.0.0
web_server_port = 8080
EOF
# 5. 创建目录
mkdir -p /bigdata/fgdata/pipelines/airflow/dags
mkdir -p /bigdata/fgdata/logs/airflow
# 6. 初始化数据库
airflow db init
# 7. 启动Airflow
airflow webserver –daemon
airflow scheduler –daemon
airflow celery worker –daemon
# 8. 访问Airflow
# http://fgedu-airflow:8080
# 9. 编写DAG
cat > /bigdata/fgdata/pipelines/airflow/dags/fgedu_etl_dag.py << ‘EOF’
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
default_args = {
‘owner’: ‘fgedu’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 4, 1),
’email’: [‘admin@fgedu.net.cn’],
’email_on_failure’: True,
’email_on_retry’: False,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
}
dag = DAG(
‘fgedu_etl_dag’,
default_args=default_args,
description=’Fgedu ETL DAG’,
schedule_interval=timedelta(days=1),
catchup=False,
)
def extract_data():
print(“Extracting data…”)
def transform_data():
print(“Transforming data…”)
def load_data():
print(“Loading data…”)
extract_task = PythonOperator(
task_id=’extract_data’,
python_callable=extract_data,
dag=dag,
)
transform_task = PythonOperator(
task_id=’transform_data’,
python_callable=transform_data,
dag=dag,
)
load_task = PythonOperator(
task_id=’load_data’,
python_callable=load_data,
dag=dag,
)
hive_task = HiveOperator(
task_id=’create_hive_table’,
hql=’CREATE TABLE IF NOT EXISTS fgedu_db.fgedu_user (id INT, name STRING)’,
hive_cli_conn_id=’hive_default’,
dag=dag,
)
extract_task >> transform_task >> load_task >> hive_task
EOF
# 10. 查看DAG
# Airflow Web UI查看DAG
# 手动触发DAG运行
3.3 DolphinScheduler调度
3.3.1 DolphinScheduler安装配置
cd /bigdata/app
wget https://archive.apache.org/dist/dolphinscheduler/3.1.9/apache-dolphinscheduler-3.1.9-bin.tar.gz
tar -zxvf apache-dolphinscheduler-3.1.9-bin.tar.gz
ln -s apache-dolphinscheduler-3.1.9-bin dolphinscheduler
# 2. 初始化数据库
mysql -u root -p
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8;
CREATE USER ‘dolphinscheduler’@’%’ IDENTIFIED BY ‘fgedu123’;
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO ‘dolphinscheduler’@’%’;
FLUSH PRIVILEGES;
# 3. 配置conf/config/install_config.conf
cd /bigdata/app/dolphinscheduler
vi conf/config/install_config.conf
# 关键配置
ips=”fgedu-ds01,fgedu-ds02″
masters=”fgedu-ds01″
workers=”fgedu-ds01,fgedu-ds02″
alertServer=”fgedu-ds01″
apiServers=”fgedu-ds01″
pythonGatewayServers=”fgedu-ds01″
dolphinschedulerPath=”/bigdata/app/dolphinscheduler”
dataBasedirPath=”/bigdata/fgdata/dolphinscheduler”
hdfsStartupSate=”false”
DATABASE_TYPE=”mysql”
SPRING_DATASOURCE_URL=”jdbc:mysql://fgedu-mysql:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8″
SPRING_DATASOURCE_USERNAME=”dolphinscheduler”
SPRING_DATASOURCE_PASSWORD=”fgedu123″
# 4. 创建目录
mkdir -p /bigdata/fgdata/dolphinscheduler
# 5. 一键安装
bash ./bin/install.sh
# 6. 启动DolphinScheduler
bash ./bin/start-all.sh
# 7. 访问DolphinScheduler
# http://fgedu-ds01:12345/dolphinscheduler
# 默认用户名: admin
# 默认密码: dolphinscheduler123
# 8. 创建项目
# Web UI创建项目
# 创建工作流
# 配置任务
# 定时调度
Part04-生产案例与实战讲解
4.1 批量数据处理实战
4.1.1 批量ETL流水线
# 1. 数据采集
# Flume采集日志到HDFS
# Sqoop从MySQL导入到HDFS
# 2. 数据清洗
# Spark清洗数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(“FgeduETL”).enableHiveSupport().getOrCreate()
df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_events/*”)
df_clean = df.select(
“user_id”,
“event_type”,
“event_time”,
“page”
).filter(df.user_id.isNotNull())
df_clean.write.mode(“overwrite”).saveAsTable(“fgedu_db.fgedu_user_events_clean”)
spark.stop()
# 3. 数据聚合
# Hive SQL聚合
INSERT OVERWRITE TABLE fgedu_db.fgedu_user_events_daily
SELECT
dt,
user_id,
COUNT(*) AS event_count,
COUNT(DISTINCT event_type) AS event_type_count
FROM fgedu_db.fgedu_user_events_clean
GROUP BY dt, user_id;
# 4. 数据导出
# Sqoop导出到MySQL
sqoop export \
–connect jdbc:mysql://fgedu-mysql:3306/fgedudb \
–username fgedu \
–password fgedu123 \
–table fgedu_user_events_daily \
–export-dir /bigdata/fgdata/hive/warehouse/fgedu_db.db/fgedu_user_events_daily \
–input-fields-terminated-by ‘\001’
# 5. Airflow DAG编排
# 按顺序执行:采集 -> 清洗 -> 聚合 -> 导出
# 配置失败重试
# 配置告警
4.2 实时数据处理实战
4.2.1 实时流水线
# 1. 数据采集
# Flume/Fluentd采集日志
# 发送到Kafka
# 2. 实时处理
# Flink实时处理
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.api.common.serialization.SimpleStringSchema
object FgeduRealTimeETL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000)
val kafkaProps = new Properties()
kafkaProps.setProperty(“bootstrap.servers”, “fgedu-kafka01:9092”)
kafkaProps.setProperty(“group.id”, “fgedu_flink_group”)
val kafkaConsumer = new FlinkKafkaConsumer[String](
“fgedu_user_events”,
new SimpleStringSchema(),
kafkaProps
)
val stream = env.addSource(kafkaConsumer)
val cleanedStream = stream
.map(parseEvent)
.filter(_ != null)
.keyBy(_.userId)
.timeWindow(Time.minutes(5))
.aggregate(new EventCountAgg())
cleanedStream.addSink(new FlinkKafkaProducer[String](
“fgedu_user_events_agg”,
new SimpleStringSchema(),
kafkaProps
))
env.execute(“Fgedu Real Time ETL”)
}
def parseEvent(json: String): Event = {
try {
// 解析JSON
} catch {
case _: Exception => null
}
}
}
# 3. 实时存储
# 写入Doris/ClickHouse实时查询
# 写入HBase用于查询
# 4. 实时监控
# Prometheus监控Flink指标
# Grafana可视化
# 配置告警
4.3 CI/CD实战
4.3.1 持续集成持续部署
# .gitlab-ci.yml
stages:
– test
– build
– deploy
variables:
MAVEN_OPTS: “-Dmaven.repo.local=.m2/repository”
test:
stage: test
image: maven:3.8.6-openjdk-8
script:
– mvn test
build:
stage: build
image: maven:3.8.6-openjdk-8
script:
– mvn clean package
artifacts:
paths:
– target/*.jar
deploy:
stage: deploy
script:
– scp target/*.jar fgedu-deploy:/bigdata/fgdata/pipelines/jars/
– ssh fgedu-deploy “cd /bigdata/fgdata/pipelines && ./deploy.sh”
only:
– master
# deploy.sh
#!/bin/bash
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “开始部署…”
JAR_FILE=fgedu-etl-1.0.0.jar
BACKUP_FILE=fgedu-etl-$(date +%Y%m%d_%H%M%S).jar
# 备份
cp /bigdata/fgdata/pipelines/jars/$JAR_FILE /bigdata/fgdata/pipelines/jars/$BACKUP_FILE
# 部署
cp /bigdata/fgdata/pipelines/jars/$JAR_FILE /bigdata/app/etl/
# 更新Airflow DAG
cp /bigdata/fgdata/pipelines/dags/*.py /bigdata/app/airflow/dags/
# 重启相关服务(如果需要)
echo “部署完成!”
# 代码审查
# Merge Request流程
# 自动化测试
# 人工审核
# 合并代码
# 环境分离
– 开发环境
– 测试环境
– 预发布环境
– 生产环境
Part05-风哥经验总结与分享
5.1 流水线最佳实践
大数据流水线最佳实践:
- 任务拆分:大任务拆分成小任务,便于调试和重试
- 幂等性:任务支持多次执行,结果一致
- 错误处理:完善的异常处理和告警
- 监控日志:完善的日志记录和监控
- 环境分离:开发、测试、生产环境分离
- CI/CD:自动化测试和部署
5.2 常见问题处理
– 查看日志
– 检查依赖
– 检查数据
– 重试任务
– 回滚操作
# 常见问题2:任务慢
– 查看执行计划
– 优化SQL
– 增加资源
– 调整并行度
– 优化数据倾斜
# 常见问题3:调度器挂了
– 检查调度器日志
– 检查资源
– 重启调度器
– 检查高可用
# 常见问题4:数据不一致
– 检查重跑逻辑
– 检查幂等性
– 检查时间窗口
– 重新跑数
# 常见问题5:告警风暴
– 优化告警规则
– 合并告警
– 分级告警
– 静默非关键告警
5.3 运维检查清单
– [ ] 调度器状态
– [ ] 任务执行状态
– [ ] 任务成功率
– [ ] 任务执行时间
– [ ] 数据量统计
– [ ] 数据延迟
– [ ] 告警检查
– [ ] 日志检查
– [ ] 资源使用
– [ ] 依赖服务状态
– [ ] 告警规则检查
# 日常巡检内容
1. 检查调度器状态
2. 检查失败任务
3. 检查慢任务
4. 检查数据延迟
5. 查看错误日志
6. 检查资源使用
7. 检查告警
8. 手动触发测试
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
