本文档风哥主要介绍Spark Streaming实时处理实战,包括Spark Streaming核心概念、DStream数据模型、Kafka集成、窗口操作等内容,风哥教程参考Spark官方文档Spark Streaming Programming Guide、Structured Streaming等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Spark Streaming核心概念
Spark Streaming是Spark核心API的扩展,支持高吞吐量、容错的实时数据流处理。学习交流加群风哥微信: itpux-com
- DStream:离散化流,是RDD的连续序列
- 批处理间隔:数据切分成批的时间间隔
- 微批处理:将流数据分成小批次处理
- Receiver:接收器,从数据源接收数据
- Checkpoint:检查点,保存处理状态
- Backpressure:背压机制,动态调整接收速率
1.2 DStream数据模型
DStream数据模型详解:
1. 时间序列的RDD
– DStream是RDD的连续序列
– 每个RDD包含一个时间间隔的数据
– 支持RDD的所有操作
2. 操作类型
– 转换操作:返回新DStream
– 输出操作:将结果写入外部系统
3. 容错机制
– 基于RDD的血缘关系
– 自动恢复失败的数据
– 支持精确一次语义
# DStream内部结构
DStream = RDD1(time: t1) + RDD2(time: t2) + RDD3(time: t3) + …
# 数据处理流程
数据源 -> Receiver -> DStream -> 转换操作 -> 输出操作 -> 外部系统
| | | |
接收数据 分批处理 处理数据 输出结果
# DStream操作示例
# 转换操作
lines.flatMap(_.split(” “)) // 扁平化
.map(word => (word, 1)) // 映射
.reduceByKey(_ + _) // 聚合
# 输出操作
wordCounts.print() // 打印
wordCounts.saveAsTextFiles() // 保存文件
1.3 Spark Streaming核心特性
Spark Streaming核心特性:
- 高吞吐:支持每秒处理数百万条记录
- 容错性:基于RDD的血缘关系自动恢复
- 可扩展:支持横向扩展到数百个节点
- 统一API:与Spark Core、Spark SQL共享API
- 多数据源:支持Kafka、Flume、Kinesis等多种数据源
- 精确一次:支持精确一次语义
Part02-生产环境规划与建议
2.1 流处理应用规划
流处理应用规划建议:
1. 数据源选择
– Kafka:高吞吐消息队列
– Flume:日志收集系统
– Kinesis:AWS流服务
– Socket:测试用
2. 数据量评估
– 消息吞吐量:条/秒
– 消息大小:字节
– 总数据量:GB/小时
3. 延迟要求
– 秒级延迟:批处理间隔1-5秒
– 分钟级延迟:批处理间隔30-60秒
4. 资源规划
– CPU:核心数 = 并行任务数
– 内存:批处理数据量 * 2-3倍
– 网络:数据传输带宽
# 示例规划
数据量:100万条/秒
消息大小:1KB
批处理间隔:5秒
每批数据量:500万条 * 1KB = 5GB
内存需求:5GB * 3 = 15GB
2.2 批处理间隔规划
批处理间隔规划建议:
1. 影响因素
– 数据量大小
– 处理复杂度
– 延迟要求
– 资源限制
2. 推荐设置
– 低延迟场景:1-2秒
– 一般场景:5-10秒
– 高吞吐场景:30-60秒
3. 调优原则
– 处理时间 < 批处理间隔
- 避免数据积压
- 监控处理延迟
# 配置示例
val ssc = new StreamingContext(sc, Seconds(5)) // 5秒批处理间隔
# 监控处理延迟
# 在Spark UI中查看Processing Time和Scheduling Delay
2.3 检查点规划
检查点规划建议:
1. 元数据检查点
– 配置信息
– DStream操作
– 未完成的批次
2. 数据检查点
– 有状态转换的中间结果
– 支持容错恢复
# 检查点配置
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-streaming”)
# 检查点目录结构
/spark-checkpoint/fgedu-streaming/
├── checkpoint-1234567890
├── checkpoint-1234567895
├── receivedBlockMetadata/
└── offsets/
# 检查点清理
# 定期清理旧的检查点文件
spark.streaming.checkpoint.directory = hdfs://192.168.1.60:9000/spark-checkpoint
spark.cleaner.referenceTracking.cleanCheckpoints = true
Part03-生产环境项目实施方案
3.1 Kafka集成实战
3.1.1 创建Kafka Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-streaming-topic \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092
Created topic fgedu-streaming-topic.
# 查看Topic信息
$ /bigdata/app/kafka/bin/kafka-topics.sh –describe \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092
Topic: fgedu-streaming-topic TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-streaming-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-streaming-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-streaming-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
3.1.2 Spark Streaming连接Kafka
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
# 导入必要类
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
scala> import org.apache.spark.streaming.kafka010._
scala> import org.apache.kafka.common.serialization.StringDeserializer
# 创建StreamingContext
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7b5a12ae
# 配置Kafka参数
scala> val kafkaParams = Map[String, Object](
| “bootstrap.servers” -> “192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092”,
| “key.deserializer” -> classOf[StringDeserializer],
| “value.deserializer” -> classOf[StringDeserializer],
| “group.id” -> “fgedu-streaming-group”,
| “auto.offset.reset” -> “latest”,
| “enable.auto.commit” -> (false: java.lang.Boolean)
| )
kafkaParams: scala.collection.immutable.Map[String,Object] = Map(bootstrap.servers -> 192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092, key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, group.id -> fgedu-streaming-group, auto.offset.reset -> latest, enable.auto.commit -> false)
# 创建DStream
scala> val topics = Array(“fgedu-streaming-topic”)
topics: Array[String] = Array(fgedu-streaming-topic)
scala> val stream = KafkaUtils.createDirectStream[String, String](
| ssc,
| PreferConsistent,
| Subscribe[String, String](topics, kafkaParams)
| )
stream: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = DirectKafkaInputDStream[0]
# 处理数据
scala> val lines = stream.map(record => record.value())
lines: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[1]
scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[2]
scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = ShuffledDStream[3]
# 输出结果
scala> wordCounts.print()
# 启动流处理
scala> ssc.start()
scala> ssc.awaitTermination()
3.2 流处理操作实战
3.2.1 基本转换操作
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-test”)
# map操作
scala> val mapped = lines.map(_.toUpperCase)
mapped: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[4]
# filter操作
scala> val filtered = lines.filter(_.length > 5)
filtered: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[5]
# flatMap操作
scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[6]
# reduce操作
scala> val reduced = lines.reduce(_ + ” ” + _)
reduced: org.apache.spark.streaming.dstream.DStream[String] = ReducedDStream[7]
# count操作
scala> val count = lines.count()
count: org.apache.spark.streaming.dstream.DStream[Long] = ForEachDStream[8]
3.2.2 有状态操作
scala> def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
| val newCount = newValues.sum
| val previousCount = runningCount.getOrElse(0)
| Some(newCount + previousCount)
| }
updateFunction: (newValues: Seq[Int], runningCount: Option[Int])Option[Int]
scala> val runningCounts = wordCounts.updateStateByKey(updateFunction _)
runningCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = StateDStream[9]
scala> runningCounts.print()
# mapWithState操作(推荐)
scala> import org.apache.spark.streaming.State
import org.apache.spark.streaming.State
scala> val stateSpec = StateSpec.function((key: String, value: Option[Int], state: State[Int]) => {
| val newState = state.getOption().getOrElse(0) + value.getOrElse(0)
| state.update(newState)
| (key, newState)
| })
stateSpec: org.apache.spark.streaming.StateSpec[String,Int,Int,(String, Int)] = org.apache.spark.streaming.StateSpecImpl@5b37e0d2
scala> val stateDStream = wordCounts.mapWithState(stateSpec)
stateDStream: org.apache.spark.streaming.dstream.MapWithStateDStream[String,Int,Int,(String, Int)] = MapWithStateDStream[10]
scala> stateDStream.print()
3.3 窗口操作实战
1. 滚动窗口
– 窗口长度 = 滑动间隔
– 数据不重叠
2. 滑动窗口
– 窗口长度 > 滑动间隔
– 数据有重叠
# 窗口操作示例
scala> val ssc = new StreamingContext(sc, Seconds(5))
# window操作
scala> val windowed = lines.window(Seconds(30), Seconds(10))
windowed: org.apache.spark.streaming.dstream.DStream[String] = WindowedDStream[11]
# reduceByWindow操作
scala> val reducedWindow = lines.reduceByWindow(_ + ” ” + _, Seconds(30), Seconds(10))
reducedWindow: org.apache.spark.streaming.dstream.DStream[String] = ReducedWindowedDStream[12]
# countByWindow操作
scala> val countWindow = lines.countByWindow(Seconds(30), Seconds(10))
countWindow: org.apache.spark.streaming.dstream.DStream[Long] = ForEachDStream[13]
# reduceByKeyAndWindow操作
scala> val wordCountsWindow = wordCounts.reduceByKeyAndWindow(
| (a: Int, b: Int) => a + b, // reduce函数
| (a: Int, b: Int) => a – b, // inverse reduce函数
| Seconds(30), // 窗口长度
| Seconds(10) // 滑动间隔
| )
wordCountsWindow: org.apache.spark.streaming.dstream.DStream[(String, Int)] = ReducedWindowedDStream[14]
scala> wordCountsWindow.print()
# countByValueAndWindow操作
scala> val countByValueWindow = words.countByValueAndWindow(Seconds(30), Seconds(10))
countByValueWindow: org.apache.spark.streaming.dstream.DStream[(String, Long)] = ForEachDStream[15]
Part04-生产案例与实战讲解
4.1 实时数据分析案例
# 创建应用
$ cat > /bigdata/spark-apps/FgeduStreamingWordCount.scala << 'EOF'
package com.fgedu.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
object FgeduStreamingWordCount {
def main(args: Array[String]): Unit = {
// 配置
val conf = new SparkConf()
.setAppName("fgedu-streaming-wordcount")
.setMaster("spark://192.168.1.60:7077")
// 创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-wordcount")
// Kafka配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> “192.168.1.51:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “fgedu-wordcount-group”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean)
)
// 创建DStream
val topics = Array(“fgedu-streaming-topic”)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// 处理数据
val lines = stream.map(record => record.value())
val words = lines.flatMap(_.split(” “))
val wordCounts = words.map(word => (word, 1))
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(30),
Seconds(10)
)
// 输出结果
wordCounts.print()
// 启动
ssc.start()
ssc.awaitTermination()
}
}
EOF
# 编译并提交应用
$ /bigdata/app/spark/bin/spark-submit \
–class com.fgedu.spark.FgeduStreamingWordCount \
–master spark://192.168.1.60:7077 \
–executor-memory 4g \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
/bigdata/spark-apps/fgedu-streaming-wordcount.jar
# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092
>hello world hello spark
>hello kafka hello fgedu
>spark streaming spark real-time
# 查看输出结果
——————————————-
Time: 2026-04-08 120000
——————————————-
(hello,4)
(spark,3)
(world,1)
(kafka,1)
(fgedu,1)
(streaming,1)
(real-time,1)
4.2 流批一体分析实战
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-join”)
# 创建静态数据(维度表)
scala> val dimData = Seq(
| (“IT”, “信息技术部”),
| (“HR”, “人力资源部”),
| (“Finance”, “财务部”)
| )
dimData: Seq[(String, String)] = List((IT,信息技术部), (HR,人力资源部), (Finance,财务部))
scala> val dimDF = dimData.toDF(“dept_code”, “dept_name”)
dimDF: org.apache.spark.sql.DataFrame = [dept_code: string, dept_name: string]
# 广播维度数据
scala> val dimBroadcast = sc.broadcast(dimDF.collect())
dimBroadcast: org.apache.spark.broadcast.Broadcast[Array[org.apache.spark.sql.Row]] = Broadcast(0)
# 创建流数据
scala> val streamData = KafkaUtils.createDirectStream[String, String](
| ssc,
| PreferConsistent,
| Subscribe[String, String](Array(“fgedu-streaming-topic”), kafkaParams)
| )
# 解析流数据
scala> import org.apache.spark.sql.Row
scala> val parsedStream = streamData.map(record => {
| val parts = record.value().split(“,”)
| (parts(0), parts(1), parts(2)) // (name, dept_code, salary)
| })
# 流批Join
scala> val enrichedStream = parsedStream.map(record => {
| val dimMap = dimBroadcast.value.map(r => (r.getString(0), r.getString(1))).toMap
| val deptName = dimMap.getOrElse(record._2, “Unknown”)
| (record._1, deptName, record._3)
| })
scala> enrichedStream.print()
4.3 常见问题处理
4.3.1 处理延迟问题
# 排查步骤
# 1. 检查处理时间
# 在Spark UI的Streaming Tab查看Processing Time
# 2. 检查资源使用
$ curl http://192.168.1.60:8080/json
# 解决方案
# 1. 增加批处理间隔
val ssc = new StreamingContext(sc, Seconds(10))
# 2. 增加并行度
spark.default.parallelism=200
# 3. 启用背压
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
# 4. 增加Executor资源
–executor-memory 8g
–num-executors 10
4.3.2 数据丢失问题
# 排查步骤
# 1. 检查检查点配置
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint”)
# 2. 检查Kafka offset提交
# 解决方案
# 1. 启用Write Ahead Logs
spark.streaming.receiver.writeAheadLog.enable=true
# 2. 配置精确一次语义
spark.streaming.kafka.maxRatePerPartition=1000
# 3. 手动提交offset
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 处理数据
// 提交offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Part05-风哥经验总结与分享
5.1 Streaming最佳实践
Spark Streaming最佳实践建议:
1. 合理设置批处理间隔
2. 配置检查点目录
3. 使用mapWithState替代updateStateByKey
4. 启用背压机制
5. 监控处理延迟
# 部署最佳实践
1. 配置资源充足
2. 启用WAL提高可靠性
3. 监控应用状态
4. 配置告警机制
5. 定期清理检查点
5.2 性能调优建议
性能调优建议:
- 处理时间 < 批处理间隔
- 启用背压机制动态调整速率
- 使用Kryo序列化
- 合理设置并行度
- 监控Scheduling Delay
5.3 监控运维建议
监控运维建议:
- Spark UI:查看Streaming Tab
- Processing Time:监控处理时间
- Scheduling Delay:监控调度延迟
- Batch Info:查看批次详情
- 告警配置:延迟超过阈值告警
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
