1. 首页 > Hadoop教程 > 正文

大数据教程FG062-Spark Streaming实时处理实战

本文档风哥主要介绍Spark Streaming实时处理实战,包括Spark Streaming核心概念、DStream数据模型、Kafka集成、窗口操作等内容,风哥教程参考Spark官方文档Spark Streaming Programming Guide、Structured Streaming等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Spark Streaming核心概念

Spark Streaming是Spark核心API的扩展,支持高吞吐量、容错的实时数据流处理。学习交流加群风哥微信: itpux-com

Spark Streaming核心概念:

  • DStream:离散化流,是RDD的连续序列
  • 批处理间隔:数据切分成批的时间间隔
  • 微批处理:将流数据分成小批次处理
  • Receiver:接收器,从数据源接收数据
  • Checkpoint:检查点,保存处理状态
  • Backpressure:背压机制,动态调整接收速率

1.2 DStream数据模型

DStream数据模型详解:

# DStream特点

1. 时间序列的RDD
– DStream是RDD的连续序列
– 每个RDD包含一个时间间隔的数据
– 支持RDD的所有操作

2. 操作类型
– 转换操作:返回新DStream
– 输出操作:将结果写入外部系统

3. 容错机制
– 基于RDD的血缘关系
– 自动恢复失败的数据
– 支持精确一次语义

# DStream内部结构
DStream = RDD1(time: t1) + RDD2(time: t2) + RDD3(time: t3) + …

# 数据处理流程
数据源 -> Receiver -> DStream -> 转换操作 -> 输出操作 -> 外部系统
| | | |
接收数据 分批处理 处理数据 输出结果

# DStream操作示例
# 转换操作
lines.flatMap(_.split(” “)) // 扁平化
.map(word => (word, 1)) // 映射
.reduceByKey(_ + _) // 聚合

# 输出操作
wordCounts.print() // 打印
wordCounts.saveAsTextFiles() // 保存文件

1.3 Spark Streaming核心特性

Spark Streaming核心特性:

  • 高吞吐:支持每秒处理数百万条记录
  • 容错性:基于RDD的血缘关系自动恢复
  • 可扩展:支持横向扩展到数百个节点
  • 统一API:与Spark Core、Spark SQL共享API
  • 多数据源:支持Kafka、Flume、Kinesis等多种数据源
  • 精确一次:支持精确一次语义
风哥提示:Spark Streaming采用微批处理模型,将流数据分成小批次处理,这种设计兼顾了吞吐量和延迟,适合大多数实时处理场景。

Part02-生产环境规划与建议

2.1 流处理应用规划

流处理应用规划建议:

# 应用规划要点

1. 数据源选择
– Kafka:高吞吐消息队列
– Flume:日志收集系统
– Kinesis:AWS流服务
– Socket:测试用

2. 数据量评估
– 消息吞吐量:条/秒
– 消息大小:字节
– 总数据量:GB/小时

3. 延迟要求
– 秒级延迟:批处理间隔1-5秒
– 分钟级延迟:批处理间隔30-60秒

4. 资源规划
– CPU:核心数 = 并行任务数
– 内存:批处理数据量 * 2-3倍
– 网络:数据传输带宽

# 示例规划
数据量:100万条/秒
消息大小:1KB
批处理间隔:5秒
每批数据量:500万条 * 1KB = 5GB
内存需求:5GB * 3 = 15GB

2.2 批处理间隔规划

批处理间隔规划建议:

# 批处理间隔选择

1. 影响因素
– 数据量大小
– 处理复杂度
– 延迟要求
– 资源限制

2. 推荐设置
– 低延迟场景:1-2秒
– 一般场景:5-10秒
– 高吞吐场景:30-60秒

3. 调优原则
– 处理时间 < 批处理间隔 - 避免数据积压 - 监控处理延迟 # 配置示例 val ssc = new StreamingContext(sc, Seconds(5)) // 5秒批处理间隔 # 监控处理延迟 # 在Spark UI中查看Processing Time和Scheduling Delay

2.3 检查点规划

检查点规划建议:

# 检查点类型

1. 元数据检查点
– 配置信息
– DStream操作
– 未完成的批次

2. 数据检查点
– 有状态转换的中间结果
– 支持容错恢复

# 检查点配置
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-streaming”)

# 检查点目录结构
/spark-checkpoint/fgedu-streaming/
├── checkpoint-1234567890
├── checkpoint-1234567895
├── receivedBlockMetadata/
└── offsets/

# 检查点清理
# 定期清理旧的检查点文件
spark.streaming.checkpoint.directory = hdfs://192.168.1.60:9000/spark-checkpoint
spark.cleaner.referenceTracking.cleanCheckpoints = true

生产环境建议:生产环境必须配置检查点目录,确保应用能够从故障中恢复。检查点目录建议使用HDFS等可靠存储。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 Kafka集成实战

3.1.1 创建Kafka Topic

# 创建测试Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-streaming-topic \
–partitions 3 \
–replication-factor 3 \
–bootstrap-server 192.168.1.51:9092

Created topic fgedu-streaming-topic.

# 查看Topic信息
$ /bigdata/app/kafka/bin/kafka-topics.sh –describe \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092

Topic: fgedu-streaming-topic TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-streaming-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-streaming-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-streaming-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

3.1.2 Spark Streaming连接Kafka

# 启动Spark Shell(包含Kafka依赖)
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1

# 导入必要类
scala> import org.apache.spark.streaming.{Seconds, StreamingContext}
scala> import org.apache.spark.streaming.kafka010._
scala> import org.apache.kafka.common.serialization.StringDeserializer

# 创建StreamingContext
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7b5a12ae

# 配置Kafka参数
scala> val kafkaParams = Map[String, Object](
| “bootstrap.servers” -> “192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092”,
| “key.deserializer” -> classOf[StringDeserializer],
| “value.deserializer” -> classOf[StringDeserializer],
| “group.id” -> “fgedu-streaming-group”,
| “auto.offset.reset” -> “latest”,
| “enable.auto.commit” -> (false: java.lang.Boolean)
| )
kafkaParams: scala.collection.immutable.Map[String,Object] = Map(bootstrap.servers -> 192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092, key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, group.id -> fgedu-streaming-group, auto.offset.reset -> latest, enable.auto.commit -> false)

# 创建DStream
scala> val topics = Array(“fgedu-streaming-topic”)
topics: Array[String] = Array(fgedu-streaming-topic)

scala> val stream = KafkaUtils.createDirectStream[String, String](
| ssc,
| PreferConsistent,
| Subscribe[String, String](topics, kafkaParams)
| )
stream: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = DirectKafkaInputDStream[0]

# 处理数据
scala> val lines = stream.map(record => record.value())
lines: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[1]

scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[2]

scala> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = ShuffledDStream[3]

# 输出结果
scala> wordCounts.print()

# 启动流处理
scala> ssc.start()
scala> ssc.awaitTermination()

3.2 流处理操作实战

3.2.1 基本转换操作

# 创建测试流
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-test”)

# map操作
scala> val mapped = lines.map(_.toUpperCase)
mapped: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[4]

# filter操作
scala> val filtered = lines.filter(_.length > 5)
filtered: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[5]

# flatMap操作
scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.streaming.dstream.DStream[String] = MapPartitionsDStream[6]

# reduce操作
scala> val reduced = lines.reduce(_ + ” ” + _)
reduced: org.apache.spark.streaming.dstream.DStream[String] = ReducedDStream[7]

# count操作
scala> val count = lines.count()
count: org.apache.spark.streaming.dstream.DStream[Long] = ForEachDStream[8]

3.2.2 有状态操作

# updateStateByKey操作
scala> def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
| val newCount = newValues.sum
| val previousCount = runningCount.getOrElse(0)
| Some(newCount + previousCount)
| }
updateFunction: (newValues: Seq[Int], runningCount: Option[Int])Option[Int]

scala> val runningCounts = wordCounts.updateStateByKey(updateFunction _)
runningCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = StateDStream[9]

scala> runningCounts.print()

# mapWithState操作(推荐)
scala> import org.apache.spark.streaming.State
import org.apache.spark.streaming.State

scala> val stateSpec = StateSpec.function((key: String, value: Option[Int], state: State[Int]) => {
| val newState = state.getOption().getOrElse(0) + value.getOrElse(0)
| state.update(newState)
| (key, newState)
| })
stateSpec: org.apache.spark.streaming.StateSpec[String,Int,Int,(String, Int)] = org.apache.spark.streaming.StateSpecImpl@5b37e0d2

scala> val stateDStream = wordCounts.mapWithState(stateSpec)
stateDStream: org.apache.spark.streaming.dstream.MapWithStateDStream[String,Int,Int,(String, Int)] = MapWithStateDStream[10]

scala> stateDStream.print()

风哥提示:有状态操作需要配置检查点目录,用于保存中间状态。mapWithState比updateStateByKey性能更好,推荐使用。更多学习教程公众号风哥教程itpux_com

3.3 窗口操作实战

# 窗口操作类型

1. 滚动窗口
– 窗口长度 = 滑动间隔
– 数据不重叠

2. 滑动窗口
– 窗口长度 > 滑动间隔
– 数据有重叠

# 窗口操作示例
scala> val ssc = new StreamingContext(sc, Seconds(5))

# window操作
scala> val windowed = lines.window(Seconds(30), Seconds(10))
windowed: org.apache.spark.streaming.dstream.DStream[String] = WindowedDStream[11]

# reduceByWindow操作
scala> val reducedWindow = lines.reduceByWindow(_ + ” ” + _, Seconds(30), Seconds(10))
reducedWindow: org.apache.spark.streaming.dstream.DStream[String] = ReducedWindowedDStream[12]

# countByWindow操作
scala> val countWindow = lines.countByWindow(Seconds(30), Seconds(10))
countWindow: org.apache.spark.streaming.dstream.DStream[Long] = ForEachDStream[13]

# reduceByKeyAndWindow操作
scala> val wordCountsWindow = wordCounts.reduceByKeyAndWindow(
| (a: Int, b: Int) => a + b, // reduce函数
| (a: Int, b: Int) => a – b, // inverse reduce函数
| Seconds(30), // 窗口长度
| Seconds(10) // 滑动间隔
| )
wordCountsWindow: org.apache.spark.streaming.dstream.DStream[(String, Int)] = ReducedWindowedDStream[14]

scala> wordCountsWindow.print()

# countByValueAndWindow操作
scala> val countByValueWindow = words.countByValueAndWindow(Seconds(30), Seconds(10))
countByValueWindow: org.apache.spark.streaming.dstream.DStream[(String, Long)] = ForEachDStream[15]

Part04-生产案例与实战讲解

4.1 实时数据分析案例

# 实时WordCount完整案例

# 创建应用
$ cat > /bigdata/spark-apps/FgeduStreamingWordCount.scala << 'EOF' package com.fgedu.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.kafka.common.serialization.StringDeserializer object FgeduStreamingWordCount { def main(args: Array[String]): Unit = { // 配置 val conf = new SparkConf() .setAppName("fgedu-streaming-wordcount") .setMaster("spark://192.168.1.60:7077") // 创建StreamingContext val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-wordcount") // Kafka配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> “192.168.1.51:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “fgedu-wordcount-group”,
“auto.offset.reset” -> “latest”,
“enable.auto.commit” -> (false: java.lang.Boolean)
)

// 创建DStream
val topics = Array(“fgedu-streaming-topic”)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

// 处理数据
val lines = stream.map(record => record.value())
val words = lines.flatMap(_.split(” “))
val wordCounts = words.map(word => (word, 1))
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(30),
Seconds(10)
)

// 输出结果
wordCounts.print()

// 启动
ssc.start()
ssc.awaitTermination()
}
}
EOF

# 编译并提交应用
$ /bigdata/app/spark/bin/spark-submit \
–class com.fgedu.spark.FgeduStreamingWordCount \
–master spark://192.168.1.60:7077 \
–executor-memory 4g \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
/bigdata/spark-apps/fgedu-streaming-wordcount.jar

# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092

>hello world hello spark
>hello kafka hello fgedu
>spark streaming spark real-time

# 查看输出结果
——————————————-
Time: 2026-04-08 120000
——————————————-
(hello,4)
(spark,3)
(world,1)
(kafka,1)
(fgedu,1)
(streaming,1)
(real-time,1)

4.2 流批一体分析实战

# 流批一体分析案例

scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint/fgedu-join”)

# 创建静态数据(维度表)
scala> val dimData = Seq(
| (“IT”, “信息技术部”),
| (“HR”, “人力资源部”),
| (“Finance”, “财务部”)
| )
dimData: Seq[(String, String)] = List((IT,信息技术部), (HR,人力资源部), (Finance,财务部))

scala> val dimDF = dimData.toDF(“dept_code”, “dept_name”)
dimDF: org.apache.spark.sql.DataFrame = [dept_code: string, dept_name: string]

# 广播维度数据
scala> val dimBroadcast = sc.broadcast(dimDF.collect())
dimBroadcast: org.apache.spark.broadcast.Broadcast[Array[org.apache.spark.sql.Row]] = Broadcast(0)

# 创建流数据
scala> val streamData = KafkaUtils.createDirectStream[String, String](
| ssc,
| PreferConsistent,
| Subscribe[String, String](Array(“fgedu-streaming-topic”), kafkaParams)
| )

# 解析流数据
scala> import org.apache.spark.sql.Row
scala> val parsedStream = streamData.map(record => {
| val parts = record.value().split(“,”)
| (parts(0), parts(1), parts(2)) // (name, dept_code, salary)
| })

# 流批Join
scala> val enrichedStream = parsedStream.map(record => {
| val dimMap = dimBroadcast.value.map(r => (r.getString(0), r.getString(1))).toMap
| val deptName = dimMap.getOrElse(record._2, “Unknown”)
| (record._1, deptName, record._3)
| })

scala> enrichedStream.print()

4.3 常见问题处理

4.3.1 处理延迟问题

# 问题现象:Scheduling Delay持续增加

# 排查步骤
# 1. 检查处理时间
# 在Spark UI的Streaming Tab查看Processing Time

# 2. 检查资源使用
$ curl http://192.168.1.60:8080/json

# 解决方案
# 1. 增加批处理间隔
val ssc = new StreamingContext(sc, Seconds(10))

# 2. 增加并行度
spark.default.parallelism=200

# 3. 启用背压
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000

# 4. 增加Executor资源
–executor-memory 8g
–num-executors 10

4.3.2 数据丢失问题

# 问题现象:数据丢失或重复

# 排查步骤
# 1. 检查检查点配置
scala> ssc.checkpoint(“hdfs://192.168.1.60:9000/spark-checkpoint”)

# 2. 检查Kafka offset提交

# 解决方案
# 1. 启用Write Ahead Logs
spark.streaming.receiver.writeAheadLog.enable=true

# 2. 配置精确一次语义
spark.streaming.kafka.maxRatePerPartition=1000

# 3. 手动提交offset
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 处理数据
// 提交offset
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Part05-风哥经验总结与分享

5.1 Streaming最佳实践

Spark Streaming最佳实践建议:

# 开发最佳实践
1. 合理设置批处理间隔
2. 配置检查点目录
3. 使用mapWithState替代updateStateByKey
4. 启用背压机制
5. 监控处理延迟

# 部署最佳实践
1. 配置资源充足
2. 启用WAL提高可靠性
3. 监控应用状态
4. 配置告警机制
5. 定期清理检查点

5.2 性能调优建议

性能调优建议:

Spark Streaming性能调优建议:

  • 处理时间 < 批处理间隔
  • 启用背压机制动态调整速率
  • 使用Kryo序列化
  • 合理设置并行度
  • 监控Scheduling Delay

5.3 监控运维建议

监控运维建议:

  • Spark UI:查看Streaming Tab
  • Processing Time:监控处理时间
  • Scheduling Delay:监控调度延迟
  • Batch Info:查看批次详情
  • 告警配置:延迟超过阈值告警
风哥提示:Spark Streaming适合微批处理场景,如果需要更低延迟,建议使用Structured Streaming或Flink。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息