1. 首页 > Hadoop教程 > 正文

大数据教程FG161-Hadoop Spark性能调优实战

本文详细介绍Spark性能调优实战,包括内存配置、Shuffle优化、代码优化、SQL优化、数据倾斜处理等内容,风哥教程参考Spark官方文档Performance Tuning部分,适合大数据开发工程师使用。学习交流加群风哥微信: itpux-com

Part01-基础概念与理论知识

1.1 Spark性能调优概述

Spark性能调优是通过调整配置、优化代码、改进数据处理方式等手段,提升Spark应用的执行效率和资源利用率。更多视频教程www.fgedu.net.cn

Spark性能调优方向:

  • 资源调优:Executor、内存、CPU配置
  • 并行度调优:分区数、Task数
  • Shuffle调优:Shuffle参数优化
  • 内存调优:RDD缓存、内存管理
  • 代码调优:算子优化、避免Shuffle
  • 数据调优:数据格式、压缩、序列化

1.2 Spark执行架构

Spark核心执行概念:

# Spark核心概念
Application:
– 用户提交的Spark应用
– 包含一个Driver和多个Executor

Driver:
– 运行main函数
– 创建SparkContext
– 构建DAG图
– 调度Task执行

Executor:
– 运行在Worker节点上
– 执行Task
– 存储数据
– 可配置多个

Job:
– 每次Action触发一个Job
– 包含多个Stage

Stage:
– Job的划分单位
– 宽依赖划分Stage
– 包含多个Task

Task:
– 最小执行单位
– 运行在Executor上
– 每个Task处理一个分区

1.3 性能指标介绍

Spark关键性能指标:

  • GC时间:垃圾回收时间,越小越好
  • Shuffle数据量:Shuffle读写数据量,越小越好
  • Task执行时间:Task执行时间,越均匀越好
  • 内存使用:Executor内存使用,避免OOM
  • CPU使用:CPU利用率,越高越好
风哥提示:Spark性能调优要先从Spark UI入手,分析哪个阶段慢、哪个Task慢、Shuffle数据量有多少,然后针对性地优化。不要盲目调整参数。学习交流加群风哥QQ113257174

Part02-生产环境规划与建议

2.1 资源配置规划

资源配置规划要点:

# Executor配置
Executor数量:
– 小应用:2-5个
– 中应用:10-30个
– 大应用:50-100个

Executor内存:
– 小应用:2GB-4GB
– 中应用:8GB-16GB
– 大应用:16GB-64GB

Executor CPU:
– 建议:2-8核
– 推荐:4-6核

# 示例配置(100核200GB集群)
# –executor-memory 16G
# –executor-cores 5
# –num-executors 10
# Driver memory: 4G

# 内存分配
堆内内存:
– 存储内存:60%
– 执行内存:20%
– 其他内存:20%

堆外内存:
– 建议:Executor内存的10%-20%
– 配置:spark.memory.offHeap.enabled=true
– 配置:spark.memory.offHeap.size=2g

2.2 并行度规划

并行度规划要点:

# 并行度配置
spark.default.parallelism:
– 建议:CPU核数的2-3倍
– 例如:100核 = 200-300

spark.sql.shuffle.partitions:
– 建议:200-500
– 根据数据量调整
– 数据量大调大,数据小调小

# 分区大小
目标分区大小:
– 建议:64MB-256MB
– 推荐:128MB

调整方法:
– 减少分区:coalesce
– 增加分区:repartition

2.3 序列化规划

序列化规划要点:

序列化对比:

  • Java序列化:默认,慢,体积大
  • Kryo序列化:快,体积小,推荐

更多学习教程公众号风哥教程itpux_com

Part03-生产环境项目实施方案

3.1 内存配置优化

3.1.1 内存配置参数

# 提交Spark应用示例
spark-submit \
–class com.fgedu.SparkJob \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 16g \
–executor-cores 5 \
–num-executors 10 \
–conf spark.driver.memoryOverhead=2g \
–conf spark.executor.memoryOverhead=2g \
–conf spark.memory.fraction=0.8 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.memory.offHeap.enabled=true \
–conf spark.memory.offHeap.size=4g \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.kryo.registrationRequired=true \
–conf spark.kryo.classesToRegister=com.fgedu.User,com.fgedu.Order \
–conf spark.default.parallelism=200 \
–conf spark.sql.shuffle.partitions=200 \
–conf spark.sql.adaptive.enabled=true \
–conf spark.sql.adaptive.coalescePartitions.enabled=true \
–conf spark.sql.adaptive.skewJoin.enabled=true \
fgedu-spark-job.jar

# 内存配置说明
spark.driver.memory:Driver内存
spark.executor.memory:Executor内存
spark.driver.memoryOverhead:Driver堆外内存
spark.executor.memoryOverhead:Executor堆外内存
spark.memory.fraction:存储和执行内存比例
spark.memory.storageFraction:存储内存比例
spark.memory.offHeap.enabled:启用堆外内存
spark.memory.offHeap.size:堆外内存大小

3.2 Shuffle优化

3.2.1 Shuffle参数配置

# Shuffle优化参数
spark.shuffle.file.buffer:64k(默认32k)
spark.reducer.maxSizeInFlight:96m(默认48m)
spark.shuffle.io.maxRetries:10(默认3)
spark.shuffle.io.retryWait:10s(默认5s)
spark.shuffle.compress:true(默认true)
spark.shuffle.spill.compress:true(默认true)
spark.io.compression.codec:snappy(默认lz4)

# 提交示例
spark-submit \
–conf spark.shuffle.file.buffer=64k \
–conf spark.reducer.maxSizeInFlight=96m \
–conf spark.shuffle.io.maxRetries=10 \
–conf spark.shuffle.io.retryWait=10s \
–conf spark.shuffle.compress=true \
–conf spark.shuffle.spill.compress=true \
–conf spark.io.compression.codec=snappy \
–conf spark.shuffle.service.enabled=true \
fgedu-spark-job.jar

# 减少Shuffle的方法
1. 使用map-side聚合:reduceByKey、aggregateByKey
2. 复用RDD:cache、persist
3. 广播小表:broadcast join
4. 过滤数据:尽早filter
5. 使用SQL优化:AQE

3.3 代码优化

3.3.1 算子优化

# 优化1:使用map-side聚合
# 不推荐
rdd.groupByKey().map(x => (x._1, x._2.sum))
# 推荐
rdd.reduceByKey(_ + _)

# 优化2:使用广播join
# 不推荐
largeRDD.join(smallRDD)
# 推荐
val broadcastSmallRDD = sc.broadcast(smallRDD.collectAsMap())
largeRDD.map(x => (x._1, (x._2, broadcastSmallRDD.value.get(x._1))))

# 优化3:复用RDD
# 不推荐
rdd.map(…).filter(…)
rdd.map(…).groupBy(…)
# 推荐
val cachedRDD = rdd.cache()
cachedRDD.map(…).filter(…)
cachedRDD.map(…).groupBy(…)

# 优化4:合理使用持久化级别
rdd.persist(StorageLevel.MEMORY_ONLY) // 内存
rdd.persist(StorageLevel.MEMORY_ONLY_SER) // 内存序列化
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 内存+磁盘
rdd.persist(StorageLevel.DISK_ONLY) // 磁盘

# 优化5:避免创建大量对象
// 不推荐
rdd.map(x => new User(x._1, x._2))
// 推荐:使用数组或原始类型

# 优化6:使用foreachPartition代替foreach
// 不推荐
rdd.foreach(x => {
val conn = createConnection()
conn.write(x)
conn.close()
})
// 推荐
rdd.foreachPartition(iter => {
val conn = createConnection()
iter.foreach(x => conn.write(x))
conn.close()
})

风哥提示:代码优化是Spark性能调优的重要部分。要避免Shuffle、复用RDD、使用高效的算子。同时要合理配置序列化和内存。from bigdata视频:www.itpux.com

Part04-生产案例与实战讲解

4.1 SQL优化实战

4.1.1 AQE(自适应查询执行)优化

# 启用AQE
spark-submit \
–conf spark.sql.adaptive.enabled=true \
–conf spark.sql.adaptive.coalescePartitions.enabled=true \
–conf spark.sql.adaptive.skewJoin.enabled=true \
–conf spark.sql.adaptive.optimizer.enabled=true \
fgedu-spark-sql.jar

# 编写优化的SQL
— 优化1:尽早过滤
— 不推荐
SELECT a.*, b.*
FROM a JOIN b ON a.id = b.id
WHERE a.date = ‘2024-04-08’;
— 推荐
SELECT a.*, b.*
FROM (SELECT * FROM a WHERE date = ‘2024-04-08’) a
JOIN (SELECT * FROM b WHERE date = ‘2024-04-08’) b
ON a.id = b.id;

— 优化2:使用广播join
— 小表自动广播(默认10MB)
SELECT /*+ BROADCAST(b) */ a.*, b.*
FROM a JOIN b ON a.id = b.id;

— 优化3:合理设置分区
SET spark.sql.shuffle.partitions=200;

— 优化4:使用开窗函数优化
— 不推荐
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) rn
FROM table
) t WHERE rn = 1;
— 推荐:如果数据量大,考虑其他方式

— 优化5:避免笛卡尔积
— 不推荐
SELECT * FROM a, b;
— 推荐:一定要加join条件

4.2 数据倾斜处理实战

4.2.1 数据倾斜处理

# 数据倾斜表现
– 某几个Task执行时间特别长
– 其他Task很快完成
– Spark UI看到Task时间不均匀
– 可能OOM

# 解决方案1:过滤倾斜key
val skewedKeys = Array(“key1”, “key2”, “key3”)
val rddWithoutSkew = rdd.filter(x => !skewedKeys.contains(x._1))
val rddSkew = rdd.filter(x => skewedKeys.contains(x._1))
// 分别处理再合并

# 解决方案2:加盐处理
// 给倾斜的key加随机前缀
val rddWithSalt = rdd.map(x => {
if (isSkewedKey(x._1)) {
(x._1 + “_” + Random.nextInt(100), x._2)
} else {
x
}
})
// 聚合后去掉前缀
val result = rddWithSalt.reduceByKey(_ + _)
.map(x => (x._1.split(“_”)(0), x._2))
.reduceByKey(_ + _)

# 解决方案3:广播小表
// 如果是join导致的倾斜
val smallTableDF = spark.read.table(“small_table”)
val broadcastSmallTable = broadcast(smallTableDF)
val result = largeTableDF.join(broadcastSmallTable, “id”)

# 解决方案4:调整并行度
SET spark.sql.shuffle.partitions=500;

# 解决方案5:使用AQE自动处理
— 启用AQE倾斜join优化
SET spark.sql.adaptive.skewJoin.enabled=true;
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB;

4.3 监控与分析实战

4.3.1 Spark UI使用

# 访问Spark UI
# YARN模式:http://fgedu-rm:8088
# 点击ApplicationMaster进入Spark UI

# Spark UI页面
Jobs页:
– 查看所有Job
– 查看Job状态
– 点击Job查看详情

Stages页:
– 查看所有Stage
– 查看Stage状态
– 点击Stage查看Task详情

Storage页:
– 查看缓存的RDD
– 查看内存使用
– 查看缓存大小

Environment页:
– 查看配置参数
– 查看类路径
– 查看系统属性

Executors页:
– 查看Executor列表
– 查看内存使用
– 查看GC时间
– 查看Task统计

SQL页:
– 查看SQL执行
– 查看执行计划
– 查看数据扫描量

# 分析方法
1. 看哪个Job慢
2. 看Job里哪个Stage慢
3. 看Stage里哪个Task慢
4. 看Shuffle数据量
5. 看GC时间
6. 看Spill情况

生产环境建议:Spark UI是性能调优的最佳工具。每次调优都要先看Spark UI,找出瓶颈,再针对性地优化。不要盲目调整参数。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

5.1 性能调优最佳实践

Spark性能调优最佳实践:

  • 先分析后优化:先看Spark UI,找出瓶颈
  • 资源配置合理:Executor数、内存、CPU要合理
  • 并行度合适:分区数、并行度要合适
  • 减少Shuffle:能不Shuffle就不Shuffle
  • 复用数据:合理使用cache/persist
  • 数据倾斜:发现倾斜及时处理

5.2 常见问题处理

# 常见问题1:OOM
– 增加Executor内存
– 减少并行度
– 增加分区数
– 使用堆外内存
– 优化数据结构

# 常见问题2:Shuffle数据量大
– 尽早过滤数据
– 使用map-side聚合
– 增加并行度
– 使用高效的序列化

# 常见问题3:Task执行时间不均匀
– 检查数据倾斜
– 调整并行度
– 使用AQE
– 加盐处理

# 常见问题4:GC频繁
– 减少创建对象
– 使用Kryo序列化
– 调整内存比例
– 增加堆外内存

# 常见问题5:作业运行慢
– 看Spark UI找瓶颈
– 优化Shuffle
– 优化代码
– 调整资源配置
– 使用AQE

5.3 调优检查清单

# Spark调优检查清单
– [ ] Spark UI已查看
– [ ] 资源配置合理
– [ ] 并行度合适
– [ ] 序列化已优化
– [ ] Shuffle已优化
– [ ] 数据已缓存
– [ ] 代码已优化
– [ ] 数据倾斜已处理
– [ ] AQE已启用
– [ ] GC时间正常
– [ ] 无Spill
– [ ] Task时间均匀

# 调优步骤
1. 运行Spark应用
2. 打开Spark UI
3. 分析哪个阶段慢
4. 找出瓶颈原因
5. 针对性优化
6. 重新运行测试
7. 对比性能提升
8. 重复直到满意

风哥提示:Spark性能调优是一个持续迭代的过程,不是一次就能调到最优。要不断测试、分析、优化,逐步提升性能。同时要注意不要过度优化,保持代码的可读性。学习交流加群风哥微信: itpux-com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息