本文档风哥主要介绍Spark SQL与Hive集成实战,包括Spark与Hive集成配置、Catalog概念、表操作、数据查询等内容,风哥教程参考Spark官方文档Spark SQL Guide、Hive Tables等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Spark与Hive集成概述
Spark SQL可以与Hive无缝集成,直接访问Hive表和数据,支持HiveQL查询。学习交流加群风哥微信: itpux-com
- 统一元数据:共享Hive Metastore
- 兼容HiveQL:支持Hive SQL语法
- 访问Hive表:直接读写Hive表
- 支持UDF:支持Hive UDF
- 高性能:Spark执行引擎比MapReduce快
1.2 Catalog概念
Spark Catalog详解:
Spark Catalog是Spark SQL的元数据管理接口,用于管理数据库、表、函数等元数据。
# Catalog功能
1. 数据库管理
– 列出数据库
– 切换数据库
– 创建/删除数据库
2. 表管理
– 列出表
– 获取表信息
– 创建/删除表
3. 函数管理
– 列出函数
– 注册UDF
4. 缓存管理
– 缓存表
– 清除缓存
# Catalog API示例
scala> spark.catalog.listDatabases().show()
+——-+——————–+——————–+
| name| description| locationUri|
+——-+——————–+——————–+
|default|Default Hive data…|hdfs://192.168.1….|
|fgedu |fgedu database |hdfs://192.168.1….|
+——-+——————–+——————–+
scala> spark.catalog.listTables(“fgedu”).show()
+——-+——–+———–+———+———–+
| name|database|description|tableType|isTemporary|
+——-+——–+———–+———+———–+
|orders | fgedu| NULL| MANAGED| false|
|users | fgedu| NULL| MANAGED| false|
+——-+——–+———–+———+———–+
# Catalog类型
1. HiveCatalog:使用Hive Metastore
2. InMemoryCatalog:内存Catalog(默认)
3. 自定义Catalog:实现Catalog接口
1.3 Metastore配置
Hive Metastore配置详解:
1. 内嵌Derby模式
– 默认模式
– 单用户
– 仅用于测试
2. 本地MySQL模式
– 本地数据库
– 多用户
– 适合开发环境
3. 远程Metastore模式
– 独立Metastore服务
– 多用户
– 生产环境推荐
# 配置示例
# spark-defaults.conf
spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse
spark.sql.catalogImplementation=hive
# hive-site.xml
Part02-生产环境规划与建议
2.1 数据仓库规划
数据仓库规划建议:
1. ODS层(原始数据层)
– 存放原始数据
– 数据格式:JSON、CSV、Parquet
– 分区:按日期分区
2. DWD层(明细数据层)
– 清洗后的明细数据
– 数据格式:Parquet、ORC
– 分区:按日期分区
3. DWS层(汇总数据层)
– 汇总统计数据
– 数据格式:Parquet
– 分区:按日期分区
4. ADS层(应用数据层)
– 应用数据
– 数据格式:Parquet
– 分区:按业务分区
# 数据库规划
fgedu_ods # ODS层数据库
fgedu_dwd # DWD层数据库
fgedu_dws # DWS层数据库
fgedu_ads # ADS层数据库
# 表命名规范
ods_业务名_表名
dwd_业务名_表名
dws_业务名_表名
ads_业务名_表名
# 示例
ods_order_detail
dwd_order_detail
dws_order_summary
ads_order_report
2.2 表设计规划
表设计规划建议:
1. 管理表
– 数据由Spark管理
– 删除表时数据也被删除
– 适合临时表、中间表
2. 外部表
– 数据由外部管理
– 删除表时数据保留
– 适合原始数据、共享数据
# 存储格式选择
1. Parquet
– 列式存储
– 压缩率高
– 查询性能好
– 推荐
2. ORC
– 列式存储
– Hive优化
– 压缩率高
3. Avro
– 行式存储
– Schema演进
– 适合数据交换
# 分区设计
1. 分区字段
– 日期:dt=2026-04-08
– 小时:hour=12
– 业务:region=beijing
2. 分区粒度
– 日分区:适合大多数场景
– 小时分区:适合实时数据
– 月分区:适合历史数据
# 分桶设计
– 用于抽样查询
– 用于Join优化
– 分桶数:数据量/128MB
2.3 集成架构规划
集成架构规划建议:
1. 元数据层
– Hive Metastore
– MySQL存储元数据
– 多服务共享
2. 存储层
– HDFS存储数据
– 多格式支持
– 分区存储
3. 计算层
– Spark SQL
– Spark Thrift Server
– 多租户支持
4. 访问层
– JDBC/ODBC
– Spark SQL CLI
– Beeline
# 部署架构
┌─────────────────────────────────────────┐
│ 访问层 │
│ JDBC/ODBC │ Spark SQL CLI │ Beeline│
└─────────────────────────────────────────┘
│
┌─────────────────────────────────────────┐
│ 计算层 │
│ Spark SQL │ Spark Thrift Server │
└─────────────────────────────────────────┘
│
┌─────────────────────────────────────────┐
│ 元数据层 │
│ Hive Metastore │ MySQL │
└─────────────────────────────────────────┘
│
┌─────────────────────────────────────────┐
│ 存储层 │
│ HDFS │ S3 │ 其他存储 │
└─────────────────────────────────────────┘
Part03-生产环境项目实施方案
3.1 Hive集成配置
3.1.1 配置Hive支持
$ cp /bigdata/app/hive/conf/hive-site.xml /bigdata/app/spark/conf/
# 配置spark-defaults.conf
$ cat >> /bigdata/app/spark/conf/spark-defaults.conf << 'EOF'
# Hive支持
spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse
spark.sql.catalogImplementation=hive
spark.sql.hive.metastore.version=3.1.3
spark.sql.hive.metastore.jars=/bigdata/app/hive/lib/*
EOF
# 启动Spark Shell(启用Hive支持)
$ /bigdata/app/spark/bin/spark-shell \
--master spark://192.168.1.60:7077 \
--conf spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse \
--conf spark.sql.catalogImplementation=hive
# 验证Hive支持
scala> spark.sql(“SHOW DATABASES”).show()
+———+
|namespace|
+———+
| default|
| fgedu|
+———+
scala> spark.sql(“USE fgedu”)
scala> spark.sql(“SHOW TABLES”).show()
+——–+———+———–+
|database|tableName|isTemporary|
+——–+———+———–+
| fgedu| orders| false|
| fgedu| users| false|
+——–+———+———–+
3.1.2 配置Spark Thrift Server
$ /bigdata/app/spark/sbin/start-thriftserver.sh \
–master spark://192.168.1.60:7077 \
–hiveconf hive.server2.thrift.port=10001 \
–hiveconf hive.server2.thrift.bind.host=192.168.1.60
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /bigdata/app/spark/logs/spark-spark-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-fgedu-spark-master.out
# 检查Thrift Server状态
$ jps
12345 ThriftServer2
12456 Jps
# 使用Beeline连接
$ /bigdata/app/spark/bin/beeline -u “jdbc:hive2://192.168.1.60:10001”
Connecting to jdbc:hive2://192.168.1.60:10001
Connected to: Spark SQL (version 3.5.1)
Driver: Hive JDBC (version 3.1.3)
Beeline version 3.1.3 by Apache Hive
0: jdbc:hive2://192.168.1.60:10001>
# 执行SQL查询
0: jdbc:hive2://192.168.1.60:10001> SHOW DATABASES;
+———+
| namespace|
+———+
| default |
| fgedu |
+———+
2 rows selected (0.123 seconds)
0: jdbc:hive2://192.168.1.60:10001> USE fgedu;
No rows affected (0.012 seconds)
0: jdbc:hive2://192.168.1.60:10001> SELECT COUNT(*) FROM users;
+——–+
| count(1)|
+——–+
| 1000 |
+——–+
1 row selected (1.234 seconds)
# 停止Thrift Server
$ /bigdata/app/spark/sbin/stop-thriftserver.sh
3.2 表操作实战
3.2.1 创建数据库和表
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_ods”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_dwd”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_dws”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_ads”)
# 创建管理表
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ods.ods_users (
| user_id INT,
| user_name STRING,
| age INT,
| city STRING,
| create_time TIMESTAMP
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| COMMENT ‘用户原始数据表’
| “””)
res0: org.apache.spark.sql.DataFrame = []
# 创建外部表
scala> spark.sql(“””
| CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ods.ods_orders (
| order_id STRING,
| user_id INT,
| product_id STRING,
| quantity INT,
| amount DECIMAL(10,2),
| order_time TIMESTAMP
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| LOCATION ‘hdfs://192.168.1.60:9000/data/ods/orders’
| COMMENT ‘订单原始数据表’
| “””)
res1: org.apache.spark.sql.DataFrame = []
# 创建分桶表
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dwd.dwd_user_detail (
| user_id INT,
| user_name STRING,
| age INT,
| city STRING,
| total_orders INT,
| total_amount DECIMAL(12,2)
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| CLUSTERED BY (user_id) INTO 10 BUCKETS
| COMMENT ‘用户明细表’
| “””)
res2: org.apache.spark.sql.DataFrame = []
# 查看表结构
scala> spark.sql(“DESCRIBE EXTENDED fgedu_ods.ods_users”).show()
+———-+———-+——-+
| col_name| data_type|comment|
+———-+———-+——-+
| user_id| int| null|
| user_name| string| null|
| age| int| null|
| city| string| null|
|create_time| timestamp| null|
| dt| string| null|
+———-+———-+——-+
3.2.2 数据导入导出
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_ods.ods_users PARTITION(dt=’2026-04-08′)
| VALUES
| (1, ‘fgedu01’, 25, ‘北京’, ‘2026-04-08 10:00:00’),
| (2, ‘fgedu02’, 30, ‘上海’, ‘2026-04-08 11:00:00’),
| (3, ‘fgedu03’, 28, ‘广州’, ‘2026-04-08 12:00:00’)
| “””)
res3: org.apache.spark.sql.DataFrame = []
# 从DataFrame写入Hive表
scala> val df = Seq(
| (4, “fgedu04”, 35, “深圳”, “2026-04-08 13:00:00”),
| (5, “fgedu05”, 27, “杭州”, “2026-04-08 14:00:00”)
| ).toDF(“user_id”, “user_name”, “age”, “city”, “create_time”)
df: org.apache.spark.sql.DataFrame = [user_id: int, user_name: string … 3 more fields]
scala> df.write.mode(“append”).insertInto(“fgedu_ods.ods_users”)
# 从文件加载数据
scala> spark.sql(“””
| LOAD DATA INPATH ‘hdfs://192.168.1.60:9000/data/users.csv’
| INTO TABLE fgedu_ods.ods_users PARTITION(dt=’2026-04-07′)
| “””)
# 导出数据到文件
scala> spark.sql(“””
| SELECT * FROM fgedu_ods.ods_users WHERE dt=’2026-04-08′
| “””).write
| .mode(“overwrite”)
| .parquet(“hdfs://192.168.1.60:9000/output/users/”)
# 查看分区
scala> spark.sql(“SHOW PARTITIONS fgedu_ods.ods_users”).show()
+———–+
| partition|
+———–+
|dt=2026-04-07|
|dt=2026-04-08|
+———–+
3.3 数据查询实战
scala> spark.sql(“””
| SELECT user_id, user_name, age, city
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′ AND age > 25
| ORDER BY age DESC
| “””).show()
+——-+———+—+—-+
|user_id|user_name|age|city|
+——-+———+—+—-+
| 4| fgedu04| 35|深圳|
| 2| fgedu02| 30|上海|
| 3| fgedu03| 28|广州|
+——-+———+—+—-+
# 聚合查询
scala> spark.sql(“””
| SELECT city, COUNT(*) as user_count, AVG(age) as avg_age
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′
| GROUP BY city
| ORDER BY user_count DESC
| “””).show()
+—-+———-+——-+
|city|user_count|avg_age|
+—-+———-+——-+
|北京| 1| 25.0|
|上海| 1| 30.0|
|广州| 1| 28.0|
|深圳| 1| 35.0|
|杭州| 1| 27.0|
+—-+———-+——-+
# 窗口函数
scala> spark.sql(“””
| SELECT user_id, user_name, age, city,
| ROW_NUMBER() OVER (ORDER BY age DESC) as rank
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′
| “””).show()
+——-+———+—+—-+—-+
|user_id|user_name|age|city|rank|
+——-+———+—+—-+—-+
| 4| fgedu04| 35|深圳| 1|
| 2| fgedu02| 30|上海| 2|
| 3| fgedu03| 28|广州| 3|
| 5| fgedu05| 27|杭州| 4|
| 1| fgedu01| 25|北京| 5|
+——-+———+—+—-+—-+
# 多表连接
scala> spark.sql(“””
| SELECT u.user_id, u.user_name, COUNT(o.order_id) as order_count
| FROM fgedu_ods.ods_users u
| LEFT JOIN fgedu_ods.ods_orders o ON u.user_id = o.user_id
| WHERE u.dt=’2026-04-08′
| GROUP BY u.user_id, u.user_name
| “””).show()
# 使用DataFrame API查询
scala> spark.table(“fgedu_ods.ods_users”)
| .filter($”dt” === “2026-04-08”)
| .select(“user_id”, “user_name”, “age”, “city”)
| .show()
Part04-生产案例与实战讲解
4.1 数据仓库建设案例
# 1. ODS层:原始数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ods.ods_user_behavior (
| user_id INT,
| item_id STRING,
| behavior_type STRING,
| item_category STRING,
| timestamp STRING
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res4: org.apache.spark.sql.DataFrame = []
# 2. DWD层:明细数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dwd.dwd_user_behavior_detail (
| user_id INT,
| item_id STRING,
| behavior_type STRING,
| item_category STRING,
| behavior_time TIMESTAMP,
| hour INT
| )
| USING PARQUET
| PARTITIONED BY (dt STRING, hour INT)
| “””)
res5: org.apache.spark.sql.DataFrame = []
# 3. DWS层:汇总数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dws.dws_user_behavior_summary (
| user_id INT,
| pv_count BIGINT,
| buy_count BIGINT,
| cart_count BIGINT,
| fav_count BIGINT
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res6: org.apache.spark.sql.DataFrame = []
# ETL处理
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_dwd.dwd_user_behavior_detail PARTITION(dt=’2026-04-08′, hour)
| SELECT
| user_id,
| item_id,
| behavior_type,
| item_category,
| FROM_UNIXTIME(CAST(timestamp AS BIGINT)) as behavior_time,
| HOUR(FROM_UNIXTIME(CAST(timestamp AS BIGINT))) as hour
| FROM fgedu_ods.ods_user_behavior
| WHERE dt=’2026-04-08′
| “””)
res7: org.apache.spark.sql.DataFrame = []
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_dws.dws_user_behavior_summary PARTITION(dt=’2026-04-08′)
| SELECT
| user_id,
| SUM(CASE WHEN behavior_type=’pv’ THEN 1 ELSE 0 END) as pv_count,
| SUM(CASE WHEN behavior_type=’buy’ THEN 1 ELSE 0 END) as buy_count,
| SUM(CASE WHEN behavior_type=’cart’ THEN 1 ELSE 0 END) as cart_count,
| SUM(CASE WHEN behavior_type=’fav’ THEN 1 ELSE 0 END) as fav_count
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY user_id
| “””)
res8: org.apache.spark.sql.DataFrame = []
4.2 ETL数据处理案例
# 创建Spark应用
$ cat > /bigdata/spark-apps/FgeduDwdEtl.scala << 'EOF'
package com.fgedu.spark
import org.apache.spark.sql.SparkSession
object FgeduDwdEtl {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("fgedu-dwd-etl")
.enableHiveSupport()
.getOrCreate()
val dt = args(0) // 日期参数
// 1. 读取ODS层数据
val odsDF = spark.table(s"fgedu_ods.ods_user_behavior")
.filter($"dt" === dt)
// 2. 数据清洗
val cleanedDF = odsDF
.filter($"user_id".isNotNull && $"item_id".isNotNull)
.filter($"behavior_type".isin("pv", "buy", "cart", "fav"))
// 3. 数据转换
val transformedDF = cleanedDF
.withColumn("behavior_time",
from_unixtime(col("timestamp").cast("long")))
.withColumn("hour",
hour(col("behavior_time")))
// 4. 写入DWD层
transformedDF.write
.mode("overwrite")
.partitionBy("dt", "hour")
.insertInto("fgedu_dwd.dwd_user_behavior_detail")
spark.stop()
}
}
EOF
# 提交ETL任务
$ /bigdata/app/spark/bin/spark-submit \
--master yarn \
--deploy-mode client \
--class com.fgedu.spark.FgeduDwdEtl \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
/bigdata/spark-apps/fgedu-etl.jar \
2026-04-08
# 查看ETL结果
scala> spark.sql(“””
| SELECT dt, hour, COUNT(*) as record_count
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY dt, hour
| ORDER BY hour
| “””).show()
+———-+—-+————+
| dt|hour|record_count|
+———-+—-+————+
|2026-04-08| 0| 12345|
|2026-04-08| 1| 10234|
|2026-04-08| 2| 8765|
…
+———-+—-+————+
4.3 常见问题处理
4.3.1 Metastore连接问题
# 排查步骤
# 1. 检查Metastore服务
$ netstat -tlnp | grep 9083
# 2. 检查hive-site.xml配置
$ cat /bigdata/app/spark/conf/hive-site.xml
# 3. 检查MySQL连接
$ mysql -h 192.168.1.60 -u hive -p
# 解决方案
# 1. 启动Metastore服务
$ hive –service metastore &
# 2. 检查配置文件
# 3. 检查网络连接
$ telnet 192.168.1.60 9083
4.3.2 权限问题
# 排查步骤
# 1. 检查HDFS权限
$ hdfs dfs -ls /user/hive/warehouse/fgedu.db/
# 2. 检查表权限
scala> spark.sql(“SHOW GRANT ON TABLE fgedu.users”)
# 解决方案
# 1. 修改HDFS权限
$ hdfs dfs -chmod -R 755 /user/hive/warehouse/fgedu.db/
# 2. 授权
scala> spark.sql(“GRANT SELECT ON TABLE fgedu.users TO USER spark”)
# 3. 切换用户
$ export HADOOP_USER_NAME=hive
Part05-风哥经验总结与分享
5.1 集成最佳实践
Spark与Hive集成最佳实践建议:
1. 使用独立Metastore服务
2. 配置正确的warehouse目录
3. 使用Parquet格式存储
4. 合理设计分区
5. 启用Hive支持
# 表设计最佳实践
1. 使用分区表
2. 选择合适的存储格式
3. 设计合理的表结构
4. 添加表注释
5.2 性能优化建议
性能优化建议:
- 使用Parquet列式存储
- 合理设计分区
- 启用分区裁剪
- 使用缓存加速查询
5.3 工具推荐
集成工具推荐:
- Spark SQL CLI:命令行工具
- Beeline:JDBC客户端
- Spark Thrift Server:JDBC服务
- Hue:Web界面
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
