本文档风哥主要介绍Spark项目实战案例,包括用户行为分析、实时报表、数据仓库建设等典型应用场景,风哥教程参考Spark官方文档Programming Guide等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 项目背景与需求
本项目是一个电商用户行为分析系统,需要实时采集用户行为数据,进行实时分析和离线统计,为运营决策提供数据支持。学习交流加群风哥微信: itpux-com
- 实时需求:实时统计PV/UV、实时推荐、实时告警
- 离线需求:用户画像、商品分析、销售报表
- 数据量:日活用户100万,日均行为数据10亿条
- 延迟要求:实时处理延迟<1秒,离线处理<2小时
1.2 架构设计
系统架构设计:
┌─────────────────────────────────────────────────────────┐
│ 数据源层 │
│ Web日志 │ App日志 │ 业务数据库 │ 第三方数据 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据采集层 │
│ Flume │ Kafka │ Canal │ Sqoop │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据处理层 │
│ Spark Streaming │ Spark SQL │ Spark MLlib │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ HDFS │ HBase │ MySQL │ Redis │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 数据服务层 │
│ REST API │ WebSocket │ 定时任务 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ 数据大屏 │ 报表系统 │ 推荐系统 │ 告警系统 │
└─────────────────────────────────────────────────────────┘
# 数据流转
1. 数据采集:Flume采集日志到Kafka
2. 实时处理:Spark Streaming消费Kafka数据
3. 离线处理:Spark SQL处理HDFS数据
4. 数据存储:结果存储到HBase/MySQL
5. 数据服务:提供API查询服务
1.3 技术栈选型
技术栈选型:
1. 数据采集
– Flume:日志采集
– Kafka:消息队列
– Canal:数据库同步
2. 数据处理
– Spark Core:核心计算
– Spark SQL:SQL处理
– Spark Streaming:流处理
– Spark MLlib:机器学习
3. 数据存储
– HDFS:分布式存储
– HBase:NoSQL存储
– MySQL:关系型存储
– Redis:缓存
4. 调度管理
– YARN:资源管理
– Oozie/Azkaban:任务调度
5. 监控运维
– Prometheus:监控
– Grafana:可视化
– Spark Web UI:应用监控
# 版本信息
– Spark: 3.5.1
– Hadoop: 3.3.6
– Kafka: 3.6.1
– HBase: 2.5.5
– Hive: 3.1.3
Part02-生产环境规划与建议
2.1 环境规划
环境规划建议:
# 生产环境(10节点)
节点角色 主机名 IP地址 配置
NameNode fgedu-nn1 192.168.1.60 32核/128GB/2TB
NameNode(备) fgedu-nn2 192.168.1.61 32核/128GB/2TB
ResourceManager fgedu-rm1 192.168.1.60 32核/128GB/2TB
ResourceManager(备) fgedu-rm2 192.168.1.61 32核/128GB/2TB
DataNode fgedu-dn1 192.168.1.62 32核/128GB/10TB
DataNode fgedu-dn2 192.168.1.63 32核/128GB/10TB
DataNode fgedu-dn3 192.168.1.64 32核/128GB/10TB
DataNode fgedu-dn4 192.168.1.65 32核/128GB/10TB
DataNode fgedu-dn5 192.168.1.66 32核/128GB/10TB
DataNode fgedu-dn6 192.168.1.67 32核/128GB/10TB
# 服务分布
服务 节点
HDFS NameNode fgedu-nn1, fgedu-nn2
HDFS DataNode fgedu-dn1-6
YARN RM fgedu-rm1, fgedu-rm2
YARN NM fgedu-dn1-6
Spark Master fgedu-nn1
Spark Worker fgedu-dn1-6
Kafka Broker fgedu-dn1-3
HBase Master fgedu-nn1, fgedu-nn2
HBase RS fgedu-dn1-6
ZooKeeper fgedu-nn1, fgedu-nn2, fgedu-dn1
# 资源分配
服务 CPU 内存 磁盘
HDFS 4核 16GB 数据盘
YARN 4核 16GB –
Spark 8核 64GB –
Kafka 4核 16GB 500GB
HBase 8核 32GB 数据盘
ZooKeeper 2核 4GB 50GB
2.2 数据规划
数据规划建议:
1. ODS层(原始数据层)
– ods_user_behavior:用户行为日志
– ods_order_detail:订单明细
– ods_product_info:商品信息
– ods_user_info:用户信息
2. DWD层(明细数据层)
– dwd_user_behavior_detail:用户行为明细
– dwd_order_detail:订单明细
– dwd_product_detail:商品明细
3. DWS层(汇总数据层)
– dws_user_behavior_summary:用户行为汇总
– dws_order_summary:订单汇总
– dws_product_summary:商品汇总
4. ADS层(应用数据层)
– ads_user_profile:用户画像
– ads_product_analysis:商品分析
– ads_sales_report:销售报表
# 表设计示例
— 用户行为表
CREATE TABLE IF NOT EXISTS fgedu_ods.ods_user_behavior (
user_id BIGINT,
item_id STRING,
category_id STRING,
behavior_type STRING,
timestamp BIGINT,
dt STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);
— 用户行为汇总表
CREATE TABLE IF NOT EXISTS fgedu_dws.dws_user_behavior_summary (
user_id BIGINT,
pv_count BIGINT,
buy_count BIGINT,
cart_count BIGINT,
fav_count BIGINT,
dt STRING
)
USING PARQUET
PARTITIONED BY (dt STRING);
2.3 任务规划
任务规划建议:
1. 实时PV/UV统计
– 数据源:Kafka Topic: fgedu-user-behavior
– 处理:Spark Structured Streaming
– 输出:Redis
– 延迟:<1秒
2. 实时用户画像
- 数据源:Kafka Topic: fgedu-user-behavior
- 处理:Spark Structured Streaming
- 输出:HBase
- 延迟:<5秒
3. 实时告警
- 数据源:Kafka Topic: fgedu-user-behavior
- 处理:Spark Structured Streaming
- 输出:Kafka Topic: fgedu-alerts
- 延迟:<1秒
# 离线任务规划
1. 用户行为ETL
- 调度时间:每小时
- 数据源:HDFS /data/ods/user_behavior/
- 处理:Spark SQL
- 输出:Hive表
2. 用户画像计算
- 调度时间:每天凌晨
- 数据源:Hive表
- 处理:Spark SQL + MLlib
- 输出:Hive表 + HBase
3. 销售报表生成
- 调度时间:每天凌晨
- 数据源:Hive表
- 处理:Spark SQL
- 输出:MySQL
Part03-生产环境项目实施方案
3.1 数据采集实现
3.1.1 Flume采集配置
$ cat > /bigdata/app/flume/conf/fgedu-user-behavior.conf << 'EOF' # Agent名称 agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1 # Source配置 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /var/log/nginx/access.log agent1.sources.source1.channels = channel1 # Channel配置 agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 100000 agent1.channels.channel1.transactionCapacity = 10000 # Sink配置 agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sink1.topic = fgedu-user-behavior agent1.sinks.sink1.brokerList = 192.168.1.62:9092,192.168.1.63:9092,192.168.1.64:9092 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.batchSize = 1000 EOF # 启动Flume $ /bigdata/app/flume/bin/flume-ng agent \ --conf /bigdata/app/flume/conf \ --conf-file /bigdata/app/flume/conf/fgedu-user-behavior.conf \ --name agent1 \ -Dflume.root.logger=INFO,console & Info: Sourcing environment configuration script /bigdata/app/flume/conf/flume-env.sh Info: Including Hive libraries found via () for Hive access + exec /usr/lib/jvm/java-17-openjdk/bin/java -Xmx2048m -Dflume.root.logger=INFO,console -cp '/bigdata/app/flume/conf:/bigdata/app/flume/lib/*' org.apache.flume.node.Application --conf-file /bigdata/app/flume/conf/fgedu-user-behavior.conf --name agent1 ... 2026-04-08 13:00:00,000 INFO [conf-file-poller-0] node.Application: Starting new configuration:{ sourceRun:{source1:{ runner:{ component: { name: source1 runner } } } } sinkRun:{sink1:{ runner:{ component: { name: sink1 runner } } } } } 2026-04-08 13:00:00,001 INFO [lifecycleSupervisor-1-0] sink.KafkaSink: Starting KafkaSink sink1 2026-04-08 13:00:00,002 INFO [lifecycleSupervisor-1-0] source.ExecSource: Exec source starting with command: tail -F /var/log/nginx/access.log
3.1.2 Kafka Topic创建
$ /bigdata/app/kafka/bin/kafka-topics.sh \
–create \
–topic fgedu-user-behavior \
–partitions 10 \
–replication-factor 3 \
–bootstrap-server 192.168.1.62:9092
Created topic fgedu-user-behavior.
# 查看Topic
$ /bigdata/app/kafka/bin/kafka-topics.sh \
–describe \
–topic fgedu-user-behavior \
–bootstrap-server 192.168.1.62:9092
Topic: fgedu-user-behavior PartitionCount: 10 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: fgedu-user-behavior Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-user-behavior Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-user-behavior Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
…
# 消费测试
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-user-behavior \
–from-beginning \
–bootstrap-server 192.168.1.62:9092
{“user_id”:1001,”item_id”:”item_001″,”category_id”:”cat_001″,”behavior_type”:”pv”,”timestamp”:1680940800}
{“user_id”:1002,”item_id”:”item_002″,”category_id”:”cat_002″,”behavior_type”:”buy”,”timestamp”:1680940801}
{“user_id”:1003,”item_id”:”item_003″,”category_id”:”cat_001″,”behavior_type”:”cart”,”timestamp”:1680940802}
3.2 ETL处理实现
3.2.1 实时ETL实现
$ cat > /bigdata/spark-apps/FgeduRealtimeEtl.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ object FgeduRealtimeEtl { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-realtime-etl") .getOrCreate() import spark.implicits._ // 定义Schema val schema = StructType(Array( StructField("user_id", LongType), StructField("item_id", StringType), StructField("category_id", StringType), StructField("behavior_type", StringType), StructField("timestamp", LongType) )) // 读取Kafka数据 val kafkaDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.1.62:9092,192.168.1.63:9092,192.168.1.64:9092") .option("subscribe", "fgedu-user-behavior") .option("startingOffsets", "latest") .load() // 解析数据 val parsedDF = kafkaDF .select(from_json(col("value").cast("string"), schema).as("data")) .select( col("data.user_id").alias("user_id"), col("data.item_id").alias("item_id"), col("data.category_id").alias("category_id"), col("data.behavior_type").alias("behavior_type"), from_unixtime(col("data.timestamp")).alias("event_time"), date_format(from_unixtime(col("data.timestamp")), "yyyy-MM-dd").alias("dt") ) // 数据清洗 val cleanedDF = parsedDF .filter(col("user_id").isNotNull) .filter(col("behavior_type").isin("pv", "buy", "cart", "fav")) // 写入HDFS val query = cleanedDF.writeStream .format("parquet") .option("path", "hdfs://192.168.1.60:9000/data/ods/user_behavior") .option("checkpointLocation", "/checkpoint/fgedu-realtime-etl") .partitionBy("dt", "behavior_type") .start() query.awaitTermination() } } EOF # 提交任务 $ /bigdata/app/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.fgedu.spark.FgeduRealtimeEtl \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 10 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \ /bigdata/spark-apps/fgedu-realtime-etl.jar 2026-04-08 13:00:00 INFO SparkContext:54 - Running Spark version 3.5.1 2026-04-08 13:00:01 INFO StreamingQuery:54 - Starting new streaming query. 2026-04-08 13:00:02 INFO KafkaSource:54 - Initial offsets: {"fgedu-user-behavior":{"0":0,"1":0,"2":0,...}} 2026-04-08 13:00:03 INFO FileStreamSink:54 - Committed batch 0 to hdfs://192.168.1.60:9000/data/ods/user_behavior
3.2.2 离线ETL实现
$ cat > /bigdata/spark-apps/FgeduOfflineEtl.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FgeduOfflineEtl { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-offline-etl") .enableHiveSupport() .getOrCreate() val dt = args(0) // 日期参数 // 读取ODS层数据 val odsDF = spark.table("fgedu_ods.ods_user_behavior") .filter($"dt" === dt) // 数据清洗 val cleanedDF = odsDF .filter($"user_id".isNotNull) .filter($"behavior_type".isin("pv", "buy", "cart", "fav")) .filter($"timestamp" > 0)
// 数据转换
val transformedDF = cleanedDF
.withColumn(“event_time”, from_unixtime($”timestamp”))
.withColumn(“hour”, hour(col(“event_time”)))
// 写入DWD层
transformedDF.write
.mode(“overwrite”)
.partitionBy(“dt”, “hour”)
.insertInto(“fgedu_dwd.dwd_user_behavior_detail”)
// 聚合统计
val summaryDF = transformedDF
.groupBy($”user_id”, $”dt”)
.agg(
sum(when($”behavior_type” === “pv”, 1).otherwise(0)).alias(“pv_count”),
sum(when($”behavior_type” === “buy”, 1).otherwise(0)).alias(“buy_count”),
sum(when($”behavior_type” === “cart”, 1).otherwise(0)).alias(“cart_count”),
sum(when($”behavior_type” === “fav”, 1).otherwise(0)).alias(“fav_count”)
)
// 写入DWS层
summaryDF.write
.mode(“overwrite”)
.partitionBy(“dt”)
.insertInto(“fgedu_dws.dws_user_behavior_summary”)
spark.stop()
}
}
EOF
# 提交任务
$ /bigdata/app/spark/bin/spark-submit \
–master yarn \
–deploy-mode client \
–class com.fgedu.spark.FgeduOfflineEtl \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 20 \
–queue production \
/bigdata/spark-apps/fgedu-offline-etl.jar \
2026-04-08
2026-04-08 13:00:00 INFO SparkContext:54 – Running Spark version 3.5.1
2026-04-08 13:00:01 INFO HiveSharedState:54 – Warehouse path: hdfs://192.168.1.60:9000/user/hive/warehouse
2026-04-08 13:00:02 INFO InsertIntoHiveTable:54 – Inserting data into Hive table fgedu_dwd.dwd_user_behavior_detail
2026-04-08 13:00:30 INFO InsertIntoHiveTable:54 – Inserting data into Hive table fgedu_dws.dws_user_behavior_summary
3.3 数据分析实现
$ cat > /bigdata/spark-apps/FgeduUserAnalysis.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object FgeduUserAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-user-analysis") .enableHiveSupport() .getOrCreate() val dt = args(0) // 1. PV/UV统计 val pvuvDF = spark.sql(s""" SELECT dt, hour, COUNT(*) as pv, COUNT(DISTINCT user_id) as uv FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY dt, hour ORDER BY hour """) pvuvDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_pvuv_hourly") // 2. 用户活跃度分析 val activeDF = spark.sql(s""" SELECT user_id, COUNT(*) as total_actions, COUNT(DISTINCT item_id) as unique_items, COUNT(DISTINCT category_id) as unique_categories FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY user_id ORDER BY total_actions DESC LIMIT 1000 """) activeDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_active_users") // 3. 商品热度分析 val hotItemsDF = spark.sql(s""" SELECT item_id, category_id, SUM(CASE WHEN behavior_type = 'pv' THEN 1 ELSE 0 END) as pv_count, SUM(CASE WHEN behavior_type = 'buy' THEN 1 ELSE 0 END) as buy_count, SUM(CASE WHEN behavior_type = 'cart' THEN 1 ELSE 0 END) as cart_count, SUM(CASE WHEN behavior_type = 'fav' THEN 1 ELSE 0 END) as fav_count FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY item_id, category_id ORDER BY buy_count DESC LIMIT 100 """) hotItemsDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_hot_items") // 4. 转化率分析 val conversionDF = spark.sql(s""" SELECT category_id, COUNT(DISTINCT CASE WHEN behavior_type = 'pv' THEN user_id END) as pv_users, COUNT(DISTINCT CASE WHEN behavior_type = 'cart' THEN user_id END) as cart_users, COUNT(DISTINCT CASE WHEN behavior_type = 'buy' THEN user_id END) as buy_users, ROUND(COUNT(DISTINCT CASE WHEN behavior_type = 'buy' THEN user_id END) * 100.0 / NULLIF(COUNT(DISTINCT CASE WHEN behavior_type = 'pv' THEN user_id END), 0), 2) as conversion_rate FROM fgedu_dwd.dwd_user_behavior_detail WHERE dt = '$dt' GROUP BY category_id ORDER BY conversion_rate DESC """) conversionDF.write .mode("overwrite") .insertInto("fgedu_ads.ads_conversion_rate") spark.stop() } } EOF # 提交分析任务 $ /bigdata/app/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.fgedu.spark.FgeduUserAnalysis \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 20 \ /bigdata/spark-apps/fgedu-user-analysis.jar \ 2026-04-08
Part04-生产案例与实战讲解
4.1 用户行为分析案例
# 1. 数据准备
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ads.ads_user_behavior_analysis (
| user_id BIGINT,
| total_pv BIGINT,
| total_buy BIGINT,
| total_cart BIGINT,
| total_fav BIGINT,
| active_days INT,
| avg_daily_actions DOUBLE,
| user_level STRING,
| dt STRING
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res0: org.apache.spark.sql.DataFrame = []
# 2. 用户行为分析
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_ads.ads_user_behavior_analysis PARTITION(dt=’2026-04-08′)
| SELECT
| user_id,
| SUM(pv_count) as total_pv,
| SUM(buy_count) as total_buy,
| SUM(cart_count) as total_cart,
| SUM(fav_count) as total_fav,
| COUNT(*) as active_days,
| AVG(pv_count + buy_count + cart_count + fav_count) as avg_daily_actions,
| CASE
| WHEN SUM(buy_count) >= 10 THEN ‘VIP’
| WHEN SUM(buy_count) >= 5 THEN ‘Active’
| WHEN SUM(pv_count) >= 100 THEN ‘Potential’
| ELSE ‘Normal’
| END as user_level
| FROM fgedu_dws.dws_user_behavior_summary
| WHERE dt BETWEEN ‘2026-04-01’ AND ‘2026-04-08′
| GROUP BY user_id
| “””)
res1: org.apache.spark.sql.DataFrame = []
# 3. 查看分析结果
scala> spark.sql(“””
| SELECT user_level, COUNT(*) as user_count, AVG(total_buy) as avg_buy
| FROM fgedu_ads.ads_user_behavior_analysis
| WHERE dt=’2026-04-08’
| GROUP BY user_level
| ORDER BY user_count DESC
| “””).show()
+———-+———-+——————+
|user_level|user_count| avg_buy|
+———-+———-+——————+
| Normal| 500000| 0.0|
| Potential| 300000| 0.5|
| Active| 150000| 6.5|
| VIP| 50000| 15.2|
+———-+———-+——————+
4.2 实时报表案例
# 1. 实时PV/UV统计
scala> val pvuvQuery = spark.readStream
| .format(“kafka”)
| .option(“kafka.bootstrap.servers”, “192.168.1.62:9092”)
| .option(“subscribe”, “fgedu-user-behavior”)
| .load()
| .select(from_json(col(“value”).cast(“string”), schema).as(“data”))
| .select(
| col(“data.user_id”).alias(“user_id”),
| col(“data.behavior_type”).alias(“behavior_type”),
| current_timestamp().alias(“process_time”)
| )
| .withWatermark(“process_time”, “10 seconds”)
| .groupBy(window(col(“process_time”), “1 minute”))
| .agg(
| count(“*”).alias(“pv”),
| countDistinct(“user_id”).alias(“uv”)
| )
| .writeStream
| .outputMode(“update”)
| .format(“console”)
| .start()
pvuvQuery: org.apache.spark.sql.streaming.StreamingQuery = StreamingQuery – pvuv
# 2. 输出示例
——————————————-
Batch: 10
——————————————-
+——————–+——+—–+
| window| pv| uv|
+——————–+——+—–+
|[2026-04-08 13:00…|100000| 5000|
|[2026-04-08 13:01…|120000| 5500|
|[2026-04-08 13:02…| 95000| 4800|
+——————–+——+—–+
# 3. 写入Redis
scala> val redisQuery = pvuvDF.writeStream
| .foreach(new RedisSink())
| .start()
# Redis存储格式
# Key: pvuv:2026-04-08:13:00
# Value: {“pv”:100000,”uv”:5000}
4.3 常见问题处理
4.3.1 数据倾斜问题
# 排查步骤
# 1. 查看Stage详情
# Spark UI -> Stages -> 查看Task执行时间分布
# 2. 查看数据分布
scala> spark.sql(“””
| SELECT user_id, COUNT(*) as cnt
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY user_id
| ORDER BY cnt DESC
| LIMIT 10
| “””).show()
# 解决方案
# 1. 增加分区数
spark.sql.shuffle.partitions=400
# 2. 使用盐值
scala> val saltedDF = df
| .withColumn(“salt”, (rand() * 10).cast(“int”))
| .withColumn(“salted_key”, concat(col(“user_id”), lit(“_”), col(“salt”)))
# 3. 广播Join
scala> val result = largeDF.join(broadcast(smallDF), “key”)
# 4. 采样倾斜Key单独处理
scala> val skewedKeys = df.groupBy(“key”).count().filter($”count” > 1000000).select(“key”)
scala> val skewedData = df.join(skewedKeys, “key”)
scala> val normalData = df.join(skewedKeys, “key”, “left_anti”)
4.3.2 内存溢出问题
# 排查步骤
# 1. 查看Executor日志
$ yarn logs -applicationId application_xxx | grep -i “OutOfMemory”
# 2. 查看内存使用
# Spark UI -> Executors -> Memory Used
# 解决方案
# 1. 增加Executor内存
–executor-memory 16g
# 2. 调整内存比例
spark.memory.fraction=0.8
spark.memory.storageFraction=0.5
# 3. 减少分区数据量
spark.sql.shuffle.partitions=400
# 4. 使用广播变量
scala> val broadcastVar = spark.sparkContext.broadcast(largeData)
# 5. 清理缓存
scala> spark.catalog.clearCache()
Part05-风哥经验总结与分享
5.1 项目最佳实践
Spark项目最佳实践建议:
1. 合理设计数据分层
2. 规范命名和代码风格
3. 完善的异常处理
4. 合理的资源配置
5. 完整的监控告警
6. 定期性能优化
5.2 优化建议
项目优化建议:
- 合理设置并行度和分区数
- 使用广播变量减少数据传输
- 及时清理缓存释放内存
- 监控任务执行状态
5.3 工具推荐
项目开发工具:
- IDEA:Scala开发IDE
- Maven/SBT:项目构建工具
- Git:版本控制
- Jenkins:持续集成
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
