1. 首页 > Hadoop教程 > 正文

大数据教程FG166-Hadoop湖仓一体架构设计实战

本文详细介绍Hadoop湖仓一体架构设计实战,包括Hudi、Iceberg、Delta Lake等技术的选型、部署和实战,参考Apache Hudi、Iceberg、Delta Lake官方文档,适合大数据架构师和开发工程师使用。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 湖仓一体概述

湖仓一体(Lakehouse)是一种将数据湖和数据仓库的优势结合在一起的架构,既能存储海量原始数据,又能提供ACID事务、数据质量和高性能查询。学习交流加群风哥微信: itpux-com

湖仓一体核心特性:

  • ACID事务支持
  • 支持批处理和流处理
  • Schema演化
  • 数据版本控制
  • 索引优化
  • 并发读写

1.2 湖仓一体架构介绍

湖仓一体架构:

# 湖仓一体分层架构
原始数据层(Bronze):
– 原始数据
– 数据采集落地
– 保持原始格式
– 保留所有历史

清洗数据层(Silver):
– 数据清洗
– 数据标准化
– 轻度聚合
– 数据质量检查

业务数据层(Gold):
– 业务建模
– 维度建模
– 汇总数据
– 面向应用

# 核心组件
存储层:
– HDFS
– S3/OSS
– 云存储

表格式层:
– Apache Hudi
– Apache Iceberg
– Delta Lake

计算引擎层:
– Spark
– Flink
– Presto/Trino
– Hive
– Impala

服务层:
– 数据服务
– BI报表
– 数据可视化

1.3 数据湖vs数据仓库vs湖仓一体

对比分析:

  • 数据湖:存储原始数据,成本低,灵活性高,但缺少ACID和数据质量
  • 数据仓库:结构化数据,高性能,ACID,但成本高,灵活性低
  • 湖仓一体:兼顾两者优点,成本低,灵活性高,同时支持ACID和数据质量
风哥提示:湖仓一体是当前的主流趋势。建议根据业务需求选择Hudi、Iceberg或Delta Lake,它们各有特点。更多学习教程公众号风哥教程itpux_com

Part02-生产环境规划与建议

2.1 湖仓一体架构规划

湖仓一体架构规划要点:

# 存储规划
HDFS/S3路径:
/bigdata/fgdata/lakehouse/
├── bronze/
│ ├── fgedu_user_log/
│ ├── fgedu_order/
│ └── fgedu_user/
├── silver/
│ ├── fgedu_user_log_clean/
│ ├── fgedu_order_clean/
│ └── fgedu_user_clean/
└── gold/
├── fgedu_user_agg/
├── fgedu_order_agg/
└── fgedu_sales_report/

# 技术选型建议
实时更新场景:
– 推荐:Apache Hudi
– 特点:Upsert/Delete性能好

数据科学场景:
– 推荐:Delta Lake
– 特点:Spark生态好

大数据量场景:
– 推荐:Apache Iceberg
– 特点:可扩展性好

# 集群规划
Hadoop集群:
– NameNode:2台
– DataNode:10台+
– 存储:PB级

Spark集群:
– Driver:高配置
– Executor:按需分配

Flink集群:
– JobManager:2台
– TaskManager:按需分配

2.2 技术选型

三大开源表格式对比:

三大表格式对比:

  • Apache Hudi:Upsert/Delete性能好,支持增量查询,适合实时入湖
  • Apache Iceberg:Schema演化好,支持多种引擎,适合大数据量
  • Delta Lake:Spark生态好,易用性好,适合数据科学

from bigdata视频:www.itpux.com

2.3 开发规范

开发规范:

# 命名规范
表名:
– Bronze层:bronze_业务_表名
– Silver层:silver_业务_表名
– Gold层:gold_业务_主题

字段名:
– 统一小写
– 下划线分隔

# 目录规范
HDFS路径:
/bigdata/fgdata/lakehouse/
├── bronze/
├── silver/
└── gold/

# 时间旅行
保留历史版本:
– Bronze层:30天
– Silver层:90天
– Gold层:永久

Part03-生产环境项目实施方案

3.1 Hudi湖仓一体实战

3.1.1 Hudi部署配置

# 1. 下载Hudi
cd /bigdata/app
wget https://archive.apache.org/dist/hudi/0.14.0/hudi-0.14.0.jar

# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet false
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
EOF

# 3. 创建Hudi表(Spark Scala)
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val basePath = “hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log”

val df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_log/*”)

df.write.format(“hudi”)
.option(TABLE_NAME, “fgedu_user_log”)
.option(RECORDKEY_FIELD_OPT_KEY, “user_id”)
.option(PRECOMBINE_FIELD_OPT_KEY, “log_time”)
.option(PARTITIONPATH_FIELD_OPT_KEY, “dt”)
.option(TABLE_TYPE_OPT_KEY, “MERGE_ON_READ”)
.mode(SaveMode.Append)
.save(basePath)

# 4. 读取Hudi表
val hudiDF = spark.read.format(“hudi”).load(basePath)
hudiDF.show()

# 5. 时间旅行查询
val hudiDFTimeTravel = spark.read.format(“hudi”)
.option(“as.of.instant”, “20240408000000”)
.load(basePath)

# 6. 增量查询
val hudiDFIncremental = spark.read.format(“hudi”)
.option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(BEGIN_INSTANTTIME_OPT_KEY, “20240407000000”)
.load(basePath)

# 7. Flink写入Hudi
# Flink SQL
CREATE TABLE fgedu_user_log_hudi (
log_time TIMESTAMP(3),
user_id BIGINT,
event_type STRING,
page STRING,
dt STRING
)
PARTITIONED BY (dt)
WITH (
‘connector’ = ‘hudi’,
‘path’ = ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’,
‘table.type’ = ‘MERGE_ON_READ’,
‘hoodie.datasource.write.recordkey.field’ = ‘user_id’,
‘hoodie.datasource.write.precombine.field’ = ‘log_time’,
‘hoodie.datasource.write.partitionpath.field’ = ‘dt’
);

3.2 Iceberg湖仓一体实战

3.2.1 Iceberg部署配置

# 1. 下载Iceberg
cd /bigdata/app
wget https://archive.apache.org/dist/iceberg/1.4.3/iceberg-spark-runtime-3.3_2.12-1.4.3.jar

# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type hadoop
spark.sql.catalog.iceberg.warehouse hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/iceberg
EOF

# 3. 创建Iceberg表(Spark SQL)
spark-sql
USE iceberg;

CREATE TABLE fgedu_user_log (
log_time TIMESTAMP,
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
dt STRING
)
PARTITIONED BY (dt)
USING iceberg
LOCATION ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’;

# 4. 写入数据
INSERT INTO iceberg.fgedu_user_log
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
dt
FROM fgedu_ods.ods_fgedu_user_log
WHERE dt = ‘2024-04-08’;

# 5. 时间旅行查询
SELECT * FROM iceberg.fgedu_user_log TIMESTAMP AS OF ‘2024-04-08 10:00:00’;

# 6. 查询快照
SELECT * FROM iceberg.fgedu_user_log.snapshots;

# 7. 回滚
CALL iceberg.system.rollback_to_snapshot(
‘fgedu_user_log’,
123456789012345
);

# 8. 合并小文件
CALL iceberg.system.rewrite_data_files(
table => ‘iceberg.fgedu_user_log’,
strategy => ‘binpack’,
options => map(‘min-input-files’, ’10’)
);

# 9. 删除旧快照
CALL iceberg.system.expire_snapshots(
‘iceberg.fgedu_user_log’,
TIMESTAMP ‘2024-03-01 00:00:00’
);

3.3 Delta Lake湖仓一体实战

3.3.1 Delta Lake部署配置

# 1. 下载Delta Lake
cd /bigdata/app
wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar

# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
EOF

# 3. 创建Delta表(Spark)
import io.delta.tables._

val basePath = “hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log”

val df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_log/*”)

df.write.format(“delta”)
.mode(“overwrite”)
.option(“path”, basePath)
.saveAsTable(“fgedu_user_log_delta”)

# 4. 读取Delta表
val deltaDF = spark.read.format(“delta”).load(basePath)
deltaDF.show()

# 5. Upsert操作
val updatesDF = …

val deltaTable = DeltaTable.forPath(spark, basePath)

deltaTable.as(“target”)
.merge(
updatesDF.as(“updates”),
“target.user_id = updates.user_id”
)
.whenMatched
.updateExpr(
Map(
“event_type” -> “updates.event_type”,
“page” -> “updates.page”,
“log_time” -> “updates.log_time”
)
)
.whenNotMatched
.insertAll()
.execute()

# 6. 时间旅行查询
val deltaDFTimeTravel = spark.read.format(“delta”)
.option(“timestampAsOf”, “2024-04-08 10:00:00”)
.load(basePath)

# 7. 查询版本
deltaTable.history().show()

# 8. 回滚
deltaTable.restoreToVersion(10)
deltaTable.restoreToTimestamp(“2024-04-07 00:00:00”)

# 9. 优化
deltaTable.optimize().execute()
deltaTable.vacuum(168)

风哥提示:这三个表格式都各有特点。如果以实时更新为主,建议Hudi;如果以Spark生态为主,建议Delta Lake;如果需要跨引擎支持,建议Iceberg。学习交流加群风哥QQ113257174

Part04-生产案例与实战讲解

4.1 实时入湖实战

4.1.1 Flink+Hudi实时入湖

— Flink SQL实时入湖
— 1. 创建Kafka源表
CREATE TABLE fgedu_user_log_kafka (
log_time STRING,
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
duration INT,
ip STRING,
dt STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘fgedu_user_events’,
‘properties.bootstrap.servers’ = ‘fgedu-kafka01:9092’,
‘properties.group.id’ = ‘fgedu_flink_hudi’,
‘format’ = ‘json’,
‘scan.startup.mode’ = ‘latest-offset’
);

— 2. 创建Hudi目标表
CREATE TABLE fgedu_user_log_hudi (
log_time TIMESTAMP(3),
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
duration INT,
ip STRING,
dt STRING,
WATERMARK FOR log_time AS log_time – INTERVAL ‘5’ SECOND
)
PARTITIONED BY (dt)
WITH (
‘connector’ = ‘hudi’,
‘path’ = ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’,
‘table.type’ = ‘MERGE_ON_READ’,
‘hoodie.datasource.write.recordkey.field’ = ‘user_id’,
‘hoodie.datasource.write.precombine.field’ = ‘log_time’,
‘hoodie.datasource.write.partitionpath.field’ = ‘dt’,
‘hoodie.datasource.hive_sync.enable’ = ‘true’,
‘hoodie.datasource.hive_sync.table’ = ‘fgedu_user_log_hudi’,
‘hoodie.datasource.hive_sync.database’ = ‘fgedu_lakehouse’
);

— 3. 实时写入Hudi
INSERT INTO fgedu_user_log_hudi
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
duration,
ip,
dt
FROM fgedu_user_log_kafka;

— 4. 查看写入进度
— Flink Web UI查看作业状态
— http://fgedu-flink:8081

4.2 交互式查询实战

4.2.1 Presto查询湖仓

— 1. 配置Presto Hudi连接器
cat > /bigdata/app/presto/etc/catalog/hudi.properties << ‘EOF’
connector.name=hudi
hive.metastore.uri=thrift://fgedu-hms:9083
hive.config.resources=/bigdata/app/hadoop/etc/hadoop/core-site.xml,/bigdata/app/hadoop/etc/hadoop/hdfs-site.xml
EOF

— 2. 配置Presto Iceberg连接器
cat > /bigdata/app/presto/etc/catalog/iceberg.properties << ‘EOF’
connector.name=iceberg
hive.metastore.uri=thrift://fgedu-hms:9083
hive.config.resources=/bigdata/app/hadoop/etc/hadoop/core-site.xml,/bigdata/app/hadoop/etc/hadoop/hdfs-site.xml
EOF

— 3. Presto查询Hudi表
presto-cli
USE hudi.fgedu_lakehouse;

SELECT
dt,
user_id,
COUNT(*) AS event_count
FROM fgedu_user_log_hudi
WHERE dt BETWEEN ‘2024-04-01’ AND ‘2024-04-08’
GROUP BY dt, user_id
ORDER BY dt DESC, event_count DESC
LIMIT 100;

— 4. Presto查询Iceberg表
USE iceberg.fgedu_lakehouse;

SELECT
dt,
event_type,
COUNT(*) AS cnt
FROM fgedu_user_log
WHERE dt = ‘2024-04-08’
GROUP BY dt, event_type;

— 5. 时间旅行查询(Iceberg)
SELECT * FROM iceberg.fgedu_lakehouse.fgedu_user_log
FOR SYSTEM_TIME AS OF TIMESTAMP ‘2024-04-07 10:00:00’;

— 6. 联合查询
SELECT
h.user_id,
h.name,
l.event_count
FROM iceberg.fgedu_lakehouse.fgedu_user h
JOIN (
SELECT user_id, COUNT(*) AS event_count
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
WHERE dt = ‘2024-04-08’
GROUP BY user_id
) l ON h.user_id = l.user_id
LIMIT 100;

4.3 数据治理实战

4.3.1 数据治理

— 1. 元数据管理
— 使用Apache Atlas管理湖仓元数据
— 配置Atlas Hudi Hook
— 配置Atlas Iceberg Hook

— 2. 数据质量检查
— 使用Great Expectations
— 或自定义检查
— 检查NULL值
SELECT
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS user_id_null_cnt,
SUM(CASE WHEN log_time IS NULL THEN 1 ELSE 0 END) AS log_time_null_cnt,
COUNT(*) AS total_cnt
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
WHERE dt = ‘2024-04-08’;

— 检查数据量
SELECT dt, COUNT(*)
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
GROUP BY dt
ORDER BY dt DESC;

— 3. 数据安全
— 使用Apache Ranger进行权限控制
— Ranger Hudi Plugin
— Ranger Iceberg Plugin

— 4. 数据血缘
— 使用Apache Atlas查看数据血缘
— 从Kafka到Hudi
— 从Hudi到DWS
— 从DWS到ADS

— 5. 数据生命周期管理
— 自动清理旧分区
— 自动清理旧快照
— 自动合并小文件

— 6. 监控告警
— Prometheus指标
— Grafana可视化
— 配置告警规则

生产环境建议:湖仓一体需要完善的数据治理体系。建议从元数据管理、数据质量、数据安全等方面逐步建设。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

5.1 湖仓一体最佳实践

湖仓一体最佳实践:

  • 技术选型:根据业务场景选择合适的表格式
  • 分层设计:Bronze-Silver-Gold分层
  • 小文件合并:定期合并小文件,提高查询性能
  • 快照管理:定期清理旧快照,节省存储空间
  • 数据质量:建立完善的数据质量检查体系

5.2 常见问题处理

# 常见问题1:小文件过多
– 自动合并
– 调整写参数
– 定期optimize
– 调整并行度

# 常见问题2:查询慢
– 合并小文件
– 分区裁剪
– 增加索引
– 更新统计信息
– 使用合适的引擎

# 常见问题3:写入失败
– 检查权限
– 检查空间
– 检查事务冲突
– 重试
– 检查日志

# 常见问题4:元数据问题
– 同步元数据
– 修复表
– 检查HMS
– 检查HDFS

# 常见问题5:时间旅行失败
– 检查快照是否存在
– 检查保留时间
– 检查快照清理策略

5.3 运维检查清单

# 湖仓一体运维检查清单
– [ ] 写入作业正常
– [ ] 查询性能正常
– [ ] 小文件合并
– [ ] 快照管理
– [ ] 元数据同步
– [ ] 数据质量检查
– [ ] 存储空间检查
– [ ] 权限检查
– [ ] 监控告警
– [ ] 日志检查
– [ ] 备份状态

# 日常巡检内容
1. 检查写入作业
2. 检查查询性能
3. 检查小文件
4. 检查存储空间
5. 检查快照
6. 检查元数据
7. 检查告警
8. 检查日志

风哥提示:湖仓一体是大数据架构的发展趋势。建议从试点开始,逐步推广。要注重运维体系建设,包括监控、告警、备份等。学习交流加群风哥微信: itpux-com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息