本文详细介绍Apache Hudi实时入湖的实战技术,包括Hudi概述、表类型、架构原理、环境部署、数据写入更新、查询优化等内容,风哥教程参考Hudi官方文档Getting Started、Core Concepts、Configuration等内容,适合大数据工程师在生产环境中使用。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Hudi概述与核心特性
Apache Hudi(Hadoop Upserts Deletes and Incrementals)是一个开源的大数据存储管理框架,用于在HDFS或云存储上构建增量数据管道和数据湖。学习交流加群风哥微信: itpux-com
- Upserts/Deletes:支持更新和删除操作
- 增量处理:支持增量数据读取
- ACID事务:保证数据一致性
- 时间旅行:支持数据版本回溯
- Schema演进:支持表结构变更
- 多种查询引擎:支持Spark、Hive、Presto等
1.2 Hudi表类型与存储模型
Hudi提供两种主要的表类型,分别适用于不同的业务场景:
– 写入时同步合并数据
– 读性能好,写性能较差
– 适用于读多写少场景
– 数据文件格式:Parquet
# Merge-on-Read (MoR) 表
– 写入时先写日志文件
– 读取时异步合并
– 写性能好,读性能有优化空间
– 适用于写多读少场景
– 数据文件格式:Parquet + Avro
1.3 Hudi架构与核心组件
Hudi架构包含以下核心组件:
- Timeline:时间轴,记录所有表操作
- File Groups:文件组,管理数据文件
- Compaction:压缩,合并小文件和日志
- Clustering:聚类,优化数据布局
- Indexing:索引,快速定位记录
Part02-生产环境规划与建议
2.1 Hudi生产环境规划
Hudi生产环境规划要点:
– 数据存储路径:/bigdata/fgdata/hudi
– 建议使用HDFS或云存储(S3/OSS/COS)
– 数据目录权限:hdfs:hdfs 755
– 预留存储空间:数据量的3-5倍
# 表设计规划
– RecordKey:唯一标识字段,如id
– PrecombineField:预组合字段,如update_time
– PartitionPath:分区字段,如dt
– 表类型选择:根据读写比例
# 资源规划
– Spark Executor内存:8GB-32GB
– Spark Executor核数:2-8核
– 并行度:根据数据量调整
– Compaction频率:根据业务需求
2.2 Hudi核心参数配置
Hudi核心参数配置建议:
hoodie.table.name=fgedu_hudi_table
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.precombine.field=update_time
hoodie.datasource.write.partitionpath.field=dt
hoodie.datasource.write.table.type=COPY_ON_WRITE
# 写入性能配置
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=200
hoodie.parquet.small.file.limit=104857600
# 压缩配置
hoodie.compact.inline=true
hoodie.compact.inline.max.delta.commits=10
hoodie.cleaner.commits.retained=10
hoodie.clean.automatic=true
# 索引配置
hoodie.index.type=BLOOM
hoodie.bloom.index.filter.type=DYNAMIC_V0
hoodie.bloom.index.parallelism=100
2.3 Hudi资源配置建议
Hudi资源配置建议:
- 小型集群:10TB以下数据,Spark Executor 8GB内存,2核,50个并行度
- 中型集群:10TB-100TB数据,Spark Executor 16GB内存,4核,100-200个并行度
- 大型集群:100TB以上数据,Spark Executor 32GB内存,8核,200-500个并行度
更多学习教程公众号风哥教程itpux_com
Part03-生产环境项目实施方案
3.1 Hudi环境安装部署
3.1.1 下载并配置Hudi
$ cd /bigdata/app
$ wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.3-bundle_2.12/0.14.0/hudi-spark3.3-bundle_2.12-0.14.0.jar
# 2. 配置Spark环境
$ vi /bigdata/app/spark/conf/spark-defaults.conf
spark.jars=/bigdata/app/hudi-spark3.3-bundle_2.12-0.14.0.jar
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet=false
# 3. 创建数据目录
$ hdfs dfs -mkdir -p /bigdata/fgdata/hudi
$ hdfs dfs -chown hdfs:hdfs /bigdata/fgdata/hudi
$ hdfs dfs -chmod 755 /bigdata/fgdata/hudi
# 4. 验证环境
$ spark-shell –packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0
3.2 Hudi表创建与数据写入
3.2.1 使用Spark创建Hudi表
$ spark-shell –packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0 \
–conf “spark.serializer=org.apache.spark.serializer.KryoSerializer”
# 导入Hudi包
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
# 配置表信息
val tableName = “fgedu_hudi_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_hudi_table”
# 创建测试数据
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
# 写入Hudi表
df.write.format(“hudi”).
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
# 验证数据
val roViewDF = spark.read.format(“hudi”).load(basePath)
roViewDF.createOrReplaceTempView(“fgedu_hudi_table”)
spark.sql(“select uuid, partitionpath, rider, begin_lon, begin_lat, ts from fgedu_hudi_table”).show()
3.3 Hudi数据更新与删除
3.3.1 Hudi数据更新操作
val updates = convertToStringList(dataGen.generateUpdates(10))
val updateDF = spark.read.json(spark.sparkContext.parallelize(updates, 2))
# 执行Upsert操作
updateDF.write.format(“hudi”).
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
# 验证更新后的数据
val roViewDFAfterUpdate = spark.read.format(“hudi”).load(basePath)
roViewDFAfterUpdate.createOrReplaceTempView(“fgedu_hudi_table_after_update”)
spark.sql(“select uuid, partitionpath, rider, begin_lon, begin_lat, ts from fgedu_hudi_table_after_update order by ts desc”).show()
# 增量查询
val increments = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, “0”).
load(basePath)
increments.show()
Part04-生产案例与实战讲解
4.1 实时数据流写入Hudi实战
4.1.1 使用Spark Streaming写入Hudi
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
object FgeduHudiStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(“FgeduHudiStreaming”)
.config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.getOrCreate()
import spark.implicits._
val tableName = “fgedu_streaming_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_streaming_table”
// 模拟读取Kafka数据
val kafkaDF = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092,192.168.1.101:9092”)
.option(“subscribe”, “fgedu_events”)
.load()
// 解析JSON数据
val parsedDF = kafkaDF.selectExpr(“CAST(value AS STRING)”)
.select(from_json($”value”, schema).as(“data”))
.select(“data.*”)
// 写入Hudi
val query = parsedDF.writeStream
.format(“hudi”)
.option(TABLE_NAME, tableName)
.option(RECORDKEY_FIELD_OPT_KEY, “id”)
.option(PRECOMBINE_FIELD_OPT_KEY, “ts”)
.option(PARTITIONPATH_FIELD_OPT_KEY, “dt”)
.option(“hoodie.datasource.write.operation”, “upsert”)
.option(“checkpointLocation”, “/bigdata/fgdata/checkpoints/fgedu_streaming”)
.start(basePath)
query.awaitTermination()
}
}
# 提交应用
$ spark-submit –class FgeduHudiStreaming \
–packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0 \
–master yarn \
–deploy-mode cluster \
–num-executors 10 \
–executor-memory 16G \
–executor-cores 4 \
fgedu-hudi-streaming.jar
4.2 Hudi Merge-on-Read表实战
4.2.1 创建MoR表并写入数据
val tableName = “fgedu_mor_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_mor_table”
# 写入MoR表
df.write.format(“hudi”).
option(TABLE_NAME, tableName).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(WRITE_OPERATION_OPT_KEY, “insert”).
option(TABLE_TYPE_OPT_KEY, “MERGE_ON_READ”).
option(COMPACTION_INLINE_OPT_KEY, “true”).
option(COMPACTION_INLINE_MAX_DELTA_COMMITS_OPT_KEY, “5”).
mode(Overwrite).
save(basePath)
# 读取优化视图(只读Parquet)
val roViewDF = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).
load(basePath)
roViewDF.show()
# 读取实时视图(合并日志文件)
val rtViewDF = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL).
load(basePath)
rtViewDF.show()
4.3 Hudi数据查询与优化
4.3.1 使用Hive查询Hudi表
CREATE EXTERNAL TABLE fgedu_hudi_hive_table (
uuid string,
partitionpath string,
rider string,
begin_lon double,
begin_lat double,
ts bigint
)
STORED AS INPUTFORMAT ‘org.apache.hudi.hadoop.hive.HoodieParquetInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_hudi_table’;
# 查询Hudi表
SELECT * FROM fgedu_hudi_hive_table LIMIT 10;
# 使用Presto查询Hudi表
# 在Presto中配置Hudi连接器
SELECT * FROM hudi.fgedu_hudi_table WHERE partitionpath = ‘2024/01/01’ LIMIT 10;
# 查询优化建议
# 1. 使用分区过滤
# 2. 使用RecordKey过滤
# 3. 使用增量查询
# 4. 合理设置并行度
# 5. 定期执行Compaction和Clustering
Part05-风哥经验总结与分享
5.1 Hudi生产最佳实践
Hudi生产最佳实践:
- 表类型选择:读多写少选CoW,写多读少选MoR
- 索引选择:中小表用Bloom索引,大表用Global Index
- 分区设计:根据查询模式合理设计分区
- Compaction:MoR表必须配置Compaction策略
- Clustering:定期执行Clustering优化数据布局
- 监控告警:监控写入延迟、文件数量、Compaction状态
5.2 Hudi常见问题处理
– 增加并行度
– 使用Bulk Insert替代Upsert
– 优化索引配置
– 调整Executor资源
# 常见问题2:小文件过多
– 增加hoodie.parquet.small.file.limit
– 配置自动Cleaner
– 定期执行Clustering
# 常见问题3:查询性能慢
– 使用分区过滤
– MoR表使用Read Optimized查询
– 执行Compaction
– 更新统计信息
# 常见问题4:数据一致性问题
– 检查RecordKey设计
– 验证PrecombineField
– 使用时间旅行回溯数据
– 检查Timeline状态
5.3 Hudi运维检查清单
– [ ] 表类型选择合理
– [ ] RecordKey和PrecombineField设计正确
– [ ] 分区策略合理
– [ ] 索引配置优化
– [ ] Compaction策略配置
– [ ] Cleaner策略配置
– [ ] 资源配置合理
– [ ] 监控告警配置
– [ ] 定期备份元数据
– [ ] 定期执行Clustering
– [ ] 小文件治理
– [ ] 查询性能监控
– [ ] 数据一致性校验
# 日常巡检内容
1. 检查写入延迟
2. 检查文件数量
3. 检查Compaction状态
4. 检查Cleaner状态
5. 检查存储空间使用
6. 检查查询性能
7. 检查错误日志
8. 检查资源使用情况
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
