1. 首页 > Hadoop教程 > 正文

大数据教程FG029-Spark SQL与DataFrame开发实战

内容简介:本文详细介绍Spark SQL与DataFrame开发实战,包括DataFrame操作、SQL查询、数据源集成、性能优化等核心内容。风哥教程参考Spark官方文档SQL Programming Guide、Performance Tuning等内容。

目录大纲

Part01-基础概念与理论知识
  1.1 Spark SQL概述
  1.2 DataFrame与Dataset
  1.3 Catalyst优化器
Part02-生产环境规划与建议
  2.1 数据源规划
  2.2 Schema设计规划
  2.3 性能优化规划
Part03-生产环境项目实施方案
  3.1 DataFrame基础操作
  3.2 SQL查询开发
  3.3 数据源集成
  3.4 UDF开发
Part04-生产案例与实战讲解
  4.1 数据清洗案例
  4.2 数据分析案例
  4.3 数据导出案例
Part05-风哥经验总结与分享
  5.1 Spark SQL最佳实践
  5.2 性能优化经验总结

Part01-基础概念与理论知识

1.1 Spark SQL概述

Spark SQL是Spark的结构化数据处理模块。更多视频教程www.fgedu.net.cn 提供SQL查询接口和DataFrame API。

风哥提示:Spark SQL比RDD更高效,推荐使用DataFrame和Dataset进行数据处理。

1.2 DataFrame与Dataset

DataFrame是带有Schema的分布式数据集合。学习交流加群风哥微信: itpux-com

DataFrame特点:
– 带有Schema的分布式数据集合
– 支持SQL查询
– 自动优化执行计划
– 支持多种数据源

1.3 Catalyst优化器

Catalyst是Spark SQL的查询优化器。from bigdata视频:www.itpux.com

# 查看执行计划
spark-sql –master yarn -e “EXPLAIN EXTENDED SELECT * FROM fgedudb.fgedu_user WHERE dt=’20240118′;”

# 执行计划
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘dt = 20240118)
+- ‘UnresolvedRelation [fgedudb, fgedu_user], [], false

== Analyzed Logical Plan ==
user_id: bigint, user_name: string, age: int, gender: string, dt: string
Project [user_id#0L, user_name#1, age#2, gender#3, dt#4]
+- Filter (dt#4 = 20240118)
+- Relation fgedudb.fgedu_user[user_id#0L,user_name#1,age#2,gender#3,dt#4] parquet

== Optimized Logical Plan ==
Project [user_id#0L, user_name#1, age#2, gender#3, dt#4]
+- Filter (isnotnull(dt#4) AND (dt#4 = 20240118))
+- Relation fgedudb.fgedu_user[user_id#0L,user_name#1,age#2,gender#3,dt#4] parquet

Part02-生产环境规划与建议

2.1 数据源规划

Spark SQL支持多种数据源。更多学习教程公众号风哥教程itpux_com

数据源支持:
– Parquet:列式存储,适合分析
– ORC:列式存储,压缩比高
– JSON:半结构化数据
– CSV:文本格式
– JDBC:关系型数据库

2.2 Schema设计规划

Schema设计影响查询性能。学习交流加群风哥QQ113257174

# 查看表Schema
spark-sql –master yarn -e “DESCRIBE FORMATTED fgedudb.fgedu_user;”

# 查看分区信息
spark-sql –master yarn -e “SHOW PARTITIONS fgedudb.fgedu_user;”

# 表Schema
# col_name data_type comment
user_id bigint
user_name string
age int
gender string
dt string

# Detailed Table Information
Database: fgedudb
Table: fgedu_user
Provider: parquet

# 分区信息
dt=20240117
dt=20240118
Time taken: 0.5 seconds

2.3 性能优化规划

性能优化需要从多个角度考虑。风哥提示:合理的分区和文件大小是性能优化的基础。

性能优化要点:
– 合理设置分区数
– 控制文件大小
– 使用列式存储
– 开启向量化执行
– 使用分区裁剪

Part03-生产环境项目实施方案

3.1 DataFrame基础操作

3.1.1 创建DataFrame

# 启动spark-shell
spark-shell –master yarn –deploy-mode client

// 从Seq创建DataFrame
import spark.implicits._
val df = Seq((1, “fgedu01”, 25), (2, “fgedu02”, 30)).toDF(“id”, “name”, “age”)
df.show()

// 从RDD创建DataFrame
val rdd = sc.parallelize(Seq((1, “fgedu01”, 25), (2, “fgedu02”, 30)))
val dfFromRdd = rdd.toDF(“id”, “name”, “age”)
dfFromRdd.show()

// 从文件创建DataFrame
val dfFromFile = spark.read.parquet(“/bigdata/warehouse/fgedu/user.parquet”)
dfFromFile.show(5)

// 从表创建DataFrame
val dfFromTable = spark.table(“fgedudb.fgedu_user”)
dfFromTable.show(5)

# spark-shell启动
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Spark context available as ‘sc’
Spark session available as ‘spark’.

// Seq创建DataFrame
df: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+

// RDD创建DataFrame
dfFromRdd: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+

// 文件创建DataFrame
dfFromFile: org.apache.spark.sql.DataFrame = [user_id: bigint, user_name: string … 3 more fields]
+——-+———+—+——+———-+
|user_id|user_name|age|gender|create_time|
+——-+———+—+——+———-+
| 1| fgedu01| 25| M|2024-01-18|
| 2| fgedu02| 30| F|2024-01-18|
+——-+———+—+——+———-+

// 表创建DataFrame
dfFromTable: org.apache.spark.sql.DataFrame = [user_id: bigint, user_name: string … 4 more fields]
+——-+———+—+——+—+
|user_id|user_name|age|gender| dt|
+——-+———+—+——+—+
| 1| fgedu01| 25| M|…|
| 2| fgedu02| 30| F|…|
+——-+———+—+——+—+

3.1.2 DataFrame查询操作

// select操作
val selected = df.select(“id”, “name”)
selected.show()

// filter操作
val filtered = df.filter($”age” > 25)
filtered.show()

// groupBy操作
val grouped = df.groupBy(“age”).count()
grouped.show()

// orderBy操作
val ordered = df.orderBy($”age”.desc)
ordered.show()

// join操作
val df1 = Seq((1, “fgedu01”), (2, “fgedu02”)).toDF(“id”, “name”)
val df2 = Seq((1, 100), (2, 200)).toDF(“id”, “amount”)
val joined = df1.join(df2, “id”)
joined.show()

// select结果
+—+——-+
| id| name|
+—+——-+
| 1|fgedu01|
| 2|fgedu02|
+—+——-+

// filter结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
+—+——-+—+

// groupBy结果
+—+—–+
|age|count|
+—+—–+
| 25| 1|
| 30| 1|
+—+—–+

// orderBy结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
| 1|fgedu01| 25|
+—+——-+—+

// join结果
+—+——-+——+
| id| name|amount|
+—+——-+——+
| 1|fgedu01| 100|
| 2|fgedu02| 200|
+—+——-+——+

3.2 SQL查询开发

3.2.1 创建临时视图

// 创建临时视图
df.createOrReplaceTempView(“users”)

// 执行SQL查询
val result = spark.sql(“SELECT * FROM users WHERE age > 25”)
result.show()

// 创建全局临时视图
df.createGlobalTempView(“global_users”)

// 查询全局临时视图
val globalResult = spark.sql(“SELECT * FROM global_temp.global_users”)
globalResult.show()

// 创建临时视图
// 创建成功

// SQL查询结果
+—+——-+—+
| id| name|age|
+—+——-+—+
| 2|fgedu02| 30|
+—+——-+—+

// 创建全局临时视图
// 创建成功

// 全局临时视图查询
+—+——-+—+
| id| name|age|
+—+——-+—+
| 1|fgedu01| 25|
| 2|fgedu02| 30|
+—+——-+—+

3.2.2 复杂SQL查询

// 窗口函数
spark.sql(“””
SELECT user_id, user_name, age,
ROW_NUMBER() OVER (PARTITION BY gender ORDER BY age DESC) AS rank
FROM fgedudb.fgedu_user
WHERE dt=’20240118′
“””).show(10)

// 子查询
spark.sql(“””
SELECT user_id, total_amount
FROM (
SELECT user_id, SUM(order_amount) AS total_amount
FROM fgedudb.fgedu_order
WHERE dt=’20240118′
GROUP BY user_id
) t
WHERE total_amount > 1000
“””).show(10)

// CTE查询
spark.sql(“””
WITH user_orders AS (
SELECT user_id, SUM(order_amount) AS total_amount
FROM fgedudb.fgedu_order
WHERE dt=’20240118′
GROUP BY user_id
)
SELECT u.user_id, u.user_name, o.total_amount
FROM fgedudb.fgedu_user u
JOIN user_orders o ON u.user_id = o.user_id
WHERE u.dt=’20240118′
“””).show(10)

// 窗口函数结果
+——-+———+—+—-+
|user_id|user_name|age|rank|
+——-+———+—+—-+
| 999999| fgedu99| 80| 1|
| 888888| fgedu88| 78| 2|
| 777777| fgedu77| 75| 3|
+——-+———+—+—-+

// 子查询结果
+——-+————-+
|user_id|total_amount|
+——-+————-+
| 100| 5000.00|
| 200| 3000.00|
| 300| 2000.00|
+——-+————-+

// CTE查询结果
+——-+———+————-+
|user_id|user_name|total_amount|
+——-+———+————-+
| 100| fgedu10| 5000.00|
| 200| fgedu20| 3000.00|
| 300| fgedu30| 2000.00|
+——-+———+————-+

3.3 数据源集成

3.3.1 Parquet数据源

// 读取Parquet文件
val df = spark.read.parquet(“/bigdata/warehouse/fgedu/user.parquet”)
df.show(5)

// 写入Parquet文件
df.write.mode(“overwrite”).parquet(“/bigdata/output/user_parquet”)

// 分区写入
df.write.mode(“overwrite”).partitionBy(“dt”).parquet(“/bigdata/output/user_partitioned”)

// 验证写入
spark.read.parquet(“/bigdata/output/user_parquet”).count()

// 读取Parquet
+——-+———+—+——+
|user_id|user_name|age|gender|
+——-+———+—+——+
| 1| fgedu01| 25| M|
| 2| fgedu02| 30| F|
+——-+———+—+——+

// 写入Parquet
# 写入成功

// 分区写入
# 写入成功

// 验证写入
res0: Long = 1000000

3.3.2 JDBC数据源

// 读取MySQL数据
val jdbcDF = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load()
jdbcDF.show(5)

// 写入MySQL数据
df.write
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user_backup”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.mode(“overwrite”)
.save()

// 验证写入
spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/fgedudb”)
.option(“dbtable”, “fgedu_user_backup”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load().count()

// 读取MySQL
+——-+———+—+——+
|user_id|user_name|age|gender|
+——-+———+—+——+
| 1| fgedu01| 25| M|
| 2| fgedu02| 30| F|
+——-+———+—+——+

// 写入MySQL
# 写入成功

// 验证写入
res1: Long = 1000000

3.4 UDF开发

3.4.1 注册UDF

// 定义Scala UDF
import org.apache.spark.sql.functions._

val upperCase = udf((s: String) => s.toUpperCase)
spark.udf.register(“upper_case”, upperCase)

// 使用UDF
df.withColumn(“name_upper”, upperCase($”name”)).show()

// SQL中使用UDF
spark.sql(“SELECT id, upper_case(name) as name_upper FROM users”).show()

// 定义聚合UDF
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

spark.udf.register(“my_avg”, functions.udaf((x: Double) => x))

// 定义UDF
upperCase: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(…)

// 使用UDF
+—+——-+—+———-+
| id| name|age|name_upper|
+—+——-+—+———-+
| 1|fgedu01| 25| FGEDU01|
| 2|fgedu02| 30| FGEDU02|
+—+——-+—+———-+

// SQL中使用UDF
+—+———-+
| id|name_upper|
+—+———-+
| 1| FGEDU01|
| 2| FGEDU02|
+—+———-+

3.4.2 UDF性能优化

// 开启UDF优化
spark.conf.set(“spark.sql.optimizer.replaceExceptWithFilter”, true)

// 使用非确定性UDF
val randomStr = udf(() => scala.util.Random.nextString(10))

// 使用向量UDF
import org.apache.spark.sql.functions._
val concatStr = udf((s1: String, s2: String) => s1 + s2)

// 批量注册UDF
val udfs = Map(
“upper_case” -> upperCase,
“concat_str” -> concatStr
)
udfs.foreach { case (name, f) => spark.udf.register(name, f) }

// UDF优化配置
# 配置成功

// 非确定性UDF
randomStr: org.apache.spark.sql.expressions.UserDefinedFunction = …

// 向量UDF
concatStr: org.apache.spark.sql.expressions.UserDefinedFunction = …

// 批量注册
# 注册成功

Part04-生产案例与实战讲解

4.1 数据清洗案例

数据清洗是ETL的核心环节。更多视频教程www.fgedu.net.cn

#!/bin/bash
# spark_data_clean.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

DT=$(date -d “1 day ago” +%Y%m%d)
echo “=== Spark Data Clean for ${DT} ===”

spark-submit \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 8g \
–executor-cores 4 \
–num-executors 10 \
–class com.fgedu.spark.DataClean \
/bigdata/app/spark/jars/fgedu-spark.jar \
${DT}

echo “=== Data Clean Completed ===”

# 数据清洗执行
./spark_data_clean.sh
=== Spark Data Clean for 20240117 ===

24/01/18 21:00:00 INFO spark.SparkContext: Running Spark version 3.3.2

Data Clean Summary:
Input Records: 1000000
Cleaned Records: 950000
Invalid Records: 50000
Duplicate Records: 10000
Null Records: 40000

Processing Time: 15 minutes

=== Data Clean Completed ===

4.2 数据分析案例

数据分析是数据价值挖掘的关键。学习交流加群风哥微信: itpux-com

// 用户行为分析
val userBehavior = spark.sql(“””
SELECT
user_id,
COUNT(*) AS action_count,
COUNT(DISTINCT date(action_time)) AS active_days,
MIN(action_time) AS first_action,
MAX(action_time) AS last_action
FROM fgedudb.fgedu_user_action
WHERE dt BETWEEN ‘20240101’ AND ‘20240118’
GROUP BY user_id
“””)
userBehavior.show(10)

// 用户留存分析
val retention = spark.sql(“””
WITH first_login AS (
SELECT user_id, MIN(date(action_time)) AS first_date
FROM fgedudb.fgedu_user_action
GROUP BY user_id
),
retention_data AS (
SELECT
f.first_date,
COUNT(DISTINCT CASE WHEN date(a.action_time) = f.first_date THEN a.user_id END) AS day0,
COUNT(DISTINCT CASE WHEN date(a.action_time) = date_add(f.first_date, 1) THEN a.user_id END) AS day1,
COUNT(DISTINCT CASE WHEN date(a.action_time) = date_add(f.first_date, 7) THEN a.user_id END) AS day7
FROM first_login f
LEFT JOIN fgedudb.fgedu_user_action a ON f.user_id = a.user_id
GROUP BY f.first_date
)
SELECT first_date, day0, day1, day7, round(day1/day0*100, 2) as retention_rate
FROM retention_data
ORDER BY first_date
“””)
retention.show(10)

// 用户行为分析结果
+——-+————+———–+——————-+——————-+
|user_id|action_count|active_days| first_action| last_action|
+——-+————+———–+——————-+——————-+
| 1| 100| 18|2024-01-01 10:00:00|2024-01-18 23:59:59|
| 2| 50| 10|2024-01-01 11:00:00|2024-01-18 22:00:00|
| 3| 30| 5|2024-01-05 09:00:00|2024-01-18 21:00:00|
+——-+————+———–+——————-+——————-+

// 用户留存分析结果
+———-+—–+—–+—–+————–+
|first_date| day0| day1| day7|retention_rate|
+———-+—–+—–+—–+————–+
|2024-01-01|50000|25000|10000| 50.00|
|2024-01-02|52000|26000|10500| 50.00|
|2024-01-03|48000|24000| 9600| 50.00|
+———-+—–+—–+—–+————–+

4.3 数据导出案例

4.3.1 导出到MySQL

// 导出到MySQL
val result = spark.sql(“””
SELECT user_id, user_name, total_amount
FROM fgedudb.fgedu_user_summary
WHERE dt=’20240118′
“””)

result.write
.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/report_db”)
.option(“dbtable”, “user_summary_report”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.option(“batchsize”, “10000”)
.mode(“overwrite”)
.save()

// 验证导出
spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://fgedu01:3306/report_db”)
.option(“dbtable”, “user_summary_report”)
.option(“user”, “fgedu”)
.option(“password”, “fgedu123”)
.load().count()

// 导出执行
# 导出成功

// 验证导出
res2: Long = 500000
# 导出记录数: 500000

4.3.2 导出到CSV

// 导出到CSV
val result = spark.sql(“””
SELECT user_id, user_name, total_amount
FROM fgedudb.fgedu_user_summary
WHERE dt=’20240118′
“””)

result.write
.option(“header”, “true”)
.option(“delimiter”, “,”)
.mode(“overwrite”)
.csv(“/bigdata/export/user_summary_csv”)

// 验证导出
spark.read
.option(“header”, “true”)
.csv(“/bigdata/export/user_summary_csv”)
.count()

// 导出执行
# 导出成功

// 验证导出
res3: Long = 500000
# 导出记录数: 500000

Part05-风哥经验总结与分享

5.1 Spark SQL最佳实践

在实际生产环境中,Spark SQL使用需要注意以下几点:from bigdata视频:www.itpux.com

风哥经验总结:
1. 使用DataFrame代替RDD
2. 合理设置分区数
3. 使用分区裁剪
4. 避免数据倾斜
5. 使用缓存优化重复查询

5.2 性能优化经验总结

5.2.1 优化建议

风哥提示:Spark SQL性能优化需要从数据源、查询、执行三个层面考虑。

性能优化要点:
– 使用列式存储格式
– 开启向量化执行
– 合理设置并行度
– 使用广播JOIN
– 避免使用UDF

5.2.2 Spark SQL运维脚本

#!/bin/bash
# spark_sql_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== Spark SQL Maintenance ===”
echo “Date: $(date)”

# 1. 清理缓存
echo “=== Clear Cache ===”
spark-sql -e “CLEAR CACHE;”

# 2. 分析表统计信息
echo “=== Analyze Tables ===”
spark-sql -e “ANALYZE TABLE fgedudb.fgedu_user COMPUTE STATISTICS FOR COLUMNS;”

# 3. 清理临时文件
echo “=== Clean Temp Files ===”
hdfs dfs -rm -r -skipTrash /tmp/spark-* 2>/dev/null

# 4. 检查表健康
echo “=== Check Table Health ===”
spark-sql -e “SHOW TABLES IN fgedudb;” | while read table; do
spark-sql -e “DESCRIBE FORMATTED ${table};”
done

echo “=== Maintenance Completed ===”

# 维护执行
./spark_sql_maintenance.sh
=== Spark SQL Maintenance ===
Date: Thu Jan 18 22:00:00 CST 2024

=== Clear Cache ===
# 清理完成

=== Analyze Tables ===
OK

=== Clean Temp Files ===
# 清理完成

=== Check Table Health ===
fgedu_order OK
fgedu_user OK

=== Maintenance Completed ===

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息