1. 首页 > Hadoop教程 > 正文

大数据教程FG164-Hadoop大数据流水线最佳实践

本文详细介绍大数据流水线最佳实践,包括工作流调度、批量处理、实时处理、CI/CD等内容,风哥教程参考Oozie、Airflow、DolphinScheduler官方文档,适合大数据开发工程师使用。学习交流加群风哥微信: itpux-com

Part01-基础概念与理论知识

1.1 大数据流水线概述

大数据流水线是指将数据从采集、清洗、转换、存储、分析到可视化的整个流程自动化处理的过程。更多视频教程www.fgedu.net.cn

大数据流水线核心目标:

  • 自动化:减少人工操作,提高效率
  • 可靠性:保证流程稳定运行
  • 可追溯:记录每一步执行情况
  • 可监控:实时监控流程状态
  • 可告警:异常时及时告警
  • 可扩展:支持流程的灵活扩展

1.2 流水线架构设计

典型的大数据流水线架构:

# 大数据流水线分层
数据采集层:
– 日志采集:Flume、Filebeat
– 数据库采集:Sqoop、DataX
– 实时采集:Kafka
– 文件采集:FTP、S3

数据存储层:
– 原始数据:HDFS
– 实时数据:Kafka
– 数据仓库:Hive
– 实时分析:Doris、ClickHouse

数据处理层:
– 批量处理:Spark、MapReduce
– 实时处理:Flink、Spark Streaming
– SQL处理:Hive、Presto、Impala

工作流调度层:
– Oozie
– Airflow
– DolphinScheduler
– Azkaban

数据服务层:
– API服务
– BI报表
– 数据可视化

1.3 常用工具介绍

常用调度工具对比:

  • Oozie:Apache开源,Hadoop原生,集成度高
  • Airflow:Python编写,灵活易用,生态丰富
  • DolphinScheduler:国产开源,可视化,易用性好
  • Azkaban:LinkedIn开源,简单易用
风哥提示:选择调度工具时要考虑团队技术栈、功能需求、易用性等因素。Airflow和DolphinScheduler是目前比较流行的选择。学习交流加群风哥QQ113257174

Part02-生产环境规划与建议

2.1 流水线架构规划

流水线架构规划要点:

# 服务器规划
调度服务器:
– 数量:2台(高可用)
– 配置:8核16GB
– 组件:Airflow/DolphinScheduler

元数据库:
– MySQL/PostgreSQL
– 高可用配置

对象存储:
– HDFS/S3
– 存储日志和临时文件

# 流水线分类
批量流水线:
– 定时调度
– 小时/天/周
– 处理历史数据

实时流水线:
– 持续运行
– 低延迟
– 处理实时数据

混合流水线:
– 结合批量和实时
– Lambda/Kappa架构

# 流水线设计原则
1. 单一职责:每个Job只做一件事
2. 可重跑:支持失败重跑
3. 可监控:有完善的监控
4. 可告警:异常时及时告警
5. 可回滚:支持回滚操作

2.2 开发规范与标准

开发规范与标准:

# 命名规范
Job命名:
– 格式:业务_场景_时间粒度
– 示例:fgedu_user_etl_daily
– 示例:fgedu_order_report_hourly

脚本命名:
– 格式:业务_场景_功能.后缀
– 示例:fgedu_user_etl.py
– 示例:fgedu_order_report.sql

目录结构:
/bigdata/fgdata/pipelines/
├── dags/ # DAG定义
├── scripts/ # 脚本
│ ├── python/
│ ├── shell/
│ └── sql/
├── conf/ # 配置文件
├── logs/ # 日志
└── data/ # 数据

# 代码规范
Python:
– 遵循PEP8
– 完善的注释
– 异常处理
– 日志记录

SQL:
– 统一格式化
– 注释说明
– 性能优化

2.3 监控与告警规划

监控与告警规划:

监控指标:

  • Job状态:成功、失败、运行中
  • 执行时间:每次执行时长
  • 数据量:处理的数据量
  • 成功率:Job成功率
  • 延迟:数据延迟

更多学习教程公众号风哥教程itpux_com

Part03-生产环境项目实施方案

3.1 Oozie工作流调度

3.1.1 Oozie安装配置

# 1. 下载Oozie
cd /bigdata/app
wget https://archive.apache.org/dist/oozie/5.2.1/oozie-5.2.1.tar.gz
tar -zxvf oozie-5.2.1.tar.gz
ln -s oozie-5.2.1 oozie

# 2. 准备WAR包
cd /bigdata/app/oozie
bin/oozie-setup.sh prepare-war

# 3. 创建数据库
mysql -u root -p
CREATE DATABASE oozie DEFAULT CHARACTER SET utf8;
CREATE USER ‘oozie’@’%’ IDENTIFIED BY ‘fgedu123’;
GRANT ALL PRIVILEGES ON oozie.* TO ‘oozie’@’%’;
FLUSH PRIVILEGES;

# 4. 配置oozie-site.xml
cat > /bigdata/app/oozie/conf/oozie-site.xml << ‘EOF’
<?xml version=”1.0″?>
<configuration>
<property>
<name>oozie.service.JPAService.jdbc.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.url</name>
<value>jdbc:mysql://fgedu-mysql:3306/oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.username</name>
<value>oozie</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.password</name>
<value>fgedu123</value>
</property>
<property>
<name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
<value>*=/bigdata/app/hadoop/etc/hadoop</value>
</property>
</configuration>
EOF

# 5. 上传ShareLib
bin/oozie-setup.sh sharelib create -fs hdfs://fgedu-nn:8020

# 6. 初始化数据库
bin/ooziedb.sh create -sqlfile oozie.sql -run

# 7. 启动Oozie
bin/oozied.sh start

# 8. 验证Oozie
bin/oozie admin -oozie http://localhost:11000/oozie -status

# 9. 编写Workflow
mkdir -p /bigdata/fgdata/pipelines/oozie/fgedu_etl
cat > /bigdata/fgdata/pipelines/oozie/fgedu_etl/workflow.xml << ‘EOF’
<workflow-app xmlns=”uri:oozie:workflow:0.5″ name=”fgedu-etl-wf”>
<start to=”fgedu-hive-node”/>

<action name=”fgedu-hive-node”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<job-tracker>fgedu-rm:8032</job-tracker>
<name-node>hdfs://fgedu-nn:8020</name-node>
<script>fgedu_etl.sql</script>
</hive>
<ok to=”end”/>
<error to=”fail”/>
</action>

<kill name=”fail”>
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name=”end”/>
</workflow-app>
EOF

# 10. 上传到HDFS
hdfs dfs -put /bigdata/fgdata/pipelines/oozie/fgedu_etl /bigdata/fgdata/oozie/

# 11. 编写job.properties
cat > /bigdata/fgdata/pipelines/oozie/fgedu_etl/job.properties << ‘EOF’
nameNode=hdfs://fgedu-nn:8020
jobTracker=fgedu-rm:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/bigdata/fgdata/oozie/fgedu_etl
EOF

# 12. 提交Job
bin/oozie job -oozie http://localhost:11000/oozie -config job.properties -run

# 13. 查看Job状态
bin/oozie job -oozie http://localhost:11000/oozie -info <job-id>

3.2 Airflow工作流调度

3.2.1 Airflow安装配置

# 1. 安装Python依赖
yum install -y python3 python3-pip
pip3 install apache-airflow==2.6.3
pip3 install apache-airflow-providers-apache-hive
pip3 install apache-airflow-providers-apache-spark

# 2. 初始化Airflow
export AIRFLOW_HOME=/bigdata/app/airflow
airflow db init

# 3. 创建用户
airflow users create \
–username admin \
–firstname Admin \
–lastname User \
–role Admin \
–email admin@fgedu.net.cn

# 4. 配置airflow.cfg
cat > /bigdata/app/airflow/airflow.cfg << ‘EOF’
[core]
dags_folder = /bigdata/fgdata/pipelines/airflow/dags
base_log_folder = /bigdata/fgdata/logs/airflow
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:fgedu123@fgedu-pg:5432/airflow

[celery]
broker_url = redis://:fgedu123@fgedu-redis:6379/0
result_backend = db+postgresql://airflow:fgedu123@fgedu-pg:5432/airflow

[scheduler]
dag_dir_list_interval = 300

[webserver]
web_server_host = 0.0.0.0
web_server_port = 8080
EOF

# 5. 创建目录
mkdir -p /bigdata/fgdata/pipelines/airflow/dags
mkdir -p /bigdata/fgdata/logs/airflow

# 6. 初始化数据库
airflow db init

# 7. 启动Airflow
airflow webserver –daemon
airflow scheduler –daemon
airflow celery worker –daemon

# 8. 访问Airflow
# http://fgedu-airflow:8080

# 9. 编写DAG
cat > /bigdata/fgdata/pipelines/airflow/dags/fgedu_etl_dag.py << ‘EOF’
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator

default_args = {
‘owner’: ‘fgedu’,
‘depends_on_past’: False,
‘start_date’: datetime(2024, 4, 1),
’email’: [‘admin@fgedu.net.cn’],
’email_on_failure’: True,
’email_on_retry’: False,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
}

dag = DAG(
‘fgedu_etl_dag’,
default_args=default_args,
description=’Fgedu ETL DAG’,
schedule_interval=timedelta(days=1),
catchup=False,
)

def extract_data():
print(“Extracting data…”)

def transform_data():
print(“Transforming data…”)

def load_data():
print(“Loading data…”)

extract_task = PythonOperator(
task_id=’extract_data’,
python_callable=extract_data,
dag=dag,
)

transform_task = PythonOperator(
task_id=’transform_data’,
python_callable=transform_data,
dag=dag,
)

load_task = PythonOperator(
task_id=’load_data’,
python_callable=load_data,
dag=dag,
)

hive_task = HiveOperator(
task_id=’create_hive_table’,
hql=’CREATE TABLE IF NOT EXISTS fgedu_db.fgedu_user (id INT, name STRING)’,
hive_cli_conn_id=’hive_default’,
dag=dag,
)

extract_task >> transform_task >> load_task >> hive_task
EOF

# 10. 查看DAG
# Airflow Web UI查看DAG
# 手动触发DAG运行

3.3 DolphinScheduler调度

3.3.1 DolphinScheduler安装配置

# 1. 下载DolphinScheduler
cd /bigdata/app
wget https://archive.apache.org/dist/dolphinscheduler/3.1.9/apache-dolphinscheduler-3.1.9-bin.tar.gz
tar -zxvf apache-dolphinscheduler-3.1.9-bin.tar.gz
ln -s apache-dolphinscheduler-3.1.9-bin dolphinscheduler

# 2. 初始化数据库
mysql -u root -p
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8;
CREATE USER ‘dolphinscheduler’@’%’ IDENTIFIED BY ‘fgedu123’;
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO ‘dolphinscheduler’@’%’;
FLUSH PRIVILEGES;

# 3. 配置conf/config/install_config.conf
cd /bigdata/app/dolphinscheduler
vi conf/config/install_config.conf

# 关键配置
ips=”fgedu-ds01,fgedu-ds02″
masters=”fgedu-ds01″
workers=”fgedu-ds01,fgedu-ds02″
alertServer=”fgedu-ds01″
apiServers=”fgedu-ds01″
pythonGatewayServers=”fgedu-ds01″
dolphinschedulerPath=”/bigdata/app/dolphinscheduler”
dataBasedirPath=”/bigdata/fgdata/dolphinscheduler”
hdfsStartupSate=”false”
DATABASE_TYPE=”mysql”
SPRING_DATASOURCE_URL=”jdbc:mysql://fgedu-mysql:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8″
SPRING_DATASOURCE_USERNAME=”dolphinscheduler”
SPRING_DATASOURCE_PASSWORD=”fgedu123″

# 4. 创建目录
mkdir -p /bigdata/fgdata/dolphinscheduler

# 5. 一键安装
bash ./bin/install.sh

# 6. 启动DolphinScheduler
bash ./bin/start-all.sh

# 7. 访问DolphinScheduler
# http://fgedu-ds01:12345/dolphinscheduler
# 默认用户名: admin
# 默认密码: dolphinscheduler123

# 8. 创建项目
# Web UI创建项目
# 创建工作流
# 配置任务
# 定时调度

风哥提示:DolphinScheduler是国产开源调度工具,可视化界面友好,适合国内用户。如果团队技术栈允许,可以考虑使用。from bigdata视频:www.itpux.com

Part04-生产案例与实战讲解

4.1 批量数据处理实战

4.1.1 批量ETL流水线

# 批量ETL流程
# 1. 数据采集
# Flume采集日志到HDFS
# Sqoop从MySQL导入到HDFS

# 2. 数据清洗
# Spark清洗数据
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(“FgeduETL”).enableHiveSupport().getOrCreate()

df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_events/*”)

df_clean = df.select(
“user_id”,
“event_type”,
“event_time”,
“page”
).filter(df.user_id.isNotNull())

df_clean.write.mode(“overwrite”).saveAsTable(“fgedu_db.fgedu_user_events_clean”)

spark.stop()

# 3. 数据聚合
# Hive SQL聚合
INSERT OVERWRITE TABLE fgedu_db.fgedu_user_events_daily
SELECT
dt,
user_id,
COUNT(*) AS event_count,
COUNT(DISTINCT event_type) AS event_type_count
FROM fgedu_db.fgedu_user_events_clean
GROUP BY dt, user_id;

# 4. 数据导出
# Sqoop导出到MySQL
sqoop export \
–connect jdbc:mysql://fgedu-mysql:3306/fgedudb \
–username fgedu \
–password fgedu123 \
–table fgedu_user_events_daily \
–export-dir /bigdata/fgdata/hive/warehouse/fgedu_db.db/fgedu_user_events_daily \
–input-fields-terminated-by ‘\001’

# 5. Airflow DAG编排
# 按顺序执行:采集 -> 清洗 -> 聚合 -> 导出
# 配置失败重试
# 配置告警

4.2 实时数据处理实战

4.2.1 实时流水线

# 实时数据处理流程
# 1. 数据采集
# Flume/Fluentd采集日志
# 发送到Kafka

# 2. 实时处理
# Flink实时处理
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.api.common.serialization.SimpleStringSchema

object FgeduRealTimeETL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000)

val kafkaProps = new Properties()
kafkaProps.setProperty(“bootstrap.servers”, “fgedu-kafka01:9092”)
kafkaProps.setProperty(“group.id”, “fgedu_flink_group”)

val kafkaConsumer = new FlinkKafkaConsumer[String](
“fgedu_user_events”,
new SimpleStringSchema(),
kafkaProps
)

val stream = env.addSource(kafkaConsumer)

val cleanedStream = stream
.map(parseEvent)
.filter(_ != null)
.keyBy(_.userId)
.timeWindow(Time.minutes(5))
.aggregate(new EventCountAgg())

cleanedStream.addSink(new FlinkKafkaProducer[String](
“fgedu_user_events_agg”,
new SimpleStringSchema(),
kafkaProps
))

env.execute(“Fgedu Real Time ETL”)
}

def parseEvent(json: String): Event = {
try {
// 解析JSON
} catch {
case _: Exception => null
}
}
}

# 3. 实时存储
# 写入Doris/ClickHouse实时查询
# 写入HBase用于查询

# 4. 实时监控
# Prometheus监控Flink指标
# Grafana可视化
# 配置告警

4.3 CI/CD实战

4.3.1 持续集成持续部署

# GitLab CI/CD
# .gitlab-ci.yml
stages:
– test
– build
– deploy

variables:
MAVEN_OPTS: “-Dmaven.repo.local=.m2/repository”

test:
stage: test
image: maven:3.8.6-openjdk-8
script:
– mvn test

build:
stage: build
image: maven:3.8.6-openjdk-8
script:
– mvn clean package
artifacts:
paths:
– target/*.jar

deploy:
stage: deploy
script:
– scp target/*.jar fgedu-deploy:/bigdata/fgdata/pipelines/jars/
– ssh fgedu-deploy “cd /bigdata/fgdata/pipelines && ./deploy.sh”
only:
– master

# deploy.sh
#!/bin/bash
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “开始部署…”
JAR_FILE=fgedu-etl-1.0.0.jar
BACKUP_FILE=fgedu-etl-$(date +%Y%m%d_%H%M%S).jar

# 备份
cp /bigdata/fgdata/pipelines/jars/$JAR_FILE /bigdata/fgdata/pipelines/jars/$BACKUP_FILE

# 部署
cp /bigdata/fgdata/pipelines/jars/$JAR_FILE /bigdata/app/etl/

# 更新Airflow DAG
cp /bigdata/fgdata/pipelines/dags/*.py /bigdata/app/airflow/dags/

# 重启相关服务(如果需要)
echo “部署完成!”

# 代码审查
# Merge Request流程
# 自动化测试
# 人工审核
# 合并代码

# 环境分离
– 开发环境
– 测试环境
– 预发布环境
– 生产环境

生产环境建议:CI/CD可以大大提高开发效率和部署可靠性。建议从简单的自动化部署开始,逐步完善测试和审查流程。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

5.1 流水线最佳实践

大数据流水线最佳实践:

  • 任务拆分:大任务拆分成小任务,便于调试和重试
  • 幂等性:任务支持多次执行,结果一致
  • 错误处理:完善的异常处理和告警
  • 监控日志:完善的日志记录和监控
  • 环境分离:开发、测试、生产环境分离
  • CI/CD:自动化测试和部署

5.2 常见问题处理

# 常见问题1:任务失败
– 查看日志
– 检查依赖
– 检查数据
– 重试任务
– 回滚操作

# 常见问题2:任务慢
– 查看执行计划
– 优化SQL
– 增加资源
– 调整并行度
– 优化数据倾斜

# 常见问题3:调度器挂了
– 检查调度器日志
– 检查资源
– 重启调度器
– 检查高可用

# 常见问题4:数据不一致
– 检查重跑逻辑
– 检查幂等性
– 检查时间窗口
– 重新跑数

# 常见问题5:告警风暴
– 优化告警规则
– 合并告警
– 分级告警
– 静默非关键告警

5.3 运维检查清单

# 流水线运维检查清单
– [ ] 调度器状态
– [ ] 任务执行状态
– [ ] 任务成功率
– [ ] 任务执行时间
– [ ] 数据量统计
– [ ] 数据延迟
– [ ] 告警检查
– [ ] 日志检查
– [ ] 资源使用
– [ ] 依赖服务状态
– [ ] 告警规则检查

# 日常巡检内容
1. 检查调度器状态
2. 检查失败任务
3. 检查慢任务
4. 检查数据延迟
5. 查看错误日志
6. 检查资源使用
7. 检查告警
8. 手动触发测试

风哥提示:大数据流水线建设是一个持续的过程,从简单的调度开始,逐步完善监控、告警、CI/CD等。要注重可观测性,确保出问题时能快速定位和解决。学习交流加群风哥微信: itpux-com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息