本文档风哥主要介绍Spark性能调优实战,包括Spark内存模型、Shuffle机制、内存调优、并行度调优、数据倾斜处理等内容,风哥教程参考Spark官方文档Tuning Guide、Configuration等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Spark性能调优概述
Spark性能调优是提升Spark应用执行效率的关键环节,需要从资源配置、代码优化、数据倾斜处理等多个维度进行优化。学习交流加群风哥微信: itpux-com
- 资源配置:Executor数量、内存、核心数
- 并行度:分区数、任务并行度
- 内存管理:内存分配、缓存策略
- Shuffle优化:减少Shuffle、优化Shuffle配置
- 序列化:选择高效序列化方式
- 数据倾斜:处理数据分布不均匀
1.2 Spark内存模型
Spark内存模型详解:
+—————————+
| Executor Memory |
+—————————+
| Reserved Memory | 300MB(固定)
+—————————+
| User Memory | 用户自定义数据结构
+—————————+
| Spark Memory |
| +———————+ |
| | Storage Memory | | 缓存数据、广播变量
| +———————+ |
| | Execution Memory | | Shuffle、Join、聚合
| +———————+ |
+—————————+
# 内存配置参数
spark.memory.fraction = 0.6 # Spark内存占比
spark.memory.storageFraction = 0.5 # Storage内存占比
spark.memory.offHeap.enabled # 堆外内存
spark.memory.offHeap.size # 堆外内存大小
# 内存计算示例
Executor内存:16GB
Reserved Memory:300MB
Available Memory:16GB – 300MB = 15.7GB
Spark Memory:15.7GB * 0.6 = 9.4GB
Storage Memory:9.4GB * 0.5 = 4.7GB
Execution Memory:9.4GB * 0.5 = 4.7GB
1.3 Shuffle机制原理
Shuffle机制详解:
1. Map阶段
– Map任务处理数据
– 写入内存缓冲区
– 溢写到磁盘文件
2. Shuffle Write
– 按分区排序
– 写入磁盘文件
– 生成索引文件
3. Shuffle Read
– 拉取数据
– 聚合/排序
– 传递给下游任务
# Shuffle Manager
– SortShuffleManager(默认)
– 支持三种写入模式:
1. Bypass Merge Sort:分区数<200
2. Unsafe Shuffle:序列化后排序
3. Sort Shuffle:默认模式
# Shuffle配置
spark.shuffle.compress=true # 压缩Shuffle输出
spark.shuffle.file.buffer=32k # Shuffle缓冲区大小
spark.shuffle.io.maxRetries=3 # IO重试次数
spark.shuffle.io.retryWait=5s # 重试等待时间
spark.reducer.maxSizeInFlight=48m # Reduce拉取数据大小
Part02-生产环境规划与建议
2.1 资源配置规划
资源配置规划建议:
1. Executor数量
– 公式:num_executors = 总核心数 / 每个Executor核心数
– 建议:每个节点2-5个Executor
2. Executor核心数
– 建议:每个Executor 2-5个核心
– 过多:资源竞争,性能下降
– 过少:并行度不足
3. Executor内存
– 建议:每个Executor 4-8GB
– 公式:内存 = 数据量 * 2-3倍
– 注意:内存过大导致GC问题
# 生产环境配置示例
–num-executors 20
–executor-cores 4
–executor-memory 8g
–driver-memory 4g
–driver-cores 2
# YARN模式配置
–num-executors 20
–executor-cores 4
–executor-memory 8g
–driver-memory 4g
–conf spark.yarn.executor.memoryOverhead=2g
2.2 并行度规划
并行度规划建议:
1. 默认并行度
spark.default.parallelism = 总核心数 * 2-3
2. Shuffle分区数
spark.sql.shuffle.partitions = 200(默认)
建议:根据数据量调整
3. 分区数计算
– 小数据(<1GB):分区数 = 核心数
- 中等数据(1-100GB):分区数 = 核心数 * 2-3
- 大数据(>100GB):分区数 = 核心数 * 3-4
# 并行度配置示例
spark.default.parallelism=200
spark.sql.shuffle.partitions=400
# 分区大小建议
每个分区数据量:128MB-256MB
分区数 = 总数据量 / 分区大小
# 动态调整
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
2.3 内存规划
内存规划建议:
1. 缓存密集型应用
spark.memory.storageFraction=0.6
增加Storage Memory
2. 计算密集型应用
spark.memory.storageFraction=0.3
增加Execution Memory
3. 混合型应用
spark.memory.storageFraction=0.5
平衡分配
# 堆外内存配置
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# 内存配置示例
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# GC配置
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=20
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=20
Part03-生产环境项目实施方案
3.1 内存调优实战
3.1.1 内存配置优化
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 8g \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5
# 查看内存使用情况
scala> sc.getExecutorMemoryStatus
res0: scala.collection.Map[String,(Long,Long)] = Map(
192.168.1.61:45678 -> (8589934592,6442450944),
192.168.1.62:45678 -> (8589934592,6442450944)
)
# 查看缓存数据
scala> sc.getRDDStorageInfo
res1: Array[org.apache.spark.storage.RDDInfo] = Array(
RDD “fgedu-data” (3 partitions, 512MB, 2 replicas)
)
# 优化内存配置
# 方案1:增加Executor内存
–executor-memory 16g
# 方案2:调整内存比例
–conf spark.memory.fraction=0.7
–conf spark.memory.storageFraction=0.4
# 方案3:启用堆外内存
–conf spark.memory.offHeap.enabled=true
–conf spark.memory.offHeap.size=4g
3.1.2 缓存优化
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel
# 仅内存
scala> rdd.persist(StorageLevel.MEMORY_ONLY)
# 内存+磁盘
scala> rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 序列化存储(节省空间)
scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER)
# 堆外内存
scala> rdd.persist(StorageLevel.OFF_HEAP)
# 多副本
scala> rdd.persist(StorageLevel.MEMORY_ONLY_2)
# 查看缓存效果
scala> rdd.persist(StorageLevel.MEMORY_AND_DISK)
scala> rdd.count() // 触发缓存
scala> rdd.count() // 使用缓存
# 释放缓存
scala> rdd.unpersist()
# 缓存最佳实践
# 1. 复用的RDD才缓存
# 2. 选择合适的存储级别
# 3. 及时释放不再使用的缓存
# 4. 监控缓存命中率
3.2 Shuffle调优实战
3.2.1 Shuffle配置优化
# 增加缓冲区大小
spark.shuffle.file.buffer=64k
# 增加Reduce拉取数据大小
spark.reducer.maxSizeInFlight=96m
# 增加重试次数
spark.shuffle.io.maxRetries=5
# 减少重试等待时间
spark.shuffle.io.retryWait=3s
# 启用Shuffle服务
spark.shuffle.service.enabled=true
# 完整配置示例
$ /bigdata/app/spark/bin/spark-submit \
–master spark://192.168.1.60:7077 \
–conf spark.shuffle.file.buffer=64k \
–conf spark.reducer.maxSizeInFlight=96m \
–conf spark.shuffle.io.maxRetries=5 \
–conf spark.shuffle.service.enabled=true \
–conf spark.sql.shuffle.partitions=400 \
/bigdata/spark-apps/fgedu-app.jar
# 查看Shuffle统计
scala> spark.sparkContext.statusTracker
res2: org.apache.spark.StatusTracker = org.apache.spark.StatusTracker@7b5a12ae
3.2.2 减少Shuffle操作
# 1. 使用广播变量替代大表Join
scala> val smallTable = Seq((“IT”, “信息技术部”), (“HR”, “人力资源部”))
smallTable: Seq[(String, String)] = List((IT,信息技术部), (HR,人力资源部))
scala> val broadcastTable = sc.broadcast(smallTable.toMap)
broadcastTable: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(0)
scala> val largeRDD = sc.parallelize(Seq((“fgedu01”, “IT”), (“fgedu02”, “HR”)))
largeRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at
scala> val result = largeRDD.map(x => (x._1, broadcastTable.value(x._2)))
result: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at
# 2. 使用reduceByKey替代groupByKey
# 不推荐
scala> rdd.groupByKey().mapValues(_.sum)
# 推荐
scala> rdd.reduceByKey(_ + _)
# 3. 使用treeReduce替代reduce
scala> rdd.treeReduce(_ + _, depth=3)
# 4. 预聚合
scala> rdd.mapPartitions(iter => {
| iter.groupBy(_._1).mapValues(_.map(_._2).sum).iterator
| }).reduceByKey(_ + _)
3.3 序列化调优实战
# 启用Kryo序列化
spark.serializer=org.apache.spark.serializer.KryoSerializer
# 配置Kryo缓冲区
spark.kryoserializer.buffer.max=512m
spark.kryoserializer.buffer=64k
# 注册自定义类
scala> import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.serializer.KryoRegistrator
scala> import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Kryo
scala> class FgeduRegistrator extends KryoRegistrator {
| override def registerClasses(kryo: Kryo): Unit = {
| kryo.register(classOf[com.fgedu.User])
| kryo.register(classOf[com.fgedu.Order])
| }
| }
# 配置自定义注册器
spark.kryo.registrator=com.fgedu.FgeduRegistrator
# 完整配置示例
$ /bigdata/app/spark/bin/spark-submit \
–master spark://192.168.1.60:7077 \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.kryoserializer.buffer.max=512m \
–conf spark.kryo.registrationRequired=true \
/bigdata/spark-apps/fgedu-app.jar
# 序列化性能对比
# Java序列化:慢,但兼容性好
# Kryo序列化:快,但需要注册类
Part04-生产案例与实战讲解
4.1 数据倾斜调优实战
4.1.1 数据倾斜诊断
# 1. 查看Task执行时间
# 在Spark UI中查看Stage详情,找出执行时间长的Task
# 2. 查看数据分布
scala> val rdd = sc.parallelize(Seq(
| (“a”, 1), (“a”, 2), (“a”, 3),
| (“b”, 1), (“b”, 2),
| (“c”, 1)
| ))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at
scala> rdd.groupByKey().map(x => (x._1, x._2.size)).collect()
res3: Array[(String, Int)] = Array((a,3), (b,2), (c,1))
# 3. 查看Key分布
scala> rdd.map(_._1).countByValue().foreach(println)
(a,3)
(b,2)
(c,1)
# 4. 查看分区数据量
scala> rdd.mapPartitions(iter => Iterator(iter.size)).collect()
res4: Array[Int] = Array(2, 2, 2)
4.1.2 数据倾斜解决方案
scala> val repartitioned = rdd.repartition(100)
# 方案2:使用Salting技术
scala> val salted = rdd.map(x => {
| val salt = scala.util.Random.nextInt(10)
| (x._1 + “_” + salt, x._2)
| })
salted: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at
scala> val aggregated = salted.reduceByKey(_ + _)
aggregated: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at
scala> val result = aggregated.map(x => {
| val originalKey = x._1.split(“_”)(0)
| (originalKey, x._2)
| }).reduceByKey(_ + _)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at
# 方案3:广播小表
scala> val smallTable = Map(“a” -> “A”, “b” -> “B”, “c” -> “C”)
smallTable: scala.collection.immutable.Map[String,String] = Map(a -> A, b -> B, c -> C)
scala> val broadcastSmall = sc.broadcast(smallTable)
broadcastSmall: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(1)
scala> val joined = rdd.map(x => (x._1, broadcastSmall.value.getOrElse(x._1, “Unknown”), x._2))
joined: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[10] at map at
# 方案4:采样倾斜Key单独处理
scala> val sampled = rdd.sample(false, 0.1).map(_._1).distinct().collect()
sampled: Array[String] = Array(a, b, c)
scala> val skewedKeys = rdd.filter(x => sampled.contains(x._1))
scala> val normalKeys = rdd.filter(x => !sampled.contains(x._1))
4.2 Join优化实战
# 1. 广播Join(小表)
scala> val smallDF = Seq((“IT”, “信息技术部”)).toDF(“dept”, “name”)
smallDF: org.apache.spark.sql.DataFrame = [dept: string, name: string]
scala> val largeDF = Seq((“fgedu01”, “IT”), (“fgedu02”, “HR”)).toDF(“user”, “dept”)
largeDF: org.apache.spark.sql.DataFrame = [user: string, dept: string]
scala> largeDF.join(broadcast(smallDF), “dept”).show()
+—-+——-+———-+
|dept| user| name|
+—-+——-+———-+
| IT|fgedu01|信息技术部|
+—-+——-+———-+
# 2. Sort Merge Join(大表)
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.join.preferSortMergeJoin=true
# 3. Shuffle Hash Join
spark.sql.join.preferSortMergeJoin=false
# 4. 配置广播阈值
spark.sql.autoBroadcastJoinThreshold=10485760 # 10MB
# 5. 分区优化
scala> largeDF.repartition(100, $”dept”).join(smallDF, “dept”)
4.3 常见问题处理
4.3.1 OOM问题处理
# 排查步骤
# 1. 查看Executor日志
$ grep -r “OutOfMemoryError” /bigdata/app/spark/logs/
# 2. 查看内存使用
scala> sc.getExecutorMemoryStatus
# 解决方案
# 1. 增加Executor内存
–executor-memory 16g
# 2. 减少分区数据量
spark.sql.shuffle.partitions=400
# 3. 启用堆外内存
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# 4. 调整内存比例
spark.memory.storageFraction=0.3
# 5. 优化GC
-XX:+UseG1GC -XX:MaxGCPauseMillis=20
4.3.2 GC问题处理
# 排查步骤
# 1. 启用GC日志
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps
# 2. 查看GC统计
# 在Spark UI Executors页面查看GC Time
# 解决方案
# 1. 使用G1垃圾回收器
-XX:+UseG1GC -XX:MaxGCPauseMillis=20
# 2. 增加年轻代大小
-XX:NewRatio=2
# 3. 减少对象创建
# 使用原始类型、避免装箱
# 4. 启用堆外内存
spark.memory.offHeap.enabled=true
# 5. 减少缓存数据量
# 只缓存必要的数据
Part05-风哥经验总结与分享
5.1 调优最佳实践
Spark调优最佳实践建议:
1. 合理配置资源
– Executor数量:节点数 * 2-5
– Executor核心:2-5个
– Executor内存:4-8GB
2. 优化并行度
– 分区数 = 核心数 * 2-4
– 每个分区128-256MB
3. 减少Shuffle
– 使用广播变量
– 使用reduceByKey
4. 处理数据倾斜
– 诊断倾斜Key
– 使用Salting技术
5. 内存优化
– 启用Kryo序列化
– 启用堆外内存
– 合理设置缓存
5.2 调优检查清单
调优检查清单:
- Executor资源配置是否合理
- 并行度是否合适
- 是否启用了Kryo序列化
- 是否存在数据倾斜
- Shuffle配置是否优化
- 缓存策略是否合理
5.3 调优工具推荐
Spark调优工具:
- Spark UI:查看Stage、Task详情
- SQL Tab:查看SQL执行计划
- Storage Tab:查看缓存使用情况
- Environment Tab:查看配置信息
- Executor Tab:查看Executor状态
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
