1. 首页 > Hadoop教程 > 正文

大数据教程FG063-Spark性能调优实战

本文档风哥主要介绍Spark性能调优实战,包括Spark内存模型、Shuffle机制、内存调优、并行度调优、数据倾斜处理等内容,风哥教程参考Spark官方文档Tuning Guide、Configuration等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Spark性能调优概述

Spark性能调优是提升Spark应用执行效率的关键环节,需要从资源配置、代码优化、数据倾斜处理等多个维度进行优化。学习交流加群风哥微信: itpux-com

Spark性能调优维度:

  • 资源配置:Executor数量、内存、核心数
  • 并行度:分区数、任务并行度
  • 内存管理:内存分配、缓存策略
  • Shuffle优化:减少Shuffle、优化Shuffle配置
  • 序列化:选择高效序列化方式
  • 数据倾斜:处理数据分布不均匀

1.2 Spark内存模型

Spark内存模型详解:

# Spark内存结构

+—————————+
| Executor Memory |
+—————————+
| Reserved Memory | 300MB(固定)
+—————————+
| User Memory | 用户自定义数据结构
+—————————+
| Spark Memory |
| +———————+ |
| | Storage Memory | | 缓存数据、广播变量
| +———————+ |
| | Execution Memory | | Shuffle、Join、聚合
| +———————+ |
+—————————+

# 内存配置参数
spark.memory.fraction = 0.6 # Spark内存占比
spark.memory.storageFraction = 0.5 # Storage内存占比
spark.memory.offHeap.enabled # 堆外内存
spark.memory.offHeap.size # 堆外内存大小

# 内存计算示例
Executor内存:16GB
Reserved Memory:300MB
Available Memory:16GB – 300MB = 15.7GB
Spark Memory:15.7GB * 0.6 = 9.4GB
Storage Memory:9.4GB * 0.5 = 4.7GB
Execution Memory:9.4GB * 0.5 = 4.7GB

1.3 Shuffle机制原理

Shuffle机制详解:

# Shuffle过程

1. Map阶段
– Map任务处理数据
– 写入内存缓冲区
– 溢写到磁盘文件

2. Shuffle Write
– 按分区排序
– 写入磁盘文件
– 生成索引文件

3. Shuffle Read
– 拉取数据
– 聚合/排序
– 传递给下游任务

# Shuffle Manager
– SortShuffleManager(默认)
– 支持三种写入模式:
1. Bypass Merge Sort:分区数<200 2. Unsafe Shuffle:序列化后排序 3. Sort Shuffle:默认模式 # Shuffle配置 spark.shuffle.compress=true # 压缩Shuffle输出 spark.shuffle.file.buffer=32k # Shuffle缓冲区大小 spark.shuffle.io.maxRetries=3 # IO重试次数 spark.shuffle.io.retryWait=5s # 重试等待时间 spark.reducer.maxSizeInFlight=48m # Reduce拉取数据大小

风哥提示:Shuffle是Spark中最耗资源的操作,应该尽量减少Shuffle次数。可以通过广播变量、合理分区等方式优化。

Part02-生产环境规划与建议

2.1 资源配置规划

资源配置规划建议:

# Executor配置规划

1. Executor数量
– 公式:num_executors = 总核心数 / 每个Executor核心数
– 建议:每个节点2-5个Executor

2. Executor核心数
– 建议:每个Executor 2-5个核心
– 过多:资源竞争,性能下降
– 过少:并行度不足

3. Executor内存
– 建议:每个Executor 4-8GB
– 公式:内存 = 数据量 * 2-3倍
– 注意:内存过大导致GC问题

# 生产环境配置示例
–num-executors 20
–executor-cores 4
–executor-memory 8g
–driver-memory 4g
–driver-cores 2

# YARN模式配置
–num-executors 20
–executor-cores 4
–executor-memory 8g
–driver-memory 4g
–conf spark.yarn.executor.memoryOverhead=2g

2.2 并行度规划

并行度规划建议:

# 并行度设置

1. 默认并行度
spark.default.parallelism = 总核心数 * 2-3

2. Shuffle分区数
spark.sql.shuffle.partitions = 200(默认)
建议:根据数据量调整

3. 分区数计算
– 小数据(<1GB):分区数 = 核心数 - 中等数据(1-100GB):分区数 = 核心数 * 2-3 - 大数据(>100GB):分区数 = 核心数 * 3-4

# 并行度配置示例
spark.default.parallelism=200
spark.sql.shuffle.partitions=400

# 分区大小建议
每个分区数据量:128MB-256MB
分区数 = 总数据量 / 分区大小

# 动态调整
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true

2.3 内存规划

内存规划建议:

# 内存分配策略

1. 缓存密集型应用
spark.memory.storageFraction=0.6
增加Storage Memory

2. 计算密集型应用
spark.memory.storageFraction=0.3
增加Execution Memory

3. 混合型应用
spark.memory.storageFraction=0.5
平衡分配

# 堆外内存配置
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

# 内存配置示例
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

# GC配置
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=20
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=20

生产环境建议:生产环境建议启用堆外内存,减少JVM GC压力。对于缓存密集型应用,增加Storage Memory比例。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 内存调优实战

3.1.1 内存配置优化

# 查看当前内存配置
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 8g \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5

# 查看内存使用情况
scala> sc.getExecutorMemoryStatus
res0: scala.collection.Map[String,(Long,Long)] = Map(
192.168.1.61:45678 -> (8589934592,6442450944),
192.168.1.62:45678 -> (8589934592,6442450944)
)

# 查看缓存数据
scala> sc.getRDDStorageInfo
res1: Array[org.apache.spark.storage.RDDInfo] = Array(
RDD “fgedu-data” (3 partitions, 512MB, 2 replicas)
)

# 优化内存配置
# 方案1:增加Executor内存
–executor-memory 16g

# 方案2:调整内存比例
–conf spark.memory.fraction=0.7
–conf spark.memory.storageFraction=0.4

# 方案3:启用堆外内存
–conf spark.memory.offHeap.enabled=true
–conf spark.memory.offHeap.size=4g

3.1.2 缓存优化

# 缓存策略选择

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

# 仅内存
scala> rdd.persist(StorageLevel.MEMORY_ONLY)

# 内存+磁盘
scala> rdd.persist(StorageLevel.MEMORY_AND_DISK)

# 序列化存储(节省空间)
scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER)

# 堆外内存
scala> rdd.persist(StorageLevel.OFF_HEAP)

# 多副本
scala> rdd.persist(StorageLevel.MEMORY_ONLY_2)

# 查看缓存效果
scala> rdd.persist(StorageLevel.MEMORY_AND_DISK)
scala> rdd.count() // 触发缓存
scala> rdd.count() // 使用缓存

# 释放缓存
scala> rdd.unpersist()

# 缓存最佳实践
# 1. 复用的RDD才缓存
# 2. 选择合适的存储级别
# 3. 及时释放不再使用的缓存
# 4. 监控缓存命中率

3.2 Shuffle调优实战

3.2.1 Shuffle配置优化

# Shuffle配置优化

# 增加缓冲区大小
spark.shuffle.file.buffer=64k

# 增加Reduce拉取数据大小
spark.reducer.maxSizeInFlight=96m

# 增加重试次数
spark.shuffle.io.maxRetries=5

# 减少重试等待时间
spark.shuffle.io.retryWait=3s

# 启用Shuffle服务
spark.shuffle.service.enabled=true

# 完整配置示例
$ /bigdata/app/spark/bin/spark-submit \
–master spark://192.168.1.60:7077 \
–conf spark.shuffle.file.buffer=64k \
–conf spark.reducer.maxSizeInFlight=96m \
–conf spark.shuffle.io.maxRetries=5 \
–conf spark.shuffle.service.enabled=true \
–conf spark.sql.shuffle.partitions=400 \
/bigdata/spark-apps/fgedu-app.jar

# 查看Shuffle统计
scala> spark.sparkContext.statusTracker
res2: org.apache.spark.StatusTracker = org.apache.spark.StatusTracker@7b5a12ae

3.2.2 减少Shuffle操作

# 减少Shuffle的方法

# 1. 使用广播变量替代大表Join
scala> val smallTable = Seq((“IT”, “信息技术部”), (“HR”, “人力资源部”))
smallTable: Seq[(String, String)] = List((IT,信息技术部), (HR,人力资源部))

scala> val broadcastTable = sc.broadcast(smallTable.toMap)
broadcastTable: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(0)

scala> val largeRDD = sc.parallelize(Seq((“fgedu01”, “IT”), (“fgedu02”, “HR”)))
largeRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at :24

scala> val result = largeRDD.map(x => (x._1, broadcastTable.value(x._2)))
result: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at :28

# 2. 使用reduceByKey替代groupByKey
# 不推荐
scala> rdd.groupByKey().mapValues(_.sum)

# 推荐
scala> rdd.reduceByKey(_ + _)

# 3. 使用treeReduce替代reduce
scala> rdd.treeReduce(_ + _, depth=3)

# 4. 预聚合
scala> rdd.mapPartitions(iter => {
| iter.groupBy(_._1).mapValues(_.map(_._2).sum).iterator
| }).reduceByKey(_ + _)

风哥提示:Shuffle是Spark性能瓶颈的主要来源,应该尽量减少Shuffle操作。使用广播变量、reduceByKey等方式可以有效减少Shuffle。更多学习教程公众号风哥教程itpux_com

3.3 序列化调优实战

# 序列化配置

# 启用Kryo序列化
spark.serializer=org.apache.spark.serializer.KryoSerializer

# 配置Kryo缓冲区
spark.kryoserializer.buffer.max=512m
spark.kryoserializer.buffer=64k

# 注册自定义类
scala> import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.serializer.KryoRegistrator

scala> import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Kryo

scala> class FgeduRegistrator extends KryoRegistrator {
| override def registerClasses(kryo: Kryo): Unit = {
| kryo.register(classOf[com.fgedu.User])
| kryo.register(classOf[com.fgedu.Order])
| }
| }

# 配置自定义注册器
spark.kryo.registrator=com.fgedu.FgeduRegistrator

# 完整配置示例
$ /bigdata/app/spark/bin/spark-submit \
–master spark://192.168.1.60:7077 \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.kryoserializer.buffer.max=512m \
–conf spark.kryo.registrationRequired=true \
/bigdata/spark-apps/fgedu-app.jar

# 序列化性能对比
# Java序列化:慢,但兼容性好
# Kryo序列化:快,但需要注册类

Part04-生产案例与实战讲解

4.1 数据倾斜调优实战

4.1.1 数据倾斜诊断

# 数据倾斜诊断方法

# 1. 查看Task执行时间
# 在Spark UI中查看Stage详情,找出执行时间长的Task

# 2. 查看数据分布
scala> val rdd = sc.parallelize(Seq(
| (“a”, 1), (“a”, 2), (“a”, 3),
| (“b”, 1), (“b”, 2),
| (“c”, 1)
| ))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at :24

scala> rdd.groupByKey().map(x => (x._1, x._2.size)).collect()
res3: Array[(String, Int)] = Array((a,3), (b,2), (c,1))

# 3. 查看Key分布
scala> rdd.map(_._1).countByValue().foreach(println)
(a,3)
(b,2)
(c,1)

# 4. 查看分区数据量
scala> rdd.mapPartitions(iter => Iterator(iter.size)).collect()
res4: Array[Int] = Array(2, 2, 2)

4.1.2 数据倾斜解决方案

# 方案1:增加分区数
scala> val repartitioned = rdd.repartition(100)

# 方案2:使用Salting技术
scala> val salted = rdd.map(x => {
| val salt = scala.util.Random.nextInt(10)
| (x._1 + “_” + salt, x._2)
| })
salted: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at :26

scala> val aggregated = salted.reduceByKey(_ + _)
aggregated: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at :26

scala> val result = aggregated.map(x => {
| val originalKey = x._1.split(“_”)(0)
| (originalKey, x._2)
| }).reduceByKey(_ + _)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at :28

# 方案3:广播小表
scala> val smallTable = Map(“a” -> “A”, “b” -> “B”, “c” -> “C”)
smallTable: scala.collection.immutable.Map[String,String] = Map(a -> A, b -> B, c -> C)

scala> val broadcastSmall = sc.broadcast(smallTable)
broadcastSmall: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(1)

scala> val joined = rdd.map(x => (x._1, broadcastSmall.value.getOrElse(x._1, “Unknown”), x._2))
joined: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[10] at map at :28

# 方案4:采样倾斜Key单独处理
scala> val sampled = rdd.sample(false, 0.1).map(_._1).distinct().collect()
sampled: Array[String] = Array(a, b, c)

scala> val skewedKeys = rdd.filter(x => sampled.contains(x._1))
scala> val normalKeys = rdd.filter(x => !sampled.contains(x._1))

4.2 Join优化实战

# Join优化策略

# 1. 广播Join(小表)
scala> val smallDF = Seq((“IT”, “信息技术部”)).toDF(“dept”, “name”)
smallDF: org.apache.spark.sql.DataFrame = [dept: string, name: string]

scala> val largeDF = Seq((“fgedu01”, “IT”), (“fgedu02”, “HR”)).toDF(“user”, “dept”)
largeDF: org.apache.spark.sql.DataFrame = [user: string, dept: string]

scala> largeDF.join(broadcast(smallDF), “dept”).show()
+—-+——-+———-+
|dept| user| name|
+—-+——-+———-+
| IT|fgedu01|信息技术部|
+—-+——-+———-+

# 2. Sort Merge Join(大表)
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.join.preferSortMergeJoin=true

# 3. Shuffle Hash Join
spark.sql.join.preferSortMergeJoin=false

# 4. 配置广播阈值
spark.sql.autoBroadcastJoinThreshold=10485760 # 10MB

# 5. 分区优化
scala> largeDF.repartition(100, $”dept”).join(smallDF, “dept”)

4.3 常见问题处理

4.3.1 OOM问题处理

# 问题现象:OutOfMemoryError

# 排查步骤
# 1. 查看Executor日志
$ grep -r “OutOfMemoryError” /bigdata/app/spark/logs/

# 2. 查看内存使用
scala> sc.getExecutorMemoryStatus

# 解决方案
# 1. 增加Executor内存
–executor-memory 16g

# 2. 减少分区数据量
spark.sql.shuffle.partitions=400

# 3. 启用堆外内存
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

# 4. 调整内存比例
spark.memory.storageFraction=0.3

# 5. 优化GC
-XX:+UseG1GC -XX:MaxGCPauseMillis=20

4.3.2 GC问题处理

# 问题现象:GC时间过长

# 排查步骤
# 1. 启用GC日志
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps

# 2. 查看GC统计
# 在Spark UI Executors页面查看GC Time

# 解决方案
# 1. 使用G1垃圾回收器
-XX:+UseG1GC -XX:MaxGCPauseMillis=20

# 2. 增加年轻代大小
-XX:NewRatio=2

# 3. 减少对象创建
# 使用原始类型、避免装箱

# 4. 启用堆外内存
spark.memory.offHeap.enabled=true

# 5. 减少缓存数据量
# 只缓存必要的数据

Part05-风哥经验总结与分享

5.1 调优最佳实践

Spark调优最佳实践建议:

# 调优最佳实践
1. 合理配置资源
– Executor数量:节点数 * 2-5
– Executor核心:2-5个
– Executor内存:4-8GB

2. 优化并行度
– 分区数 = 核心数 * 2-4
– 每个分区128-256MB

3. 减少Shuffle
– 使用广播变量
– 使用reduceByKey

4. 处理数据倾斜
– 诊断倾斜Key
– 使用Salting技术

5. 内存优化
– 启用Kryo序列化
– 启用堆外内存
– 合理设置缓存

5.2 调优检查清单

调优检查清单:

Spark调优检查清单:

  • Executor资源配置是否合理
  • 并行度是否合适
  • 是否启用了Kryo序列化
  • 是否存在数据倾斜
  • Shuffle配置是否优化
  • 缓存策略是否合理

5.3 调优工具推荐

Spark调优工具:

  • Spark UI:查看Stage、Task详情
  • SQL Tab:查看SQL执行计划
  • Storage Tab:查看缓存使用情况
  • Environment Tab:查看配置信息
  • Executor Tab:查看Executor状态
风哥提示:Spark性能调优是一个持续的过程,需要根据实际应用场景和数据特点进行调整。建议先从资源配置和并行度入手,再处理数据倾斜等问题。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息