本文详细介绍Hadoop数据仓库架构设计实战,包括分层设计、维度建模、事实表设计、性能优化等内容,参考数据仓库建模理论,适合大数据开发工程师使用。学习交流加群风哥QQ113257174
Part01-基础概念与理论知识
1.1 数据仓库概述
数据仓库是一个面向主题的、集成的、非易失的、随时间变化的数据集合,用于支持管理决策。更多视频教程www.fgedu.net.cn
- 面向主题:按业务主题组织数据
- 集成:集成多个数据源
- 非易失:数据稳定,历史数据保留
- 随时间变化:保存历史数据
1.2 数据仓库架构介绍
典型的大数据仓库分层架构:
ODS层(Operational Data Store):
– 原始数据层
– 数据采集落地
– 保持原始格式
– 保留历史数据
DWD层(Data Warehouse Detail):
– 明细数据层
– 数据清洗
– 数据标准化
– 轻度聚合
DWS层(Data Warehouse Service):
– 汇总数据层
– 按主题汇总
– 轻度聚合
– 公共维度
ADS层(Application Data Store):
– 应用数据层
– 面向应用
– 高度聚合
– 报表数据
DIM层(Dimension):
– 维度层
– 公共维度
– 缓慢变化维度
1.3 数据模型介绍
常用数据模型:
- 星型模型:事实表+维度表,简单直观
- 雪花模型:星型模型的扩展,维度表规范化
- 星座模型:多个事实表共享维度表
- DataVault:Hub+Link+Satellite,灵活可扩展
Part02-生产环境规划与建议
2.1 数据仓库架构规划
数据仓库架构规划要点:
ODS层:
– 存储格式:Parquet/ORC
– 压缩:Snappy
– 保留时间:6个月-1年
– 分区:按天分区
DWD层:
– 存储格式:Parquet/ORC
– 压缩:Snappy
– 保留时间:1年-3年
– 分区:按天分区
DWS层:
– 存储格式:Parquet/ORC
– 压缩:Snappy
– 保留时间:3年-永久
– 分区:按天/周/月分区
ADS层:
– 存储格式:Parquet/ORC
– 压缩:Snappy
– 保留时间:永久
– 分区:按天/周/月分区
# 目录规划
HDFS目录:
/bigdata/fgdata/warehouse/
├── ods/
│ ├── fgedu_user_log/
│ ├── fgedu_order/
│ └── fgedu_user/
├── dwd/
│ ├── fgedu_user_log_d/
│ ├── fgedu_order_d/
│ └── fgedu_user_d/
├── dws/
│ ├── fgedu_user_1d/
│ ├── fgedu_user_7d/
│ └── fgedu_user_30d/
└── ads/
├── fgedu_user_report/
├── fgedu_order_report/
└── fgedu_sales_report/
2.2 分层设计
分层设计原则:
1. 保持原始数据格式
2. 不做任何清洗
3. 按天分区
4. 保留所有历史数据
5. 命名:ods_业务_表名
# DWD层设计原则
1. 数据清洗
2. 数据标准化
3. 脱敏处理
4. 维度退化
5. 命名:dwd_业务_表名_d
# DWS层设计原则
1. 按主题汇总
2. 公共维度
3. 轻度聚合
4. 可复用
5. 命名:dws_业务_主题_时间粒度
# ADS层设计原则
1. 面向应用
2. 高度聚合
3. 业务逻辑
4. 报表数据
5. 命名:ads_业务_报表名
# DIM层设计原则
1. 公共维度
2. 缓慢变化维度
3. 拉链表
4. 命名:dim_维度名
2.3 开发规范
开发规范:
- 表名:层次_业务_主题_时间粒度
- 字段名:统一小写,下划线分隔
- 分区字段:dt、hour等
from bigdata视频:www.itpux.com
Part03-生产环境项目实施方案
3.1 ODS层设计
3.1.1 ODS层建表
CREATE DATABASE IF NOT EXISTS fgedu_ods;
— ODS用户行为日志表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ods.ods_fgedu_user_log (
log_time STRING COMMENT ‘日志时间’,
user_id BIGINT COMMENT ‘用户ID’,
event_type STRING COMMENT ‘事件类型’,
page STRING COMMENT ‘页面’,
item_id BIGINT COMMENT ‘商品ID’,
duration INT COMMENT ‘停留时长(秒)’,
ip STRING COMMENT ‘IP地址’,
user_agent STRING COMMENT ‘User Agent’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
WITH SERDEPROPERTIES (
‘field.delim’ = ‘\t’,
‘line.delim’ = ‘\n’
)
STORED AS TEXTFILE
LOCATION ‘/bigdata/fgdata/warehouse/ods/fgedu_user_log’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— ODS订单表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ods.ods_fgedu_order (
order_id BIGINT COMMENT ‘订单ID’,
user_id BIGINT COMMENT ‘用户ID’,
order_time STRING COMMENT ‘下单时间’,
pay_time STRING COMMENT ‘支付时间’,
order_amount DECIMAL(18,2) COMMENT ‘订单金额’,
pay_amount DECIMAL(18,2) COMMENT ‘支付金额’,
order_status STRING COMMENT ‘订单状态’,
item_id BIGINT COMMENT ‘商品ID’,
item_cnt INT COMMENT ‘商品数量’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
WITH SERDEPROPERTIES (
‘field.delim’ = ‘\t’,
‘line.delim’ = ‘\n’
)
STORED AS TEXTFILE
LOCATION ‘/bigdata/fgdata/warehouse/ods/fgedu_order’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— ODS用户表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ods.ods_fgedu_user (
user_id BIGINT COMMENT ‘用户ID’,
user_name STRING COMMENT ‘用户名’,
phone STRING COMMENT ‘手机号’,
email STRING COMMENT ‘邮箱’,
city STRING COMMENT ‘城市’,
register_time STRING COMMENT ‘注册时间’,
user_level STRING COMMENT ‘用户等级’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
WITH SERDEPROPERTIES (
‘field.delim’ = ‘\t’,
‘line.delim’ = ‘\n’
)
STORED AS TEXTFILE
LOCATION ‘/bigdata/fgdata/warehouse/ods/fgedu_user’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 加载数据到ODS
LOAD DATA INPATH ‘/bigdata/fgdata/raw/user_log/2024-04-08′
OVERWRITE INTO TABLE fgedu_ods.ods_fgedu_user_log
PARTITION (dt=’2024-04-08’);
3.2 DWD层设计
3.2.1 DWD层建表
CREATE DATABASE IF NOT EXISTS fgedu_dwd;
— DWD用户行为日志表(清洗后)
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_dwd.dwd_fgedu_user_log_d (
log_time TIMESTAMP COMMENT ‘日志时间’,
user_id BIGINT COMMENT ‘用户ID’,
event_type STRING COMMENT ‘事件类型’,
page STRING COMMENT ‘页面’,
item_id BIGINT COMMENT ‘商品ID’,
duration INT COMMENT ‘停留时长(秒)’,
ip STRING COMMENT ‘IP地址’,
device_id STRING COMMENT ‘设备ID’,
platform STRING COMMENT ‘平台’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/dwd/fgedu_user_log_d’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 从ODS清洗到DWD
INSERT OVERWRITE TABLE fgedu_dwd.dwd_fgedu_user_log_d
PARTITION (dt=’2024-04-08′)
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
duration,
ip,
SPLIT(user_agent, ‘;’)[0] AS device_id,
CASE
WHEN user_agent LIKE ‘%iPhone%’ THEN ‘iOS’
WHEN user_agent LIKE ‘%Android%’ THEN ‘Android’
ELSE ‘Web’
END AS platform
FROM fgedu_ods.ods_fgedu_user_log
WHERE dt = ‘2024-04-08’
AND user_id IS NOT NULL
AND log_time IS NOT NULL;
— DWD订单表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_dwd.dwd_fgedu_order_d (
order_id BIGINT COMMENT ‘订单ID’,
user_id BIGINT COMMENT ‘用户ID’,
order_time TIMESTAMP COMMENT ‘下单时间’,
pay_time TIMESTAMP COMMENT ‘支付时间’,
order_amount DECIMAL(18,2) COMMENT ‘订单金额’,
pay_amount DECIMAL(18,2) COMMENT ‘支付金额’,
order_status STRING COMMENT ‘订单状态’,
item_id BIGINT COMMENT ‘商品ID’,
item_cnt INT COMMENT ‘商品数量’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/dwd/fgedu_order_d’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— DWD用户表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_dwd.dwd_fgedu_user_d (
user_id BIGINT COMMENT ‘用户ID’,
user_name STRING COMMENT ‘用户名’,
phone STRING COMMENT ‘手机号(脱敏)’,
email STRING COMMENT ‘邮箱(脱敏)’,
city STRING COMMENT ‘城市’,
register_time TIMESTAMP COMMENT ‘注册时间’,
user_level STRING COMMENT ‘用户等级’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/dwd/fgedu_user_d’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
3.3 DWS和ADS层设计
3.3.1 DWS和ADS层建表
CREATE DATABASE IF NOT EXISTS fgedu_dws;
— DWS用户行为日汇总表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_dws.dws_fgedu_user_1d (
dt STRING COMMENT ‘日期’,
user_id BIGINT COMMENT ‘用户ID’,
pv BIGINT COMMENT ‘页面浏览量’,
uv BIGINT COMMENT ‘独立访客’,
click_cnt BIGINT COMMENT ‘点击次数’,
order_cnt BIGINT COMMENT ‘下单次数’,
pay_cnt BIGINT COMMENT ‘支付次数’,
total_duration BIGINT COMMENT ‘总停留时长’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/dws/fgedu_user_1d’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 从DWD汇总到DWS
INSERT OVERWRITE TABLE fgedu_dws.dws_fgedu_user_1d
PARTITION (dt=’2024-04-08′)
SELECT
‘2024-04-08’ AS dt,
user_id,
SUM(CASE WHEN event_type = ‘page_view’ THEN 1 ELSE 0 END) AS pv,
COUNT(DISTINCT user_id) AS uv,
SUM(CASE WHEN event_type = ‘click’ THEN 1 ELSE 0 END) AS click_cnt,
SUM(CASE WHEN event_type = ‘order’ THEN 1 ELSE 0 END) AS order_cnt,
SUM(CASE WHEN event_type = ‘pay’ THEN 1 ELSE 0 END) AS pay_cnt,
SUM(duration) AS total_duration
FROM fgedu_dwd.dwd_fgedu_user_log_d
WHERE dt = ‘2024-04-08’
GROUP BY user_id;
— DWS用户行为7日汇总表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_dws.dws_fgedu_user_7d (
dt STRING COMMENT ‘日期’,
user_id BIGINT COMMENT ‘用户ID’,
pv_7d BIGINT COMMENT ‘7日PV’,
uv_7d BIGINT COMMENT ‘7日UV’,
order_cnt_7d BIGINT COMMENT ‘7日下单次数’,
pay_amt_7d DECIMAL(18,2) COMMENT ‘7日支付金额’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/dws/fgedu_user_7d’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 创建ADS数据库
CREATE DATABASE IF NOT EXISTS fgedu_ads;
— ADS用户日活跃报表
CREATE EXTERNAL TABLE IF NOT EXISTS fgedu_ads.ads_fgedu_user_daily (
dt STRING COMMENT ‘日期’,
dau BIGINT COMMENT ‘日活’,
new_user_cnt BIGINT COMMENT ‘新增用户数’,
pv BIGINT COMMENT ‘总PV’,
avg_session_duration DECIMAL(10,2) COMMENT ‘平均会话时长’,
order_cnt BIGINT COMMENT ‘订单数’,
total_pay_amt DECIMAL(18,2) COMMENT ‘总支付金额’
)
PARTITIONED BY (dt STRING COMMENT ‘日期’)
STORED AS ORC
LOCATION ‘/bigdata/fgdata/warehouse/ads/fgedu_user_daily’
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 从DWS汇总到ADS
INSERT OVERWRITE TABLE fgedu_ads.ads_fgedu_user_daily
PARTITION (dt=’2024-04-08′)
SELECT
‘2024-04-08’ AS dt,
COUNT(DISTINCT user_id) AS dau,
SUM(CASE WHEN user_level = ‘new’ THEN 1 ELSE 0 END) AS new_user_cnt,
SUM(pv) AS pv,
AVG(total_duration) AS avg_session_duration,
SUM(order_cnt) AS order_cnt,
SUM(o.pay_amount) AS total_pay_amt
FROM fgedu_dws.dws_fgedu_user_1d u
LEFT JOIN fgedu_dwd.dwd_fgedu_order_d o ON u.user_id = o.user_id
WHERE u.dt = ‘2024-04-08’
AND o.dt = ‘2024-04-08’;
Part04-生产案例与实战讲解
4.1 用户行为数据仓库实战
4.1.1 完整的ETL流程
— 每天凌晨加载前一天的数据
LOAD DATA INPATH ‘/bigdata/fgdata/raw/user_log/2024-04-07′
OVERWRITE INTO TABLE fgedu_ods.ods_fgedu_user_log
PARTITION (dt=’2024-04-07’);
LOAD DATA INPATH ‘/bigdata/fgdata/raw/order/2024-04-07′
OVERWRITE INTO TABLE fgedu_ods.ods_fgedu_order
PARTITION (dt=’2024-04-07′);
— 2. DWD层数据清洗
INSERT OVERWRITE TABLE fgedu_dwd.dwd_fgedu_user_log_d
PARTITION (dt=’2024-04-07’)
SELECT
TO_TIMESTAMP(log_time) AS log_time,
user_id,
event_type,
page,
item_id,
duration,
ip,
SPLIT(user_agent, ‘;’)[0] AS device_id,
CASE
WHEN user_agent LIKE ‘%iPhone%’ OR user_agent LIKE ‘%iPad%’ THEN ‘iOS’
WHEN user_agent LIKE ‘%Android%’ THEN ‘Android’
ELSE ‘Web’
END AS platform
FROM fgedu_ods.ods_fgedu_user_log
WHERE dt = ‘2024-04-07′
AND user_id IS NOT NULL
AND user_id > 0
AND log_time IS NOT NULL;
INSERT OVERWRITE TABLE fgedu_dwd.dwd_fgedu_order_d
PARTITION (dt=’2024-04-07’)
SELECT
order_id,
user_id,
TO_TIMESTAMP(order_time) AS order_time,
TO_TIMESTAMP(pay_time) AS pay_time,
order_amount,
pay_amount,
order_status,
item_id,
item_cnt
FROM fgedu_ods.ods_fgedu_order
WHERE dt = ‘2024-04-07′
AND order_id IS NOT NULL
AND user_id IS NOT NULL;
— 3. DWS层数据汇总
INSERT OVERWRITE TABLE fgedu_dws.dws_fgedu_user_1d
PARTITION (dt=’2024-04-07’)
SELECT
‘2024-04-07’ AS dt,
user_id,
SUM(CASE WHEN event_type = ‘page_view’ THEN 1 ELSE 0 END) AS pv,
COUNT(1) AS total_events,
SUM(CASE WHEN event_type = ‘click’ THEN 1 ELSE 0 END) AS click_cnt,
SUM(CASE WHEN event_type = ‘order’ THEN 1 ELSE 0 END) AS order_cnt,
SUM(CASE WHEN event_type = ‘pay’ THEN 1 ELSE 0 END) AS pay_cnt,
SUM(duration) AS total_duration
FROM fgedu_dwd.dwd_fgedu_user_log_d
WHERE dt = ‘2024-04-07′
GROUP BY user_id;
— 4. ADS层报表生成
INSERT OVERWRITE TABLE fgedu_ads.ads_fgedu_user_daily
PARTITION (dt=’2024-04-07’)
SELECT
‘2024-04-07’ AS dt,
COUNT(DISTINCT u.user_id) AS dau,
SUM(CASE WHEN u.register_time >= ‘2024-04-07 00:00:00’ THEN 1 ELSE 0 END) AS new_user_cnt,
SUM(u.pv) AS pv,
AVG(u.total_duration) AS avg_session_duration,
COUNT(DISTINCT o.order_id) AS order_cnt,
SUM(o.pay_amount) AS total_pay_amt
FROM fgedu_dws.dws_fgedu_user_1d u
LEFT JOIN fgedu_dwd.dwd_fgedu_order_d o
ON u.user_id = o.user_id
AND o.dt = ‘2024-04-07’
WHERE u.dt = ‘2024-04-07’
GROUP BY ‘2024-04-07’;
4.2 性能优化实战
4.2.1 数据仓库性能优化
— 使用ORC或Parquet格式
STORED AS ORC
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 优化2:分区裁剪
— 查询时一定要带分区字段
SELECT * FROM dwd_fgedu_user_log_d WHERE dt = ‘2024-04-08’;
— 优化3:分桶
CREATE TABLE dwd_fgedu_user_log_d (
…
)
CLUSTERED BY (user_id) INTO 32 BUCKETS
STORED AS ORC;
— 优化4:小文件合并
— 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;
SET hive.merge.smallfiles.avgsize=16000000;
— 优化5:数据压缩
— 使用Snappy或ZSTD压缩
TBLPROPERTIES (‘orc.compress’ = ‘SNAPPY’);
— 优化6:使用合适的执行引擎
— Tez引擎
SET hive.execution.engine=tez;
— Spark引擎
SET hive.execution.engine=spark;
— 优化7:合理设置并行度
SET mapreduce.job.reduces=16;
SET hive.exec.reducers.bytes.per.reducer=256000000;
— 优化8:数据倾斜处理
— 加盐处理
SELECT
CASE WHEN user_id % 100 = 0 THEN user_id + FLOOR(RAND() * 1000000) ELSE user_id END AS user_id,
COUNT(*)
FROM dwd_fgedu_user_log_d
WHERE dt = ‘2024-04-08′
GROUP BY CASE WHEN user_id % 100 = 0 THEN user_id + FLOOR(RAND() * 1000000) ELSE user_id END;
— 优化9:使用CBO
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
— 优化10:收集统计信息
ANALYZE TABLE dwd_fgedu_user_log_d PARTITION (dt=’2024-04-08′) COMPUTE STATISTICS;
ANALYZE TABLE dwd_fgedu_user_log_d PARTITION (dt=’2024-04-08’) COMPUTE STATISTICS FOR COLUMNS user_id, event_type;
4.3 数据仓库运维实战
4.3.1 日常运维
SHOW PARTITIONS dwd_fgedu_user_log_d;
— 2. 添加分区
ALTER TABLE dwd_fgedu_user_log_d ADD PARTITION (dt=’2024-04-09′);
— 3. 删除分区
ALTER TABLE dwd_fgedu_user_log_d DROP PARTITION (dt=’2024-01-01′);
— 4. 修复分区
MSCK REPAIR TABLE dwd_fgedu_user_log_d;
— 5. 查看表大小
dfs -du -h /bigdata/fgdata/warehouse/dwd/fgedu_user_log_d;
— 6. 清理过期分区
— 保留最近90天
ALTER TABLE dwd_fgedu_user_log_d DROP PARTITION (dt<‘2024-01-01’);
— 7. 备份元数据
mysqldump -h fgedu-mysql -u hive -p hive > hive_metadata_$(date +%Y%m%d).sql
— 8. 同步元数据
— 使用HiveMetastore
— 或使用Atlas管理元数据
— 9. 数据质量检查
— 检查分区数据量
SELECT dt, COUNT(*)
FROM dwd_fgedu_user_log_d
GROUP BY dt
ORDER BY dt DESC;
— 检查NULL值
SELECT
SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS user_id_null_cnt,
SUM(CASE WHEN log_time IS NULL THEN 1 ELSE 0 END) AS log_time_null_cnt
FROM dwd_fgedu_user_log_d
WHERE dt = ‘2024-04-08’;
— 10. 数据血缘
— 使用Atlas查看数据血缘
— 或使用自定义工具
Part05-风哥经验总结与分享
5.1 数据仓库最佳实践
数据仓库最佳实践:
- 分层设计:严格遵循ODS-DWD-DWS-ADS分层
- 命名规范:统一的命名规范,便于理解
- 分区设计:合理的分区字段,通常按天分区
- 存储格式:使用ORC/Parquet列式存储
- 数据压缩:启用压缩,减少存储和IO
- 元数据管理:完善的元数据管理
5.2 常见问题处理
– 查看Key分布
– 加盐处理
– 两阶段聚合
– 调整并行度
# 常见问题2:查询慢
– 检查分区裁剪
– 查看执行计划
– 使用列式存储
– 收集统计信息
– 优化SQL
# 常见问题3:小文件过多
– 合并小文件
– 调整任务参数
– 使用CombineHiveInputFormat
– 定期合并
# 常见问题4:数据不一致
– 检查重跑逻辑
– 幂等性设计
– 检查时间窗口
– 重新跑数
# 常见问题5:分区丢失
– MSCK修复分区
– 检查数据加载
– 检查分区创建
– 监控分区状态
5.3 运维检查清单
– [ ] 分区状态正常
– [ ] 数据加载成功
– [ ] 数据质量检查
– [ ] 表大小合理
– [ ] 小文件合并
– [ ] 查询性能正常
– [ ] 元数据备份
– [ ] 数据血缘完整
– [ ] 权限配置正确
– [ ] 监控告警正常
– [ ] 日志检查
– [ ] 告警规则检查
# 日常巡检内容
1. 检查分区状态
2. 检查数据加载任务
3. 检查数据质量
4. 检查表大小增长
5. 查看慢查询
6. 检查小文件
7. 检查告警
8. 备份元数据
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
