1. 首页 > Hadoop教程 > 正文

大数据教程FG028-Spark核心架构与运行原理实战

内容简介:本文详细介绍Spark核心架构与运行原理实战,包括Spark架构组件、RDD原理、作业执行流程、资源调度等核心内容。风哥教程参考Spark官方文档Architecture、RDD Programming Guide等内容。

目录大纲

Part01-基础概念与理论知识
  1.1 Spark架构概述
  1.2 RDD核心概念
  1.3 作业执行原理
Part02-生产环境规划与建议
  2.1 集群部署规划
  2.2 资源配置规划
  2.3 参数优化规划
Part03-生产环境项目实施方案
  3.1 Spark安装配置
  3.2 RDD操作实战
  3.3 作业提交管理
  3.4 监控与调优
Part04-生产案例与实战讲解
  4.1 批处理案例
  4.2 交互式查询案例
  4.3 性能调优案例
Part05-风哥经验总结与分享
  5.1 Spark最佳实践
  5.2 性能优化经验总结

Part01-基础概念与理论知识

1.1 Spark架构概述

Spark是快速通用的分布式计算引擎。更多视频教程www.fgedu.net.cn Spark提供内存计算能力,比MapReduce快100倍。

风哥提示:Spark的核心优势是内存计算和DAG执行引擎,适合迭代计算场景。

1.2 RDD核心概念

RDD是Spark的核心数据抽象。学习交流加群风哥微信: itpux-com

RDD核心特性:
– 弹性:自动进行内存和磁盘数据交换
– 分布式:数据分布在多个节点
– 数据集:只读的分区记录集合
– 容错性:自动恢复丢失分区

1.3 作业执行原理

Spark作业执行分为多个阶段。from bigdata视频:www.itpux.com

# 查看Spark版本
spark-submit –version

# 查看Spark配置
cat /bigdata/app/spark/conf/spark-defaults.conf | head -20

# Spark版本
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/

# Spark配置
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10

Part02-生产环境规划与建议

2.1 集群部署规划

Spark集群部署需要考虑资源分配和高可用。更多学习教程公众号风哥教程itpux_com

集群部署建议:
– 部署模式:YARN模式
– Driver运行模式:根据应用特点选择
– 资源隔离:使用YARN队列
– 高可用:配置多Master

2.2 资源配置规划

资源配置影响作业执行效率。学习交流加群风哥QQ113257174

# 查看YARN资源
yarn node -list -showDetails

# 查看队列资源
yarn queue -status default

# 查看Spark资源使用
spark-submit –master yarn –status application_1705536000000_0001

# YARN资源
Total Nodes: 6
Node-Id State Available-VCores Available-Memory
fgedu01:8041 RUNNING 16 65536
fgedu02:8041 RUNNING 16 65536
fgedu03:8041 RUNNING 16 65536
fgedu04:8041 RUNNING 16 65536
fgedu05:8041 RUNNING 16 65536
fgedu06:8041 RUNNING 16 65536

# 队列资源
Queue Name: default
Queue State: RUNNING
Current Capacity: 50%

# Spark资源
Application report for application_1705536000000_0001
State: RUNNING
Final Status: UNDEFINED
Tracking URL: http://fgedu01:8088/proxy/application_1705536000000_0001/

2.3 参数优化规划

参数优化是提升性能的关键。风哥提示:需要根据应用特点调整参数。

关键参数配置:
– spark.driver.memory:Driver内存
– spark.executor.memory:Executor内存
– spark.executor.cores:Executor核数
– spark.executor.instances:Executor数量
– spark.default.parallelism:默认并行度

Part03-生产环境项目实施方案

3.1 Spark安装配置

3.1.1 安装Spark

# 下载Spark
wget https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar -xzf spark-3.3.2-bin-hadoop3.tgz -C /bigdata/app/
ln -s /bigdata/app/spark-3.3.2-bin-hadoop3 /bigdata/app/spark

# 配置环境变量
export SPARK_HOME=/bigdata/app/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

# 配置spark-env.sh
cat > /bigdata/app/spark/conf/spark-env.sh << 'EOF'
export JAVA_HOME=/bigdata/app/java
export HADOOP_CONF_DIR=/bigdata/app/hadoop/etc/hadoop
export YARN_CONF_DIR=/bigdata/app/hadoop/etc/hadoop
export SPARK_MASTER_HOST=fgedu01
export SPARK_MASTER_PORT=7077
EOF

# 验证安装
spark-submit –version

# 下载解压
# 完成

# 环境变量
# 配置完成

# 配置文件
# 配置完成

# 验证安装
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15, Java 1.8.0_362
# Spark安装成功

3.1.2 配置Spark

# 配置spark-defaults.conf
cat > /bigdata/app/spark/conf/spark-defaults.conf << 'EOF'
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
spark.default.parallelism 200
spark.sql.shuffle.partitions 200
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
EOF

# 验证配置
cat /bigdata/app/spark/conf/spark-defaults.conf

# 配置文件
# 配置完成

# 验证配置
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
spark.default.parallelism 200
spark.sql.shuffle.partitions 200
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true

3.2 RDD操作实战

3.2.1 创建RDD

# 启动spark-shell
spark-shell –master yarn –deploy-mode client

// 从集合创建RDD
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
rdd.collect()

// 从文件创建RDD
val fileRdd = sc.textFile(“/bigdata/warehouse/fgedu/data.txt”)
fileRdd.take(5)

// 从HDFS创建RDD
val hdfsRdd = sc.textFile(“hdfs://fgedu01:9000/bigdata/warehouse/fgedu/data.parquet”)
hdfsRdd.count()

# spark-shell启动
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Spark context Web UI available at http://fgedu01:4040
Spark context available as ‘sc’ (master = yarn, app id = application_1705536000000_0001)
Spark session available as ‘spark’.

// 集合RDD
data: Array[Int] = Array(1, 2, 3, 4, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
res0: Array[Int] = Array(1, 2, 3, 4, 5)

// 文件RDD
fileRdd: org.apache.spark.rdd.RDD[String] = /bigdata/warehouse/fgedu/data.txt MapPartitionsRDD[1]
res1: Array[String] = Array(line1, line2, line3, line4, line5)

// HDFS RDD
hdfsRdd: org.apache.spark.rdd.RDD[String] = hdfs://fgedu01:9000/bigdata/warehouse/fgedu/data.parquet MapPartitionsRDD[2]
res2: Long = 1000000

3.2.2 RDD转换操作

// map转换
val mappedRdd = rdd.map(x => x * 2)
mappedRdd.collect()

// filter转换
val filteredRdd = rdd.filter(x => x > 2)
filteredRdd.collect()

// flatMap转换
val flatMappedRdd = fileRdd.flatMap(line => line.split(” “))
flatMappedRdd.take(10)

// reduceByKey转换
val pairRdd = flatMappedRdd.map(word => (word, 1))
val wordCount = pairRdd.reduceByKey((a, b) => a + b)
wordCount.take(10)

// map结果
mappedRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3]
res3: Array[Int] = Array(2, 4, 6, 8, 10)

// filter结果
filteredRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4]
res4: Array[Int] = Array(3, 4, 5)

// flatMap结果
flatMappedRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5]
res5: Array[String] = Array(hello, world, spark, hadoop, hive, …)

// reduceByKey结果
pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6]
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8]
res6: Array[(String, Int)] = Array((hello,100), (world,80), (spark,50), …)

3.2.3 RDD行动操作

// collect行动
val result = rdd.collect()

// count行动
val count = rdd.count()

// reduce行动
val sum = rdd.reduce((a, b) => a + b)

// saveAsTextFile行动
wordCount.saveAsTextFile(“/bigdata/output/wordcount”)

// foreach行动
rdd.foreach(x => println(x))

// collect结果
result: Array[Int] = Array(1, 2, 3, 4, 5)

// count结果
count: Long = 5

// reduce结果
sum: Int = 15

// saveAsTextFile结果
// 保存成功

// foreach结果
1
2
3
4
5

3.3 作业提交管理

3.3.1 提交作业

# 提交Spark作业
spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–class org.apache.spark.examples.SparkPi \
/bigdata/app/spark/examples/jars/spark-examples_2.12-3.3.2.jar \
100

# 查看作业状态
yarn application -list -appStates RUNNING

# 查看作业日志
yarn logs -applicationId application_1705536000000_0002

# 作业提交
24/01/18 16:00:00 INFO client.RMProxy: Connecting to ResourceManager at fgedu01/192.168.1.10:8032
24/01/18 16:00:05 INFO yarn.Client: Requesting a new application from cluster with 6 NodeManagers
24/01/18 16:00:10 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability
24/01/18 16:05:00 INFO spark.SparkContext: Successfully stopped SparkContext
Pi is roughly 3.1415926535

# 作业状态
Total Applications:1
Application-Id Application-Name State
app_1705536000000_0002 Spark Pi RUNNING

# 作业日志
Container: container_1705536000000_0002_01_000001 on fgedu01_8041
LogType:stdout
Pi is roughly 3.1415926535

3.3.2 管理作业

# 查看所有作业
yarn application -list

# 杀死作业
yarn application -kill application_1705536000000_0002

# 查看作业详情
yarn application -status application_1705536000000_0002

# 查看Spark UI
echo “Spark UI: http://fgedu01:8088/proxy/application_1705536000000_0002/”

# 所有作业
Total Applications:2
Application-Id Application-Name State Final-State
app_1705536000000_0001 Spark shell RUNNING UNDEFINED
app_1705536000000_0002 Spark Pi FINISHED SUCCEEDED

# 杀死作业
Killing application application_1705536000000_0002
24/01/18 16:10:00 INFO impl.YarnClientImpl: Killed application application_1705536000000_0002

# 作业详情
Application Report :
Application-Id: application_1705536000000_0002
Application-Name: Spark Pi
Application-Type: SPARK
State: FINISHED
Final-State: SUCCEEDED

# Spark UI
Spark UI: http://fgedu01:8088/proxy/application_1705536000000_0002/

3.4 监控与调优

3.4.1 监控作业

# 查看Spark历史服务器
ls /bigdata/app/spark/logs/

# 启动历史服务器
start-history-server.sh

# 查看历史作业
curl http://fgedu01:18080/api/v1/applications

# 查看作业指标
curl http://fgedu01:4040/metrics/prometheus

# 日志目录
total 100
-rw-r–r– 1 fgedu fgedu 5000 Jan 18 16:00 spark-fgedu-org.apache.spark.deploy.worker.Worker-1-fgedu01.out
-rw-r–r– 1 fgedu fgedu 5000 Jan 18 16:00 spark-fgedu-org.apache.spark.deploy.master.Master-1-fgedu01.out

# 启动历史服务器
starting org.apache.spark.deploy.history.HistoryServer, logging to /bigdata/app/spark/logs/spark-fgedu-org.apache.spark.deploy.history.HistoryServer-1-fgedu01.out

# 历史作业
[
{
“id” : “application_1705536000000_0001”,
“name” : “Spark shell”,
“attempts” : [ {
“startTime” : “2024-01-18T16:00:00.000Z”,
“endTime” : “2024-01-18T16:30:00.000Z”,
“completed” : true
} ]
}
]

# 作业指标
# HELP spark_driver_BlockManager_memory_memUsed Memory used by BlockManager
# TYPE spark_driver_BlockManager_memory_memUsed gauge
spark_driver_BlockManager_memory_memUsed 4.0E8

3.4.2 性能调优

# 内存调优
spark-submit \
–master yarn \
–driver-memory 8g \
–executor-memory 16g \
–executor-cores 4 \
–num-executors 20 \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.rdd.compress=true \
–class com.fgedu.spark.DataProcess \
/bigdata/app/spark/jars/fgedu-spark.jar

# 并行度调优
spark-submit \
–master yarn \
–conf spark.default.parallelism=400 \
–conf spark.sql.shuffle.partitions=400 \
–class com.fgedu.spark.DataProcess \
/bigdata/app/spark/jars/fgedu-spark.jar

# 内存调优
24/01/18 16:30:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 16:30:05 INFO resource.ResourceUtils: Custom resources detected
24/01/18 17:00:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully

# 并行度调优
24/01/18 17:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 17:30:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully

Part04-生产案例与实战讲解

4.1 批处理案例

批处理是Spark的核心应用场景。更多视频教程www.fgedu.net.cn

#!/bin/bash
# spark_batch_process.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

DT=$(date -d “1 day ago” +%Y%m%d)
echo “=== Spark Batch Process for ${DT} ===”

spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–conf spark.sql.shuffle.partitions=200 \
–class com.fgedu.spark.UserOrderProcess \
/bigdata/app/spark/jars/fgedu-spark.jar \
${DT}

echo “=== Batch Process Completed ===”

# 批处理执行
./spark_batch_process.sh
=== Spark Batch Process for 20240117 ===

24/01/18 18:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 18:00:05 INFO yarn.Client: Requesting a new application from cluster
24/01/18 18:30:00 INFO spark.SparkContext: Successfully stopped SparkContext

Processing Summary:
Input Records: 1000000
Output Records: 500000
Processing Time: 30 minutes

=== Batch Process Completed ===

4.2 交互式查询案例

Spark SQL支持交互式查询。学习交流加群风哥微信: itpux-com

# 启动spark-sql
spark-sql –master yarn –deploy-mode client

— 创建数据库
CREATE DATABASE IF NOT EXISTS fgedudb;
USE fgedudb;

— 创建表
CREATE TABLE IF NOT EXISTS fgedu_user (
user_id BIGINT,
user_name STRING,
age INT,
gender STRING
) USING PARQUET
PARTITIONED BY (dt STRING);

— 查询数据
SELECT gender, COUNT(*) AS cnt, AVG(age) AS avg_age
FROM fgedu_user
WHERE dt=’20240118′
GROUP BY gender;

— 保存查询结果
CREATE TABLE fgedu_user_agg AS
SELECT user_id, COUNT(*) AS order_cnt
FROM fgedu_order
WHERE dt=’20240118′
GROUP BY user_id;

# spark-sql启动
spark-sql>

— 创建数据库
Time taken: 0.5 seconds

— 创建表
Time taken: 1.0 seconds

— 查询结果
gender cnt avg_age
M 500000 30.5
F 500000 28.3
Time taken: 30.0 seconds

— 保存结果
Time taken: 60.0 seconds

4.3 性能调优案例

4.3.1 数据倾斜优化

// 数据倾斜问题
val skewedRdd = sc.textFile(“/bigdata/data/skewed”)
.map(line => (line.split(“,”)(0), 1))
.reduceByKey(_ + _)

// 优化方案:加盐
val saltedRdd = sc.textFile(“/bigdata/data/skewed”)
.map(line => {
val key = line.split(“,”)(0)
val salt = scala.util.Random.nextInt(10)
(s”${key}_$salt”, 1)
})
.reduceByKey(_ + _)
.map { case (key, count) => (key.split(“_”)(0), count) }
.reduceByKey(_ + _)

// 查看执行计划
skewedRdd.toDebugString

// 数据倾斜问题
skewedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10]
Time taken: 1800.0 seconds

// 优化方案
saltedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15]
Time taken: 300.0 seconds
// 性能提升6倍

// 执行计划
(10) ShuffledRDD[10] at reduceByKey
| MapPartitionsRDD[9] at map
| /bigdata/data/skewed MapPartitionsRDD[8] at textFile
| /bigdata/data/skewed HadoopRDD[7] at textFile

4.3.2 内存优化

# 内存配置优化
spark-submit \
–master yarn \
–driver-memory 8g \
–executor-memory 16g \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.rdd.compress=true \
–conf spark.shuffle.compress=true \
–conf spark.broadcast.compress=true \
–class com.fgedu.spark.MemoryIntensiveJob \
/bigdata/app/spark/jars/fgedu-spark.jar

# 查看内存使用
curl http://fgedu01:4040/api/v1/applications/application_1705536000000_0003/executors

# 内存优化执行
24/01/18 19:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 19:00:05 INFO spark.SparkContext: Submitted application: MemoryIntensiveJob
24/01/18 19:30:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully

# 内存使用
[
{
“id” : “driver”,
“hostPort” : “fgedu01:4040”,
“isActive” : true,
“rddBlocks” : 0,
“memoryUsed” : 4294967296,
“memoryRemaining” : 4294967296,
“diskUsed” : 0
},
{
“id” : “1”,
“hostPort” : “fgedu02:8041”,
“isActive” : true,
“rddBlocks” : 100,
“memoryUsed” : 8589934592,
“memoryRemaining” : 8589934592,
“diskUsed” : 0
}
]

Part05-风哥经验总结与分享

5.1 Spark最佳实践

在实际生产环境中,Spark使用需要注意以下几点:from bigdata视频:www.itpux.com

风哥经验总结:
1. 合理设置并行度
2. 使用Kryo序列化
3. 避免数据倾斜
4. 合理使用缓存
5. 监控资源使用

5.2 性能优化经验总结

5.2.1 优化建议

风哥提示:Spark性能优化需要从多个角度考虑,包括代码、配置、资源等。

性能优化要点:
– 合理设置内存比例
– 使用广播变量
– 避免创建过多小文件
– 使用DataFrame代替RDD
– 定期清理缓存

5.2.2 Spark运维脚本

#!/bin/bash
# spark_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== Spark Maintenance ===”
echo “Date: $(date)”

# 1. 清理日志
echo “=== Clean Logs ===”
find /bigdata/app/spark/logs -name “*.out” -mtime +7 -delete
find /bigdata/app/spark/logs -name “*.log” -mtime +7 -delete

# 2. 清理临时文件
echo “=== Clean Temp Files ===”
hdfs dfs -rm -r -skipTrash /tmp/spark-* 2>/dev/null

# 3. 检查资源使用
echo “=== Check Resource Usage ===”
yarn node -list -showDetails | grep -E “RUNNING|UNHEALTHY”

# 4. 检查作业状态
echo “=== Check Application Status ===”
yarn application -list -appStates RUNNING,FAILED

echo “=== Maintenance Completed ===”

# 维护执行
./spark_maintenance.sh
=== Spark Maintenance ===
Date: Thu Jan 18 20:00:00 CST 2024

=== Clean Logs ===
# 清理完成

=== Clean Temp Files ===
# 清理完成

=== Check Resource Usage ===
fgedu01:8041 RUNNING
fgedu02:8041 RUNNING
fgedu03:8041 RUNNING
fgedu04:8041 RUNNING
fgedu05:8041 RUNNING
fgedu06:8041 RUNNING

=== Check Application Status ===
Total Applications:0

=== Maintenance Completed ===

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息