本文详细介绍Spark性能调优实战,包括内存配置、Shuffle优化、代码优化、SQL优化、数据倾斜处理等内容,风哥教程参考Spark官方文档Performance Tuning部分,适合大数据开发工程师使用。学习交流加群风哥微信: itpux-com
Part01-基础概念与理论知识
1.1 Spark性能调优概述
Spark性能调优是通过调整配置、优化代码、改进数据处理方式等手段,提升Spark应用的执行效率和资源利用率。更多视频教程www.fgedu.net.cn
- 资源调优:Executor、内存、CPU配置
- 并行度调优:分区数、Task数
- Shuffle调优:Shuffle参数优化
- 内存调优:RDD缓存、内存管理
- 代码调优:算子优化、避免Shuffle
- 数据调优:数据格式、压缩、序列化
1.2 Spark执行架构
Spark核心执行概念:
Application:
– 用户提交的Spark应用
– 包含一个Driver和多个Executor
Driver:
– 运行main函数
– 创建SparkContext
– 构建DAG图
– 调度Task执行
Executor:
– 运行在Worker节点上
– 执行Task
– 存储数据
– 可配置多个
Job:
– 每次Action触发一个Job
– 包含多个Stage
Stage:
– Job的划分单位
– 宽依赖划分Stage
– 包含多个Task
Task:
– 最小执行单位
– 运行在Executor上
– 每个Task处理一个分区
1.3 性能指标介绍
Spark关键性能指标:
- GC时间:垃圾回收时间,越小越好
- Shuffle数据量:Shuffle读写数据量,越小越好
- Task执行时间:Task执行时间,越均匀越好
- 内存使用:Executor内存使用,避免OOM
- CPU使用:CPU利用率,越高越好
Part02-生产环境规划与建议
2.1 资源配置规划
资源配置规划要点:
Executor数量:
– 小应用:2-5个
– 中应用:10-30个
– 大应用:50-100个
Executor内存:
– 小应用:2GB-4GB
– 中应用:8GB-16GB
– 大应用:16GB-64GB
Executor CPU:
– 建议:2-8核
– 推荐:4-6核
# 示例配置(100核200GB集群)
# –executor-memory 16G
# –executor-cores 5
# –num-executors 10
# Driver memory: 4G
# 内存分配
堆内内存:
– 存储内存:60%
– 执行内存:20%
– 其他内存:20%
堆外内存:
– 建议:Executor内存的10%-20%
– 配置:spark.memory.offHeap.enabled=true
– 配置:spark.memory.offHeap.size=2g
2.2 并行度规划
并行度规划要点:
spark.default.parallelism:
– 建议:CPU核数的2-3倍
– 例如:100核 = 200-300
spark.sql.shuffle.partitions:
– 建议:200-500
– 根据数据量调整
– 数据量大调大,数据小调小
# 分区大小
目标分区大小:
– 建议:64MB-256MB
– 推荐:128MB
调整方法:
– 减少分区:coalesce
– 增加分区:repartition
2.3 序列化规划
序列化规划要点:
- Java序列化:默认,慢,体积大
- Kryo序列化:快,体积小,推荐
更多学习教程公众号风哥教程itpux_com
Part03-生产环境项目实施方案
3.1 内存配置优化
3.1.1 内存配置参数
spark-submit \
–class com.fgedu.SparkJob \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 16g \
–executor-cores 5 \
–num-executors 10 \
–conf spark.driver.memoryOverhead=2g \
–conf spark.executor.memoryOverhead=2g \
–conf spark.memory.fraction=0.8 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.memory.offHeap.enabled=true \
–conf spark.memory.offHeap.size=4g \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.kryo.registrationRequired=true \
–conf spark.kryo.classesToRegister=com.fgedu.User,com.fgedu.Order \
–conf spark.default.parallelism=200 \
–conf spark.sql.shuffle.partitions=200 \
–conf spark.sql.adaptive.enabled=true \
–conf spark.sql.adaptive.coalescePartitions.enabled=true \
–conf spark.sql.adaptive.skewJoin.enabled=true \
fgedu-spark-job.jar
# 内存配置说明
spark.driver.memory:Driver内存
spark.executor.memory:Executor内存
spark.driver.memoryOverhead:Driver堆外内存
spark.executor.memoryOverhead:Executor堆外内存
spark.memory.fraction:存储和执行内存比例
spark.memory.storageFraction:存储内存比例
spark.memory.offHeap.enabled:启用堆外内存
spark.memory.offHeap.size:堆外内存大小
3.2 Shuffle优化
3.2.1 Shuffle参数配置
spark.shuffle.file.buffer:64k(默认32k)
spark.reducer.maxSizeInFlight:96m(默认48m)
spark.shuffle.io.maxRetries:10(默认3)
spark.shuffle.io.retryWait:10s(默认5s)
spark.shuffle.compress:true(默认true)
spark.shuffle.spill.compress:true(默认true)
spark.io.compression.codec:snappy(默认lz4)
# 提交示例
spark-submit \
–conf spark.shuffle.file.buffer=64k \
–conf spark.reducer.maxSizeInFlight=96m \
–conf spark.shuffle.io.maxRetries=10 \
–conf spark.shuffle.io.retryWait=10s \
–conf spark.shuffle.compress=true \
–conf spark.shuffle.spill.compress=true \
–conf spark.io.compression.codec=snappy \
–conf spark.shuffle.service.enabled=true \
fgedu-spark-job.jar
# 减少Shuffle的方法
1. 使用map-side聚合:reduceByKey、aggregateByKey
2. 复用RDD:cache、persist
3. 广播小表:broadcast join
4. 过滤数据:尽早filter
5. 使用SQL优化:AQE
3.3 代码优化
3.3.1 算子优化
# 不推荐
rdd.groupByKey().map(x => (x._1, x._2.sum))
# 推荐
rdd.reduceByKey(_ + _)
# 优化2:使用广播join
# 不推荐
largeRDD.join(smallRDD)
# 推荐
val broadcastSmallRDD = sc.broadcast(smallRDD.collectAsMap())
largeRDD.map(x => (x._1, (x._2, broadcastSmallRDD.value.get(x._1))))
# 优化3:复用RDD
# 不推荐
rdd.map(…).filter(…)
rdd.map(…).groupBy(…)
# 推荐
val cachedRDD = rdd.cache()
cachedRDD.map(…).filter(…)
cachedRDD.map(…).groupBy(…)
# 优化4:合理使用持久化级别
rdd.persist(StorageLevel.MEMORY_ONLY) // 内存
rdd.persist(StorageLevel.MEMORY_ONLY_SER) // 内存序列化
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 内存+磁盘
rdd.persist(StorageLevel.DISK_ONLY) // 磁盘
# 优化5:避免创建大量对象
// 不推荐
rdd.map(x => new User(x._1, x._2))
// 推荐:使用数组或原始类型
# 优化6:使用foreachPartition代替foreach
// 不推荐
rdd.foreach(x => {
val conn = createConnection()
conn.write(x)
conn.close()
})
// 推荐
rdd.foreachPartition(iter => {
val conn = createConnection()
iter.foreach(x => conn.write(x))
conn.close()
})
Part04-生产案例与实战讲解
4.1 SQL优化实战
4.1.1 AQE(自适应查询执行)优化
spark-submit \
–conf spark.sql.adaptive.enabled=true \
–conf spark.sql.adaptive.coalescePartitions.enabled=true \
–conf spark.sql.adaptive.skewJoin.enabled=true \
–conf spark.sql.adaptive.optimizer.enabled=true \
fgedu-spark-sql.jar
# 编写优化的SQL
— 优化1:尽早过滤
— 不推荐
SELECT a.*, b.*
FROM a JOIN b ON a.id = b.id
WHERE a.date = ‘2024-04-08’;
— 推荐
SELECT a.*, b.*
FROM (SELECT * FROM a WHERE date = ‘2024-04-08’) a
JOIN (SELECT * FROM b WHERE date = ‘2024-04-08’) b
ON a.id = b.id;
— 优化2:使用广播join
— 小表自动广播(默认10MB)
SELECT /*+ BROADCAST(b) */ a.*, b.*
FROM a JOIN b ON a.id = b.id;
— 优化3:合理设置分区
SET spark.sql.shuffle.partitions=200;
— 优化4:使用开窗函数优化
— 不推荐
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) rn
FROM table
) t WHERE rn = 1;
— 推荐:如果数据量大,考虑其他方式
— 优化5:避免笛卡尔积
— 不推荐
SELECT * FROM a, b;
— 推荐:一定要加join条件
4.2 数据倾斜处理实战
4.2.1 数据倾斜处理
– 某几个Task执行时间特别长
– 其他Task很快完成
– Spark UI看到Task时间不均匀
– 可能OOM
# 解决方案1:过滤倾斜key
val skewedKeys = Array(“key1”, “key2”, “key3”)
val rddWithoutSkew = rdd.filter(x => !skewedKeys.contains(x._1))
val rddSkew = rdd.filter(x => skewedKeys.contains(x._1))
// 分别处理再合并
# 解决方案2:加盐处理
// 给倾斜的key加随机前缀
val rddWithSalt = rdd.map(x => {
if (isSkewedKey(x._1)) {
(x._1 + “_” + Random.nextInt(100), x._2)
} else {
x
}
})
// 聚合后去掉前缀
val result = rddWithSalt.reduceByKey(_ + _)
.map(x => (x._1.split(“_”)(0), x._2))
.reduceByKey(_ + _)
# 解决方案3:广播小表
// 如果是join导致的倾斜
val smallTableDF = spark.read.table(“small_table”)
val broadcastSmallTable = broadcast(smallTableDF)
val result = largeTableDF.join(broadcastSmallTable, “id”)
# 解决方案4:调整并行度
SET spark.sql.shuffle.partitions=500;
# 解决方案5:使用AQE自动处理
— 启用AQE倾斜join优化
SET spark.sql.adaptive.skewJoin.enabled=true;
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB;
4.3 监控与分析实战
4.3.1 Spark UI使用
# YARN模式:http://fgedu-rm:8088
# 点击ApplicationMaster进入Spark UI
# Spark UI页面
Jobs页:
– 查看所有Job
– 查看Job状态
– 点击Job查看详情
Stages页:
– 查看所有Stage
– 查看Stage状态
– 点击Stage查看Task详情
Storage页:
– 查看缓存的RDD
– 查看内存使用
– 查看缓存大小
Environment页:
– 查看配置参数
– 查看类路径
– 查看系统属性
Executors页:
– 查看Executor列表
– 查看内存使用
– 查看GC时间
– 查看Task统计
SQL页:
– 查看SQL执行
– 查看执行计划
– 查看数据扫描量
# 分析方法
1. 看哪个Job慢
2. 看Job里哪个Stage慢
3. 看Stage里哪个Task慢
4. 看Shuffle数据量
5. 看GC时间
6. 看Spill情况
Part05-风哥经验总结与分享
5.1 性能调优最佳实践
Spark性能调优最佳实践:
- 先分析后优化:先看Spark UI,找出瓶颈
- 资源配置合理:Executor数、内存、CPU要合理
- 并行度合适:分区数、并行度要合适
- 减少Shuffle:能不Shuffle就不Shuffle
- 复用数据:合理使用cache/persist
- 数据倾斜:发现倾斜及时处理
5.2 常见问题处理
– 增加Executor内存
– 减少并行度
– 增加分区数
– 使用堆外内存
– 优化数据结构
# 常见问题2:Shuffle数据量大
– 尽早过滤数据
– 使用map-side聚合
– 增加并行度
– 使用高效的序列化
# 常见问题3:Task执行时间不均匀
– 检查数据倾斜
– 调整并行度
– 使用AQE
– 加盐处理
# 常见问题4:GC频繁
– 减少创建对象
– 使用Kryo序列化
– 调整内存比例
– 增加堆外内存
# 常见问题5:作业运行慢
– 看Spark UI找瓶颈
– 优化Shuffle
– 优化代码
– 调整资源配置
– 使用AQE
5.3 调优检查清单
– [ ] Spark UI已查看
– [ ] 资源配置合理
– [ ] 并行度合适
– [ ] 序列化已优化
– [ ] Shuffle已优化
– [ ] 数据已缓存
– [ ] 代码已优化
– [ ] 数据倾斜已处理
– [ ] AQE已启用
– [ ] GC时间正常
– [ ] 无Spill
– [ ] Task时间均匀
# 调优步骤
1. 运行Spark应用
2. 打开Spark UI
3. 分析哪个阶段慢
4. 找出瓶颈原因
5. 针对性优化
6. 重新运行测试
7. 对比性能提升
8. 重复直到满意
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
