本文档风哥主要介绍Spark Structured Streaming实战,包括Structured Streaming核心概念、编程模型、Kafka集成、窗口操作等内容,风哥教程参考Spark官方文档Structured Streaming Programming Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Structured Streaming核心概念
Structured Streaming是Spark 2.0引入的流处理API,基于Spark SQL引擎,提供声明式的流处理能力。学习交流加群风哥微信: itpux-com
- 无界表:将流数据视为不断增长的表
- 增量查询:对新数据增量执行查询
- 端到端精确一次:支持精确一次语义
- 统一API:与批处理使用相同的DataFrame API
1.2 编程模型详解
Structured Streaming编程模型详解:
1. 输入表
– 流数据被视为无界表
– 新数据追加到表中
– 表持续增长
2. 查询
– 对输入表执行查询
– 生成结果表
– 增量计算
3. 输出
– 将结果写入外部存储
– 支持多种输出模式
# 处理流程
输入流 -> 输入表 -> 查询 -> 结果表 -> 输出
# 示例
# 创建流DataFrame
val lines = spark.readStream
.format(“socket”)
.option(“host”, “localhost”)
.option(“port”, 9999)
.load()
# 执行查询
val words = lines.as[String].flatMap(_.split(” “))
# 输出结果
val query = words.writeStream
.outputMode(“append”)
.format(“console”)
.start()
query.awaitTermination()
# 与Spark Streaming对比
| 特性 | Structured Streaming | Spark Streaming |
|————|———————|—————–|
| API | DataFrame/Dataset | DStream |
| 模型 | 无界表 | 微批处理 |
| 延迟 | 毫秒级 | 秒级 |
| 精确一次 | 支持 | 需要配置 |
| 事件时间 | 原生支持 | 需要手动处理 |
1.3 输出模式详解
Structured Streaming输出模式详解:
1. Append模式
– 只输出新数据
– 适合无状态查询
– 不适合聚合查询
2. Complete模式
– 输出完整结果表
– 适合聚合查询
– 需要存储完整状态
3. Update模式
– 只输出更新的行
– 适合聚合查询
– 效率比Complete高
# 输出模式选择
– 简单查询(map、filter):Append
– 聚合查询(groupBy):Complete或Update
– 有水印的聚合:Update
# 输出模式示例
# Append模式
df.writeStream
.outputMode(“append”)
.format(“console”)
.start()
# Complete模式
aggDF.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
# Update模式
aggDF.writeStream
.outputMode(“update”)
.format(“console”)
.start()
Part02-生产环境规划与建议
2.1 流处理应用规划
流处理应用规划建议:
1. 数据源选择
– Kafka:高吞吐消息队列
– File Source:文件流
– Socket:测试用
– Rate Source:测试用
2. 数据量评估
– 消息吞吐量:条/秒
– 消息大小:字节
– 状态大小:GB
3. 延迟要求
– 毫秒级:Structured Streaming
– 秒级:Spark Streaming
– 分钟级:批处理
4. 容错要求
– 精确一次:启用Checkpoint
– 至少一次:简单配置
# 资源规划
# Trigger间隔
– 默认:尽可能快
– 固定间隔:ProcessingTime
– 连续处理:Continuous
# 示例配置
spark.sql.shuffle.partitions=200
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.checkpointLocation=/checkpoint/fgedu-streaming
2.2 检查点规划
检查点规划建议:
# 检查点目录结构
/checkpoint/fgedu-streaming/
├── offsets/ # 偏移量信息
├── commits/ # 提交信息
├── metadata/ # 元数据
├── state/ # 状态数据
└── sources/ # 数据源信息
# 配置检查点
val query = df.writeStream
.option(“checkpointLocation”, “/checkpoint/fgedu-streaming”)
.start()
# 检查点配置
spark.sql.streaming.checkpointLocation=/checkpoint/fgedu-streaming
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.stateStore.rocksdb.compactor.numThreads=2
# 检查点清理
# 定期清理旧的检查点
spark.sql.streaming.minBatchesToRetain=100
2.3 状态管理规划
状态管理规划建议:
1. 内存状态存储
– 默认方式
– 适合小状态
– 快速但占用内存
2. RocksDB状态存储
– 磁盘存储
– 适合大状态
– 需要额外配置
# RocksDB配置
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# RocksDB优化
spark.sql.streaming.stateStore.rocksdb.blockSizeKB=64
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB=512
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB=64
# 状态TTL
# 限制状态保留时间
import org.apache.spark.sql.streaming.GroupStateTimeout
# 状态超时
df.groupByKey(…)
.flatMapGroupsWithState(
OutputMode.Update,
GroupStateTimeout.ProcessingTimeTimeout
)
Part03-生产环境项目实施方案
3.1 Kafka数据源实战
3.1.1 从Kafka读取数据
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1
# 创建Kafka流
scala> val kafkaDF = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092”)
| .option(“subscribe”, “fgedu-streaming-topic”)
| .option(“startingOffsets”, “latest”)
| .option(“failOnDataLoss”, “false”)
| .load()
kafkaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary … 5 more fields]
# 查看Schema
scala> kafkaDF.printSchema()
root
|– key: binary (nullable = true)
|– value: binary (nullable = true)
|– topic: string (nullable = true)
|– partition: integer (nullable = true)
|– offset: long (nullable = true)
|– timestamp: timestamp (nullable = true)
|– timestampType: integer (nullable = true)
# 解析数据
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val lines = kafkaDF.select(
| col(“key”).cast(“string”),
| col(“value”).cast(“string”).alias(“value”),
| col(“topic”),
| col(“partition”),
| col(“offset”),
| col(“timestamp”)
| )
lines: org.apache.spark.sql.DataFrame = [key: string, value: string … 4 more fields]
# 启动查询
scala> val query = lines.writeStream
| .outputMode(“append”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-streaming
# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092
>hello world
>hello spark
>hello fgedu
# 查看输出
——————————————-
Batch: 0
——————————————-
+—-+———–+———————+———+——+——————-+
|key |value |topic |partition|offset|timestamp |
+—-+———–+———————+———+——+——————-+
|null|hello world|fgedu-streaming-topic|0 |0 |2026-04-08 13:00:00|
|null|hello spark|fgedu-streaming-topic|0 |1 |2026-04-08 13:00:01|
|null|hello fgedu|fgedu-streaming-topic|0 |2 |2026-04-08 13:00:02|
+—-+———–+———————+———+——+——————-+
3.1.2 写入数据到Kafka
scala> val writeQuery = lines.writeStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“topic”, “fgedu-output-topic”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-kafka-output”)
| .start()
writeQuery: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-kafka-output
# 查看输出Topic
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-output-topic \
–from-beginning \
–bootstrap-server 192.168.1.51:9092
hello world
hello spark
hello fgedu
# 写入带Key的数据
scala> val keyedDF = lines.select(
| col(“key”),
| to_json(struct(col(“*”))).alias(“value”)
| )
keyedDF: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> val keyedQuery = keyedDF.writeStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“topic”, “fgedu-keyed-output”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-keyed-output”)
| .start()
3.2 流处理操作实战
3.2.1 基本转换操作
scala> val lines = spark.readStream
| .format(“socket”)
| .option(“host”, “localhost”)
| .option(“port”, 9999)
| .load()
| .as[String]
lines: org.apache.spark.sql.Dataset[String] = [value: string]
# 分词
scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.sql.Dataset[String] = [value: string]
# 过滤
scala> val filtered = words.filter(_.length > 3)
filtered: org.apache.spark.sql.Dataset[String] = [value: string]
# 映射
scala> val upperWords = words.map(_.toUpperCase)
upperWords: org.apache.spark.sql.Dataset[String] = [value: string]
# 聚合
scala> val wordCounts = words.groupBy(“value”).count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
# 输出聚合结果
scala> val query = wordCounts.writeStream
| .outputMode(“complete”)
| .format(“console”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – word-counts
# 启动Socket服务
$ nc -lk 9999
hello world hello spark
hello kafka hello fgedu
# 查看输出
——————————————-
Batch: 0
——————————————-
+——+—–+
|value |count|
+——+—–+
|hello |4 |
|world |1 |
|spark |1 |
|kafka |1 |
|fgedu |1 |
+——+—–+
3.2.2 事件时间处理
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val events = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-events”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “name STRING, timestamp STRING”).as(“data”)
| )
| .select(
| col(“data.name”).alias(“name”),
| to_timestamp(col(“data.timestamp”)).alias(“event_time”)
| )
events: org.apache.spark.sql.DataFrame = [name: string, event_time: timestamp]
# 设置水印
scala> val withWatermark = events
| .withWatermark(“event_time”, “10 minutes”)
withWatermark: org.apache.spark.sql.DataFrame = [name: string, event_time: timestamp]
# 按事件时间窗口聚合
scala> val windowedCounts = withWatermark
| .groupBy(
| window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
windowedCounts: org.apache.spark.sql.DataFrame = [window: struct
# 输出结果
scala> val query = windowedCounts.writeStream
| .outputMode(“update”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – windowed-counts
3.3 窗口操作实战
1. 滚动窗口
– 固定大小,不重叠
– window(col(“time”), “5 minutes”)
2. 滑动窗口
– 固定大小,可重叠
– window(col(“time”), “10 minutes”, “5 minutes”)
# 滚动窗口示例
scala> val tumblingWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
tumblingWindow: org.apache.spark.sql.DataFrame = [window: struct
# 滑动窗口示例
scala> val slidingWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| window(col(“event_time”), “10 minutes”, “5 minutes”),
| col(“name”)
| )
| .count()
slidingWindow: org.apache.spark.sql.DataFrame = [window: struct
# 会话窗口示例(Spark 3.2+)
scala> val sessionWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| session_window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
sessionWindow: org.apache.spark.sql.DataFrame = [session_window: struct
# 延迟数据处理
# 水印时间内的迟到数据会被处理
# 超过水印时间的迟到数据会被丢弃
# 输出窗口结果
scala> val query = tumblingWindow.writeStream
| .outputMode(“update”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()
Part04-生产案例与实战讲解
4.1 实时ETL案例
# 创建Kafka源
scala> val kafkaSource = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-raw-data”)
| .option(“startingOffsets”, “latest”)
| .load()
kafkaSource: org.apache.spark.sql.DataFrame = [key: binary, value: binary … 5 more fields]
# 定义Schema
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = StructType(Array(
| StructField(“user_id”, IntegerType),
| StructField(“event_type”, StringType),
| StructField(“item_id”, StringType),
| StructField(“timestamp”, StringType)
| ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,IntegerType,true), StructField(event_type,StringType,true), StructField(item_id,StringType,true), StructField(timestamp,StringType,true))
# 解析数据
scala> val parsedDF = kafkaSource
| .select(from_json(col(“value”).cast(“string”), schema).as(“data”))
| .select(
| col(“data.user_id”).alias(“user_id”),
| col(“data.event_type”).alias(“event_type”),
| col(“data.item_id”).alias(“item_id”),
| to_timestamp(col(“data.timestamp”)).alias(“event_time”)
| )
parsedDF: org.apache.spark.sql.DataFrame = [user_id: int, event_type: string … 2 more fields]
# 数据清洗
scala> val cleanedDF = parsedDF
| .filter(col(“user_id”).isNotNull)
| .filter(col(“event_type”).isin(“click”, “purchase”, “view”))
| .withWatermark(“event_time”, “10 minutes”)
cleanedDF: org.apache.spark.sql.DataFrame = [user_id: int, event_type: string … 2 more fields]
# 写入Hive表
scala> val query = cleanedDF.writeStream
| .format(“parquet”)
| .option(“path”, “hdfs://192.168.1.60:9000/data/dwd/user_events”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-etl”)
| .partitionBy(“event_type”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-etl
# 查看运行状态
scala> query.status
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
“message” : “Waiting for data to arrive”,
“isDataAvailable” : false,
“isTriggerActive” : false
}
scala> query.lastProgress
res1: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
“id” : “fgedu-etl”,
“runId” : “xxx-xxx-xxx”,
“timestamp” : “2026-04-08T13:00:00.000Z”,
“numInputRows” : 1000,
“processedRowsPerSecond” : 100.0,
“durationMs” : {“triggerExecution” : 10000}
}
4.2 流流连接案例
# 创建两个流
scala> val impressions = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-impressions”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “adId STRING, impressionTime TIMESTAMP”).as(“data”)
| )
| .select(
| col(“data.adId”).alias(“adId”),
| col(“data.impressionTime”).alias(“impressionTime”)
| )
| .withWatermark(“impressionTime”, “1 hour”)
impressions: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp]
scala> val clicks = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-clicks”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “adId STRING, clickTime TIMESTAMP”).as(“data”)
| )
| .select(
| col(“data.adId”).alias(“adId”),
| col(“data.clickTime”).alias(“clickTime”)
| )
| .withWatermark(“clickTime”, “1 hour”)
clicks: org.apache.spark.sql.DataFrame = [adId: string, clickTime: timestamp]
# 流流连接
scala> val joined = impressions.join(clicks,
| impressions(“adId”) === clicks(“adId”) &&
| impressions(“impressionTime”) <= clicks("clickTime") &&
| clicks("clickTime") <= impressions("impressionTime") + expr("INTERVAL 1 HOUR"),
| "inner"
| )
joined: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp ... 2 more fields]
# 输出连接结果
scala> val query = joined.writeStream
| .outputMode(“append”)
| .format(“console”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – stream-join
# 流批连接
scala> val staticData = spark.read
| .parquet(“hdfs://192.168.1.60:9000/data/dim/ads/”)
staticData: org.apache.spark.sql.DataFrame = [adId: string, adName: string … 2 more fields]
scala> val streamBatchJoin = impressions.join(staticData, “adId”)
streamBatchJoin: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp … 3 more fields]
4.3 常见问题处理
4.3.1 延迟问题
# 排查步骤
# 1. 查看查询进度
scala> query.lastProgress
# 2. 查看状态
scala> query.status
# 解决方案
# 1. 调整Trigger间隔
scala> val query = df.writeStream
| .trigger(Trigger.ProcessingTime(“5 seconds”))
| .start()
# 2. 增加并行度
spark.sql.shuffle.partitions=400
# 3. 优化状态存储
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# 4. 使用连续处理(低延迟)
scala> val query = df.writeStream
| .trigger(Trigger.Continuous(“1 second”))
| .start()
4.3.2 状态过大问题
# 排查步骤
# 1. 检查状态大小
scala> query.lastProgress.stateOperators
# 解决方案
# 1. 设置水印清理状态
scala> df.withWatermark(“event_time”, “1 hour”)
# 2. 使用状态TTL
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# 3. 减少状态键数量
# 使用更粗粒度的分组
# 4. 定期清理检查点
spark.sql.streaming.minBatchesToRetain=100
Part05-风哥经验总结与分享
5.1 Structured Streaming最佳实践
Structured Streaming最佳实践建议:
1. 使用检查点确保容错
2. 设置合理的水印时间
3. 选择合适的输出模式
4. 监控查询状态
5. 处理迟到数据
# 性能最佳实践
1. 使用RocksDB状态存储
2. 合理设置并行度
3. 优化状态大小
4. 使用适当的Trigger
5.2 性能调优建议
性能调优建议:
- 使用RocksDB状态存储处理大状态
- 设置合理的水印清理过期状态
- 调整并行度提高吞吐量
- 监控查询延迟和状态大小
5.3 工具推荐
Structured Streaming工具:
- StreamingQuery:查询管理
- StreamingQueryListener:查询监听
- StreamingQueryProgress:进度监控
- Spark UI:可视化监控
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
