内容简介:本文详细介绍Hive离线数仓开发的核心方法与生产实战应用。风哥教程参考Hive官方文档Data Warehouse Development、ETL Best Practices等内容,涵盖数据抽取、数据清洗、数据转换、数据加载等ETL全流程开发,结合生产环境实际案例,帮助读者掌握企业级离线数仓开发的实战技能。
目录大纲
Part01-基础概念与理论知识
1.1 离线数仓开发概述
1.2 ETL流程设计原则
1.3 开发规范与标准
Part02-生产环境规划与建议
2.1 数据抽取策略
2.2 数据清洗规则
2.3 调度依赖设计
Part03-生产环境项目实施方案
3.1 数据抽取开发
3.2 数据清洗开发
3.3 数据转换开发
Part04-生产案例与实战讲解
4.1 订单数据ETL案例
4.2 用户数据ETL案例
4.3 商品数据ETL案例
Part05-风哥经验总结与分享
5.1 离线开发最佳实践
5.2 常见问题与解决方案
5.3 生产环境注意事项
Part01-基础概念与理论知识
1.1 离线数仓开发概述
离线数仓开发是大数据平台建设的核心工作,通过ETL流程将业务数据转化为可分析的数据资产。更多视频教程www.fgedu.net.cn
离线数仓特点:
1. 批量处理:定时批量处理数据
2. 延迟性:T+1或更长延迟
3. 高吞吐:处理大规模历史数据
4. 稳定性:稳定的调度执行
开发流程:
需求分析 → 模型设计 → ETL开发 → 测试验证 → 上线部署 → 运维监控
1.2 ETL流程设计原则
ETL流程设计需要遵循以下原则:学习交流加群风哥微信: itpux-com
数据抽取原则:
1. 增量优先:优先使用增量抽取减少数据量
2. 断点续传:支持断点续传,避免重复抽取
3. 数据校验:抽取后进行数据校验
数据清洗原则:
1. 空值处理:合理处理空值
2. 异常值处理:识别和处理异常值
3. 数据标准化:统一数据格式和编码
数据加载原则:
1. 幂等性:支持重复执行
2. 原子性:保证数据一致性
3. 可追溯:记录数据血缘
1.3 开发规范与标准
统一的开发规范是团队协作的基础:
— 1. 使用统一缩进(2空格或4空格)
— 2. 关键字大写
— 3. 每个子查询添加注释
— 4. 使用CTE提高可读性
— 规范示例
WITH user_orders AS (
— 获取用户订单数据
SELECT user_id, COUNT(*) AS order_count
FROM dwd_trade.dwd_trade_order_di
WHERE dt = ‘${dt}’
GROUP BY user_id
),
user_profile AS (
— 获取用户画像数据
SELECT user_id, user_level
FROM dwd_user.dwd_user_profile_di
WHERE dt = ‘${dt}’
)
SELECT
u.user_id,
u.user_level,
o.order_count
FROM user_profile u
LEFT JOIN user_orders o ON u.user_id = o.user_id;
Part02-生产环境规划与建议
2.1 数据抽取策略
数据抽取策略需要根据数据特点选择:风哥提示:合理的抽取策略可以大幅减少数据处理时间。
— 全量抽取:适用于小表、维度表
— 增量抽取:适用于大表、事实表
— CDC抽取:适用于实时性要求高的场景
— 增量抽取示例(基于时间戳)
SELECT * FROM source_table
WHERE update_time >= ‘${last_extract_time}’
AND update_time < '${current_extract_time}';
— 增量抽取示例(基于自增ID)
SELECT * FROM source_table
WHERE id > ${last_max_id}
ORDER BY id;
2.2 数据清洗规则
数据清洗规则需要根据业务需求制定:更多学习教程公众号风哥教程itpux_com
— 1. 空值处理
SELECT
COALESCE(user_name, ‘未知’) AS user_name,
NVL(age, 0) AS age
FROM source_table;
— 2. 异常值处理
SELECT
CASE
WHEN age < 0 OR age > 150 THEN NULL
ELSE age
END AS age
FROM source_table;
— 3. 数据标准化
SELECT
TRIM(UPPER(user_name)) AS user_name,
REGEXP_REPLACE(phone, ‘[^0-9]’, ”) AS phone
FROM source_table;
— 4. 去重处理
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY update_time DESC) AS rn
FROM source_table
) t WHERE rn = 1;
2.3 调度依赖设计
调度依赖设计是保证数据正确性的关键:
— 任务依赖图
ods_sync_task (02:00)
├── dwd_clean_task (04:00)
│ ├── dws_agg_task (06:00)
│ │ └── ads_report_task (08:00)
│ └── dws_user_task (06:00)
└── dim_sync_task (04:00)
— 调度参数配置
— 重试次数: 3
— 重试间隔: 5分钟
— 超时时间: 2小时
— 告警阈值: 失败1次告警
Part03-生产环境项目实施方案
3.1 数据抽取开发
数据抽取是ETL流程的第一步。from bigdata视频:www.itpux.com
3.1.1 Sqoop抽取脚本
# sqoop_extract.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Sqoop数据抽取 ===”
echo “Date: $(date)”
DT=$1
if [ -z “$DT” ]; then
DT=$(date -d “-1 day” +%Y-%m-%d)
fi
# 抽取订单数据
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
–username fgedu \
–password-file /user/hive/.password \
–table orders \
–where “create_time >= ‘${DT} 00:00:00’ AND create_time < '${DT} 23:59:59'" \
–target-dir /user/hive/warehouse/ods_trade.db/ods_order_di/dt=${DT} \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–compress \
–compression-codec org.apache.hadoop.io.compress.SnappyCodec \
–num-mappers 4 \
–split-by id;
echo “=== 抽取完成 ===”
Date: Fri Jan 19 02:00:00 CST 2024
# 执行Sqoop抽取
24/01/19 02:00:00 INFO sqoop.ImportTool: Beginning import of orders
24/01/19 02:05:00 INFO mapred.JobClient: Map input records=1250000
24/01/19 02:05:00 INFO mapred.JobClient: Map output records=1250000
24/01/19 02:05:00 INFO sqoop.ImportTool: Transfer complete: 1250000 records.
=== 抽取完成 ===
3.2 数据清洗开发
数据清洗是保证数据质量的关键环节。学习交流加群风哥QQ113257174
— 清洗ODS层数据写入DWD层
INSERT OVERWRITE TABLE dwd_trade.dwd_trade_order_di PARTITION(dt=’${dt}’)
SELECT
— 主键处理
TRIM(order_id) AS order_id,
— 外键处理
TRIM(user_id) AS user_id,
TRIM(product_id) AS product_id,
— 金额处理
CASE
WHEN order_amount < 0 THEN 0
WHEN order_amount > 1000000 THEN NULL
ELSE ROUND(order_amount, 2)
END AS order_amount,
— 状态处理
CASE UPPER(TRIM(order_status))
WHEN ‘CREATED’ THEN ’10’
WHEN ‘PAID’ THEN ’20’
WHEN ‘SHIPPED’ THEN ’30’
WHEN ‘COMPLETED’ THEN ’40’
WHEN ‘CANCELLED’ THEN ’50’
ELSE ’00’
END AS order_status,
— 时间处理
FROM_UNIXTIME(UNIX_TIMESTAMP(create_time, ‘yyyy-MM-dd HH:mm:ss’)) AS create_time,
— ETL时间
CURRENT_TIMESTAMP AS etl_time
FROM ods_trade.ods_trade_order_di
WHERE dt = ‘${dt}’
AND order_id IS NOT NULL
AND order_id != ”;
Query ID = root_20240119040000_xxxx
Total jobs = 1
Loading data to table dwd_trade.dwd_trade_order_di
Table dwd_trade.dwd_trade_order_di stats: [numFiles=1, numRows=1248750, totalSize=45678901]
# 清洗统计
原始数据: 1250000 条
清洗后数据: 1248750 条
过滤数据: 1250 条 (空主键)
异常金额修正: 35 条
状态码转换: 1248750 条
3.3 数据转换开发
数据转换实现业务逻辑处理:
— DWD层到DWS层的转换
INSERT OVERWRITE TABLE dws_trade.dws_trade_user_order_1d PARTITION(dt=’${dt}’)
SELECT
user_id,
user_name,
user_level,
— 订单统计
COUNT(*) AS order_count,
SUM(order_amount) AS order_amount,
AVG(order_amount) AS avg_order_amount,
— 支付统计
SUM(CASE WHEN order_status >= ’20’ THEN order_amount ELSE 0 END) AS payment_amount,
SUM(CASE WHEN order_status >= ’40’ THEN 1 ELSE 0 END) AS completed_count,
— 时间统计
MIN(create_time) AS first_order_time,
MAX(create_time) AS last_order_time,
CURRENT_TIMESTAMP AS etl_time
FROM dwd_trade.dwd_trade_order_di o
LEFT JOIN dim_trade.dim_user_df u ON o.user_id = u.user_id
WHERE o.dt = ‘${dt}’
GROUP BY user_id, user_name, user_level;
Part04-生产案例与实战讲解
4.1 订单数据ETL案例
本案例演示订单数据完整的ETL流程。更多视频教程www.fgedu.net.cn
# order_etl_pipeline.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== 订单数据ETL流程 ===”
echo “Date: $(date)”
DT=$1
if [ -z “$DT” ]; then
DT=$(date -d “-1 day” +%Y-%m-%d)
fi
echo “Processing date: $DT”
# Step 1: ODS层数据加载
echo “Step 1: Loading ODS layer…”
hive -e “
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
LOAD DATA INPATH ‘/data/orders/${DT}’
OVERWRITE INTO TABLE ods_trade.ods_trade_order_di
PARTITION(dt=’${DT}’);
“
# Step 2: DWD层数据清洗
echo “Step 2: Cleaning DWD layer…”
hive -e “
INSERT OVERWRITE TABLE dwd_trade.dwd_trade_order_di PARTITION(dt=’${DT}’)
SELECT
TRIM(order_id), TRIM(user_id), TRIM(product_id),
ROUND(order_amount, 2), order_status, create_time,
CURRENT_TIMESTAMP
FROM ods_trade.ods_trade_order_di
WHERE dt=’${DT}’ AND order_id IS NOT NULL;
“
# Step 3: DWS层数据汇总
echo “Step 3: Aggregating DWS layer…”
hive -e “
INSERT OVERWRITE TABLE dws_trade.dws_trade_order_summary_1d PARTITION(dt=’${DT}’)
SELECT
user_id,
COUNT(*) AS order_count,
SUM(order_amount) AS total_amount,
CURRENT_TIMESTAMP
FROM dwd_trade.dwd_trade_order_di
WHERE dt=’${DT}’
GROUP BY user_id;
“
# Step 4: 数据质量检查
echo “Step 4: Data quality check…”
hive -e “
SELECT ‘ODS’ AS layer, COUNT(*) AS cnt FROM ods_trade.ods_trade_order_di WHERE dt=’${DT}’
UNION ALL
SELECT ‘DWD’ AS layer, COUNT(*) AS cnt FROM dwd_trade.dwd_trade_order_di WHERE dt=’${DT}’
UNION ALL
SELECT ‘DWS’ AS layer, COUNT(*) AS cnt FROM dws_trade.dws_trade_order_summary_1d WHERE dt=’${DT}’;
“
echo “=== ETL流程完成 ===”
Date: Fri Jan 19 04:00:00 CST 2024
Processing date: 2024-01-18
Step 1: Loading ODS layer…
OK
Time taken: 12.345 seconds
Step 2: Cleaning DWD layer…
OK
Time taken: 45.678 seconds
Step 3: Aggregating DWS layer…
OK
Time taken: 34.567 seconds
Step 4: Data quality check…
+——-+———+
| layer | cnt |
+——-+———+
| ODS | 1250000 |
| DWD | 1248750 |
| DWS | 356789 |
+——-+———+
=== ETL流程完成 ===
4.2 用户数据ETL案例
用户数据ETL案例演示用户维度数据处理。学习交流加群风哥微信: itpux-com
— ODS层:用户原始数据
CREATE TABLE ods_user.ods_user_info_df (
user_id STRING,
user_name STRING,
phone STRING,
email STRING,
register_time STRING,
etl_time TIMESTAMP
) PARTITIONED BY (dt STRING)
STORED AS ORC;
— DWD层:用户清洗数据
INSERT OVERWRITE TABLE dwd_user.dwd_user_info_df PARTITION(dt=’${dt}’)
SELECT
TRIM(user_id) AS user_id,
TRIM(user_name) AS user_name,
REGEXP_REPLACE(phone, ‘[^0-9]’, ”) AS phone,
LOWER(TRIM(email)) AS email,
FROM_UNIXTIME(UNIX_TIMESTAMP(register_time, ‘yyyy-MM-dd HH:mm:ss’)) AS register_time,
CURRENT_TIMESTAMP AS etl_time
FROM ods_user.ods_user_info_df
WHERE dt = ‘${dt}’
AND user_id IS NOT NULL
AND user_id != ”;
4.3 商品数据ETL案例
商品数据ETL案例演示商品维度数据处理。风哥提示:维度表数据需要保持全量快照。
— 维度表全量快照
INSERT OVERWRITE TABLE dim_trade.dim_product_df PARTITION(dt=’${dt}’)
SELECT
p.product_id,
p.product_name,
p.category_id,
c.category_name,
p.brand_id,
b.brand_name,
p.price,
p.status,
CURRENT_TIMESTAMP AS etl_time
FROM ods_product.ods_product_info_df p
LEFT JOIN dim_trade.dim_category_df c ON p.category_id = c.category_id
LEFT JOIN dim_trade.dim_brand_df b ON p.brand_id = b.brand_id
WHERE p.dt = ‘${dt}’;
Part05-风哥经验总结与分享
5.1 离线开发最佳实践
风哥在生产环境中的离线开发经验总结:from bigdata视频:www.itpux.com
1. 开发流程:
需求确认 → 方案设计 → 代码开发 → 单元测试 → 集成测试 → 上线部署
2. 代码质量:
统一编码规范,添加必要注释,进行代码审查
3. 性能优化:
合理使用分区,优化Join策略,控制数据倾斜
4. 数据质量:
建立数据质量检查机制,设置告警阈值
5.2 常见问题与解决方案
问题1:任务执行超时
解决方案:优化SQL,增加并行度,调整超时配置。
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
SET mapreduce.job.timeout=7200000;
问题2:数据质量异常
解决方案:建立数据质量检查机制,设置告警。学习交流加群风哥QQ113257174
5.3 生产环境注意事项
1. 任务监控:建立完善的任务监控和告警机制。
2. 数据备份:定期备份关键数据,建立恢复机制。
3. 文档维护:及时更新开发文档,保持文档与代码同步。
风哥提示:离线数仓开发是大数据平台建设的核心工作,需要建立完善的开发规范和质量保障机制。在生产环境中,要注重代码质量、性能优化和数据质量,确保数仓稳定运行和数据准确性。
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
