1. 首页 > Hadoop教程 > 正文

大数据教程FG104-Spark实时数据处理实战

本教程主要介绍Spark实时数据处理的方法和实战技巧,包括Structured Streaming、Spark Streaming等组件的使用。风哥教程参考bigdata官方文档Spark Streaming、Structured Streaming等相关内容。

通过本教程的学习,您将掌握Spark实时数据处理的方法,实现对实时数据的快速处理和分析,为业务决策提供实时数据支持。

目录大纲

Part01-基础概念与理论知识

1.1 Spark实时处理概述

Spark实时处理是指对连续生成的数据进行实时处理和分析,主要包括:

  • Spark Streaming:基于微批处理的实时处理框架
  • Structured Streaming:基于DataFrame/Dataset API的流式处理框架
  • Spark Streaming + Kafka:与Kafka集成,实现高吞吐量的实时处理

实时处理的特点是低延迟、高吞吐量,能够实时响应业务需求,学习交流加群风哥微信: itpux-com

1.2 Structured Streaming原理

Structured Streaming的核心原理:

  • 将流数据视为无限增长的表
  • 使用SQL或DataFrame API进行操作
  • 支持事件时间和处理时间
  • 提供端到端的一致性保证
  • 支持容错和状态管理

1.3 实时数据处理架构

实时数据处理架构:

  • 数据采集层:使用Flume、Kafka等采集实时数据
  • 数据传输层:使用Kafka等消息队列传输数据
  • 数据处理层:使用Spark进行实时处理
  • 数据存储层:使用HBase、Elasticsearch等存储处理结果
  • 数据展示层:使用Tableau、Grafana等展示实时数据

Part02-生产环境规划与建议

2.1 集群配置规划

风哥提示:实时处理集群的配置应根据数据量和处理需求进行合理规划,确保低延迟和高可靠性。

集群配置建议:

  • Master节点:8核CPU、32GB内存
  • Worker节点:16核CPU、64GB内存
  • 网络:10Gbps以太网
  • 存储:SSD存储,提高数据读写速度

2.2 数据源选择

常用的实时数据源:

  • Kafka:高吞吐量、高可靠性的消息队列
  • Flume:日志采集工具
  • Kinesis:AWS提供的实时数据流服务
  • RabbitMQ:消息队列

2.3 容错机制设计

容错机制设计:

  • 启用 checkpoint:定期保存处理状态
  • 使用事务:确保数据处理的一致性
  • 实现重试机制:处理临时故障
  • 部署高可用集群:避免单点故障

Part03-生产环境项目实施方案

3.1 Structured Streaming实现

使用Structured Streaming处理Kafka数据:

# structured_streaming.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

# 创建SparkSession
spark = SparkSession.builder \
.appName(“StructuredStreamingExample”) \
.master(“yarn”) \
.getOrCreate()

# 定义schema
schema = StructType() \
.add(“user_id”, StringType()) \
.add(“item_id”, StringType()) \
.add(“behavior_type”, StringType()) \
.add(“timestamp”, TimestampType())

# 从Kafka读取数据
kafka_df = spark.readStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, “fgedu.net.cn:9092”) \
.option(“subscribe”, “user_behavior”) \
.option(“startingOffsets”, “latest”) \
.load()

# 解析JSON数据
parsed_df = kafka_df.selectExpr(“CAST(value AS STRING)”) \
.select(from_json(col(“value”), schema).alias(“data”)) \
.select(“data.*”)

# 窗口聚合
windowed_counts = parsed_df \
.withWatermark(“timestamp”, “10 minutes”) \
.groupBy(
window(col(“timestamp”), “5 minutes”),
col(“behavior_type”)
) \
.count()

# 输出到控制台
query = windowed_counts.writeStream \
.outputMode(“complete”) \
.format(“console”) \
.option(“truncate”, “false”) \
.start()

query.awaitTermination()

3.2 实时数据处理流程

实时数据处理流程:

  1. 数据采集:使用Flume采集日志数据
  2. 数据传输:使用Kafka传输数据
  3. 数据处理:使用Spark Structured Streaming处理数据
  4. 数据存储:将处理结果存储到HBase或Elasticsearch
  5. 数据展示:使用Grafana展示实时数据

3.3 结果输出与存储

结果输出与存储:

# 输出到HBase
hbase_query = windowed_counts.writeStream \
.outputMode(“update”) \
.format(“org.apache.spark.sql.execution.datasources.hbase.HBaseSource”) \
.option(“hbase.table”, “user_behavior_counts”) \
.option(“hbase.columns.mapping”, “cf:count”) \
.option(“checkpointLocation”, “/user/fgedu/checkpoint/hbase”) \
.start()

# 输出到Elasticsearch
es_query = windowed_counts.writeStream \
.outputMode(“append”) \
.format(“es”) \
.option(“es.nodes”, “fgedu.net.cn:9200”) \
.option(“es.index.auto.create”, “true”) \
.option(“checkpointLocation”, “/user/fgedu/checkpoint/es”) \
.start()

Part04-生产案例与实战讲解

4.1 实时用户行为分析

案例:实时分析用户行为数据

# 启动Kafka生产者

$ kafka-console-producer.sh –broker-list fgedu.net.cn:9092 –topic user_behavior
> {“user_id”: “1001”, “item_id”: “item001”, “behavior_type”: “click”, “timestamp”: “2026-04-08T10:00:00Z”}
> {“user_id”: “1002”, “item_id”: “item002”, “behavior_type”: “view”, “timestamp”: “2026-04-08T10:00:01Z”}
> {“user_id”: “1003”, “item_id”: “item003”, “behavior_type”: “purchase”, “timestamp”: “2026-04-08T10:00:02Z”}
> {“user_id”: “1004”, “item_id”: “item004”, “behavior_type”: “click”, “timestamp”: “2026-04-08T10:00:03Z”}
> {“user_id”: “1005”, “item_id”: “item005”, “behavior_type”: “view”, “timestamp”: “2026-04-08T10:00:04Z”}

# 运行Structured Streaming应用

$ spark-submit –master yarn –deploy-mode cluster –packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.0 structured_streaming.py
2026-04-08 10:00:00,000 INFO SparkContext: Running Spark version 3.3.0
2026-04-08 10:00:01,000 INFO SparkContext: Submitted application: StructuredStreamingExample
2026-04-08 10:00:02,000 INFO YarnClientImpl: Submitted application application_1617778210345_0001
2026-04-08 10:00:03,000 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
2026-04-08 10:00:04,000 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> fgedu.net.cn, PROXY_URI_BASES -> http://fgedu.net.cn:8088/proxy/application_1617778210345_0001), /proxy/application_1617778210345_0001
2026-04-08 10:00:05,000 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://fgedu.net.cn:4040

# 查看处理结果

——————————————-
Batch: 0
——————————————-
+——————————————+—————+—–+
|window |behavior_type |count|
+——————————————+—————+—–+
|{2026-04-08 10:00:00, 2026-04-08 10:05:00}|click |2 |
|{2026-04-08 10:00:00, 2026-04-08 10:05:00}|view |2 |
|{2026-04-08 10:00:00, 2026-04-08 10:05:00}|purchase |1 |
+——————————————+—————+—–+

4.2 实时日志处理

案例:实时处理网站访问日志

# 创建Flume配置

# flume.conf
agent.sources = web_log
agent.channels = memory_channel
agent.sinks = kafka_sink

# Source configuration
agent.sources.web_log.type = exec
agent.sources.web_log.command = tail -F /var/log/nginx/access.log
agent.sources.web_log.channels = memory_channel

# Channel configuration
agent.channels.memory_channel.type = memory
agent.channels.memory_channel.capacity = 10000
agent.channels.memory_channel.transactionCapacity = 1000

# Sink configuration
agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink.channel = memory_channel
agent.sinks.kafka_sink.kafka.bootstrap.servers = fgedu.net.cn:9092
agent.sinks.kafka_sink.kafka.topic = web_log
agent.sinks.kafka_sink.kafka.producer.acks = 1

# 启动Flume

$ flume-ng agent –name agent –conf /bigdata/app/flume/conf –conf-file /bigdata/app/flume/conf/flume.conf -Dflume.root.logger=INFO,console
2026-04-08 10:00:00,000 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2026-04-08 10:00:01,000 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/bigdata/app/flume/conf/flume.conf
2026-04-08 10:00:02,000 INFO conf.FlumeConfiguration: Processing:kafka_sink
2026-04-08 10:00:03,000 INFO conf.FlumeConfiguration: Processing:kafka_sink
2026-04-08 10:00:04,000 INFO conf.FlumeConfiguration: Processing:kafka_sink
2026-04-08 10:00:05,000 INFO conf.FlumeConfiguration: Processing:kafka_sink
2026-04-08 10:00:06,000 INFO conf.FlumeConfiguration: Processing:kafka_sink
2026-04-08 10:00:07,000 INFO node.AbstractConfigurationProvider: Flume configuration successfully parsed
2026-04-08 10:00:08,000 INFO node.AbstractConfigurationProvider: Creating channels
2026-04-08 10:00:09,000 INFO channel.DefaultChannelFactory: Creating instance of channel memory_channel type memory
2026-04-08 10:00:10,000 INFO node.AbstractConfigurationProvider: Created channel memory_channel
2026-04-08 10:00:11,000 INFO source.DefaultSourceFactory: Creating instance of source web_log, type exec
2026-04-08 10:00:12,000 INFO sink.DefaultSinkFactory: Creating instance of sink: kafka_sink, type: org.apache.flume.sink.kafka.KafkaSink
2026-04-08 10:00:13,000 INFO node.AbstractConfigurationProvider: Created sink kafka_sink
2026-04-08 10:00:14,000 INFO node.AbstractConfigurationProvider: Channel memory_channel connected to [web_log, kafka_sink]
2026-04-08 10:00:15,000 INFO node.Application: Starting new configuration:{ sourceRunners:{web_log=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:web_log,state:IDLE} }} sinkRunners:{kafka_sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4f2b5a7f counterGroup:{ name:null counters:{} } }} channels:{memory_channel=org.apache.flume.channel.MemoryChannel{name: memory_channel}} }
2026-04-08 10:00:16,000 INFO node.Application: Starting Channel memory_channel
2026-04-08 10:00:17,000 INFO channel.MemoryChannel: MemoryChannel memory_channel started
2026-04-08 10:00:18,000 INFO node.Application: Starting Sink kafka_sink
2026-04-08 10:00:19,000 INFO sink.kafka.KafkaSink: Kafka sink kafka_sink started
2026-04-08 10:00:20,000 INFO node.Application: Starting Source web_log
2026-04-08 10:00:21,000 INFO source.ExecSource: Exec source starting with command: tail -F /var/log/nginx/access.log
2026-04-08 10:00:22,000 INFO source.ExecSource: Exec source started successfully

4.3 实时监控告警

案例:实时监控系统指标并告警

# 创建监控脚本

# monitoring.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, window
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

# 创建SparkSession
spark = SparkSession.builder \
.appName(“MonitoringExample”) \
.master(“yarn”) \
.getOrCreate()

# 定义schema
schema = StructType() \
.add(“host”, StringType()) \
.add(“cpu_usage”, DoubleType()) \
.add(“memory_usage”, DoubleType()) \
.add(“disk_usage”, DoubleType()) \
.add(“timestamp”, TimestampType())

# 从Kafka读取数据
kafka_df = spark.readStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, “fgedu.net.cn:9092”) \
.option(“subscribe”, “system_metrics”) \
.option(“startingOffsets”, “latest”) \
.load()

# 解析JSON数据
parsed_df = kafka_df.selectExpr(“CAST(value AS STRING)”) \
.select(from_json(col(“value”), schema).alias(“data”)) \
.select(“data.*”)

# 计算平均值
windowed_avg = parsed_df \
.withWatermark(“timestamp”, “5 minutes”) \
.groupBy(
window(col(“timestamp”), “1 minute”),
col(“host”)
) \
.agg(
avg(“cpu_usage”).alias(“avg_cpu”),
avg(“memory_usage”).alias(“avg_memory”),
avg(“disk_usage”).alias(“avg_disk”)
)

# 过滤告警
alerts = windowed_avg \
.filter((col(“avg_cpu”) > 80) | (col(“avg_memory”) > 90) | (col(“avg_disk”) > 95))

# 输出告警
alert_query = alerts.writeStream \
.outputMode(“append”) \
.format(“console”) \
.option(“truncate”, “false”) \
.start()

alert_query.awaitTermination()

# 运行监控应用

$ spark-submit –master yarn –deploy-mode cluster –packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.0 monitoring.py
2026-04-08 10:00:00,000 INFO SparkContext: Running Spark version 3.3.0
2026-04-08 10:00:01,000 INFO SparkContext: Submitted application: MonitoringExample
2026-04-08 10:00:02,000 INFO YarnClientImpl: Submitted application application_1617778210345_0002
2026-04-08 10:00:03,000 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
2026-04-08 10:00:04,000 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> fgedu.net.cn, PROXY_URI_BASES -> http://fgedu.net.cn:8088/proxy/application_1617778210345_0002), /proxy/application_1617778210345_0002
2026-04-08 10:00:05,000 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://fgedu.net.cn:4040

Part05-风哥经验总结与分享

5.1 性能优化建议

性能优化建议:

  • 批处理大小:根据数据量和集群资源调整批处理大小
  • 并行度:合理设置分区数,提高并行处理能力
  • 缓存策略:缓存热点数据,提高查询速度
  • checkpoint:合理设置checkpoint间隔,平衡容错和性能
  • 序列化:使用高效的序列化格式,如Avro、Parquet

5.2 常见问题解决方案

常见问题解决方案:

  • 数据倾斜:使用随机前缀、局部聚合等方法解决
  • 内存溢出:调整JVM参数,增加内存分配
  • 任务失败:检查错误日志,分析失败原因,调整参数或修复数据问题
  • 延迟过高:优化批处理大小和并行度

5.3 最佳实践分享

风哥提示:在实时数据处理中,应注重系统的可靠性和低延迟,确保数据处理的准确性和及时性。

最佳实践分享:

  • 端到端测试:测试整个数据处理流程,确保数据的完整性和一致性
  • 监控与告警:建立完善的监控和告警机制,及时发现和解决问题
  • 版本控制:对代码和配置进行版本控制,便于回滚和管理
  • 文档化:详细记录系统架构和处理流程,便于维护和扩展
  • 更多视频教程www.fgedu.net.cn

通过本教程的学习,您已经掌握了Spark实时数据处理的方法和实战技巧。在实际生产环境中,应根据具体业务场景和数据特点,选择合适的实时处理框架和方法,以实现对实时数据的快速处理和分析,为业务决策提供实时数据支持。学习交流加群风哥QQ113257174

更多学习教程公众号风哥教程itpux_com

from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息