目录大纲
Part01-基础概念与理论知识
1.1 Spark SQL概述
1.2 DataFrame与Dataset
1.3 Catalyst优化器
Part02-生产环境规划与建议
2.1 数据源规划
2.2 Schema设计规划
2.3 性能优化规划
Part03-生产环境项目实施方案
3.1 DataFrame基础操作
3.2 SQL查询开发
3.3 数据源集成
3.4 UDF开发
Part04-生产案例与实战讲解
4.1 数据清洗案例
4.2 数据分析案例
4.3 数据导出案例
Part05-风哥经验总结与分享
5.1 Spark SQL最佳实践
5.2 性能优化经验总结
Part01-基础概念与理论知识
1.1 Spark SQL概述
Spark SQL是Spark的结构化数据处理模块。更多视频教程www.fgedu.net.cn 提供SQL查询接口和DataFrame API。
1.2 DataFrame与Dataset
DataFrame是带有Schema的分布式数据集合。学习交流加群风哥微信: itpux-com
– 带有Schema的分布式数据集合
– 支持SQL查询
– 自动优化执行计划
– 支持多种数据源
1.3 Catalyst优化器
Catalyst是Spark SQL的查询优化器。from bigdata视频:www.itpux.com
spark-sql –master yarn -e “EXPLAIN EXTENDED SELECT * FROM fgedudb.fgedu_user WHERE dt=’20240118′;”
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘dt = 20240118)
+- ‘UnresolvedRelation [fgedudb, fgedu_user], [], false
== Analyzed Logical Plan ==
user_id: bigint, user_name: string, age: int, gender: string, dt: string
Project [user_id#0L, user_name#1, age#2, gender#3, dt#4]
+- Filter (dt#4 = 20240118)
+- Relation fgedudb.fgedu_user[user_id#0L,user_name#1,age#2,gender#3,dt#4] parquet
== Optimized Logical Plan ==
Project [user_id#0L, user_name#1, age#2, gender#3, dt#4]
+- Filter (isnotnull(dt#4) AND (dt#4 = 20240118))
+- Relation fgedudb.fgedu_user[user_id#0L,user_name#1,age#2,gender#3,dt#4] parquet
Part02-生产环境规划与建议
2.1 数据源规划
Spark SQL支持多种数据源。更多学习教程公众号风哥教程itpux_com
– Parquet:列式存储,适合分析
– ORC:列式存储,压缩比高
– JSON:半结构化数据
– CSV:文本格式
– JDBC:关系型数据库
2.2 Schema设计规划
Schema设计影响查询性能。学习交流加群风哥QQ113257174
spark-sql –master yarn -e “DESCRIBE FORMATTED fgedudb.fgedu_user;”
# 查看分区信息
spark-sql –master yarn -e “SHOW PARTITIONS fgedudb.fgedu_user;”
# col_name data_type comment
user_id bigint
user_name string
age int
gender string
dt string
# Detailed Table Information
Database: fgedudb
Table: fgedu_user
Provider: parquet
# 分区信息
dt=20240117
dt=20240118
Time taken: 0.5 seconds
2.3 性能优化规划
性能优化需要从多个角度考虑。风哥提示:合理的分区和文件大小是性能优化的基础。
– 合理设置分区数
– 控制文件大小
– 使用列式存储
– 开启向量化执行
– 使用分区裁剪
Part03-生产环境项目实施方案
3.1 DataFrame基础操作
3.1.1 创建DataFrame
spark-shell –master yarn –deploy-mode client
// 从Seq创建DataFrame
import spark.implicits._
val df = Seq((1, “fgedu01”, 25), (2, “fgedu02”, 30)).toDF(“id”, “name”, “age”)
df.show()
// 从RDD创建DataFrame
val rdd = sc.parallelize(Seq((1, “fgedu01”, 25), (2, “fgedu02”, 30)))
val dfFromRdd = rdd.toDF(“id”, “name”, “age”)
dfFromRdd.show()
// 从文件创建DataFrame
val dfFromFile = spark.read.parquet(“/bigdata/warehouse/fgedu/user.parquet”)
dfFromFile.show(5)
// 从表创建DataFrame
val dfFromTable = spark.table(“fgedudb.fgedu_user”)
dfFromTable.show(5)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Spark context available as ‘sc’
Spark session available as ‘spark’.
// Seq创建DataFrame
df: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+
// RDD创建DataFrame
dfFromRdd: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+
// 文件创建DataFrame
dfFromFile: org.apache.spark.sql.DataFrame = [user_id: bigint, user_name: string … 3 more fields]
+——-+———+—+——+———-+
|user_id|user_name|age|gender|create_time|
+——-+———+—+——+———-+
| 1| fgedu01| 25| M|2024-01-18|
| 2| fgedu02| 30| F|2024-01-18|
+——-+———+—+——+———-+
// 表创建DataFrame
dfFromTable: org.apache.spark.sql.DataFrame = [user_id: bigint, user_name: string … 4 more fields]
+——-+———+—+——+—+
|user_id|user_name|age|gender| dt|
+——-+———+—+——+—+
| 1| fgedu01| 25| M|…|
| 2| fgedu02| 30| F|…|
+——-+———+—+——+—+
3.1.2 DataFrame查询操作
val selected = df.select(“id”, “name”)
selected.show()
// filter操作
val filtered = df.filter($”age” > 25)
filtered.show()
// groupBy操作
val grouped = df.groupBy(“age”).count()
grouped.show()
// orderBy操作
val ordered = df.orderBy($”age”.desc)
ordered.show()
// join操作
val df1 = Seq((1, “fgedu01”), (2, “fgedu02”)).toDF(“id”, “name”)
val df2 = Seq((1, 100), (2, 200)).toDF(“id”, “amount”)
val joined = df1.join(df2, “id”)
joined.show()
+—+——-+
| id| name|
+—+——-+
| 1|fgedu01|
| 2|fgedu02|
+—+——-+
// filter结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
+—+——-+—+
// groupBy结果
+—+—–+
|age|count|
+—+—–+
| 25| 1|
| 30| 1|
+—+—–+
// orderBy结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
| 1|fgedu01| 25|
+—+——-+—+
// join结果
+—+——-+——+
| id| name|amount|
+—+——-+——+
| 1|fgedu01| 100|
| 2|fgedu02| 200|
+—+——-+——+
3.2 SQL查询开发
3.2.1 创建临时视图
df.createOrReplaceTempView(“users”)
// 执行SQL查询
val result = spark.sql(“SELECT * FROM users WHERE age > 25”)
result.show()
// 创建全局临时视图
df.createGlobalTempView(“global_users”)
// 查询全局临时视图
val globalResult = spark.sql(“SELECT * FROM global_temp.global_users”)
globalResult.show()
// 创建成功
// SQL查询结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
+—+——-+—+
// 创建全局临时视图
// 创建成功
// 全局临时视图查询
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+
3.2.2 复杂SQL查询
spark.sql(“””
SELECT user_id, user_name, age,
ROW_NUMBER() OVER (PARTITION BY gender ORDER BY age DESC) AS rank
FROM fgedudb.fgedu_user
WHERE dt=’20240118′
“””).show(10)
// 子查询
spark.sql(“””
SELECT user_id, total_amount
FROM (
SELECT user_id, SUM(order_amount) AS total_amount
FROM fgedudb.fgedu_order
WHERE dt=’20240118′
GROUP BY user_id
) t
WHERE total_amount > 1000
“””).show(10)
// CTE查询
spark.sql(“””
WITH user_orders AS (
SELECT user_id, SUM(order_amount) AS total_amount
FROM fgedudb.fgedu_order
WHERE dt=’20240118′
GROUP BY user_id
)
SELECT u.user_id, u.user_name, o.total_amount
FROM fgedudb.fgedu_user u
JOIN user_orders o ON u.user_id = o.user_id
WHERE u.dt=’20240118′
“””).show(10)
+——-+———+—+—-+
|user_id|user_name|age|rank|
+——-+———+—+—-+
| 999999| fgedu99| 80| 1|
| 888888| fgedu88| 78| 2|
| 777777| fgedu77| 75| 3|
+——-+———+—+—-+
// 子查询结果
+——-+————-+
|user_id|total_amount|
+——-+————-+
| 100| 5000.00|
| 200| 3000.00|
| 300| 2000.00|
+——-+————-+
// CTE查询结果
+——-+———+————-+
|user_id|user_name|total_amount|
+——-+———+————-+
| 100| fgedu10| 5000.00|
| 200| fgedu20| 3000.00|
| 300| fgedu30| 2000.00|
+——-+———+————-+
3.3 数据源集成
3.3.1 Parquet数据源
val df = spark.read.parquet(“/bigdata/warehouse/fgedu/user.parquet”)
df.show(5)
// 写入Parquet文件
df.write.mode(“overwrite”).parquet(“/bigdata/output/user_parquet”)
// 分区写入
df.write.mode(“overwrite”).partitionBy(“dt”).parquet(“/bigdata/output/user_partitioned”)
// 验证写入
spark.read.parquet(“/bigdata/output/user_parquet”).count()
+——-+———+—+——+
|user_id|user_name|age|gender|
+——-+———+—+——+
| 1| fgedu01| 25| M|
| 2| fgedu02| 30| F|
+——-+———+—+——+
// 写入Parquet
# 写入成功
// 分区写入
# 写入成功
// 验证写入
res0: Long = 1000000
3.3.2 JDBC数据源
val jdbcDF = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load()
jdbcDF.show(5)
// 写入MySQL数据
df.write
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user_backup”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.mode(“overwrite”)
.save()
// 验证写入
spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user_backup”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load().count()
+——-+———+—+——+
|user_id|user_name|age|gender|
+——-+———+—+——+
| 1| fgedu01| 25| M|
| 2| fgedu02| 30| F|
+——-+———+—+——+
// 写入MySQL
# 写入成功
// 验证写入
res1: Long = 1000000
3.4 UDF开发
3.4.1 注册UDF
import org.apache.spark.sql.functions._
val upperCase = udf((s: String) => s.toUpperCase)
spark.udf.register(“upper_case”, upperCase)
// 使用UDF
df.withColumn(“name_upper”, upperCase($”name”)).show()
// SQL中使用UDF
spark.sql(“SELECT id, upper_case(name) as name_upper FROM users”).show()
// 定义聚合UDF
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
spark.udf.register(“my_avg”, functions.udaf((x: Double) => x))
upperCase: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(…)
// 使用UDF
+—+——-+—+———-+
| id| name|age|name_upper|
+—+——-+—+———-+
| 1|fgedu01| 25| FGEDU01|
| 2|fgedu02| 30| FGEDU02|
+—+——-+—+———-+
// SQL中使用UDF
+—+———-+
| id|name_upper|
+—+———-+
| 1| FGEDU01|
| 2| FGEDU02|
+—+———-+
3.4.2 UDF性能优化
spark.conf.set(“spark.sql.optimizer.replaceExceptWithFilter”, true)
// 使用非确定性UDF
val randomStr = udf(() => scala.util.Random.nextString(10))
// 使用向量UDF
import org.apache.spark.sql.functions._
val concatStr = udf((s1: String, s2: String) => s1 + s2)
// 批量注册UDF
val udfs = Map(
“upper_case” -> upperCase,
“concat_str” -> concatStr
)
udfs.foreach { case (name, f) => spark.udf.register(name, f) }
# 配置成功
// 非确定性UDF
randomStr: org.apache.spark.sql.expressions.UserDefinedFunction = …
// 向量UDF
concatStr: org.apache.spark.sql.expressions.UserDefinedFunction = …
// 批量注册
# 注册成功
Part04-生产案例与实战讲解
4.1 数据清洗案例
数据清洗是ETL的核心环节。更多视频教程www.fgedu.net.cn
# spark_data_clean.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
DT=$(date -d “1 day ago” +%Y%m%d)
echo “=== Spark Data Clean for ${DT} ===”
spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–class com.fgedu.spark.DataClean \
/bigdata/app/spark/jars/fgedu-spark.jar \
${DT}
echo “=== Data Clean Completed ===”
./spark_data_clean.sh
=== Spark Data Clean for 20240117 ===
24/01/18 21:00:00 INFO spark.SparkContext: Running Spark version 3.3.2
Data Clean Summary:
Input Records: 1000000
Cleaned Records: 950000
Invalid Records: 50000
Duplicate Records: 10000
Null Records: 40000
Processing Time: 15 minutes
=== Data Clean Completed ===
4.2 数据分析案例
数据分析是数据价值挖掘的关键。学习交流加群风哥微信: itpux-com
val userBehavior = spark.sql(“””
SELECT
user_id,
COUNT(*) AS action_count,
COUNT(DISTINCT date(action_time)) AS active_days,
MIN(action_time) AS first_action,
MAX(action_time) AS last_action
FROM fgedudb.fgedu_user_action
WHERE dt BETWEEN ‘20240101’ AND ‘20240118’
GROUP BY user_id
“””)
userBehavior.show(10)
// 用户留存分析
val retention = spark.sql(“””
WITH first_login AS (
SELECT user_id, MIN(date(action_time)) AS first_date
FROM fgedudb.fgedu_user_action
GROUP BY user_id
),
retention_data AS (
SELECT
f.first_date,
COUNT(DISTINCT CASE WHEN date(a.action_time) = f.first_date THEN a.user_id END) AS day0,
COUNT(DISTINCT CASE WHEN date(a.action_time) = date_add(f.first_date, 1) THEN a.user_id END) AS day1,
COUNT(DISTINCT CASE WHEN date(a.action_time) = date_add(f.first_date, 7) THEN a.user_id END) AS day7
FROM first_login f
LEFT JOIN fgedudb.fgedu_user_action a ON f.user_id = a.user_id
GROUP BY f.first_date
)
SELECT first_date, day0, day1, day7, round(day1/day0*100, 2) as retention_rate
FROM retention_data
ORDER BY first_date
“””)
retention.show(10)
+——-+————+———–+——————-+——————-+
|user_id|action_count|active_days| first_action| last_action|
+——-+————+———–+——————-+——————-+
| 1| 100| 18|2024-01-01 10:00:00|2024-01-18 23:59:59|
| 2| 50| 10|2024-01-01 11:00:00|2024-01-18 22:00:00|
| 3| 30| 5|2024-01-05 09:00:00|2024-01-18 21:00:00|
+——-+————+———–+——————-+——————-+
// 用户留存分析结果
+———-+—–+—–+—–+————–+
|first_date| day0| day1| day7|retention_rate|
+———-+—–+—–+—–+————–+
|2024-01-01|50000|25000|10000| 50.00|
|2024-01-02|52000|26000|10500| 50.00|
|2024-01-03|48000|24000| 9600| 50.00|
+———-+—–+—–+—–+————–+
4.3 数据导出案例
4.3.1 导出到MySQL
val result = spark.sql(“””
SELECT user_id, user_name, total_amount
FROM fgedudb.fgedu_user_summary
WHERE dt=’20240118′
“””)
result.write
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/report_db”)
.option(“dbtable”, “user_summary_report”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.option(“batchsize”, “10000”)
.mode(“overwrite”)
.save()
// 验证导出
spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/report_db”)
.option(“dbtable”, “user_summary_report”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load().count()
# 导出成功
// 验证导出
res2: Long = 500000
# 导出记录数: 500000
4.3.2 导出到CSV
val result = spark.sql(“””
SELECT user_id, user_name, total_amount
FROM fgedudb.fgedu_user_summary
WHERE dt=’20240118′
“””)
result.write
.option(“header”, “true”)
.option(“delimiter”, “,”)
.mode(“overwrite”)
.csv(“/bigdata/export/user_summary_csv”)
// 验证导出
spark.read
.option(“header”, “true”)
.csv(“/bigdata/export/user_summary_csv”)
.count()
# 导出成功
// 验证导出
res3: Long = 500000
# 导出记录数: 500000
Part05-风哥经验总结与分享
5.1 Spark SQL最佳实践
在实际生产环境中,Spark SQL使用需要注意以下几点:from bigdata视频:www.itpux.com
1. 使用DataFrame代替RDD
2. 合理设置分区数
3. 使用分区裁剪
4. 避免数据倾斜
5. 使用缓存优化重复查询
5.2 性能优化经验总结
5.2.1 优化建议
– 使用列式存储格式
– 开启向量化执行
– 合理设置并行度
– 使用广播JOIN
– 避免使用UDF
5.2.2 Spark SQL运维脚本
# spark_sql_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Spark SQL Maintenance ===”
echo “Date: $(date)”
# 1. 清理缓存
echo “=== Clear Cache ===”
spark-sql -e “CLEAR CACHE;”
# 2. 分析表统计信息
echo “=== Analyze Tables ===”
spark-sql -e “ANALYZE TABLE fgedudb.fgedu_user COMPUTE STATISTICS FOR COLUMNS;”
# 3. 清理临时文件
echo “=== Clean Temp Files ===”
hdfs dfs -rm -r -skipTrash /tmp/spark-* 2>/dev/null
# 4. 检查表健康
echo “=== Check Table Health ===”
spark-sql -e “SHOW TABLES IN fgedudb;” | while read table; do
spark-sql -e “DESCRIBE FORMATTED ${table};”
done
echo “=== Maintenance Completed ===”
./spark_sql_maintenance.sh
=== Spark SQL Maintenance ===
Date: Thu Jan 18 22:00:00 CST 2024
=== Clear Cache ===
# 清理完成
=== Analyze Tables ===
OK
=== Clean Temp Files ===
# 清理完成
=== Check Table Health ===
fgedu_order OK
fgedu_user OK
=== Maintenance Completed ===
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
