1. 首页 > Hadoop教程 > 正文

大数据教程FG071-Spark项目实战案例

本文档风哥主要介绍Spark项目实战案例,包括用户行为分析、实时报表、数据仓库建设等典型应用场景,风哥教程参考Spark官方文档Programming Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 项目背景与需求

本项目是一个电商用户行为分析系统,需要实时采集用户行为数据,进行实时分析和离线统计,为运营决策提供数据支持。学习交流加群风哥微信: itpux-com

项目需求:

  • 实时需求:实时统计PV/UV、实时推荐、实时告警
  • 离线需求:用户画像、商品分析、销售报表
  • 数据量:日活用户100万,日均行为数据10亿条
  • 延迟要求:实时处理延迟<1秒,离线处理<2小时

1.2 架构设计

系统架构设计:

# 系统架构

┌─────────────────────────────────────────────────────────┐
│ 数据源层 │
│ Web日志 │ App日志 │ 业务数据库 │ 第三方数据 │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ 数据采集层 │
│ Flume │ Kafka │ Canal │ Sqoop │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ 数据处理层 │
│ Spark Streaming │ Spark SQL │ Spark MLlib │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ HDFS │ HBase │ MySQL │ Redis │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ 数据服务层 │
│ REST API │ WebSocket │ 定时任务 │
└─────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ 数据大屏 │ 报表系统 │ 推荐系统 │ 告警系统 │
└─────────────────────────────────────────────────────────┘

# 数据流转
1. 数据采集:Flume采集日志到Kafka
2. 实时处理:Spark Streaming消费Kafka数据
3. 离线处理:Spark SQL处理HDFS数据
4. 数据存储:结果存储到HBase/MySQL
5. 数据服务:提供API查询服务

1.3 技术栈选型

技术栈选型:

# 技术栈

1. 数据采集
– Flume:日志采集
– Kafka:消息队列
– Canal:数据库同步

2. 数据处理
– Spark Core:核心计算
– Spark SQL:SQL处理
– Spark Streaming:流处理
– Spark MLlib:机器学习

3. 数据存储
– HDFS:分布式存储
– HBase:NoSQL存储
– MySQL:关系型存储
– Redis:缓存

4. 调度管理
– YARN:资源管理
– Oozie/Azkaban:任务调度

5. 监控运维
– Prometheus:监控
– Grafana:可视化
– Spark Web UI:应用监控

# 版本信息
– Spark: 3.5.1
– Hadoop: 3.3.6
– Kafka: 3.6.1
– HBase: 2.5.5
– Hive: 3.1.3

风哥提示:技术选型需要根据业务需求、数据量、团队能力等因素综合考虑。本项目选择Spark作为核心计算引擎,兼顾实时和离线处理。

Part02-生产环境规划与建议

2.1 环境规划

环境规划建议:

# 集群规划

# 生产环境(10节点)
节点角色 主机名 IP地址 配置
NameNode fgedu-nn1 192.168.1.60 32核/128GB/2TB
NameNode(备) fgedu-nn2 192.168.1.61 32核/128GB/2TB
ResourceManager fgedu-rm1 192.168.1.60 32核/128GB/2TB
ResourceManager(备) fgedu-rm2 192.168.1.61 32核/128GB/2TB
DataNode fgedu-dn1 192.168.1.62 32核/128GB/10TB
DataNode fgedu-dn2 192.168.1.63 32核/128GB/10TB
DataNode fgedu-dn3 192.168.1.64 32核/128GB/10TB
DataNode fgedu-dn4 192.168.1.65 32核/128GB/10TB
DataNode fgedu-dn5 192.168.1.66 32核/128GB/10TB
DataNode fgedu-dn6 192.168.1.67 32核/128GB/10TB

# 服务分布
服务 节点
HDFS NameNode fgedu-nn1, fgedu-nn2
HDFS DataNode fgedu-dn1-6
YARN RM fgedu-rm1, fgedu-rm2
YARN NM fgedu-dn1-6
Spark Master fgedu-nn1
Spark Worker fgedu-dn1-6
Kafka Broker fgedu-dn1-3
HBase Master fgedu-nn1, fgedu-nn2
HBase RS fgedu-dn1-6
ZooKeeper fgedu-nn1, fgedu-nn2, fgedu-dn1

# 资源分配
服务 CPU 内存 磁盘
HDFS 4核 16GB 数据盘
YARN 4核 16GB –
Spark 8核 64GB –
Kafka 4核 16GB 500GB
HBase 8核 32GB 数据盘
ZooKeeper 2核 4GB 50GB

2.2 数据规划

数据规划建议:

# 数据库规划

1. ODS层(原始数据层)
– ods_user_behavior:用户行为日志
– ods_order_detail:订单明细
– ods_product_info:商品信息
– ods_user_info:用户信息

2. DWD层(明细数据层)
– dwd_user_behavior_detail:用户行为明细
– dwd_order_detail:订单明细
– dwd_product_detail:商品明细

3. DWS层(汇总数据层)
– dws_user_behavior_summary:用户行为汇总
– dws_order_summary:订单汇总
– dws_product_summary:商品汇总

4. ADS层(应用数据层)
– ads_user_profile:用户画像
– ads_product_analysis:商品分析
– ads_sales_report:销售报表

# 表设计示例
— 用户行为表
CREATE TABLE IF NOT EXISTS fgedu_ods.ods_user_behavior (
user_id BIGINT,
item_id STRING,
category_id STRING,
behavior_type STRING,
timestamp BIGINT,
dt STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);

— 用户行为汇总表
CREATE TABLE IF NOT EXISTS fgedu_dws.dws_user_behavior_summary (
user_id BIGINT,
pv_count BIGINT,
buy_count BIGINT,
cart_count BIGINT,
fav_count BIGINT,
dt STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);

2.3 任务规划

任务规划建议:

# 实时任务规划

1. 实时PV/UV统计
– 数据源:Kafka Topic: fgedu-user-behavior
– 处理:Spark Structured Streaming
– 输出:Redis
– 延迟:<1秒 2. 实时用户画像 - 数据源:Kafka Topic: fgedu-user-behavior - 处理:Spark Structured Streaming - 输出:HBase - 延迟:<5秒 3. 实时告警 - 数据源:Kafka Topic: fgedu-user-behavior - 处理:Spark Structured Streaming - 输出:Kafka Topic: fgedu-alerts - 延迟:<1秒 # 离线任务规划 1. 用户行为ETL - 调度时间:每小时 - 数据源:HDFS /data/ods/user_behavior/ - 处理:Spark SQL - 输出:Hive表 2. 用户画像计算 - 调度时间:每天凌晨 - 数据源:Hive表 - 处理:Spark SQL + MLlib - 输出:Hive表 + HBase 3. 销售报表生成 - 调度时间:每天凌晨 - 数据源:Hive表 - 处理:Spark SQL - 输出:MySQL

生产环境建议:生产环境建议合理规划集群资源,确保实时任务和离线任务资源隔离。使用YARN队列进行资源分配。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 数据采集实现

3.1.1 Flume采集配置

# Flume配置文件
$ cat > /bigdata/app/flume/conf/fgedu-user-behavior.conf << 'EOF' # Agent名称 agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1 # Source配置 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /var/log/nginx/access.log agent1.sources.source1.channels = channel1 # Channel配置 agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 100000 agent1.channels.channel1.transactionCapacity = 10000 # Sink配置 agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sink1.topic = fgedu-user-behavior agent1.sinks.sink1.brokerList = 192.168.1.62:9092,192.168.1.63:9092,192.168.1.64:9092 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.batchSize = 1000 EOF # 启动Flume $ /bigdata/app/flume/bin/flume-ng agent \ --conf /bigdata/app/flume/conf \ --conf-file /bigdata/app/flume/conf/fgedu-user-behavior.conf \ --name agent1 \ -Dflume.root.logger=INFO,console & Info: Sourcing environment configuration script /bigdata/app/flume/conf/flume-env.sh Info: Including Hive libraries found via () for Hive access + exec /usr/lib/jvm/java-17-openjdk/bin/java -Xmx2048m -Dflume.root.logger=INFO,console -cp '/bigdata/app/flume/conf:/bigdata/app/flume/lib/*' org.apache.flume.node.Application --conf-file /bigdata/app/flume/conf/fgedu-user-behavior.conf --name agent1 ... 2026-04-08 13:00:00,000 INFO [conf-file-poller-0] node.Application: Starting new configuration:{ sourceRun:{source1:{ runner:{ component: { name: source1 runner } } } } sinkRun:{sink1:{ runner:{ component: { name: sink1 runner } } } } } 2026-04-08 13:00:00,001 INFO [lifecycleSupervisor-1-0] sink.KafkaSink: Starting KafkaSink sink1 2026-04-08 13:00:00,002 INFO [lifecycleSupervisor-1-0] source.ExecSource: Exec source starting with command: tail -F /var/log/nginx/access.log

3.1.2 Kafka Topic创建

# 创建Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh \
–create \
–topic fgedu-user-behavior \
–partitions 10 \
–replication-factor 3 \
–bootstrap-server 192.168.1.62:9092

Created topic fgedu-user-behavior.

# 查看Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh \
–describe \
–topic fgedu-user-behavior \
–bootstrap-server 192.168.1.62:9092

Topic: fgedu-user-behavior PartitionCount: 10 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: fgedu-user-behavior Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-user-behavior Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-user-behavior Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

# 消费测试
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-user-behavior \
–from-beginning \
–bootstrap-server 192.168.1.62:9092

{“user_id”:1001,”item_id”:”item_001″,”category_id”:”cat_001″,”behavior_type”:”pv”,”timestamp”:1680940800}
{“user_id”:1002,”item_id”:”item_002″,”category_id”:”cat_002″,”behavior_type”:”buy”,”timestamp”:1680940801}
{“user_id”:1003,”item_id”:”item_003″,”category_id”:”cat_001″,”behavior_type”:”cart”,”timestamp”:1680940802}

3.2 ETL处理实现

3.2.1 实时ETL实现

# 实时ETL代码
$ cat > /bigdata/spark-apps/FgeduRealtimeEtl.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ object FgeduRealtimeEtl { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-realtime-etl") .getOrCreate() import spark.implicits._ // 定义Schema val schema = StructType(Array( StructField("user_id", LongType), StructField("item_id", StringType), StructField("category_id", StringType), StructField("behavior_type", StringType), StructField("timestamp", LongType) )) // 读取Kafka数据 val kafkaDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.1.62:9092,192.168.1.63:9092,192.168.1.64:9092") .option("subscribe", "fgedu-user-behavior") .option("startingOffsets", "latest") .load() // 解析数据 val parsedDF = kafkaDF .select(from_json(col("value").cast("string"), schema).as("data")) .select( col("data.user_id").alias("user_id"), col("data.item_id").alias("item_id"), col("data.category_id").alias("category_id"), col("data.behavior_type").alias("behavior_type"), from_unixtime(col("data.timestamp")).alias("event_time"), date_format(from_unixtime(col("data.timestamp")), "yyyy-MM-dd").alias("dt") ) // 数据清洗 val cleanedDF = parsedDF .filter(col("user_id").isNotNull) .filter(col("behavior_type").isin("pv", "buy", "cart", "fav")) // 写入HDFS val query = cleanedDF.writeStream .format("parquet") .option("path", "hdfs://192.168.1.60:9000/data/ods/user_behavior") .option("checkpointLocation", "/checkpoint/fgedu-realtime-etl") .partitionBy("dt", "behavior_type") .start() query.awaitTermination() } } EOF # 提交任务 $ /bigdata/app/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.fgedu.spark.FgeduRealtimeEtl \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 10 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \ /bigdata/spark-apps/fgedu-realtime-etl.jar 2026-04-08 13:00:00 INFO SparkContext:54 - Running Spark version 3.5.1 2026-04-08 13:00:01 INFO StreamingQuery:54 - Starting new streaming query. 2026-04-08 13:00:02 INFO KafkaSource:54 - Initial offsets: {"fgedu-user-behavior":{"0":0,"1":0,"2":0,...}} 2026-04-08 13:00:03 INFO FileStreamSink:54 - Committed batch 0 to hdfs://192.168.1.60:9000/data/ods/user_behavior

3.2.2 离线ETL实现

# 离线ETL代码
$ cat > /bigdata/spark-apps/FgeduOfflineEtl.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FgeduOfflineEtl { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-offline-etl") .enableHiveSupport() .getOrCreate() val dt = args(0) // 日期参数 // 读取ODS层数据 val odsDF = spark.table("fgedu_ods.ods_user_behavior") .filter($"dt" === dt) // 数据清洗 val cleanedDF = odsDF .filter($"user_id".isNotNull) .filter($"behavior_type".isin("pv", "buy", "cart", "fav")) .filter($"timestamp" > 0)

// 数据转换
val transformedDF = cleanedDF
.withColumn(“event_time”, from_unixtime($”timestamp”))
.withColumn(“hour”, hour(col(“event_time”)))

// 写入DWD层
transformedDF.write
.mode(“overwrite”)
.partitionBy(“dt”, “hour”)
.insertInto(“fgedu_dwd.dwd_user_behavior_detail”)

// 聚合统计
val summaryDF = transformedDF
.groupBy($”user_id”, $”dt”)
.agg(
sum(when($”behavior_type” === “pv”, 1).otherwise(0)).alias(“pv_count”),
sum(when($”behavior_type” === “buy”, 1).otherwise(0)).alias(“buy_count”),
sum(when($”behavior_type” === “cart”, 1).otherwise(0)).alias(“cart_count”),
sum(when($”behavior_type” === “fav”, 1).otherwise(0)).alias(“fav_count”)
)

// 写入DWS层
summaryDF.write
.mode(“overwrite”)
.partitionBy(“dt”)
.insertInto(“fgedu_dws.dws_user_behavior_summary”)

spark.stop()
}
}
EOF

# 提交任务
$ /bigdata/app/spark/bin/spark-submit \
–master yarn \
–deploy-mode client \
–class com.fgedu.spark.FgeduOfflineEtl \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 20 \
–queue production \
/bigdata/spark-apps/fgedu-offline-etl.jar \
2026-04-08

2026-04-08 13:00:00 INFO SparkContext:54 – Running Spark version 3.5.1
2026-04-08 13:00:01 INFO HiveSharedState:54 – Warehouse path: hdfs://192.168.1.60:9000/user/hive/warehouse
2026-04-08 13:00:02 INFO InsertIntoHiveTable:54 – Inserting data into Hive table fgedu_dwd.dwd_user_behavior_detail
2026-04-08 13:00:30 INFO InsertIntoHiveTable:54 – Inserting data into Hive table fgedu_dws.dws_user_behavior_summary

3.3 数据分析实现

# 用户行为分析代码
$ cat > /bigdata/spark-apps/FgeduUserAnalysis.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FgeduUserAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-user-analysis") .enableHiveSupport() .getOrCreate() val dt = args(0) // 1. PV/UV统计 val pvuvDF = spark.sql(s""" SELECT dt, hour, COUNT(*) as pv, COUNT(DISTINCT user_id) as uv FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY dt, hour ORDER BY hour """) pvuvDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_pvuv_hourly") // 2. 用户活跃度分析 val activeDF = spark.sql(s""" SELECT user_id, COUNT(*) as total_actions, COUNT(DISTINCT item_id) as unique_items, COUNT(DISTINCT category_id) as unique_categories FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY user_id ORDER BY total_actions DESC LIMIT 1000 """) activeDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_active_users") // 3. 商品热度分析 val hotItemsDF = spark.sql(s""" SELECT item_id, category_id, SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) as pv_count, SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) as buy_count, SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) as cart_count, SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) as fav_count FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY item_id, category_id ORDER BY buy_count DESC LIMIT 100 """) hotItemsDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_hot_items") // 4. 转化率分析 val conversionDF = spark.sql(s""" SELECT category_id, COUNT(DISTINCT CASE WHEN behavior_type = 'pv' THEN user_id END) as pv_users, COUNT(DISTINCT CASE WHEN behavior_type = 'cart' THEN user_id END) as cart_users, COUNT(DISTINCT CASE WHEN behavior_type = 'buy' THEN user_id END) as buy_users, ROUND(COUNT(DISTINCT CASE WHEN behavior_type = 'buy' THEN user_id END) * 100.0 / NULLIF(COUNT(DISTINCT CASE WHEN behavior_type = 'pv' THEN user_id END), 0), 2) as conversion_rate FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY category_id ORDER BY conversion_rate DESC """) conversionDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_conversion_rate") spark.stop() } } EOF # 提交分析任务 $ /bigdata/app/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.fgedu.spark.FgeduUserAnalysis \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 20 \ /bigdata/spark-apps/fgedu-user-analysis.jar \ 2026-04-08
风哥提示:实际项目中需要根据业务需求设计合理的数据分层和ETL流程,确保数据质量和处理效率。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 用户行为分析案例

# 用户行为分析完整流程

# 1. 数据准备
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ads.ads_user_behavior_analysis (
| user_id BIGINT,
| total_pv BIGINT,
| total_buy BIGINT,
| total_cart BIGINT,
| total_fav BIGINT,
| active_days INT,
| avg_daily_actions DOUBLE,
| user_level STRING,
| dt STRING
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res0: org.apache.spark.sql.DataFrame = []

# 2. 用户行为分析
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_ads.ads_user_behavior_analysis PARTITION(dt=’2026-04-08′)
| SELECT
| user_id,
| SUM(pv_count) as total_pv,
| SUM(buy_count) as total_buy,
| SUM(cart_count) as total_cart,
| SUM(fav_count) as total_fav,
| COUNT(*) as active_days,
| AVG(pv_count + buy_count + cart_count + fav_count) as avg_daily_actions,
| CASE
| WHEN SUM(buy_count) >= 10 THEN ‘VIP’
| WHEN SUM(buy_count) >= 5 THEN ‘Active’
| WHEN SUM(pv_count) >= 100 THEN ‘Potential’
| ELSE ‘Normal’
| END as user_level
| FROM fgedu_dws.dws_user_behavior_summary
| WHERE dt BETWEEN ‘2026-04-01’ AND ‘2026-04-08′
| GROUP BY user_id
| “””)
res1: org.apache.spark.sql.DataFrame = []

# 3. 查看分析结果
scala> spark.sql(“””
| SELECT user_level, COUNT(*) as user_count, AVG(total_buy) as avg_buy
| FROM fgedu_ads.ads_user_behavior_analysis
| WHERE dt=’2026-04-08’
| GROUP BY user_level
| ORDER BY user_count DESC
| “””).show()
+———-+———-+——————+
|user_level|user_count| avg_buy|
+———-+———-+——————+
| Normal| 500000| 0.0|
| Potential| 300000| 0.5|
| Active| 150000| 6.5|
| VIP| 50000| 15.2|
+———-+———-+——————+

4.2 实时报表案例

# 实时报表实现

# 1. 实时PV/UV统计
scala> val pvuvQuery = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.62:9092”)
| .option(“subscribe”, “fgedu-user-behavior”)
| .load()
| .select(from_json(col(“value”).cast(“string”), schema).as(“data”))
| .select(
| col(“data.user_id”).alias(“user_id”),
| col(“data.behavior_type”).alias(“behavior_type”),
| current_timestamp().alias(“process_time”)
| )
| .withWatermark(“process_time”, “10 seconds”)
| .groupBy(window(col(“process_time”), “1 minute”))
| .agg(
| count(“*”).alias(“pv”),
| countDistinct(“user_id”).alias(“uv”)
| )
| .writeStream
| .outputMode(“update”)
| .format(“console”)
| .start()
pvuvQuery: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – pvuv

# 2. 输出示例
——————————————-
Batch: 10
——————————————-
+——————–+——+—–+
| window| pv| uv|
+——————–+——+—–+
|[2026-04-08 13:00…|100000| 5000|
|[2026-04-08 13:01…|120000| 5500|
|[2026-04-08 13:02…| 95000| 4800|
+——————–+——+—–+

# 3. 写入Redis
scala> val redisQuery = pvuvDF.writeStream
| .foreach(new RedisSink())
| .start()

# Redis存储格式
# Key: pvuv:2026-04-08:13:00
# Value: {“pv”:100000,”uv”:5000}

4.3 常见问题处理

4.3.1 数据倾斜问题

# 问题现象:某些Task执行时间过长

# 排查步骤
# 1. 查看Stage详情
# Spark UI -> Stages -> 查看Task执行时间分布

# 2. 查看数据分布
scala> spark.sql(“””
| SELECT user_id, COUNT(*) as cnt
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY user_id
| ORDER BY cnt DESC
| LIMIT 10
| “””).show()

# 解决方案
# 1. 增加分区数
spark.sql.shuffle.partitions=400

# 2. 使用盐值
scala> val saltedDF = df
| .withColumn(“salt”, (rand() * 10).cast(“int”))
| .withColumn(“salted_key”, concat(col(“user_id”), lit(“_”), col(“salt”)))

# 3. 广播Join
scala> val result = largeDF.join(broadcast(smallDF), “key”)

# 4. 采样倾斜Key单独处理
scala> val skewedKeys = df.groupBy(“key”).count().filter($”count” > 1000000).select(“key”)
scala> val skewedData = df.join(skewedKeys, “key”)
scala> val normalData = df.join(skewedKeys, “key”, “left_anti”)

4.3.2 内存溢出问题

# 问题现象:Executor内存溢出

# 排查步骤
# 1. 查看Executor日志
$ yarn logs -applicationId application_xxx | grep -i “OutOfMemory”

# 2. 查看内存使用
# Spark UI -> Executors -> Memory Used

# 解决方案
# 1. 增加Executor内存
–executor-memory 16g

# 2. 调整内存比例
spark.memory.fraction=0.8
spark.memory.storageFraction=0.5

# 3. 减少分区数据量
spark.sql.shuffle.partitions=400

# 4. 使用广播变量
scala> val broadcastVar = spark.sparkContext.broadcast(largeData)

# 5. 清理缓存
scala> spark.catalog.clearCache()

Part05-风哥经验总结与分享

5.1 项目最佳实践

Spark项目最佳实践建议:

# 项目最佳实践
1. 合理设计数据分层
2. 规范命名和代码风格
3. 完善的异常处理
4. 合理的资源配置
5. 完整的监控告警
6. 定期性能优化

5.2 优化建议

项目优化建议:

Spark项目优化建议:

  • 合理设置并行度和分区数
  • 使用广播变量减少数据传输
  • 及时清理缓存释放内存
  • 监控任务执行状态

5.3 工具推荐

项目开发工具:

  • IDEA:Scala开发IDE
  • Maven/SBT:项目构建工具
  • Git:版本控制
  • Jenkins:持续集成
风哥提示:实际项目开发需要综合考虑业务需求、技术架构、性能优化等多个方面。建议从小规模开始,逐步迭代优化。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息