1. 首页 > Hadoop教程 > 正文

大数据教程FG060-Spark RDD核心编程实战

本文档风哥主要介绍Spark RDD核心编程实战,包括RDD核心概念、RDD创建与操作、RDD转换与行动操作、RDD缓存与持久化等内容,风哥教程参考Spark官方文档RDD Programming Guide、RDD API等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 RDD核心概念

RDD(Resilient Distributed Dataset)是Spark的核心数据抽象,是一个不可变、可分区、里面的元素可并行计算的集合。学习交流加群风哥微信: itpux-com

RDD核心概念:

  • 弹性:数据可以在内存和磁盘之间自动切换
  • 分布式:数据分布在集群多个节点上
  • 数据集:数据的集合,支持多种数据类型
  • 不可变:RDD一旦创建就不能修改
  • 分区:数据被分成多个分区并行处理
  • 依赖:RDD之间有明确的依赖关系

1.2 RDD核心特性

RDD核心特性详解:

# RDD五大特性

1. 分区列表
– 数据被分成多个分区
– 每个分区在独立节点上处理
– 分区数决定并行度

2. 计算函数
– 每个分区有独立的计算函数
– 函数应用于分区中的每个元素
– 支持自定义计算逻辑

3. 依赖关系
– RDD之间有明确的依赖关系
– 形成DAG(有向无环图)
– 支持血统追踪和容错

4. 分区器
– 控制分区策略
– 支持Hash分区和Range分区
– 可自定义分区器

5. 最佳位置
– 数据本地性优化
– 任务调度到数据所在节点
– 减少网络传输

# RDD血缘关系示例
RDD A -> map -> RDD B -> filter -> RDD C -> reduce -> Result
| | |
依赖A 依赖B 依赖C

1.3 RDD操作类型

RDD支持两种操作类型:

# RDD操作类型

1. 转换操作
– 返回新的RDD
– 惰性执行(延迟计算)
– 构建DAG图

常见转换操作:
– map: 元素映射
– filter: 元素过滤
– flatMap: 扁平化映射
– mapPartitions: 分区映射
– union: 合并RDD
– intersection: 交集
– distinct: 去重
– groupByKey: 按Key分组
– reduceByKey: 按Key聚合
– sortByKey: 按Key排序
– join: 连接操作

2. 行动操作
– 返回结果或写入外部存储
– 触发DAG执行
– 触发Job

常见行动操作:
– collect: 收集所有元素
– count: 计数
– first: 第一个元素
– take: 取前N个元素
– reduce: 聚合
– aggregate: 自定义聚合
– countByKey: 按Key计数
– foreach: 遍历元素
– saveAsTextFile: 保存为文本文件

风哥提示:RDD的转换操作是惰性的,只有在行动操作触发时才会真正执行。这种设计可以优化执行计划,提高性能。

Part02-生产环境规划与建议

2.1 RDD设计原则

RDD设计原则建议:

# RDD设计原则

1. 最小化Shuffle
– 避免不必要的Shuffle操作
– 使用广播变量替代大表Join
– 合理使用分区器

2. 合理分区
– 分区数 = 核心数 * 2-4
– 每个分区大小:128MB-256MB
– 避免分区数过多或过少

3. 缓存策略
– 复用的RDD需要缓存
– 选择合适的存储级别
– 及时释放不再使用的缓存

4. 数据本地性
– 优先处理本地数据
– 避免数据倾斜
– 合理设置并行度

# 分区数计算示例
集群核心数:80核
建议分区数:160-320个
每个分区数据量:128MB-256MB

2.2 分区策略规划

分区策略规划建议:

# 分区策略

1. Hash分区
– 默认分区策略
– 适用于均匀分布的数据
– partition = key.hashCode % numPartitions

2. Range分区
– 适用于排序操作
– 数据按范围分布
– 可能导致数据倾斜

3. 自定义分区
– 实现Partitioner接口
– 控制数据分布
– 解决数据倾斜问题

# 分区数设置建议
– 小数据集(<1GB):分区数 = 核心数 - 中等数据集(1GB-100GB):分区数 = 核心数 * 2-3 - 大数据集(>100GB):分区数 = 核心数 * 3-4

# 分区数调整
# 创建RDD时指定分区数
val rdd = sc.parallelize(data, 100)

# 重分区
val repartitioned = rdd.repartition(200) // 增加分区
val coalesced = rdd.coalesce(50) // 减少分区

2.3 缓存策略规划

RDD缓存策略规划:

# 存储级别

MEMORY_ONLY # 仅内存,默认
MEMORY_ONLY_SER # 内存序列化
MEMORY_AND_DISK # 内存+磁盘
MEMORY_AND_DISK_SER # 内存+磁盘序列化
DISK_ONLY # 仅磁盘
MEMORY_ONLY_2 # 内存,2副本
MEMORY_AND_DISK_2 # 内存+磁盘,2副本

# 缓存策略选择

1. 内存充足
– MEMORY_ONLY
– 适合频繁访问的数据

2. 内存有限
– MEMORY_AND_DISK
– 溢出数据写入磁盘

3. 数据量大
– MEMORY_ONLY_SER
– 序列化节省空间

4. 高可用需求
– MEMORY_ONLY_2
– 多副本提高可靠性

# 缓存使用示例
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.cache() // 等价于 persist(MEMORY_ONLY)
rdd.unpersist() // 释放缓存

生产环境建议:对于需要多次使用的RDD,建议使用persist()进行缓存,并根据内存情况选择合适的存储级别。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 RDD创建实战

3.1.1 从集合创建RDD

# 启动Spark Shell
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g

# 从集合创建RDD
scala> val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val rdd = sc.parallelize(data, 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :26

scala> rdd.partitions.size
res0: Int = 3

scala> rdd.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

# 使用makeRDD创建RDD
scala> val rdd2 = sc.makeRDD(data, 3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :26

# 创建带分区的RDD
scala> val rdd3 = sc.parallelize(1 to 100, 10)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :26

scala> rdd3.getNumPartitions
res2: Int = 10

3.1.2 从外部文件创建RDD

# 创建测试数据文件
$ mkdir -p /bigdata/spark-data
$ echo -e “hello world\nhello spark\nhello fgedu” > /bigdata/spark-data/fgedu.txt
$ hdfs dfs -mkdir -p /spark-data
$ hdfs dfs -put /bigdata/spark-data/fgedu.txt /spark-data/

# 从本地文件创建RDD
scala> val textFile = sc.textFile(“file:///bigdata/spark-data/fgedu.txt”)
textFile: org.apache.spark.rdd.RDD[String] = file:///bigdata/spark-data/fgedu.txt MapPartitionsRDD[3] at textFile at :24

scala> textFile.collect()
res3: Array[String] = Array(hello world, hello spark, hello fgedu)

# 从HDFS创建RDD
scala> val hdfsFile = sc.textFile(“hdfs://192.168.1.60:9000/spark-data/fgedu.txt”)
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.1.60:9000/spark-data/fgedu.txt MapPartitionsRDD[5] at textFile at :24

scala> hdfsFile.collect()
res4: Array[String] = Array(hello world, hello spark, hello fgedu)

# 从多个文件创建RDD
scala> val multiFile = sc.textFile(“hdfs://192.168.1.60:9000/spark-data/*.txt”)
multiFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.1.60:9000/spark-data/*.txt MapPartitionsRDD[7] at textFile at :24

# 查看分区信息
scala> hdfsFile.getNumPartitions
res5: Int = 2

3.2 RDD转换操作实战

3.2.1 基本转换操作

# map操作
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24

scala> val mapped = rdd.map(_ * 2)
mapped: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at :26

scala> mapped.collect()
res6: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

# filter操作
scala> val filtered = rdd.filter(_ > 5)
filtered: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at :26

scala> filtered.collect()
res7: Array[Int] = Array(6, 7, 8, 9, 10)

# flatMap操作
scala> val words = sc.parallelize(Array(“hello world”, “hello spark”))
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at :24

scala> val flatMapped = words.flatMap(_.split(” “))
flatMapped: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at :26

scala> flatMapped.collect()
res8: Array[String] = Array(hello, world, hello, spark)

# mapPartitions操作
scala> val partitionMapped = rdd.mapPartitions(iter => {
| iter.map(_ * 3)
| })
partitionMapped: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at mapPartitions at :27

scala> partitionMapped.collect()
res9: Array[Int] = Array(3, 6, 9, 12, 15, 18, 21, 24, 27, 30)

3.2.2 Key-Value转换操作

# 创建Pair RDD
scala> val pairs = sc.parallelize(Array((“a”, 1), (“b”, 2), (“a”, 3), (“b”, 4), (“c”, 5)))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at :24

# reduceByKey操作
scala> val reduced = pairs.reduceByKey(_ + _)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at :26

scala> reduced.collect()
res10: Array[(String, Int)] = Array((a,4), (b,6), (c,5))

# groupByKey操作
scala> val grouped = pairs.groupByKey()
grouped: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at :26

scala> grouped.collect()
res11: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 3)), (b,CompactBuffer(2, 4)), (c,CompactBuffer(5)))

# sortByKey操作
scala> val sorted = pairs.sortByKey()
sorted: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at sortByKey at :26

scala> sorted.collect()
res12: Array[(String, Int)] = Array((a,1), (a,3), (b,2), (b,4), (c,5))

# mapValues操作
scala> val mappedValues = pairs.mapValues(_ * 2)
mappedValues: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at mapValues at :26

scala> mappedValues.collect()
res13: Array[(String, Int)] = Array((a,2), (b,4), (a,6), (b,8), (c,10))

# keys和values操作
scala> pairs.keys.collect()
res14: Array[String] = Array(a, b, a, b, c)

scala> pairs.values.collect()
res15: Array[Int] = Array(1, 2, 3, 4, 5)

风哥提示:reduceByKey比groupByKey性能更好,因为reduceByKey在Shuffle前会先在本地进行聚合,减少网络传输。更多学习教程公众号风哥教程itpux_com

3.3 RDD行动操作实战

# 创建测试RDD
scala> val rdd = sc.parallelize(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24

# collect操作
scala> rdd.collect()
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

# count操作
scala> rdd.count()
res17: Long = 100

# first操作
scala> rdd.first()
res18: Int = 1

# take操作
scala> rdd.take(5)
res19: Array[Int] = Array(1, 2, 3, 4, 5)

# reduce操作
scala> rdd.reduce(_ + _)
res20: Int = 5050

# aggregate操作
scala> rdd.aggregate(0)(_ + _, _ + _)
res21: Int = 5050

# 统计操作
scala> rdd.sum()
res22: Double = 5050.0

scala> rdd.mean()
res23: Double = 50.5

scala> rdd.max()
res24: Int = 100

scala> rdd.min()
res25: Int = 1

scala> rdd.variance()
res26: Double = 833.25

scala> rdd.stdev()
res27: Double = 28.86607004772212

# countByValue操作
scala> val rdd2 = sc.parallelize(Array(1, 1, 2, 2, 2, 3, 3, 3, 3))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24

scala> rdd2.countByValue()
res28: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 3, 3 -> 4)

# foreach操作
scala> rdd.take(10).foreach(println)
1
2
3
4
5
6
7
8
9
10

# saveAsTextFile操作
scala> rdd.saveAsTextFile(“hdfs://192.168.1.60:9000/spark-output/fgedu-rdd-output”)

# 查看输出
$ hdfs dfs -ls /spark-output/fgedu-rdd-output
Found 3 items
-rw-r–r– 3 spark supergroup 0 2026-04-08 11:00 /spark-output/fgedu-rdd-output/_SUCCESS
-rw-r–r– 3 spark supergroup 400 2026-04-08 11:00 /spark-output/fgedu-rdd-output/part-00000
-rw-r–r– 3 spark supergroup 400 2026-04-08 11:00 /spark-output/fgedu-rdd-output/part-00001

Part04-生产案例与实战讲解

4.1 WordCount实战案例

# WordCount完整案例

# 准备数据
$ cat > /bigdata/spark-data/fgedu-wordcount.txt << 'EOF' hello world hello spark spark is fast spark is easy hello fgedu hello bigdata bigdata hadoop spark kafka EOF $ hdfs dfs -put /bigdata/spark-data/fgedu-wordcount.txt /spark-data/ # WordCount实现 scala> val textFile = sc.textFile(“hdfs://192.168.1.60:9000/spark-data/fgedu-wordcount.txt”)
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.1.60:9000/spark-data/fgedu-wordcount.txt MapPartitionsRDD[25] at textFile at :24

scala> val counts = textFile
| .flatMap(_.split(” “))
| .map(word => (word, 1))
| .reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at :26

scala> counts.collect()
res29: Array[(String, Int)] = Array((hello,4), (world,1), (spark,3), (is,2), (fast,1), (easy,1), (fgedu,1), (bigdata,2), (hadoop,1), (kafka,1))

# 按词频排序
scala> val sortedCounts = counts
| .map(_.swap)
| .sortByKey(ascending = false)
| .map(_.swap)
sortedCounts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[35] at map at :28

scala> sortedCounts.collect()
res30: Array[(String, Int)] = Array((hello,4), (spark,3), (is,2), (bigdata,2), (world,1), (fast,1), (easy,1), (fgedu,1), (hadoop,1), (kafka,1))

# 保存结果
scala> sortedCounts.saveAsTextFile(“hdfs://192.168.1.60:9000/spark-output/fgedu-wordcount-output”)

# 查看结果
$ hdfs dfs -cat /spark-output/fgedu-wordcount-output/part-00000

(hello,4)
(spark,3)
(is,2)
(bigdata,2)

4.2 RDD连接操作实战

# 创建测试数据
scala> val users = sc.parallelize(Array(
| (1, “fgedu01”),
| (2, “fgedu02”),
| (3, “fgedu03”)
| ))
users: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[36] at parallelize at :24

scala> val orders = sc.parallelize(Array(
| (1, “Order001”),
| (1, “Order002”),
| (2, “Order003”),
| (4, “Order004”)
| ))
orders: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at :24

# join操作
scala> val joined = users.join(orders)
joined: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[40] at join at :28

scala> joined.collect()
res31: Array[(Int, (String, String))] = Array((1,(fgedu01,Order001)), (1,(fgedu01,Order002)), (2,(fgedu02,Order003)))

# leftOuterJoin操作
scala> val leftJoined = users.leftOuterJoin(orders)
leftJoined: org.apache.spark.rdd.RDD[(Int, (String, Option[String]))] = MapPartitionsRDD[43] at leftOuterJoin at :28

scala> leftJoined.collect()
res32: Array[(Int, (String, Option[String]))] = Array((1,(fgedu01,Some(Order001))), (1,(fgedu01,Some(Order002))), (2,(fgedu02,Some(Order003))), (3,(fgedu03,None)))

# rightOuterJoin操作
scala> val rightJoined = users.rightOuterJoin(orders)
rightJoined: org.apache.spark.rdd.RDD[(Int, (Option[String], String))] = MapPartitionsRDD[46] at rightOuterJoin at :28

scala> rightJoined.collect()
res33: Array[(Int, (Option[String], String))] = Array((1,(Some(fgedu01),Order001)), (1,(Some(fgedu01),Order002)), (2,(Some(fgedu02),Order003)), (4,(None,Order004)))

# cogroup操作
scala> val cogrouped = users.cogroup(orders)
cogrouped: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[48] at cogroup at :28

scala> cogrouped.collect()
res34: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(fgedu01),CompactBuffer(Order001, Order002))), (2,(CompactBuffer(fgedu02),CompactBuffer(Order003))), (3,(CompactBuffer(fgedu03),CompactBuffer())), (4,(CompactBuffer(),CompactBuffer(Order004))))

4.3 常见问题处理

4.3.1 数据倾斜问题

# 问题现象:某些Task执行时间过长

# 排查步骤
# 1. 查看Task执行时间
# 在Spark UI中查看Stage详情

# 2. 查看数据分布
scala> pairs.map(_._1).countByValue().foreach(println)

# 解决方案
# 1. 增加分区数
scala> val repartitioned = pairs.repartition(100)

# 2. 使用盐值
scala> val salted = pairs.map(x => (x._1 + “_” + scala.util.Random.nextInt(10), x._2))

# 3. 使用广播变量
scala> val broadcastVar = sc.broadcast(smallTable.collect())

4.3.2 内存溢出问题

# 问题现象:OutOfMemoryError

# 排查步骤
# 1. 检查Executor内存配置
$ grep executor.memory /bigdata/app/spark/conf/spark-defaults.conf

# 2. 检查缓存数据量
scala> sc.getRDDStorageInfo.foreach(println)

# 解决方案
# 1. 增加Executor内存
spark.executor.memory 16g

# 2. 使用序列化存储
scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER)

# 3. 减少分区数据量
scala> val rdd = sc.textFile(“path”, 200)

# 4. 清理不再使用的缓存
scala> rdd.unpersist()

Part05-风哥经验总结与分享

5.1 RDD编程最佳实践

RDD编程最佳实践建议:

# 编程最佳实践
1. 使用reduceByKey替代groupByKey
2. 合理设置分区数
3. 复用RDD时进行缓存
4. 避免collect大数据集
5. 使用广播变量共享数据
6. 使用累加器收集指标

# 性能优化建议
1. 使用Kryo序列化
2. 优化数据本地性
3. 避免Shuffle操作
4. 合理设置并行度
5. 监控内存使用

5.2 性能调优建议

性能调优建议:

Spark RDD性能调优建议:

  • 合理设置并行度:分区数 = 核心数 * 2-4
  • 使用Kryo序列化提高性能
  • 缓存复用的RDD
  • 避免不必要的Shuffle
  • 使用广播变量减少数据传输

5.3 调试技巧

RDD调试技巧:

  • 查看RDD血缘:rdd.toDebugString
  • 查看分区数:rdd.getNumPartitions
  • 查看缓存信息:sc.getRDDStorageInfo
  • 设置日志级别:sc.setLogLevel(“DEBUG”)
  • 查看Stage信息:Spark Web UI
风哥提示:RDD是Spark的核心数据抽象,掌握RDD编程是学习Spark的基础。建议多练习RDD操作,理解惰性执行和血缘关系。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息