本文详细介绍Hadoop湖仓一体架构设计实战,包括Hudi、Iceberg、Delta Lake等技术的选型、部署和实战,参考Apache Hudi、Iceberg、Delta Lake官方文档,适合大数据架构师和开发工程师使用。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 湖仓一体概述
湖仓一体(Lakehouse)是一种将数据湖和数据仓库的优势结合在一起的架构,既能存储海量原始数据,又能提供ACID事务、数据质量和高性能查询。学习交流加群风哥微信: itpux-com
- ACID事务支持
- 支持批处理和流处理
- Schema演化
- 数据版本控制
- 索引优化
- 并发读写
1.2 湖仓一体架构介绍
湖仓一体架构:
原始数据层(Bronze):
– 原始数据
– 数据采集落地
– 保持原始格式
– 保留所有历史
清洗数据层(Silver):
– 数据清洗
– 数据标准化
– 轻度聚合
– 数据质量检查
业务数据层(Gold):
– 业务建模
– 维度建模
– 汇总数据
– 面向应用
# 核心组件
存储层:
– HDFS
– S3/OSS
– 云存储
表格式层:
– Apache Hudi
– Apache Iceberg
– Delta Lake
计算引擎层:
– Spark
– Flink
– Presto/Trino
– Hive
– Impala
服务层:
– 数据服务
– BI报表
– 数据可视化
1.3 数据湖vs数据仓库vs湖仓一体
对比分析:
- 数据湖:存储原始数据,成本低,灵活性高,但缺少ACID和数据质量
- 数据仓库:结构化数据,高性能,ACID,但成本高,灵活性低
- 湖仓一体:兼顾两者优点,成本低,灵活性高,同时支持ACID和数据质量
Part02-生产环境规划与建议
2.1 湖仓一体架构规划
湖仓一体架构规划要点:
HDFS/S3路径:
/bigdata/fgdata/lakehouse/
├── bronze/
│ ├── fgedu_user_log/
│ ├── fgedu_order/
│ └── fgedu_user/
├── silver/
│ ├── fgedu_user_log_clean/
│ ├── fgedu_order_clean/
│ └── fgedu_user_clean/
└── gold/
├── fgedu_user_agg/
├── fgedu_order_agg/
└── fgedu_sales_report/
# 技术选型建议
实时更新场景:
– 推荐:Apache Hudi
– 特点:Upsert/Delete性能好
数据科学场景:
– 推荐:Delta Lake
– 特点:Spark生态好
大数据量场景:
– 推荐:Apache Iceberg
– 特点:可扩展性好
# 集群规划
Hadoop集群:
– NameNode:2台
– DataNode:10台+
– 存储:PB级
Spark集群:
– Driver:高配置
– Executor:按需分配
Flink集群:
– JobManager:2台
– TaskManager:按需分配
2.2 技术选型
三大开源表格式对比:
- Apache Hudi:Upsert/Delete性能好,支持增量查询,适合实时入湖
- Apache Iceberg:Schema演化好,支持多种引擎,适合大数据量
- Delta Lake:Spark生态好,易用性好,适合数据科学
from bigdata视频:www.itpux.com
2.3 开发规范
开发规范:
表名:
– Bronze层:bronze_业务_表名
– Silver层:silver_业务_表名
– Gold层:gold_业务_主题
字段名:
– 统一小写
– 下划线分隔
# 目录规范
HDFS路径:
/bigdata/fgdata/lakehouse/
├── bronze/
├── silver/
└── gold/
# 时间旅行
保留历史版本:
– Bronze层:30天
– Silver层:90天
– Gold层:永久
Part03-生产环境项目实施方案
3.1 Hudi湖仓一体实战
3.1.1 Hudi部署配置
cd /bigdata/app
wget https://archive.apache.org/dist/hudi/0.14.0/hudi-0.14.0.jar
# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet false
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
EOF
# 3. 创建Hudi表(Spark Scala)
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val basePath = “hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log”
val df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_log/*”)
df.write.format(“hudi”)
.option(TABLE_NAME, “fgedu_user_log”)
.option(RECORDKEY_FIELD_OPT_KEY, “user_id”)
.option(PRECOMBINE_FIELD_OPT_KEY, “log_time”)
.option(PARTITIONPATH_FIELD_OPT_KEY, “dt”)
.option(TABLE_TYPE_OPT_KEY, “MERGE_ON_READ”)
.mode(SaveMode.Append)
.save(basePath)
# 4. 读取Hudi表
val hudiDF = spark.read.format(“hudi”).load(basePath)
hudiDF.show()
# 5. 时间旅行查询
val hudiDFTimeTravel = spark.read.format(“hudi”)
.option(“as.of.instant”, “20240408000000”)
.load(basePath)
# 6. 增量查询
val hudiDFIncremental = spark.read.format(“hudi”)
.option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(BEGIN_INSTANTTIME_OPT_KEY, “20240407000000”)
.load(basePath)
# 7. Flink写入Hudi
# Flink SQL
CREATE TABLE fgedu_user_log_hudi (
log_time TIMESTAMP(3),
user_id BIGINT,
event_type STRING,
page STRING,
dt STRING
)
PARTITIONED BY (dt)
WITH (
‘connector’ = ‘hudi’,
‘path’ = ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’,
‘table.type’ = ‘MERGE_ON_READ’,
‘hoodie.datasource.write.recordkey.field’ = ‘user_id’,
‘hoodie.datasource.write.precombine.field’ = ‘log_time’,
‘hoodie.datasource.write.partitionpath.field’ = ‘dt’
);
3.2 Iceberg湖仓一体实战
3.2.1 Iceberg部署配置
cd /bigdata/app
wget https://archive.apache.org/dist/iceberg/1.4.3/iceberg-spark-runtime-3.3_2.12-1.4.3.jar
# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type hive
spark.sql.catalog.iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type hadoop
spark.sql.catalog.iceberg.warehouse hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/iceberg
EOF
# 3. 创建Iceberg表(Spark SQL)
spark-sql
USE iceberg;
CREATE TABLE fgedu_user_log (
log_time TIMESTAMP,
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
dt STRING
)
PARTITIONED BY (dt)
USING iceberg
LOCATION ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’;
# 4. 写入数据
INSERT INTO iceberg.fgedu_user_log
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
dt
FROM fgedu_ods.ods_fgedu_user_log
WHERE dt = ‘2024-04-08’;
# 5. 时间旅行查询
SELECT * FROM iceberg.fgedu_user_log TIMESTAMP AS OF ‘2024-04-08 10:00:00’;
# 6. 查询快照
SELECT * FROM iceberg.fgedu_user_log.snapshots;
# 7. 回滚
CALL iceberg.system.rollback_to_snapshot(
‘fgedu_user_log’,
123456789012345
);
# 8. 合并小文件
CALL iceberg.system.rewrite_data_files(
table => ‘iceberg.fgedu_user_log’,
strategy => ‘binpack’,
options => map(‘min-input-files’, ’10’)
);
# 9. 删除旧快照
CALL iceberg.system.expire_snapshots(
‘iceberg.fgedu_user_log’,
TIMESTAMP ‘2024-03-01 00:00:00’
);
3.3 Delta Lake湖仓一体实战
3.3.1 Delta Lake部署配置
cd /bigdata/app
wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar
# 2. Spark配置
cat > /bigdata/app/spark/conf/spark-defaults.conf << ‘EOF’
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
EOF
# 3. 创建Delta表(Spark)
import io.delta.tables._
val basePath = “hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log”
val df = spark.read.json(“hdfs://fgedu-nn:8020/bigdata/fgdata/raw/user_log/*”)
df.write.format(“delta”)
.mode(“overwrite”)
.option(“path”, basePath)
.saveAsTable(“fgedu_user_log_delta”)
# 4. 读取Delta表
val deltaDF = spark.read.format(“delta”).load(basePath)
deltaDF.show()
# 5. Upsert操作
val updatesDF = …
val deltaTable = DeltaTable.forPath(spark, basePath)
deltaTable.as(“target”)
.merge(
updatesDF.as(“updates”),
“target.user_id = updates.user_id”
)
.whenMatched
.updateExpr(
Map(
“event_type” -> “updates.event_type”,
“page” -> “updates.page”,
“log_time” -> “updates.log_time”
)
)
.whenNotMatched
.insertAll()
.execute()
# 6. 时间旅行查询
val deltaDFTimeTravel = spark.read.format(“delta”)
.option(“timestampAsOf”, “2024-04-08 10:00:00”)
.load(basePath)
# 7. 查询版本
deltaTable.history().show()
# 8. 回滚
deltaTable.restoreToVersion(10)
deltaTable.restoreToTimestamp(“2024-04-07 00:00:00”)
# 9. 优化
deltaTable.optimize().execute()
deltaTable.vacuum(168)
Part04-生产案例与实战讲解
4.1 实时入湖实战
4.1.1 Flink+Hudi实时入湖
— 1. 创建Kafka源表
CREATE TABLE fgedu_user_log_kafka (
log_time STRING,
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
duration INT,
ip STRING,
dt STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘fgedu_user_events’,
‘properties.bootstrap.servers’ = ‘fgedu-kafka01:9092’,
‘properties.group.id’ = ‘fgedu_flink_hudi’,
‘format’ = ‘json’,
‘scan.startup.mode’ = ‘latest-offset’
);
— 2. 创建Hudi目标表
CREATE TABLE fgedu_user_log_hudi (
log_time TIMESTAMP(3),
user_id BIGINT,
event_type STRING,
page STRING,
item_id BIGINT,
duration INT,
ip STRING,
dt STRING,
WATERMARK FOR log_time AS log_time – INTERVAL ‘5’ SECOND
)
PARTITIONED BY (dt)
WITH (
‘connector’ = ‘hudi’,
‘path’ = ‘hdfs://fgedu-nn:8020/bigdata/fgdata/lakehouse/bronze/fgedu_user_log’,
‘table.type’ = ‘MERGE_ON_READ’,
‘hoodie.datasource.write.recordkey.field’ = ‘user_id’,
‘hoodie.datasource.write.precombine.field’ = ‘log_time’,
‘hoodie.datasource.write.partitionpath.field’ = ‘dt’,
‘hoodie.datasource.hive_sync.enable’ = ‘true’,
‘hoodie.datasource.hive_sync.table’ = ‘fgedu_user_log_hudi’,
‘hoodie.datasource.hive_sync.database’ = ‘fgedu_lakehouse’
);
— 3. 实时写入Hudi
INSERT INTO fgedu_user_log_hudi
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
duration,
ip,
dt
FROM fgedu_user_log_kafka;
— 4. 查看写入进度
— Flink Web UI查看作业状态
— http://fgedu-flink:8081
4.2 交互式查询实战
4.2.1 Presto查询湖仓
cat > /bigdata/app/presto/etc/catalog/hudi.properties << ‘EOF’
connector.name=hudi
hive.metastore.uri=thrift://fgedu-hms:9083
hive.config.resources=/bigdata/app/hadoop/etc/hadoop/core-site.xml,/bigdata/app/hadoop/etc/hadoop/hdfs-site.xml
EOF
— 2. 配置Presto Iceberg连接器
cat > /bigdata/app/presto/etc/catalog/iceberg.properties << ‘EOF’
connector.name=iceberg
hive.metastore.uri=thrift://fgedu-hms:9083
hive.config.resources=/bigdata/app/hadoop/etc/hadoop/core-site.xml,/bigdata/app/hadoop/etc/hadoop/hdfs-site.xml
EOF
— 3. Presto查询Hudi表
presto-cli
USE hudi.fgedu_lakehouse;
SELECT
dt,
user_id,
COUNT(*) AS event_count
FROM fgedu_user_log_hudi
WHERE dt BETWEEN ‘2024-04-01’ AND ‘2024-04-08’
GROUP BY dt, user_id
ORDER BY dt DESC, event_count DESC
LIMIT 100;
— 4. Presto查询Iceberg表
USE iceberg.fgedu_lakehouse;
SELECT
dt,
event_type,
COUNT(*) AS cnt
FROM fgedu_user_log
WHERE dt = ‘2024-04-08’
GROUP BY dt, event_type;
— 5. 时间旅行查询(Iceberg)
SELECT * FROM iceberg.fgedu_lakehouse.fgedu_user_log
FOR SYSTEM_TIME AS OF TIMESTAMP ‘2024-04-07 10:00:00’;
— 6. 联合查询
SELECT
h.user_id,
h.name,
l.event_count
FROM iceberg.fgedu_lakehouse.fgedu_user h
JOIN (
SELECT user_id, COUNT(*) AS event_count
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
WHERE dt = ‘2024-04-08’
GROUP BY user_id
) l ON h.user_id = l.user_id
LIMIT 100;
4.3 数据治理实战
4.3.1 数据治理
— 使用Apache Atlas管理湖仓元数据
— 配置Atlas Hudi Hook
— 配置Atlas Iceberg Hook
— 2. 数据质量检查
— 使用Great Expectations
— 或自定义检查
— 检查NULL值
SELECT
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS user_id_null_cnt,
SUM(CASE WHEN log_time IS NULL THEN 1 ELSE 0 END) AS log_time_null_cnt,
COUNT(*) AS total_cnt
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
WHERE dt = ‘2024-04-08’;
— 检查数据量
SELECT dt, COUNT(*)
FROM hudi.fgedu_lakehouse.fgedu_user_log_hudi
GROUP BY dt
ORDER BY dt DESC;
— 3. 数据安全
— 使用Apache Ranger进行权限控制
— Ranger Hudi Plugin
— Ranger Iceberg Plugin
— 4. 数据血缘
— 使用Apache Atlas查看数据血缘
— 从Kafka到Hudi
— 从Hudi到DWS
— 从DWS到ADS
— 5. 数据生命周期管理
— 自动清理旧分区
— 自动清理旧快照
— 自动合并小文件
— 6. 监控告警
— Prometheus指标
— Grafana可视化
— 配置告警规则
Part05-风哥经验总结与分享
5.1 湖仓一体最佳实践
湖仓一体最佳实践:
- 技术选型:根据业务场景选择合适的表格式
- 分层设计:Bronze-Silver-Gold分层
- 小文件合并:定期合并小文件,提高查询性能
- 快照管理:定期清理旧快照,节省存储空间
- 数据质量:建立完善的数据质量检查体系
5.2 常见问题处理
– 自动合并
– 调整写参数
– 定期optimize
– 调整并行度
# 常见问题2:查询慢
– 合并小文件
– 分区裁剪
– 增加索引
– 更新统计信息
– 使用合适的引擎
# 常见问题3:写入失败
– 检查权限
– 检查空间
– 检查事务冲突
– 重试
– 检查日志
# 常见问题4:元数据问题
– 同步元数据
– 修复表
– 检查HMS
– 检查HDFS
# 常见问题5:时间旅行失败
– 检查快照是否存在
– 检查保留时间
– 检查快照清理策略
5.3 运维检查清单
– [ ] 写入作业正常
– [ ] 查询性能正常
– [ ] 小文件合并
– [ ] 快照管理
– [ ] 元数据同步
– [ ] 数据质量检查
– [ ] 存储空间检查
– [ ] 权限检查
– [ ] 监控告警
– [ ] 日志检查
– [ ] 备份状态
# 日常巡检内容
1. 检查写入作业
2. 检查查询性能
3. 检查小文件
4. 检查存储空间
5. 检查快照
6. 检查元数据
7. 检查告警
8. 检查日志
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
