目录大纲
Part01-基础概念与理论知识
1.1 Spark架构概述
1.2 RDD核心概念
1.3 作业执行原理
Part02-生产环境规划与建议
2.1 集群部署规划
2.2 资源配置规划
2.3 参数优化规划
Part03-生产环境项目实施方案
3.1 Spark安装配置
3.2 RDD操作实战
3.3 作业提交管理
3.4 监控与调优
Part04-生产案例与实战讲解
4.1 批处理案例
4.2 交互式查询案例
4.3 性能调优案例
Part05-风哥经验总结与分享
5.1 Spark最佳实践
5.2 性能优化经验总结
Part01-基础概念与理论知识
1.1 Spark架构概述
Spark是快速通用的分布式计算引擎。更多视频教程www.fgedu.net.cn Spark提供内存计算能力,比MapReduce快100倍。
1.2 RDD核心概念
RDD是Spark的核心数据抽象。学习交流加群风哥微信: itpux-com
– 弹性:自动进行内存和磁盘数据交换
– 分布式:数据分布在多个节点
– 数据集:只读的分区记录集合
– 容错性:自动恢复丢失分区
1.3 作业执行原理
Spark作业执行分为多个阶段。from bigdata视频:www.itpux.com
spark-submit –version
# 查看Spark配置
cat /bigdata/app/spark/conf/spark-defaults.conf | head -20
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
# Spark配置
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
Part02-生产环境规划与建议
2.1 集群部署规划
Spark集群部署需要考虑资源分配和高可用。更多学习教程公众号风哥教程itpux_com
– 部署模式:YARN模式
– Driver运行模式:根据应用特点选择
– 资源隔离:使用YARN队列
– 高可用:配置多Master
2.2 资源配置规划
资源配置影响作业执行效率。学习交流加群风哥QQ113257174
yarn node -list -showDetails
# 查看队列资源
yarn queue -status default
# 查看Spark资源使用
spark-submit –master yarn –status application_1705536000000_0001
Total Nodes: 6
Node-Id State Available-VCores Available-Memory
fgedu01:8041 RUNNING 16 65536
fgedu02:8041 RUNNING 16 65536
fgedu03:8041 RUNNING 16 65536
fgedu04:8041 RUNNING 16 65536
fgedu05:8041 RUNNING 16 65536
fgedu06:8041 RUNNING 16 65536
# 队列资源
Queue Name: default
Queue State: RUNNING
Current Capacity: 50%
# Spark资源
Application report for application_1705536000000_0001
State: RUNNING
Final Status: UNDEFINED
Tracking URL: http://fgedu01:8088/proxy/application_1705536000000_0001/
2.3 参数优化规划
参数优化是提升性能的关键。风哥提示:需要根据应用特点调整参数。
– spark.driver.memory:Driver内存
– spark.executor.memory:Executor内存
– spark.executor.cores:Executor核数
– spark.executor.instances:Executor数量
– spark.default.parallelism:默认并行度
Part03-生产环境项目实施方案
3.1 Spark安装配置
3.1.1 安装Spark
wget https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar -xzf spark-3.3.2-bin-hadoop3.tgz -C /bigdata/app/
ln -s /bigdata/app/spark-3.3.2-bin-hadoop3 /bigdata/app/spark
# 配置环境变量
export SPARK_HOME=/bigdata/app/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
# 配置spark-env.sh
cat > /bigdata/app/spark/conf/spark-env.sh << 'EOF'
export JAVA_HOME=/bigdata/app/java
export HADOOP_CONF_DIR=/bigdata/app/hadoop/etc/hadoop
export YARN_CONF_DIR=/bigdata/app/hadoop/etc/hadoop
export SPARK_MASTER_HOST=fgedu01
export SPARK_MASTER_PORT=7077
EOF
# 验证安装
spark-submit –version
# 完成
# 环境变量
# 配置完成
# 配置文件
# 配置完成
# 验证安装
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Scala version 2.12.15, Java 1.8.0_362
# Spark安装成功
3.1.2 配置Spark
cat > /bigdata/app/spark/conf/spark-defaults.conf << 'EOF'
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
spark.default.parallelism 200
spark.sql.shuffle.partitions 200
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
EOF
# 验证配置
cat /bigdata/app/spark/conf/spark-defaults.conf
# 配置完成
# 验证配置
spark.master yarn
spark.submit.deployMode client
spark.driver.memory 4g
spark.executor.memory 8g
spark.executor.cores 4
spark.executor.instances 10
spark.default.parallelism 200
spark.sql.shuffle.partitions 200
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
3.2 RDD操作实战
3.2.1 创建RDD
spark-shell –master yarn –deploy-mode client
// 从集合创建RDD
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
rdd.collect()
// 从文件创建RDD
val fileRdd = sc.textFile(“/bigdata/warehouse/fgedu/data.txt”)
fileRdd.take(5)
// 从HDFS创建RDD
val hdfsRdd = sc.textFile(“hdfs://fgedu01:9000/bigdata/warehouse/fgedu/data.parquet”)
hdfsRdd.count()
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Spark context Web UI available at http://fgedu01:4040
Spark context available as ‘sc’ (master = yarn, app id = application_1705536000000_0001)
Spark session available as ‘spark’.
// 集合RDD
data: Array[Int] = Array(1, 2, 3, 4, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
res0: Array[Int] = Array(1, 2, 3, 4, 5)
// 文件RDD
fileRdd: org.apache.spark.rdd.RDD[String] = /bigdata/warehouse/fgedu/data.txt MapPartitionsRDD[1]
res1: Array[String] = Array(line1, line2, line3, line4, line5)
// HDFS RDD
hdfsRdd: org.apache.spark.rdd.RDD[String] = hdfs://fgedu01:9000/bigdata/warehouse/fgedu/data.parquet MapPartitionsRDD[2]
res2: Long = 1000000
3.2.2 RDD转换操作
val mappedRdd = rdd.map(x => x * 2)
mappedRdd.collect()
// filter转换
val filteredRdd = rdd.filter(x => x > 2)
filteredRdd.collect()
// flatMap转换
val flatMappedRdd = fileRdd.flatMap(line => line.split(” “))
flatMappedRdd.take(10)
// reduceByKey转换
val pairRdd = flatMappedRdd.map(word => (word, 1))
val wordCount = pairRdd.reduceByKey((a, b) => a + b)
wordCount.take(10)
mappedRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3]
res3: Array[Int] = Array(2, 4, 6, 8, 10)
// filter结果
filteredRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4]
res4: Array[Int] = Array(3, 4, 5)
// flatMap结果
flatMappedRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5]
res5: Array[String] = Array(hello, world, spark, hadoop, hive, …)
// reduceByKey结果
pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6]
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8]
res6: Array[(String, Int)] = Array((hello,100), (world,80), (spark,50), …)
3.2.3 RDD行动操作
val result = rdd.collect()
// count行动
val count = rdd.count()
// reduce行动
val sum = rdd.reduce((a, b) => a + b)
// saveAsTextFile行动
wordCount.saveAsTextFile(“/bigdata/output/wordcount”)
// foreach行动
rdd.foreach(x => println(x))
result: Array[Int] = Array(1, 2, 3, 4, 5)
// count结果
count: Long = 5
// reduce结果
sum: Int = 15
// saveAsTextFile结果
// 保存成功
// foreach结果
1
2
3
4
5
3.3 作业提交管理
3.3.1 提交作业
spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–class org.apache.spark.examples.SparkPi \
/bigdata/app/spark/examples/jars/spark-examples_2.12-3.3.2.jar \
100
# 查看作业状态
yarn application -list -appStates RUNNING
# 查看作业日志
yarn logs -applicationId application_1705536000000_0002
24/01/18 16:00:00 INFO client.RMProxy: Connecting to ResourceManager at fgedu01/192.168.1.10:8032
24/01/18 16:00:05 INFO yarn.Client: Requesting a new application from cluster with 6 NodeManagers
24/01/18 16:00:10 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability
24/01/18 16:05:00 INFO spark.SparkContext: Successfully stopped SparkContext
Pi is roughly 3.1415926535
# 作业状态
Total Applications:1
Application-Id Application-Name State
app_1705536000000_0002 Spark Pi RUNNING
# 作业日志
Container: container_1705536000000_0002_01_000001 on fgedu01_8041
LogType:stdout
Pi is roughly 3.1415926535
3.3.2 管理作业
yarn application -list
# 杀死作业
yarn application -kill application_1705536000000_0002
# 查看作业详情
yarn application -status application_1705536000000_0002
# 查看Spark UI
echo “Spark UI: http://fgedu01:8088/proxy/application_1705536000000_0002/”
Total Applications:2
Application-Id Application-Name State Final-State
app_1705536000000_0001 Spark shell RUNNING UNDEFINED
app_1705536000000_0002 Spark Pi FINISHED SUCCEEDED
# 杀死作业
Killing application application_1705536000000_0002
24/01/18 16:10:00 INFO impl.YarnClientImpl: Killed application application_1705536000000_0002
# 作业详情
Application Report :
Application-Id: application_1705536000000_0002
Application-Name: Spark Pi
Application-Type: SPARK
State: FINISHED
Final-State: SUCCEEDED
# Spark UI
Spark UI: http://fgedu01:8088/proxy/application_1705536000000_0002/
3.4 监控与调优
3.4.1 监控作业
ls /bigdata/app/spark/logs/
# 启动历史服务器
start-history-server.sh
# 查看历史作业
curl http://fgedu01:18080/api/v1/applications
# 查看作业指标
curl http://fgedu01:4040/metrics/prometheus
total 100
-rw-r–r– 1 fgedu fgedu 5000 Jan 18 16:00 spark-fgedu-org.apache.spark.deploy.worker.Worker-1-fgedu01.out
-rw-r–r– 1 fgedu fgedu 5000 Jan 18 16:00 spark-fgedu-org.apache.spark.deploy.master.Master-1-fgedu01.out
# 启动历史服务器
starting org.apache.spark.deploy.history.HistoryServer, logging to /bigdata/app/spark/logs/spark-fgedu-org.apache.spark.deploy.history.HistoryServer-1-fgedu01.out
# 历史作业
[
{
“id” : “application_1705536000000_0001”,
“name” : “Spark shell”,
“attempts” : [ {
“startTime” : “2024-01-18T16:00:00.000Z”,
“endTime” : “2024-01-18T16:30:00.000Z”,
“completed” : true
} ]
}
]
# 作业指标
# HELP spark_driver_BlockManager_memory_memUsed Memory used by BlockManager
# TYPE spark_driver_BlockManager_memory_memUsed gauge
spark_driver_BlockManager_memory_memUsed 4.0E8
3.4.2 性能调优
spark-submit \
–master yarn \
–driver-memory 8g \
–executor-memory 16g \
–executor-cores 4 \
–num-executors 20 \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.rdd.compress=true \
–class com.fgedu.spark.DataProcess \
/bigdata/app/spark/jars/fgedu-spark.jar
# 并行度调优
spark-submit \
–master yarn \
–conf spark.default.parallelism=400 \
–conf spark.sql.shuffle.partitions=400 \
–class com.fgedu.spark.DataProcess \
/bigdata/app/spark/jars/fgedu-spark.jar
24/01/18 16:30:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 16:30:05 INFO resource.ResourceUtils: Custom resources detected
24/01/18 17:00:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully
# 并行度调优
24/01/18 17:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 17:30:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully
Part04-生产案例与实战讲解
4.1 批处理案例
批处理是Spark的核心应用场景。更多视频教程www.fgedu.net.cn
# spark_batch_process.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
DT=$(date -d “1 day ago” +%Y%m%d)
echo “=== Spark Batch Process for ${DT} ===”
spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–conf spark.sql.shuffle.partitions=200 \
–class com.fgedu.spark.UserOrderProcess \
/bigdata/app/spark/jars/fgedu-spark.jar \
${DT}
echo “=== Batch Process Completed ===”
./spark_batch_process.sh
=== Spark Batch Process for 20240117 ===
24/01/18 18:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 18:00:05 INFO yarn.Client: Requesting a new application from cluster
24/01/18 18:30:00 INFO spark.SparkContext: Successfully stopped SparkContext
Processing Summary:
Input Records: 1000000
Output Records: 500000
Processing Time: 30 minutes
=== Batch Process Completed ===
4.2 交互式查询案例
Spark SQL支持交互式查询。学习交流加群风哥微信: itpux-com
spark-sql –master yarn –deploy-mode client
— 创建数据库
CREATE DATABASE IF NOT EXISTS fgedudb;
USE fgedudb;
— 创建表
CREATE TABLE IF NOT EXISTS fgedu_user (
user_id BIGINT,
user_name STRING,
age INT,
gender STRING
) USING PARQUET
PARTITIONED BY (dt STRING);
— 查询数据
SELECT gender, COUNT(*) AS cnt, AVG(age) AS avg_age
FROM fgedu_user
WHERE dt=’20240118′
GROUP BY gender;
— 保存查询结果
CREATE TABLE fgedu_user_agg AS
SELECT user_id, COUNT(*) AS order_cnt
FROM fgedu_order
WHERE dt=’20240118′
GROUP BY user_id;
spark-sql>
— 创建数据库
Time taken: 0.5 seconds
— 创建表
Time taken: 1.0 seconds
— 查询结果
gender cnt avg_age
M 500000 30.5
F 500000 28.3
Time taken: 30.0 seconds
— 保存结果
Time taken: 60.0 seconds
4.3 性能调优案例
4.3.1 数据倾斜优化
val skewedRdd = sc.textFile(“/bigdata/data/skewed”)
.map(line => (line.split(“,”)(0), 1))
.reduceByKey(_ + _)
// 优化方案:加盐
val saltedRdd = sc.textFile(“/bigdata/data/skewed”)
.map(line => {
val key = line.split(“,”)(0)
val salt = scala.util.Random.nextInt(10)
(s”${key}_$salt”, 1)
})
.reduceByKey(_ + _)
.map { case (key, count) => (key.split(“_”)(0), count) }
.reduceByKey(_ + _)
// 查看执行计划
skewedRdd.toDebugString
skewedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10]
Time taken: 1800.0 seconds
// 优化方案
saltedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15]
Time taken: 300.0 seconds
// 性能提升6倍
// 执行计划
(10) ShuffledRDD[10] at reduceByKey
| MapPartitionsRDD[9] at map
| /bigdata/data/skewed MapPartitionsRDD[8] at textFile
| /bigdata/data/skewed HadoopRDD[7] at textFile
4.3.2 内存优化
spark-submit \
–master yarn \
–driver-memory 8g \
–executor-memory 16g \
–conf spark.memory.fraction=0.6 \
–conf spark.memory.storageFraction=0.5 \
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
–conf spark.rdd.compress=true \
–conf spark.shuffle.compress=true \
–conf spark.broadcast.compress=true \
–class com.fgedu.spark.MemoryIntensiveJob \
/bigdata/app/spark/jars/fgedu-spark.jar
# 查看内存使用
curl http://fgedu01:4040/api/v1/applications/application_1705536000000_0003/executors
24/01/18 19:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
24/01/18 19:00:05 INFO spark.SparkContext: Submitted application: MemoryIntensiveJob
24/01/18 19:30:00 INFO spark.SparkContext: Successfully stopped SparkContext
Job completed successfully
# 内存使用
[
{
“id” : “driver”,
“hostPort” : “fgedu01:4040”,
“isActive” : true,
“rddBlocks” : 0,
“memoryUsed” : 4294967296,
“memoryRemaining” : 4294967296,
“diskUsed” : 0
},
{
“id” : “1”,
“hostPort” : “fgedu02:8041”,
“isActive” : true,
“rddBlocks” : 100,
“memoryUsed” : 8589934592,
“memoryRemaining” : 8589934592,
“diskUsed” : 0
}
]
Part05-风哥经验总结与分享
5.1 Spark最佳实践
在实际生产环境中,Spark使用需要注意以下几点:from bigdata视频:www.itpux.com
1. 合理设置并行度
2. 使用Kryo序列化
3. 避免数据倾斜
4. 合理使用缓存
5. 监控资源使用
5.2 性能优化经验总结
5.2.1 优化建议
– 合理设置内存比例
– 使用广播变量
– 避免创建过多小文件
– 使用DataFrame代替RDD
– 定期清理缓存
5.2.2 Spark运维脚本
# spark_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Spark Maintenance ===”
echo “Date: $(date)”
# 1. 清理日志
echo “=== Clean Logs ===”
find /bigdata/app/spark/logs -name “*.out” -mtime +7 -delete
find /bigdata/app/spark/logs -name “*.log” -mtime +7 -delete
# 2. 清理临时文件
echo “=== Clean Temp Files ===”
hdfs dfs -rm -r -skipTrash /tmp/spark-* 2>/dev/null
# 3. 检查资源使用
echo “=== Check Resource Usage ===”
yarn node -list -showDetails | grep -E “RUNNING|UNHEALTHY”
# 4. 检查作业状态
echo “=== Check Application Status ===”
yarn application -list -appStates RUNNING,FAILED
echo “=== Maintenance Completed ===”
./spark_maintenance.sh
=== Spark Maintenance ===
Date: Thu Jan 18 20:00:00 CST 2024
=== Clean Logs ===
# 清理完成
=== Clean Temp Files ===
# 清理完成
=== Check Resource Usage ===
fgedu01:8041 RUNNING
fgedu02:8041 RUNNING
fgedu03:8041 RUNNING
fgedu04:8041 RUNNING
fgedu05:8041 RUNNING
fgedu06:8041 RUNNING
=== Check Application Status ===
Total Applications:0
=== Maintenance Completed ===
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
