1. 首页 > Hadoop教程 > 正文

大数据教程FG151-Hadoop Hudi实时入湖实战

本文详细介绍Apache Hudi实时入湖的实战技术,包括Hudi概述、表类型、架构原理、环境部署、数据写入更新、查询优化等内容,风哥教程参考Hudi官方文档Getting Started、Core Concepts、Configuration等内容,适合大数据工程师在生产环境中使用。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Hudi概述与核心特性

Apache Hudi(Hadoop Upserts Deletes and Incrementals)是一个开源的大数据存储管理框架,用于在HDFS或云存储上构建增量数据管道和数据湖。学习交流加群风哥微信: itpux-com

Hudi核心特性:

  • Upserts/Deletes:支持更新和删除操作
  • 增量处理:支持增量数据读取
  • ACID事务:保证数据一致性
  • 时间旅行:支持数据版本回溯
  • Schema演进:支持表结构变更
  • 多种查询引擎:支持Spark、Hive、Presto等

1.2 Hudi表类型与存储模型

Hudi提供两种主要的表类型,分别适用于不同的业务场景:

# Copy-on-Write (CoW) 表
– 写入时同步合并数据
– 读性能好,写性能较差
– 适用于读多写少场景
– 数据文件格式:Parquet

# Merge-on-Read (MoR) 表
– 写入时先写日志文件
– 读取时异步合并
– 写性能好,读性能有优化空间
– 适用于写多读少场景
– 数据文件格式:Parquet + Avro

1.3 Hudi架构与核心组件

Hudi架构包含以下核心组件:

  • Timeline:时间轴,记录所有表操作
  • File Groups:文件组,管理数据文件
  • Compaction:压缩,合并小文件和日志
  • Clustering:聚类,优化数据布局
  • Indexing:索引,快速定位记录
风哥提示:选择表类型时要根据业务读写比例来决定。如果读多写少,选择CoW表;如果写多读少,选择MoR表。学习交流加群风哥QQ113257174

Part02-生产环境规划与建议

2.1 Hudi生产环境规划

Hudi生产环境规划要点:

# 存储规划
– 数据存储路径:/bigdata/fgdata/hudi
– 建议使用HDFS或云存储(S3/OSS/COS)
– 数据目录权限:hdfs:hdfs 755
– 预留存储空间:数据量的3-5倍

# 表设计规划
– RecordKey:唯一标识字段,如id
– PrecombineField:预组合字段,如update_time
– PartitionPath:分区字段,如dt
– 表类型选择:根据读写比例

# 资源规划
– Spark Executor内存:8GB-32GB
– Spark Executor核数:2-8核
– 并行度:根据数据量调整
– Compaction频率:根据业务需求

2.2 Hudi核心参数配置

Hudi核心参数配置建议:

# 表基本配置
hoodie.table.name=fgedu_hudi_table
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.precombine.field=update_time
hoodie.datasource.write.partitionpath.field=dt
hoodie.datasource.write.table.type=COPY_ON_WRITE

# 写入性能配置
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=200
hoodie.parquet.small.file.limit=104857600

# 压缩配置
hoodie.compact.inline=true
hoodie.compact.inline.max.delta.commits=10
hoodie.cleaner.commits.retained=10
hoodie.clean.automatic=true

# 索引配置
hoodie.index.type=BLOOM
hoodie.bloom.index.filter.type=DYNAMIC_V0
hoodie.bloom.index.parallelism=100

2.3 Hudi资源配置建议

Hudi资源配置建议:

生产环境资源建议:

  • 小型集群:10TB以下数据,Spark Executor 8GB内存,2核,50个并行度
  • 中型集群:10TB-100TB数据,Spark Executor 16GB内存,4核,100-200个并行度
  • 大型集群:100TB以上数据,Spark Executor 32GB内存,8核,200-500个并行度

更多学习教程公众号风哥教程itpux_com

Part03-生产环境项目实施方案

3.1 Hudi环境安装部署

3.1.1 下载并配置Hudi

# 1. 下载Hudi
$ cd /bigdata/app
$ wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.3-bundle_2.12/0.14.0/hudi-spark3.3-bundle_2.12-0.14.0.jar

# 2. 配置Spark环境
$ vi /bigdata/app/spark/conf/spark-defaults.conf

spark.jars=/bigdata/app/hudi-spark3.3-bundle_2.12-0.14.0.jar
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet=false

# 3. 创建数据目录
$ hdfs dfs -mkdir -p /bigdata/fgdata/hudi
$ hdfs dfs -chown hdfs:hdfs /bigdata/fgdata/hudi
$ hdfs dfs -chmod 755 /bigdata/fgdata/hudi

# 4. 验证环境
$ spark-shell –packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0

3.2 Hudi表创建与数据写入

3.2.1 使用Spark创建Hudi表

# 启动Spark Shell
$ spark-shell –packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0 \
–conf “spark.serializer=org.apache.spark.serializer.KryoSerializer”

# 导入Hudi包
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

# 配置表信息
val tableName = “fgedu_hudi_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_hudi_table”

# 创建测试数据
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

# 写入Hudi表
df.write.format(“hudi”).
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

# 验证数据
val roViewDF = spark.read.format(“hudi”).load(basePath)
roViewDF.createOrReplaceTempView(“fgedu_hudi_table”)
spark.sql(“select uuid, partitionpath, rider, begin_lon, begin_lat, ts from fgedu_hudi_table”).show()

3.3 Hudi数据更新与删除

3.3.1 Hudi数据更新操作

# 生成更新数据
val updates = convertToStringList(dataGen.generateUpdates(10))
val updateDF = spark.read.json(spark.sparkContext.parallelize(updates, 2))

# 执行Upsert操作
updateDF.write.format(“hudi”).
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

# 验证更新后的数据
val roViewDFAfterUpdate = spark.read.format(“hudi”).load(basePath)
roViewDFAfterUpdate.createOrReplaceTempView(“fgedu_hudi_table_after_update”)
spark.sql(“select uuid, partitionpath, rider, begin_lon, begin_lat, ts from fgedu_hudi_table_after_update order by ts desc”).show()

# 增量查询
val increments = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, “0”).
load(basePath)
increments.show()

风哥提示:Hudi的Upsert操作会自动处理更新和插入。对于删除操作,需要在数据中添加_hoodie_is_deleted=true标识。from bigdata视频:www.itpux.com

Part04-生产案例与实战讲解

4.1 实时数据流写入Hudi实战

4.1.1 使用Spark Streaming写入Hudi

# 创建实时写入Hudi的Spark Streaming应用
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

object FgeduHudiStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(“FgeduHudiStreaming”)
.config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.getOrCreate()

import spark.implicits._

val tableName = “fgedu_streaming_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_streaming_table”

// 模拟读取Kafka数据
val kafkaDF = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “192.168.1.100:9092,192.168.1.101:9092”)
.option(“subscribe”, “fgedu_events”)
.load()

// 解析JSON数据
val parsedDF = kafkaDF.selectExpr(“CAST(value AS STRING)”)
.select(from_json($”value”, schema).as(“data”))
.select(“data.*”)

// 写入Hudi
val query = parsedDF.writeStream
.format(“hudi”)
.option(TABLE_NAME, tableName)
.option(RECORDKEY_FIELD_OPT_KEY, “id”)
.option(PRECOMBINE_FIELD_OPT_KEY, “ts”)
.option(PARTITIONPATH_FIELD_OPT_KEY, “dt”)
.option(“hoodie.datasource.write.operation”, “upsert”)
.option(“checkpointLocation”, “/bigdata/fgdata/checkpoints/fgedu_streaming”)
.start(basePath)

query.awaitTermination()
}
}

# 提交应用
$ spark-submit –class FgeduHudiStreaming \
–packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0 \
–master yarn \
–deploy-mode cluster \
–num-executors 10 \
–executor-memory 16G \
–executor-cores 4 \
fgedu-hudi-streaming.jar

4.2 Hudi Merge-on-Read表实战

4.2.1 创建MoR表并写入数据

# 配置MoR表
val tableName = “fgedu_mor_table”
val basePath = “hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_mor_table”

# 写入MoR表
df.write.format(“hudi”).
option(TABLE_NAME, tableName).
option(RECORDKEY_FIELD_OPT_KEY, “uuid”).
option(PRECOMBINE_FIELD_OPT_KEY, “ts”).
option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”).
option(WRITE_OPERATION_OPT_KEY, “insert”).
option(TABLE_TYPE_OPT_KEY, “MERGE_ON_READ”).
option(COMPACTION_INLINE_OPT_KEY, “true”).
option(COMPACTION_INLINE_MAX_DELTA_COMMITS_OPT_KEY, “5”).
mode(Overwrite).
save(basePath)

# 读取优化视图(只读Parquet)
val roViewDF = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).
load(basePath)
roViewDF.show()

# 读取实时视图(合并日志文件)
val rtViewDF = spark.read.format(“hudi”).
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL).
load(basePath)
rtViewDF.show()

4.3 Hudi数据查询与优化

4.3.1 使用Hive查询Hudi表

# 创建Hive外部表
CREATE EXTERNAL TABLE fgedu_hudi_hive_table (
uuid string,
partitionpath string,
rider string,
begin_lon double,
begin_lat double,
ts bigint
)
STORED AS INPUTFORMAT ‘org.apache.hudi.hadoop.hive.HoodieParquetInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://fgedu.net.cn:8020/bigdata/fgdata/hudi/fgedu_hudi_table’;

# 查询Hudi表
SELECT * FROM fgedu_hudi_hive_table LIMIT 10;

# 使用Presto查询Hudi表
# 在Presto中配置Hudi连接器
SELECT * FROM hudi.fgedu_hudi_table WHERE partitionpath = ‘2024/01/01’ LIMIT 10;

# 查询优化建议
# 1. 使用分区过滤
# 2. 使用RecordKey过滤
# 3. 使用增量查询
# 4. 合理设置并行度
# 5. 定期执行Compaction和Clustering

生产环境建议:对于MoR表,建议配置自动Compaction,避免日志文件过多影响查询性能。同时定期执行Clustering优化数据布局。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

5.1 Hudi生产最佳实践

Hudi生产最佳实践:

  • 表类型选择:读多写少选CoW,写多读少选MoR
  • 索引选择:中小表用Bloom索引,大表用Global Index
  • 分区设计:根据查询模式合理设计分区
  • Compaction:MoR表必须配置Compaction策略
  • Clustering:定期执行Clustering优化数据布局
  • 监控告警:监控写入延迟、文件数量、Compaction状态

5.2 Hudi常见问题处理

# 常见问题1:写入性能慢
– 增加并行度
– 使用Bulk Insert替代Upsert
– 优化索引配置
– 调整Executor资源

# 常见问题2:小文件过多
– 增加hoodie.parquet.small.file.limit
– 配置自动Cleaner
– 定期执行Clustering

# 常见问题3:查询性能慢
– 使用分区过滤
– MoR表使用Read Optimized查询
– 执行Compaction
– 更新统计信息

# 常见问题4:数据一致性问题
– 检查RecordKey设计
– 验证PrecombineField
– 使用时间旅行回溯数据
– 检查Timeline状态

5.3 Hudi运维检查清单

# Hudi运维检查清单
– [ ] 表类型选择合理
– [ ] RecordKey和PrecombineField设计正确
– [ ] 分区策略合理
– [ ] 索引配置优化
– [ ] Compaction策略配置
– [ ] Cleaner策略配置
– [ ] 资源配置合理
– [ ] 监控告警配置
– [ ] 定期备份元数据
– [ ] 定期执行Clustering
– [ ] 小文件治理
– [ ] 查询性能监控
– [ ] 数据一致性校验

# 日常巡检内容
1. 检查写入延迟
2. 检查文件数量
3. 检查Compaction状态
4. 检查Cleaner状态
5. 检查存储空间使用
6. 检查查询性能
7. 检查错误日志
8. 检查资源使用情况

风哥提示:Hudi是一个功能强大的实时入湖框架,但要充分发挥其性能,需要根据业务场景合理配置参数。建议先在测试环境充分验证后再上线生产环境。学习交流加群风哥微信: itpux-com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息