本文档风哥主要介绍Spark SQL与DataFrame实战,包括Spark SQL核心概念、DataFrame创建与操作、SQL查询、数据源连接等内容,风哥教程参考Spark官方文档Spark SQL Guide、DataFrame API等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Spark SQL核心概念
Spark SQL是Spark用于处理结构化数据的模块,提供了DataFrame和Dataset API,以及SQL查询能力。学习交流加群风哥微信: itpux-com
- SparkSession:Spark SQL的入口点,统一了SQLContext和HiveContext
- DataFrame:分布式Row的集合,带有Schema信息
- Dataset:类型安全的分布式对象集合
- Catalyst优化器:查询优化引擎,自动优化SQL执行计划
- Tungsten执行引擎:内存管理和代码生成优化
1.2 DataFrame与Dataset概念
DataFrame与Dataset详解:
1. 分布式数据集合
2. 带有Schema信息(列名、类型)
3. 类似于数据库表或R/Python DataFrame
4. 惰性执行
5. 支持SQL查询
# Dataset特点
1. 类型安全的分布式对象集合
2. Scala/Java强类型API
3. DataFrame = Dataset[Row]
4. 编译时类型检查
5. 性能优化
# DataFrame vs RDD
| 特性 | DataFrame | RDD |
|————|———–|————|
| Schema | 有 | 无 |
| 优化 | Catalyst | 手动 |
| 类型安全 | 弱 | 强 |
| 性能 | 高 | 中 |
| API | SQL/DSL | 函数式 |
# SparkSession创建
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName(“fgedu-spark-sql”)
.master(“spark://192.168.1.60:7077”)
.config(“spark.sql.warehouse.dir”, “hdfs://192.168.1.60:9000/spark-warehouse”)
.enableHiveSupport()
.getOrCreate()
1.3 Spark SQL核心特性
Spark SQL核心特性:
- 统一数据访问:支持多种数据源(JSON、Parquet、ORC、JDBC等)
- 兼容Hive:支持HiveQL查询和Hive UDF
- 标准连接:支持JDBC/ODBC连接
- 自动优化:Catalyst优化器自动优化查询
- 代码生成:Tungsten引擎生成优化的字节码
Part02-生产环境规划与建议
2.1 Schema设计原则
Schema设计原则建议:
1. 选择合适的数据类型
– 整数:IntegerType, LongType
– 浮点:FloatType, DoubleType
– 字符串:StringType
– 日期:DateType, TimestampType
– 布尔:BooleanType
2. 合理命名
– 使用有意义的列名
– 遵循命名规范
– 避免使用保留字
3. 处理空值
– 明确是否允许空值
– 使用Option类型
– 设置默认值
4. 分区设计
– 按日期分区
– 按业务字段分区
– 避免分区过多
# Schema定义示例
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField(“id”, IntegerType, nullable = false),
StructField(“name”, StringType, nullable = true),
StructField(“age”, IntegerType, nullable = true),
StructField(“salary”, DoubleType, nullable = true),
StructField(“hire_date”, DateType, nullable = true)
))
2.2 数据源规划
数据源规划建议:
1. 文件格式
– Parquet: 列式存储,推荐
– ORC: 列式存储,Hive优化
– JSON: 半结构化数据
– CSV: 文本格式
– Avro: 序列化格式
2. 数据库
– MySQL: JDBC连接
– PostgreSQL: JDBC连接
– Oracle: JDBC连接
3. 大数据存储
– HDFS: 分布式文件系统
– HBase: NoSQL数据库
– Hive: 数据仓库
4. 消息队列
– Kafka: 流数据
# 数据源选择建议
– 数据仓库:Parquet + Hive
– 实时处理:Kafka
– 关系数据:JDBC
– 日志分析:JSON
2.3 查询优化规划
查询优化规划建议:
1. 谓词下推
– 过滤条件尽早执行
– 减少数据传输量
2. 列裁剪
– 只读取需要的列
– 减少IO开销
3. 分区裁剪
– 只扫描需要的分区
– 减少数据扫描量
4. Join优化
– 广播小表
– 使用合适的Join类型
– 避免笛卡尔积
5. 聚合优化
– 使用reduceByKey
– 本地聚合减少Shuffle
# 配置优化
spark.sql.autoBroadcastJoinThreshold=10485760 # 广播阈值
spark.sql.shuffle.partitions=200 # Shuffle分区数
spark.sql.adaptive.enabled=true # 自适应查询
Part03-生产环境项目实施方案
3.1 DataFrame创建实战
3.1.1 从JSON文件创建DataFrame
$ /bigdata/app/spark/bin/spark-shell \
–master spark://192.168.1.60:7077 \
–executor-memory 4g
# 创建测试JSON数据
$ cat > /bigdata/spark-data/fgedu-employees.json << 'EOF'
{"id": 1, "name": "fgedu01", "age": 25, "department": "IT", "salary": 10000}
{"id": 2, "name": "fgedu02", "age": 30, "department": "HR", "salary": 8000}
{"id": 3, "name": "fgedu03", "age": 28, "department": "IT", "salary": 12000}
{"id": 4, "name": "fgedu04", "age": 35, "department": "Finance", "salary": 15000}
{"id": 5, "name": "fgedu05", "age": 27, "department": "IT", "salary": 11000}
EOF
$ hdfs dfs -put /bigdata/spark-data/fgedu-employees.json /spark-data/
# 读取JSON文件
scala> val df = spark.read.json(“hdfs://192.168.1.60:9000/spark-data/fgedu-employees.json”)
df: org.apache.spark.sql.DataFrame = [age: bigint, department: string … 3 more fields]
# 查看Schema
scala> df.printSchema()
root
|– age: long (nullable = true)
|– department: string (nullable = true)
|– id: long (nullable = true)
|– name: string (nullable = true)
|– salary: long (nullable = true)
# 查看数据
scala> df.show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 25| IT| 1|fgedu01| 10000|
| 30| HR| 2|fgedu02| 8000|
| 28| IT| 3|fgedu03| 12000|
| 35| Finance| 4|fgedu04| 15000|
| 27| IT| 5|fgedu05| 11000|
+—+———-+—+——-+——+
3.1.2 从CSV文件创建DataFrame
$ cat > /bigdata/spark-data/fgedu-orders.csv << 'EOF' order_id,customer_id,product_id,quantity,amount,order_date 1001,C001,P001,2,200.00,2026-01-01 1002,C002,P002,1,150.00,2026-01-02 1003,C001,P003,3,450.00,2026-01-03 1004,C003,P001,1,100.00,2026-01-04 1005,C002,P002,2,300.00,2026-01-05 EOF $ hdfs dfs -put /bigdata/spark-data/fgedu-orders.csv /spark-data/ # 读取CSV文件 scala> val ordersDF = spark.read
| .option(“header”, “true”)
| .option(“inferSchema”, “true”)
| .csv(“hdfs://192.168.1.60:9000/spark-data/fgedu-orders.csv”)
ordersDF: org.apache.spark.sql.DataFrame = [order_id: int, customer_id: string … 4 more fields]
scala> ordersDF.printSchema()
root
|– order_id: integer (nullable = true)
|– customer_id: string (nullable = true)
|– product_id: string (nullable = true)
|– quantity: integer (nullable = true)
|– amount: double (nullable = true)
|– order_date: string (nullable = true)
scala> ordersDF.show()
+——–+———–+———-+——–+——+———-+
|order_id|customer_id|product_id|quantity|amount|order_date|
+——–+———–+———-+——–+——+———-+
| 1001| C001| P001| 2| 200.0| 2026-01-01|
| 1002| C002| P002| 1| 150.0| 2026-01-02|
| 1003| C001| P003| 3| 450.0| 2026-01-03|
| 1004| C003| P001| 1| 100.0| 2026-01-04|
| 1005| C002| P002| 2| 300.0| 2026-01-05|
+——–+———–+———-+——–+——+———-+
3.1.3 从RDD创建DataFrame
scala> import spark.implicits._
import spark.implicits._
scala> val rdd = sc.parallelize(Seq(
| (1, “fgedu01”, 25, 10000.0),
| (2, “fgedu02”, 30, 12000.0),
| (3, “fgedu03”, 28, 11000.0)
| ))
rdd: org.apache.spark.rdd.RDD[(Int, String, Int, Double)] = ParallelCollectionRDD[0] at parallelize at
scala> val dfFromRDD = rdd.toDF(“id”, “name”, “age”, “salary”)
dfFromRDD: org.apache.spark.sql.DataFrame = [id: int, name: string … 2 more fields]
scala> dfFromRDD.show()
+—+——-+—+——-+
| id| name|age| salary|
+—+——-+—+——-+
| 1|fgedu01| 25|10000.0|
| 2|fgedu02| 30|12000.0|
| 3|fgedu03| 28|11000.0|
+—+——-+—+——-+
# 使用createDataFrame
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = StructType(Array(
| StructField(“id”, IntegerType, nullable = false),
| StructField(“name”, StringType, nullable = true),
| StructField(“age”, IntegerType, nullable = true),
| StructField(“salary”, DoubleType, nullable = true)
| ))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(salary,DoubleType,true))
scala> val rowRDD = rdd.map(t => Row(t._1, t._2, t._3, t._4))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map at
scala> val dfWithSchema = spark.createDataFrame(rowRDD, schema)
dfWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string … 2 more fields]
scala> dfWithSchema.show()
+—+——-+—+——-+
| id| name|age| salary|
+—+——-+—+——-+
| 1|fgedu01| 25|10000.0|
| 2|fgedu02| 30|12000.0|
| 3|fgedu03| 28|11000.0|
+—+——-+—+——-+
3.2 DataFrame操作实战
3.2.1 基本操作
scala> df.select(“name”, “age”, “salary”).show()
+——-+—+——+
| name|age|salary|
+——-+—+——+
|fgedu01| 25| 10000|
|fgedu02| 30| 8000|
|fgedu03| 28| 12000|
|fgedu04| 35| 15000|
|fgedu05| 27| 11000|
+——-+—+——+
# 过滤数据
scala> df.filter($”age” > 28).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 30| HR| 2|fgedu02| 8000|
| 35| Finance| 4|fgedu04| 15000|
+—+———-+—+——-+——+
# 分组聚合
scala> df.groupBy(“department”)
| .agg(
| count(“id”).alias(“employee_count”),
| avg(“salary”).alias(“avg_salary”),
| max(“salary”).alias(“max_salary”)
| )
| .show()
+———-+————–+———-+———-+
|department|employee_count|avg_salary|max_salary|
+———-+————–+———-+———-+
| HR| 1| 8000.0| 8000|
| Finance| 1| 15000.0| 15000|
| IT| 3| 11000.0| 12000|
+———-+————–+———-+———-+
# 排序
scala> df.orderBy($”salary”.desc).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 35| Finance| 4|fgedu04| 15000|
| 28| IT| 3|fgedu03| 12000|
| 27| IT| 5|fgedu05| 11000|
| 25| IT| 1|fgedu01| 10000|
| 30| HR| 2|fgedu02| 8000|
+—+———-+—+——-+——+
# 添加列
scala> df.withColumn(“bonus”, $”salary” * 0.1).show()
+—+———-+—+——-+——+——+
|age|department| id| name|salary| bonus|
+—+———-+—+——-+——+——+
| 25| IT| 1|fgedu01| 10000|1000.0|
| 30| HR| 2|fgedu02| 8000| 800.0|
| 28| IT| 3|fgedu03| 12000|1200.0|
| 35| Finance| 4|fgedu04| 15000|1500.0|
| 27| IT| 5|fgedu05| 11000|1100.0|
+—+———-+—+——-+——+——+
# 重命名列
scala> df.withColumnRenamed(“department”, “dept”).show()
+—+——-+—+——-+——+
|age| dept| id| name|salary|
+—+——-+—+——-+——+
| 25| IT| 1|fgedu01| 10000|
| 30| HR| 2|fgedu02| 8000|
| 28| IT| 3|fgedu03| 12000|
| 35|Finance| 4|fgedu04| 15000|
| 27| IT| 5|fgedu05| 11000|
+—+——-+—+——-+——+
3.2.2 高级操作
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
# 条件表达式
scala> df.withColumn(“level”,
| when($”salary” >= 12000, “Senior”)
| .when($”salary” >= 10000, “Middle”)
| .otherwise(“Junior”)
| ).show()
+—+———-+—+——-+——+——+
|age|department| id| name|salary| level|
+—+———-+—+——-+——+——+
| 25| IT| 1|fgedu01| 10000|Middle|
| 30| HR| 2|fgedu02| 8000|Junior|
| 28| IT| 3|fgedu03| 12000|Senior|
| 35| Finance| 4|fgedu04| 15000|Senior|
| 27| IT| 5|fgedu05| 11000|Middle|
+—+———-+—+——-+——+——+
# 窗口函数
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val windowSpec = Window.partitionBy(“department”).orderBy($”salary”.desc)
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7b5a12ae
scala> df.withColumn(“rank”, rank().over(windowSpec)).show()
+—+———-+—+——-+——+—-+
|age|department| id| name|salary|rank|
+—+———-+—+——-+——+—-+
| 35| Finance| 4|fgedu04| 15000| 1|
| 30| HR| 2|fgedu02| 8000| 1|
| 28| IT| 3|fgedu03| 12000| 1|
| 27| IT| 5|fgedu05| 11000| 2|
| 25| IT| 1|fgedu01| 10000| 3|
+—+———-+—+——-+——+—-+
# 去重
scala> df.dropDuplicates(“department”).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 30| HR| 2|fgedu02| 8000|
| 35| Finance| 4|fgedu04| 15000|
| 25| IT| 1|fgedu01| 10000|
+—+———-+—+——-+——+
# 空值处理
scala> val dfWithNull = df.withColumn(“bonus”, when($”salary” > 10000, $”salary” * 0.1))
dfWithNull: org.apache.spark.sql.DataFrame = [age: bigint, department: string … 4 more fields]
scala> dfWithNull.na.fill(0, Seq(“bonus”)).show()
+—+———-+—+——-+——+——+
|age|department| id| name|salary| bonus|
+—+———-+—+——-+——+——+
| 25| IT| 1|fgedu01| 10000| 0.0|
| 30| HR| 2|fgedu02| 8000| 0.0|
| 28| IT| 3|fgedu03| 12000|1200.0|
| 35| Finance| 4|fgedu04| 15000|1500.0|
| 27| IT| 5|fgedu05| 11000|1100.0|
+—+———-+—+——-+——+——+
3.3 SQL查询实战
scala> df.createOrReplaceTempView(“employees”)
# 执行SQL查询
scala> spark.sql(“SELECT * FROM employees WHERE salary > 10000”).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 28| IT| 3|fgedu03| 12000|
| 35| Finance| 4|fgedu04| 15000|
| 27| IT| 5|fgedu05| 11000|
+—+———-+—+——-+——+
# 分组统计
scala> spark.sql(“””
| SELECT department,
| COUNT(*) as employee_count,
| AVG(salary) as avg_salary
| FROM employees
| GROUP BY department
| ORDER BY avg_salary DESC
| “””).show()
+———-+————–+———-+
|department|employee_count|avg_salary|
+———-+————–+———-+
| Finance| 1| 15000.0|
| IT| 3| 11000.0|
| HR| 1| 8000.0|
+———-+————–+———-+
# 子查询
scala> spark.sql(“””
| SELECT * FROM employees
| WHERE salary > (SELECT AVG(salary) FROM employees)
| “””).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 28| IT| 3|fgedu03| 12000|
| 35| Finance| 4|fgedu04| 15000|
+—+———-+—+——-+——+
# 窗口函数
scala> spark.sql(“””
| SELECT *,
| RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
| FROM employees
| “””).show()
+—+———-+—+——-+——+—-+
|age|department| id| name|salary|rank|
+—+———-+—+——-+——+—-+
| 35| Finance| 4|fgedu04| 15000| 1|
| 30| HR| 2|fgedu02| 8000| 1|
| 28| IT| 3|fgedu03| 12000| 1|
| 27| IT| 5|fgedu05| 11000| 2|
| 25| IT| 1|fgedu01| 10000| 3|
+—+———-+—+——-+——+—-+
# 注册全局临时视图
scala> df.createGlobalTempView(“global_employees”)
scala> spark.sql(“SELECT * FROM global_temp.global_employees LIMIT 3”).show()
+—+———-+—+——-+——+
|age|department| id| name|salary|
+—+———-+—+——-+——+
| 25| IT| 1|fgedu01| 10000|
| 30| HR| 2|fgedu02| 8000|
| 28| IT| 3|fgedu03| 12000|
+—+———-+—+——-+——+
Part04-生产案例与实战讲解
4.1 数据分析实战案例
# 创建销售数据
scala> val salesData = Seq(
| (“2026-01-01”, “北京”, “电子产品”, 10000),
| (“2026-01-01”, “上海”, “电子产品”, 15000),
| (“2026-01-01”, “北京”, “服装”, 5000),
| (“2026-01-02”, “上海”, “电子产品”, 12000),
| (“2026-01-02”, “广州”, “服装”, 8000),
| (“2026-01-03”, “北京”, “电子产品”, 20000),
| (“2026-01-03”, “上海”, “服装”, 6000),
| (“2026-01-03”, “广州”, “电子产品”, 9000)
| )
salesData: Seq[(String, String, String, Int)] = List((2026-01-01,北京,电子产品,10000), (2026-01-01,上海,电子产品,15000), (2026-01-01,北京,服装,5000), (2026-01-02,上海,电子产品,12000), (2026-01-02,广州,服装,8000), (2026-01-03,北京,电子产品,20000), (2026-01-03,上海,服装,6000), (2026-01-03,广州,电子产品,9000))
scala> val salesDF = salesData.toDF(“date”, “city”, “category”, “amount”)
salesDF: org.apache.spark.sql.DataFrame = [date: string, city: string … 2 more fields]
scala> salesDF.createOrReplaceTempView(“sales”)
# 按日期统计销售额
scala> spark.sql(“””
| SELECT date, SUM(amount) as total_amount
| FROM sales
| GROUP BY date
| ORDER BY date
| “””).show()
+———-+————+
| date|total_amount|
+———-+————+
|2026-01-01| 30000|
|2026-01-02| 20000|
|2026-01-03| 35000|
+———-+————+
# 按城市和品类统计
scala> spark.sql(“””
| SELECT city, category,
| SUM(amount) as total_amount,
| COUNT(*) as order_count
| FROM sales
| GROUP BY city, category
| ORDER BY total_amount DESC
| “””).show()
+—-+——–+————+———–+
|city|category|total_amount|order_count|
+—-+——–+————+———–+
|北京|电子产品| 30000| 2|
|上海|电子产品| 27000| 2|
|广州|电子产品| 9000| 1|
|广州| 服装| 8000| 1|
|上海| 服装| 6000| 1|
|北京| 服装| 5000| 1|
+—-+——–+————+———–+
# 计算环比增长
scala> spark.sql(“””
| SELECT date,
| SUM(amount) as total_amount,
| LAG(SUM(amount), 1) OVER (ORDER BY date) as prev_amount,
| (SUM(amount) – LAG(SUM(amount), 1) OVER (ORDER BY date)) as growth
| FROM sales
| GROUP BY date
| ORDER BY date
| “””).show()
+———-+————+———–+——-+
| date|total_amount|prev_amount|growth|
+———-+————+———–+——-+
|2026-01-01| 30000| null| null|
|2026-01-02| 20000| 30000|-10000|
|2026-01-03| 35000| 20000| 15000|
+———-+————+———–+——-+
4.2 多表连接分析实战
scala> val customers = Seq(
| (“C001”, “fgedu客户1”, “北京”),
| (“C002”, “fgedu客户2”, “上海”),
| (“C003”, “fgedu客户3”, “广州”)
| ).toDF(“customer_id”, “customer_name”, “city”)
customers: org.apache.spark.sql.DataFrame = [customer_id: string, customer_name: string … 1 more field]
scala> val products = Seq(
| (“P001”, “笔记本电脑”, 5000.0),
| (“P002”, “手机”, 3000.0),
| (“P003”, “平板电脑”, 2000.0)
| ).toDF(“product_id”, “product_name”, “price”)
products: org.apache.spark.sql.DataFrame = [product_id: string, product_name: string … 1 more field]
scala> val orders = Seq(
| (“O001”, “C001”, “P001”, 2),
| (“O002”, “C001”, “P002”, 1),
| (“O003”, “C002”, “P001”, 1),
| (“O004”, “C003”, “P003”, 3)
| ).toDF(“order_id”, “customer_id”, “product_id”, “quantity”)
orders: org.apache.spark.sql.DataFrame = [order_id: string, customer_id: string … 2 more fields]
scala> customers.createOrReplaceTempView(“customers”)
scala> products.createOrReplaceTempView(“products”)
scala> orders.createOrReplaceTempView(“orders”)
# 多表连接查询
scala> spark.sql(“””
| SELECT c.customer_name,
| p.product_name,
| o.quantity,
| p.price * o.quantity as total_amount
| FROM orders o
| JOIN customers c ON o.customer_id = c.customer_id
| JOIN products p ON o.product_id = p.product_id
| ORDER BY total_amount DESC
| “””).show()
+————–+————+——–+————+
|customer_name |product_name|quantity|total_amount|
+————–+————+——–+————+
| fgedu客户1| 笔记本电脑| 2| 10000.0|
| fgedu客户1| 手机| 1| 3000.0|
| fgedu客户2| 笔记本电脑| 1| 5000.0|
| fgedu客户3| 平板电脑| 3| 6000.0|
+————–+————+——–+————+
# 客户消费统计
scala> spark.sql(“””
| SELECT c.customer_name,
| COUNT(o.order_id) as order_count,
| SUM(p.price * o.quantity) as total_amount
| FROM customers c
| LEFT JOIN orders o ON c.customer_id = o.customer_id
| LEFT JOIN products p ON o.product_id = p.product_id
| GROUP BY c.customer_name
| ORDER BY total_amount DESC
| “””).show()
+————–+———–+————+
|customer_name |order_count|total_amount|
+————–+———–+————+
| fgedu客户1| 2| 13000.0|
| fgedu客户3| 1| 6000.0|
| fgedu客户2| 1| 5000.0|
+————–+———–+————+
4.3 常见问题处理
4.3.1 查询性能问题
# 排查步骤
# 1. 查看执行计划
scala> df.queryExecution.logical
# 2. 查看物理执行计划
scala> df.explain(true)
# 解决方案
# 1. 启用自适应查询
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
# 2. 调整Shuffle分区数
spark.sql.shuffle.partitions=200
# 3. 使用广播Join
spark.sql.autoBroadcastJoinThreshold=10485760
# 4. 缓存常用表
scala> df.cache()
4.3.2 数据倾斜问题
# 排查步骤
# 1. 查看数据分布
scala> df.groupBy(“key”).count().orderBy($”count”.desc).show()
# 解决方案
# 1. 使用Salting技术
scala> df.withColumn(“salted_key”,
| concat($”key”, “_”, (rand() * 10).cast(“int”)))
# 2. 广播小表
scala> df1.join(broadcast(df2), “key”)
# 3. 增加分区数
scala> df.repartition(200, $”key”)
Part05-风哥经验总结与分享
5.1 Spark SQL最佳实践
Spark SQL最佳实践建议:
1. 优先使用DataFrame API
2. 使用Parquet格式存储
3. 合理设置分区数
4. 缓存常用数据
5. 使用广播Join优化
6. 启用自适应查询
# SQL优化建议
1. 使用EXPLAIN分析执行计划
2. 避免SELECT *
3. 使用分区裁剪
4. 合理使用索引
5. 避免笛卡尔积
5.2 性能优化建议
性能优化建议:
- 启用自适应查询执行(AQE)
- 使用Parquet列式存储
- 合理设置Shuffle分区数
- 使用广播Join优化小表
- 缓存常用DataFrame
5.3 调试工具推荐
Spark SQL调试工具:
- EXPLAIN:查看执行计划
- Spark UI:查看Stage和Task详情
- SQL Tab:查看SQL执行计划
- Statistics:查看数据统计信息
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
