1. 首页 > Hadoop教程 > 正文

大数据教程FG067-Spark SQL与Hive集成实战

本文档风哥主要介绍Spark SQL与Hive集成实战,包括Spark与Hive集成配置、Catalog概念、表操作、数据查询等内容,风哥教程参考Spark官方文档Spark SQL Guide、Hive Tables等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Spark与Hive集成概述

Spark SQL可以与Hive无缝集成,直接访问Hive表和数据,支持HiveQL查询。学习交流加群风哥微信: itpux-com

Spark与Hive集成优势:

  • 统一元数据:共享Hive Metastore
  • 兼容HiveQL:支持Hive SQL语法
  • 访问Hive表:直接读写Hive表
  • 支持UDF:支持Hive UDF
  • 高性能:Spark执行引擎比MapReduce快

1.2 Catalog概念

Spark Catalog详解:

# Catalog概述
Spark Catalog是Spark SQL的元数据管理接口,用于管理数据库、表、函数等元数据。

# Catalog功能
1. 数据库管理
– 列出数据库
– 切换数据库
– 创建/删除数据库

2. 表管理
– 列出表
– 获取表信息
– 创建/删除表

3. 函数管理
– 列出函数
– 注册UDF

4. 缓存管理
– 缓存表
– 清除缓存

# Catalog API示例
scala> spark.catalog.listDatabases().show()
+——-+——————–+——————–+
| name| description| locationUri|
+——-+——————–+——————–+
|default|Default Hive data…|hdfs://192.168.1….|
|fgedu |fgedu database |hdfs://192.168.1….|
+——-+——————–+——————–+

scala> spark.catalog.listTables(“fgedu”).show()
+——-+——–+———–+———+———–+
| name|database|description|tableType|isTemporary|
+——-+——–+———–+———+———–+
|orders | fgedu| NULL| MANAGED| false|
|users | fgedu| NULL| MANAGED| false|
+——-+——–+———–+———+———–+

# Catalog类型
1. HiveCatalog:使用Hive Metastore
2. InMemoryCatalog:内存Catalog(默认)
3. 自定义Catalog:实现Catalog接口

1.3 Metastore配置

Hive Metastore配置详解:

# Metastore配置方式

1. 内嵌Derby模式
– 默认模式
– 单用户
– 仅用于测试

2. 本地MySQL模式
– 本地数据库
– 多用户
– 适合开发环境

3. 远程Metastore模式
– 独立Metastore服务
– 多用户
– 生产环境推荐

# 配置示例
# spark-defaults.conf
spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse
spark.sql.catalogImplementation=hive

# hive-site.xml
javax.jdo.option.ConnectionURL
jdbc:mysql://192.168.1.60:3306/hive_metastore?createDatabaseIfNotExist=true
javax.jdo.option.ConnectionDriverName
com.mysql.cj.jdbc.Driver
javax.jdo.option.ConnectionUserName
hive
javax.jdo.option.ConnectionPassword
fgedu123
hive.metastore.uris
thrift://192.168.1.60:9083

风哥提示:生产环境建议使用远程Metastore模式,将Metastore部署为独立服务,多个Spark应用共享同一Metastore。

Part02-生产环境规划与建议

2.1 数据仓库规划

数据仓库规划建议:

# 数据仓库分层

1. ODS层(原始数据层)
– 存放原始数据
– 数据格式:JSON、CSV、Parquet
– 分区:按日期分区

2. DWD层(明细数据层)
– 清洗后的明细数据
– 数据格式:Parquet、ORC
– 分区:按日期分区

3. DWS层(汇总数据层)
– 汇总统计数据
– 数据格式:Parquet
– 分区:按日期分区

4. ADS层(应用数据层)
– 应用数据
– 数据格式:Parquet
– 分区:按业务分区

# 数据库规划
fgedu_ods # ODS层数据库
fgedu_dwd # DWD层数据库
fgedu_dws # DWS层数据库
fgedu_ads # ADS层数据库

# 表命名规范
ods_业务名_表名
dwd_业务名_表名
dws_业务名_表名
ads_业务名_表名

# 示例
ods_order_detail
dwd_order_detail
dws_order_summary
ads_order_report

2.2 表设计规划

表设计规划建议:

# 表类型选择

1. 管理表
– 数据由Spark管理
– 删除表时数据也被删除
– 适合临时表、中间表

2. 外部表
– 数据由外部管理
– 删除表时数据保留
– 适合原始数据、共享数据

# 存储格式选择
1. Parquet
– 列式存储
– 压缩率高
– 查询性能好
– 推荐

2. ORC
– 列式存储
– Hive优化
– 压缩率高

3. Avro
– 行式存储
– Schema演进
– 适合数据交换

# 分区设计
1. 分区字段
– 日期:dt=2026-04-08
– 小时:hour=12
– 业务:region=beijing

2. 分区粒度
– 日分区:适合大多数场景
– 小时分区:适合实时数据
– 月分区:适合历史数据

# 分桶设计
– 用于抽样查询
– 用于Join优化
– 分桶数:数据量/128MB

2.3 集成架构规划

集成架构规划建议:

# 集成架构

1. 元数据层
– Hive Metastore
– MySQL存储元数据
– 多服务共享

2. 存储层
– HDFS存储数据
– 多格式支持
– 分区存储

3. 计算层
– Spark SQL
– Spark Thrift Server
– 多租户支持

4. 访问层
– JDBC/ODBC
– Spark SQL CLI
– Beeline

# 部署架构
┌─────────────────────────────────────────┐
│ 访问层 │
│ JDBC/ODBC │ Spark SQL CLI │ Beeline│
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│ 计算层 │
│ Spark SQL │ Spark Thrift Server │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│ 元数据层 │
│ Hive Metastore │ MySQL │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│ 存储层 │
│ HDFS │ S3 │ 其他存储 │
└─────────────────────────────────────────┘

生产环境建议:生产环境建议使用独立的Hive Metastore服务,Spark通过Thrift协议连接Metastore,实现元数据共享。学习交流加群风哥QQ113257174

Part03-生产环境项目实施方案

3.1 Hive集成配置

3.1.1 配置Hive支持

# 复制hive-site.xml到Spark配置目录
$ cp /bigdata/app/hive/conf/hive-site.xml /bigdata/app/spark/conf/

# 配置spark-defaults.conf
$ cat >> /bigdata/app/spark/conf/spark-defaults.conf << 'EOF' # Hive支持 spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse spark.sql.catalogImplementation=hive spark.sql.hive.metastore.version=3.1.3 spark.sql.hive.metastore.jars=/bigdata/app/hive/lib/* EOF # 启动Spark Shell(启用Hive支持) $ /bigdata/app/spark/bin/spark-shell \ --master spark://192.168.1.60:7077 \ --conf spark.sql.warehouse.dir=hdfs://192.168.1.60:9000/user/hive/warehouse \ --conf spark.sql.catalogImplementation=hive # 验证Hive支持 scala> spark.sql(“SHOW DATABASES”).show()
+———+
|namespace|
+———+
| default|
| fgedu|
+———+

scala> spark.sql(“USE fgedu”)
scala> spark.sql(“SHOW TABLES”).show()
+——–+———+———–+
|database|tableName|isTemporary|
+——–+———+———–+
| fgedu| orders| false|
| fgedu| users| false|
+——–+———+———–+

3.1.2 配置Spark Thrift Server

# 启动Spark Thrift Server
$ /bigdata/app/spark/sbin/start-thriftserver.sh \
–master spark://192.168.1.60:7077 \
–hiveconf hive.server2.thrift.port=10001 \
–hiveconf hive.server2.thrift.bind.host=192.168.1.60

starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /bigdata/app/spark/logs/spark-spark-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-fgedu-spark-master.out

# 检查Thrift Server状态
$ jps

12345 ThriftServer2
12456 Jps

# 使用Beeline连接
$ /bigdata/app/spark/bin/beeline -u “jdbc:hive2://192.168.1.60:10001”

Connecting to jdbc:hive2://192.168.1.60:10001
Connected to: Spark SQL (version 3.5.1)
Driver: Hive JDBC (version 3.1.3)
Beeline version 3.1.3 by Apache Hive
0: jdbc:hive2://192.168.1.60:10001>

# 执行SQL查询
0: jdbc:hive2://192.168.1.60:10001> SHOW DATABASES;
+———+
| namespace|
+———+
| default |
| fgedu |
+———+
2 rows selected (0.123 seconds)

0: jdbc:hive2://192.168.1.60:10001> USE fgedu;
No rows affected (0.012 seconds)

0: jdbc:hive2://192.168.1.60:10001> SELECT COUNT(*) FROM users;
+——–+
| count(1)|
+——–+
| 1000 |
+——–+
1 row selected (1.234 seconds)

# 停止Thrift Server
$ /bigdata/app/spark/sbin/stop-thriftserver.sh

3.2 表操作实战

3.2.1 创建数据库和表

# 创建数据库
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_ods”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_dwd”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_dws”)
scala> spark.sql(“CREATE DATABASE IF NOT EXISTS fgedu_ads”)

# 创建管理表
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ods.ods_users (
| user_id INT,
| user_name STRING,
| age INT,
| city STRING,
| create_time TIMESTAMP
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| COMMENT ‘用户原始数据表’
| “””)
res0: org.apache.spark.sql.DataFrame = []

# 创建外部表
scala> spark.sql(“””
| CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ods.ods_orders (
| order_id STRING,
| user_id INT,
| product_id STRING,
| quantity INT,
| amount DECIMAL(10,2),
| order_time TIMESTAMP
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| LOCATION ‘hdfs://192.168.1.60:9000/data/ods/orders’
| COMMENT ‘订单原始数据表’
| “””)
res1: org.apache.spark.sql.DataFrame = []

# 创建分桶表
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dwd.dwd_user_detail (
| user_id INT,
| user_name STRING,
| age INT,
| city STRING,
| total_orders INT,
| total_amount DECIMAL(12,2)
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| CLUSTERED BY (user_id) INTO 10 BUCKETS
| COMMENT ‘用户明细表’
| “””)
res2: org.apache.spark.sql.DataFrame = []

# 查看表结构
scala> spark.sql(“DESCRIBE EXTENDED fgedu_ods.ods_users”).show()
+———-+———-+——-+
| col_name| data_type|comment|
+———-+———-+——-+
| user_id| int| null|
| user_name| string| null|
| age| int| null|
| city| string| null|
|create_time| timestamp| null|
| dt| string| null|
+———-+———-+——-+

3.2.2 数据导入导出

# 插入数据
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_ods.ods_users PARTITION(dt=’2026-04-08′)
| VALUES
| (1, ‘fgedu01’, 25, ‘北京’, ‘2026-04-08 10:00:00’),
| (2, ‘fgedu02’, 30, ‘上海’, ‘2026-04-08 11:00:00’),
| (3, ‘fgedu03’, 28, ‘广州’, ‘2026-04-08 12:00:00’)
| “””)
res3: org.apache.spark.sql.DataFrame = []

# 从DataFrame写入Hive表
scala> val df = Seq(
| (4, “fgedu04”, 35, “深圳”, “2026-04-08 13:00:00”),
| (5, “fgedu05”, 27, “杭州”, “2026-04-08 14:00:00”)
| ).toDF(“user_id”, “user_name”, “age”, “city”, “create_time”)
df: org.apache.spark.sql.DataFrame = [user_id: int, user_name: string … 3 more fields]

scala> df.write.mode(“append”).insertInto(“fgedu_ods.ods_users”)

# 从文件加载数据
scala> spark.sql(“””
| LOAD DATA INPATH ‘hdfs://192.168.1.60:9000/data/users.csv’
| INTO TABLE fgedu_ods.ods_users PARTITION(dt=’2026-04-07′)
| “””)

# 导出数据到文件
scala> spark.sql(“””
| SELECT * FROM fgedu_ods.ods_users WHERE dt=’2026-04-08′
| “””).write
| .mode(“overwrite”)
| .parquet(“hdfs://192.168.1.60:9000/output/users/”)

# 查看分区
scala> spark.sql(“SHOW PARTITIONS fgedu_ods.ods_users”).show()
+———–+
| partition|
+———–+
|dt=2026-04-07|
|dt=2026-04-08|
+———–+

3.3 数据查询实战

# 基本查询
scala> spark.sql(“””
| SELECT user_id, user_name, age, city
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′ AND age > 25
| ORDER BY age DESC
| “””).show()
+——-+———+—+—-+
|user_id|user_name|age|city|
+——-+———+—+—-+
| 4| fgedu04| 35|深圳|
| 2| fgedu02| 30|上海|
| 3| fgedu03| 28|广州|
+——-+———+—+—-+

# 聚合查询
scala> spark.sql(“””
| SELECT city, COUNT(*) as user_count, AVG(age) as avg_age
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′
| GROUP BY city
| ORDER BY user_count DESC
| “””).show()
+—-+———-+——-+
|city|user_count|avg_age|
+—-+———-+——-+
|北京| 1| 25.0|
|上海| 1| 30.0|
|广州| 1| 28.0|
|深圳| 1| 35.0|
|杭州| 1| 27.0|
+—-+———-+——-+

# 窗口函数
scala> spark.sql(“””
| SELECT user_id, user_name, age, city,
| ROW_NUMBER() OVER (ORDER BY age DESC) as rank
| FROM fgedu_ods.ods_users
| WHERE dt=’2026-04-08′
| “””).show()
+——-+———+—+—-+—-+
|user_id|user_name|age|city|rank|
+——-+———+—+—-+—-+
| 4| fgedu04| 35|深圳| 1|
| 2| fgedu02| 30|上海| 2|
| 3| fgedu03| 28|广州| 3|
| 5| fgedu05| 27|杭州| 4|
| 1| fgedu01| 25|北京| 5|
+——-+———+—+—-+—-+

# 多表连接
scala> spark.sql(“””
| SELECT u.user_id, u.user_name, COUNT(o.order_id) as order_count
| FROM fgedu_ods.ods_users u
| LEFT JOIN fgedu_ods.ods_orders o ON u.user_id = o.user_id
| WHERE u.dt=’2026-04-08′
| GROUP BY u.user_id, u.user_name
| “””).show()

# 使用DataFrame API查询
scala> spark.table(“fgedu_ods.ods_users”)
| .filter($”dt” === “2026-04-08”)
| .select(“user_id”, “user_name”, “age”, “city”)
| .show()

风哥提示:Spark SQL完全兼容HiveQL语法,可以直接使用Hive SQL进行数据查询。建议使用Parquet格式存储数据,查询性能更好。更多学习教程公众号风哥教程itpux_com

Part04-生产案例与实战讲解

4.1 数据仓库建设案例

# 数据仓库ETL流程

# 1. ODS层:原始数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_ods.ods_user_behavior (
| user_id INT,
| item_id STRING,
| behavior_type STRING,
| item_category STRING,
| timestamp STRING
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res4: org.apache.spark.sql.DataFrame = []

# 2. DWD层:明细数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dwd.dwd_user_behavior_detail (
| user_id INT,
| item_id STRING,
| behavior_type STRING,
| item_category STRING,
| behavior_time TIMESTAMP,
| hour INT
| )
| USING PARQUET
| PARTITIONED BY (dt STRING, hour INT)
| “””)
res5: org.apache.spark.sql.DataFrame = []

# 3. DWS层:汇总数据
scala> spark.sql(“””
| CREATE TABLE IF NOT EXISTS fgedu_dws.dws_user_behavior_summary (
| user_id INT,
| pv_count BIGINT,
| buy_count BIGINT,
| cart_count BIGINT,
| fav_count BIGINT
| )
| USING PARQUET
| PARTITIONED BY (dt STRING)
| “””)
res6: org.apache.spark.sql.DataFrame = []

# ETL处理
scala> spark.sql(“””
| INSERT INTO TABLE fgedu_dwd.dwd_user_behavior_detail PARTITION(dt=’2026-04-08′, hour)
| SELECT
| user_id,
| item_id,
| behavior_type,
| item_category,
| FROM_UNIXTIME(CAST(timestamp AS BIGINT)) as behavior_time,
| HOUR(FROM_UNIXTIME(CAST(timestamp AS BIGINT))) as hour
| FROM fgedu_ods.ods_user_behavior
| WHERE dt=’2026-04-08′
| “””)
res7: org.apache.spark.sql.DataFrame = []

scala> spark.sql(“””
| INSERT INTO TABLE fgedu_dws.dws_user_behavior_summary PARTITION(dt=’2026-04-08′)
| SELECT
| user_id,
| SUM(CASE WHEN behavior_type=’pv’ THEN 1 ELSE 0 END) as pv_count,
| SUM(CASE WHEN behavior_type=’buy’ THEN 1 ELSE 0 END) as buy_count,
| SUM(CASE WHEN behavior_type=’cart’ THEN 1 ELSE 0 END) as cart_count,
| SUM(CASE WHEN behavior_type=’fav’ THEN 1 ELSE 0 END) as fav_count
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY user_id
| “””)
res8: org.apache.spark.sql.DataFrame = []

4.2 ETL数据处理案例

# 完整ETL流程

# 创建Spark应用
$ cat > /bigdata/spark-apps/FgeduDwdEtl.scala << 'EOF' package com.fgedu.spark import org.apache.spark.sql.SparkSession object FgeduDwdEtl { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("fgedu-dwd-etl") .enableHiveSupport() .getOrCreate() val dt = args(0) // 日期参数 // 1. 读取ODS层数据 val odsDF = spark.table(s"fgedu_ods.ods_user_behavior") .filter($"dt" === dt) // 2. 数据清洗 val cleanedDF = odsDF .filter($"user_id".isNotNull && $"item_id".isNotNull) .filter($"behavior_type".isin("pv", "buy", "cart", "fav")) // 3. 数据转换 val transformedDF = cleanedDF .withColumn("behavior_time", from_unixtime(col("timestamp").cast("long"))) .withColumn("hour", hour(col("behavior_time"))) // 4. 写入DWD层 transformedDF.write .mode("overwrite") .partitionBy("dt", "hour") .insertInto("fgedu_dwd.dwd_user_behavior_detail") spark.stop() } } EOF # 提交ETL任务 $ /bigdata/app/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --class com.fgedu.spark.FgeduDwdEtl \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 10 \ /bigdata/spark-apps/fgedu-etl.jar \ 2026-04-08 # 查看ETL结果 scala> spark.sql(“””
| SELECT dt, hour, COUNT(*) as record_count
| FROM fgedu_dwd.dwd_user_behavior_detail
| WHERE dt=’2026-04-08′
| GROUP BY dt, hour
| ORDER BY hour
| “””).show()
+———-+—-+————+
| dt|hour|record_count|
+———-+—-+————+
|2026-04-08| 0| 12345|
|2026-04-08| 1| 10234|
|2026-04-08| 2| 8765|

+———-+—-+————+

4.3 常见问题处理

4.3.1 Metastore连接问题

# 问题现象:无法连接Hive Metastore

# 排查步骤
# 1. 检查Metastore服务
$ netstat -tlnp | grep 9083

# 2. 检查hive-site.xml配置
$ cat /bigdata/app/spark/conf/hive-site.xml

# 3. 检查MySQL连接
$ mysql -h 192.168.1.60 -u hive -p

# 解决方案
# 1. 启动Metastore服务
$ hive –service metastore &

# 2. 检查配置文件 hive.metastore.uris
thrift://192.168.1.60:9083

# 3. 检查网络连接
$ telnet 192.168.1.60 9083

4.3.2 权限问题

# 问题现象:无权限访问表

# 排查步骤
# 1. 检查HDFS权限
$ hdfs dfs -ls /user/hive/warehouse/fgedu.db/

# 2. 检查表权限
scala> spark.sql(“SHOW GRANT ON TABLE fgedu.users”)

# 解决方案
# 1. 修改HDFS权限
$ hdfs dfs -chmod -R 755 /user/hive/warehouse/fgedu.db/

# 2. 授权
scala> spark.sql(“GRANT SELECT ON TABLE fgedu.users TO USER spark”)

# 3. 切换用户
$ export HADOOP_USER_NAME=hive

Part05-风哥经验总结与分享

5.1 集成最佳实践

Spark与Hive集成最佳实践建议:

# 集成最佳实践
1. 使用独立Metastore服务
2. 配置正确的warehouse目录
3. 使用Parquet格式存储
4. 合理设计分区
5. 启用Hive支持

# 表设计最佳实践
1. 使用分区表
2. 选择合适的存储格式
3. 设计合理的表结构
4. 添加表注释

5.2 性能优化建议

性能优化建议:

Spark SQL性能优化建议:

  • 使用Parquet列式存储
  • 合理设计分区
  • 启用分区裁剪
  • 使用缓存加速查询

5.3 工具推荐

集成工具推荐:

  • Spark SQL CLI:命令行工具
  • Beeline:JDBC客户端
  • Spark Thrift Server:JDBC服务
  • Hue:Web界面
风哥提示:Spark与Hive集成可以实现统一的数据仓库平台,Spark提供高性能计算能力,Hive提供元数据管理和SQL兼容性。from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息