1. 首页 > Hadoop教程 > 正文

大数据教程FG068-Spark Structured Streaming实战

本文档风哥主要介绍Spark Structured Streaming实战,包括Structured Streaming核心概念、编程模型、Kafka集成、窗口操作等内容,风哥教程参考Spark官方文档Structured Streaming Programming Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Structured Streaming核心概念

Structured Streaming是Spark 2.0引入的流处理API,基于Spark SQL引擎,提供声明式的流处理能力。学习交流加群风哥微信: itpux-com

Structured Streaming核心概念:

  • 无界表:将流数据视为不断增长的表
  • 增量查询:对新数据增量执行查询
  • 端到端精确一次:支持精确一次语义
  • 统一API:与批处理使用相同的DataFrame API

1.2 编程模型详解

Structured Streaming编程模型详解:

# 编程模型

1. 输入表
– 流数据被视为无界表
– 新数据追加到表中
– 表持续增长

2. 查询
– 对输入表执行查询
– 生成结果表
– 增量计算

3. 输出
– 将结果写入外部存储
– 支持多种输出模式

# 处理流程
输入流 -> 输入表 -> 查询 -> 结果表 -> 输出

# 示例
# 创建流DataFrame
val lines = spark.readStream
.format(“socket”)
.option(“host”, “localhost”)
.option(“port”, 9999)
.load()

# 执行查询
val words = lines.as[String].flatMap(_.split(” “))

# 输出结果
val query = words.writeStream
.outputMode(“append”)
.format(“console”)
.start()

query.awaitTermination()

# 与Spark Streaming对比
| 特性 | Structured Streaming | Spark Streaming |
|————|———————|—————–|
| API | DataFrame/Dataset | DStream |
| 模型 | 无界表 | 微批处理 |
| 延迟 | 毫秒级 | 秒级 |
| 精确一次 | 支持 | 需要配置 |
| 事件时间 | 原生支持 | 需要手动处理 |

1.3 输出模式详解

Structured Streaming输出模式详解:

# 输出模式类型

1. Append模式
– 只输出新数据
– 适合无状态查询
– 不适合聚合查询

2. Complete模式
– 输出完整结果表
– 适合聚合查询
– 需要存储完整状态

3. Update模式
– 只输出更新的行
– 适合聚合查询
– 效率比Complete高

# 输出模式选择
– 简单查询(map、filter):Append
– 聚合查询(groupBy):Complete或Update
– 有水印的聚合:Update

# 输出模式示例
# Append模式
df.writeStream
.outputMode(“append”)
.format(“console”)
.start()

# Complete模式
aggDF.writeStream
.outputMode(“complete”)
.format(“console”)
.start()

# Update模式
aggDF.writeStream
.outputMode(“update”)
.format(“console”)
.start()

风哥提示:Structured Streaming将流处理抽象为无界表,使用与批处理相同的DataFrame API,大大简化了流处理开发。

Part02-生产环境规划与建议

2.1 流处理应用规划

流处理应用规划建议:

# 应用规划要点

1. 数据源选择
– Kafka:高吞吐消息队列
– File Source:文件流
– Socket:测试用
– Rate Source:测试用

2. 数据量评估
– 消息吞吐量:条/秒
– 消息大小:字节
– 状态大小:GB

3. 延迟要求
– 毫秒级:Structured Streaming
– 秒级:Spark Streaming
– 分钟级:批处理

4. 容错要求
– 精确一次:启用Checkpoint
– 至少一次:简单配置

# 资源规划
# Trigger间隔
– 默认:尽可能快
– 固定间隔:ProcessingTime
– 连续处理:Continuous

# 示例配置
spark.sql.shuffle.partitions=200
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.checkpointLocation=/checkpoint/fgedu-streaming

2.2 检查点规划

检查点规划建议:

# 检查点配置

# 检查点目录结构
/checkpoint/fgedu-streaming/
├── offsets/ # 偏移量信息
├── commits/ # 提交信息
├── metadata/ # 元数据
├── state/ # 状态数据
└── sources/ # 数据源信息

# 配置检查点
val query = df.writeStream
.option(“checkpointLocation”, “/checkpoint/fgedu-streaming”)
.start()

# 检查点配置
spark.sql.streaming.checkpointLocation=/checkpoint/fgedu-streaming
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.stateStore.rocksdb.compactor.numThreads=2

# 检查点清理
# 定期清理旧的检查点
spark.sql.streaming.minBatchesToRetain=100

2.3 状态管理规划

状态管理规划建议:

# 状态存储类型

1. 内存状态存储
– 默认方式
– 适合小状态
– 快速但占用内存

2. RocksDB状态存储
– 磁盘存储
– 适合大状态
– 需要额外配置

# RocksDB配置
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

# RocksDB优化
spark.sql.streaming.stateStore.rocksdb.blockSizeKB=64
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB=512
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB=64

# 状态TTL
# 限制状态保留时间
import org.apache.spark.sql.streaming.GroupStateTimeout

# 状态超时
df.groupByKey(…)
.flatMapGroupsWithState(
OutputMode.Update,
GroupStateTimeout.ProcessingTimeTimeout
)

生产环境建议:生产环境建议使用RocksDB状态存储,支持大状态管理。配置检查点目录确保容错恢复。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 Kafka数据源实战

3.1.1 从Kafka读取数据

# 启动Spark Shell
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1

# 创建Kafka流
scala> val kafkaDF = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092”)
| .option(“subscribe”, “fgedu-streaming-topic”)
| .option(“startingOffsets”, “latest”)
| .option(“failOnDataLoss”, “false”)
| .load()
kafkaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary … 5 more fields]

# 查看Schema
scala> kafkaDF.printSchema()
root
|– key: binary (nullable = true)
|– value: binary (nullable = true)
|– topic: string (nullable = true)
|– partition: integer (nullable = true)
|– offset: long (nullable = true)
|– timestamp: timestamp (nullable = true)
|– timestampType: integer (nullable = true)

# 解析数据
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val lines = kafkaDF.select(
| col(“key”).cast(“string”),
| col(“value”).cast(“string”).alias(“value”),
| col(“topic”),
| col(“partition”),
| col(“offset”),
| col(“timestamp”)
| )
lines: org.apache.spark.sql.DataFrame = [key: string, value: string … 4 more fields]

# 启动查询
scala> val query = lines.writeStream
| .outputMode(“append”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-streaming

# 发送测试数据
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-streaming-topic \
–bootstrap-server 192.168.1.51:9092

>hello world
>hello spark
>hello fgedu

# 查看输出
——————————————-
Batch: 0
——————————————-
+—-+———–+———————+———+——+——————-+
|key |value |topic |partition|offset|timestamp |
+—-+———–+———————+———+——+——————-+
|null|hello world|fgedu-streaming-topic|0 |0 |2026-04-08 13:00:00|
|null|hello spark|fgedu-streaming-topic|0 |1 |2026-04-08 13:00:01|
|null|hello fgedu|fgedu-streaming-topic|0 |2 |2026-04-08 13:00:02|
+—-+———–+———————+———+——+——————-+

3.1.2 写入数据到Kafka

# 写入Kafka
scala> val writeQuery = lines.writeStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“topic”, “fgedu-output-topic”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-kafka-output”)
| .start()
writeQuery: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-kafka-output

# 查看输出Topic
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-output-topic \
–from-beginning \
–bootstrap-server 192.168.1.51:9092

hello world
hello spark
hello fgedu

# 写入带Key的数据
scala> val keyedDF = lines.select(
| col(“key”),
| to_json(struct(col(“*”))).alias(“value”)
| )
keyedDF: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> val keyedQuery = keyedDF.writeStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“topic”, “fgedu-keyed-output”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-keyed-output”)
| .start()

3.2 流处理操作实战

3.2.1 基本转换操作

# 创建流DataFrame
scala> val lines = spark.readStream
| .format(“socket”)
| .option(“host”, “localhost”)
| .option(“port”, 9999)
| .load()
| .as[String]
lines: org.apache.spark.sql.Dataset[String] = [value: string]

# 分词
scala> val words = lines.flatMap(_.split(” “))
words: org.apache.spark.sql.Dataset[String] = [value: string]

# 过滤
scala> val filtered = words.filter(_.length > 3)
filtered: org.apache.spark.sql.Dataset[String] = [value: string]

# 映射
scala> val upperWords = words.map(_.toUpperCase)
upperWords: org.apache.spark.sql.Dataset[String] = [value: string]

# 聚合
scala> val wordCounts = words.groupBy(“value”).count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

# 输出聚合结果
scala> val query = wordCounts.writeStream
| .outputMode(“complete”)
| .format(“console”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – word-counts

# 启动Socket服务
$ nc -lk 9999

hello world hello spark
hello kafka hello fgedu

# 查看输出
——————————————-
Batch: 0
——————————————-
+——+—–+
|value |count|
+——+—–+
|hello |4 |
|world |1 |
|spark |1 |
|kafka |1 |
|fgedu |1 |
+——+—–+

3.2.2 事件时间处理

# 创建带时间戳的流
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val events = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-events”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “name STRING, timestamp STRING”).as(“data”)
| )
| .select(
| col(“data.name”).alias(“name”),
| to_timestamp(col(“data.timestamp”)).alias(“event_time”)
| )
events: org.apache.spark.sql.DataFrame = [name: string, event_time: timestamp]

# 设置水印
scala> val withWatermark = events
| .withWatermark(“event_time”, “10 minutes”)
withWatermark: org.apache.spark.sql.DataFrame = [name: string, event_time: timestamp]

# 按事件时间窗口聚合
scala> val windowedCounts = withWatermark
| .groupBy(
| window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
windowedCounts: org.apache.spark.sql.DataFrame = [window: struct, name: string … 1 more field]

# 输出结果
scala> val query = windowedCounts.writeStream
| .outputMode(“update”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – windowed-counts

风哥提示:Structured Streaming原生支持事件时间处理,通过设置水印可以处理迟到数据。水印时间应根据业务需求设置。更多学习教程公众号风哥教程itpux_com

3.3 窗口操作实战

# 窗口类型

1. 滚动窗口
– 固定大小,不重叠
– window(col(“time”), “5 minutes”)

2. 滑动窗口
– 固定大小,可重叠
– window(col(“time”), “10 minutes”, “5 minutes”)

# 滚动窗口示例
scala> val tumblingWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
tumblingWindow: org.apache.spark.sql.DataFrame = [window: struct, name: string … 1 more field]

# 滑动窗口示例
scala> val slidingWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| window(col(“event_time”), “10 minutes”, “5 minutes”),
| col(“name”)
| )
| .count()
slidingWindow: org.apache.spark.sql.DataFrame = [window: struct, name: string … 1 more field]

# 会话窗口示例(Spark 3.2+)
scala> val sessionWindow = events
| .withWatermark(“event_time”, “10 minutes”)
| .groupBy(
| session_window(col(“event_time”), “5 minutes”),
| col(“name”)
| )
| .count()
sessionWindow: org.apache.spark.sql.DataFrame = [session_window: struct, name: string … 1 more field]

# 延迟数据处理
# 水印时间内的迟到数据会被处理
# 超过水印时间的迟到数据会被丢弃

# 输出窗口结果
scala> val query = tumblingWindow.writeStream
| .outputMode(“update”)
| .format(“console”)
| .option(“truncate”, “false”)
| .start()

Part04-生产案例与实战讲解

4.1 实时ETL案例

# 实时ETL完整案例

# 创建Kafka源
scala> val kafkaSource = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-raw-data”)
| .option(“startingOffsets”, “latest”)
| .load()
kafkaSource: org.apache.spark.sql.DataFrame = [key: binary, value: binary … 5 more fields]

# 定义Schema
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = StructType(Array(
| StructField(“user_id”, IntegerType),
| StructField(“event_type”, StringType),
| StructField(“item_id”, StringType),
| StructField(“timestamp”, StringType)
| ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,IntegerType,true), StructField(event_type,StringType,true), StructField(item_id,StringType,true), StructField(timestamp,StringType,true))

# 解析数据
scala> val parsedDF = kafkaSource
| .select(from_json(col(“value”).cast(“string”), schema).as(“data”))
| .select(
| col(“data.user_id”).alias(“user_id”),
| col(“data.event_type”).alias(“event_type”),
| col(“data.item_id”).alias(“item_id”),
| to_timestamp(col(“data.timestamp”)).alias(“event_time”)
| )
parsedDF: org.apache.spark.sql.DataFrame = [user_id: int, event_type: string … 2 more fields]

# 数据清洗
scala> val cleanedDF = parsedDF
| .filter(col(“user_id”).isNotNull)
| .filter(col(“event_type”).isin(“click”, “purchase”, “view”))
| .withWatermark(“event_time”, “10 minutes”)
cleanedDF: org.apache.spark.sql.DataFrame = [user_id: int, event_type: string … 2 more fields]

# 写入Hive表
scala> val query = cleanedDF.writeStream
| .format(“parquet”)
| .option(“path”, “hdfs://192.168.1.60:9000/data/dwd/user_events”)
| .option(“checkpointLocation”, “/checkpoint/fgedu-etl”)
| .partitionBy(“event_type”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – fgedu-etl

# 查看运行状态
scala> query.status
res0: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
“message” : “Waiting for data to arrive”,
“isDataAvailable” : false,
“isTriggerActive” : false
}

scala> query.lastProgress
res1: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
“id” : “fgedu-etl”,
“runId” : “xxx-xxx-xxx”,
“timestamp” : “2026-04-08T13:00:00.000Z”,
“numInputRows” : 1000,
“processedRowsPerSecond” : 100.0,
“durationMs” : {“triggerExecution” : 10000}
}

4.2 流流连接案例

# 流流连接

# 创建两个流
scala> val impressions = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-impressions”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “adId STRING, impressionTime TIMESTAMP”).as(“data”)
| )
| .select(
| col(“data.adId”).alias(“adId”),
| col(“data.impressionTime”).alias(“impressionTime”)
| )
| .withWatermark(“impressionTime”, “1 hour”)
impressions: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp]

scala> val clicks = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.51:9092”)
| .option(“subscribe”, “fgedu-clicks”)
| .load()
| .select(
| from_json(col(“value”).cast(“string”), “adId STRING, clickTime TIMESTAMP”).as(“data”)
| )
| .select(
| col(“data.adId”).alias(“adId”),
| col(“data.clickTime”).alias(“clickTime”)
| )
| .withWatermark(“clickTime”, “1 hour”)
clicks: org.apache.spark.sql.DataFrame = [adId: string, clickTime: timestamp]

# 流流连接
scala> val joined = impressions.join(clicks,
| impressions(“adId”) === clicks(“adId”) &&
| impressions(“impressionTime”) <= clicks("clickTime") && | clicks("clickTime") <= impressions("impressionTime") + expr("INTERVAL 1 HOUR"), | "inner" | ) joined: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp ... 2 more fields] # 输出连接结果 scala> val query = joined.writeStream
| .outputMode(“append”)
| .format(“console”)
| .start()
query: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – stream-join

# 流批连接
scala> val staticData = spark.read
| .parquet(“hdfs://192.168.1.60:9000/data/dim/ads/”)
staticData: org.apache.spark.sql.DataFrame = [adId: string, adName: string … 2 more fields]

scala> val streamBatchJoin = impressions.join(staticData, “adId”)
streamBatchJoin: org.apache.spark.sql.DataFrame = [adId: string, impressionTime: timestamp … 3 more fields]

4.3 常见问题处理

4.3.1 延迟问题

# 问题现象:处理延迟高

# 排查步骤
# 1. 查看查询进度
scala> query.lastProgress

# 2. 查看状态
scala> query.status

# 解决方案
# 1. 调整Trigger间隔
scala> val query = df.writeStream
| .trigger(Trigger.ProcessingTime(“5 seconds”))
| .start()

# 2. 增加并行度
spark.sql.shuffle.partitions=400

# 3. 优化状态存储
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

# 4. 使用连续处理(低延迟)
scala> val query = df.writeStream
| .trigger(Trigger.Continuous(“1 second”))
| .start()

4.3.2 状态过大问题

# 问题现象:状态数据过大

# 排查步骤
# 1. 检查状态大小
scala> query.lastProgress.stateOperators

# 解决方案
# 1. 设置水印清理状态
scala> df.withWatermark(“event_time”, “1 hour”)

# 2. 使用状态TTL
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

# 3. 减少状态键数量
# 使用更粗粒度的分组

# 4. 定期清理检查点
spark.sql.streaming.minBatchesToRetain=100

Part05-风哥经验总结与分享

5.1 Structured Streaming最佳实践

Structured Streaming最佳实践建议:

# 开发最佳实践
1. 使用检查点确保容错
2. 设置合理的水印时间
3. 选择合适的输出模式
4. 监控查询状态
5. 处理迟到数据

# 性能最佳实践
1. 使用RocksDB状态存储
2. 合理设置并行度
3. 优化状态大小
4. 使用适当的Trigger

5.2 性能调优建议

性能调优建议:

Structured Streaming性能调优建议:

  • 使用RocksDB状态存储处理大状态
  • 设置合理的水印清理过期状态
  • 调整并行度提高吞吐量
  • 监控查询延迟和状态大小

5.3 工具推荐

Structured Streaming工具:

  • StreamingQuery:查询管理
  • StreamingQueryListener:查询监听
  • StreamingQueryProgress:进度监控
  • Spark UI:可视化监控
风哥提示:Structured Streaming是Spark新一代流处理API,相比Spark Streaming有更好的延迟和更简单的API。建议新项目使用Structured Streaming。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息