1. 首页 > Hadoop教程 > 正文

大数据教程FG041-Hive离线数仓开发实战

内容简介:本文详细介绍Hive离线数仓开发的核心方法与生产实战应用。风哥教程参考Hive官方文档Data Warehouse Development、ETL Best Practices等内容,涵盖数据抽取、数据清洗、数据转换、数据加载等ETL全流程开发,结合生产环境实际案例,帮助读者掌握企业级离线数仓开发的实战技能。

目录大纲

Part01-基础概念与理论知识
  1.1 离线数仓开发概述
  1.2 ETL流程设计原则
  1.3 开发规范与标准
Part02-生产环境规划与建议
  2.1 数据抽取策略
  2.2 数据清洗规则
  2.3 调度依赖设计
Part03-生产环境项目实施方案
  3.1 数据抽取开发
  3.2 数据清洗开发
  3.3 数据转换开发
Part04-生产案例与实战讲解
  4.1 订单数据ETL案例
  4.2 用户数据ETL案例
  4.3 商品数据ETL案例
Part05-风哥经验总结与分享
  5.1 离线开发最佳实践
  5.2 常见问题与解决方案
  5.3 生产环境注意事项

Part01-基础概念与理论知识

1.1 离线数仓开发概述

离线数仓开发是大数据平台建设的核心工作,通过ETL流程将业务数据转化为可分析的数据资产。更多视频教程www.fgedu.net.cn

离线数仓特点:

1. 批量处理:定时批量处理数据
2. 延迟性:T+1或更长延迟
3. 高吞吐:处理大规模历史数据
4. 稳定性:稳定的调度执行

开发流程:

需求分析 → 模型设计 → ETL开发 → 测试验证 → 上线部署 → 运维监控

1.2 ETL流程设计原则

ETL流程设计需要遵循以下原则:学习交流加群风哥微信: itpux-com

数据抽取原则:

1. 增量优先:优先使用增量抽取减少数据量
2. 断点续传:支持断点续传,避免重复抽取
3. 数据校验:抽取后进行数据校验

数据清洗原则:

1. 空值处理:合理处理空值
2. 异常值处理:识别和处理异常值
3. 数据标准化:统一数据格式和编码

数据加载原则:

1. 幂等性:支持重复执行
2. 原子性:保证数据一致性
3. 可追溯:记录数据血缘

1.3 开发规范与标准

统一的开发规范是团队协作的基础:

— SQL开发规范
— 1. 使用统一缩进(2空格或4空格)
— 2. 关键字大写
— 3. 每个子查询添加注释
— 4. 使用CTE提高可读性

— 规范示例
WITH user_orders AS (
  — 获取用户订单数据
  SELECT user_id, COUNT(*) AS order_count
  FROM dwd_trade.dwd_trade_order_di
  WHERE dt = ‘${dt}’
  GROUP BY user_id
),
user_profile AS (
  — 获取用户画像数据
  SELECT user_id, user_level
  FROM dwd_user.dwd_user_profile_di
  WHERE dt = ‘${dt}’
)
SELECT
  u.user_id,
  u.user_level,
  o.order_count
FROM user_profile u
LEFT JOIN user_orders o ON u.user_id = o.user_id;

Part02-生产环境规划与建议

2.1 数据抽取策略

数据抽取策略需要根据数据特点选择:风哥提示:合理的抽取策略可以大幅减少数据处理时间。

— 抽取策略选择
— 全量抽取:适用于小表、维度表
— 增量抽取:适用于大表、事实表
— CDC抽取:适用于实时性要求高的场景

— 增量抽取示例(基于时间戳)
SELECT * FROM source_table
WHERE update_time >= ‘${last_extract_time}’
  AND update_time < '${current_extract_time}'; — 增量抽取示例(基于自增ID)
SELECT * FROM source_table
WHERE id > ${last_max_id}
ORDER BY id;

2.2 数据清洗规则

数据清洗规则需要根据业务需求制定:更多学习教程公众号风哥教程itpux_com

— 常用清洗规则

— 1. 空值处理
SELECT
  COALESCE(user_name, ‘未知’) AS user_name,
  NVL(age, 0) AS age
FROM source_table;

— 2. 异常值处理
SELECT
  CASE
    WHEN age < 0 OR age > 150 THEN NULL
    ELSE age
  END AS age
FROM source_table;

— 3. 数据标准化
SELECT
  TRIM(UPPER(user_name)) AS user_name,
  REGEXP_REPLACE(phone, ‘[^0-9]’, ”) AS phone
FROM source_table;

— 4. 去重处理
SELECT * FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY update_time DESC) AS rn
  FROM source_table
) t WHERE rn = 1;

2.3 调度依赖设计

调度依赖设计是保证数据正确性的关键:

— 调度依赖关系示例

— 任务依赖图
ods_sync_task (02:00)
  ├── dwd_clean_task (04:00)
  │    ├── dws_agg_task (06:00)
  │    │    └── ads_report_task (08:00)
  │    └── dws_user_task (06:00)
  └── dim_sync_task (04:00)

— 调度参数配置
— 重试次数: 3
— 重试间隔: 5分钟
— 超时时间: 2小时
— 告警阈值: 失败1次告警

Part03-生产环境项目实施方案

3.1 数据抽取开发

数据抽取是ETL流程的第一步。from bigdata视频:www.itpux.com

3.1.1 Sqoop抽取脚本

#!/bin/bash
# sqoop_extract.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== Sqoop数据抽取 ===”
echo “Date: $(date)”

DT=$1
if [ -z “$DT” ]; then
  DT=$(date -d “-1 day” +%Y-%m-%d)
fi

# 抽取订单数据
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
  –username fgedu \
  –password-file /user/hive/.password \
  –table orders \
  –where “create_time >= ‘${DT} 00:00:00’ AND create_time < '${DT} 23:59:59'" \
  –target-dir /user/hive/warehouse/ods_trade.db/ods_order_di/dt=${DT} \
  –delete-target-dir \
  –fields-terminated-by ‘\t’ \
  –null-string ‘\\N’ \
  –null-non-string ‘\\N’ \
  –compress \
  –compression-codec org.apache.hadoop.io.compress.SnappyCodec \
  –num-mappers 4 \
  –split-by id;

echo “=== 抽取完成 ===”

=== Sqoop数据抽取 ===
Date: Fri Jan 19 02:00:00 CST 2024

# 执行Sqoop抽取
24/01/19 02:00:00 INFO sqoop.ImportTool: Beginning import of orders
24/01/19 02:05:00 INFO mapred.JobClient: Map input records=1250000
24/01/19 02:05:00 INFO mapred.JobClient: Map output records=1250000
24/01/19 02:05:00 INFO sqoop.ImportTool: Transfer complete: 1250000 records.

=== 抽取完成 ===

3.2 数据清洗开发

数据清洗是保证数据质量的关键环节。学习交流加群风哥QQ113257174

— 数据清洗脚本
— 清洗ODS层数据写入DWD层

INSERT OVERWRITE TABLE dwd_trade.dwd_trade_order_di PARTITION(dt=’${dt}’)
SELECT
  — 主键处理
  TRIM(order_id) AS order_id,
  
  — 外键处理
  TRIM(user_id) AS user_id,
  TRIM(product_id) AS product_id,
  
  — 金额处理
  CASE
    WHEN order_amount < 0 THEN 0
    WHEN order_amount > 1000000 THEN NULL
    ELSE ROUND(order_amount, 2)
  END AS order_amount,
  
  — 状态处理
  CASE UPPER(TRIM(order_status))
    WHEN ‘CREATED’ THEN ’10’
    WHEN ‘PAID’ THEN ’20’
    WHEN ‘SHIPPED’ THEN ’30’
    WHEN ‘COMPLETED’ THEN ’40’
    WHEN ‘CANCELLED’ THEN ’50’
    ELSE ’00’
  END AS order_status,
  
  — 时间处理
  FROM_UNIXTIME(UNIX_TIMESTAMP(create_time, ‘yyyy-MM-dd HH:mm:ss’)) AS create_time,
  
  — ETL时间
  CURRENT_TIMESTAMP AS etl_time

FROM ods_trade.ods_trade_order_di
WHERE dt = ‘${dt}’
  AND order_id IS NOT NULL
  AND order_id != ”;

# 数据清洗执行
Query ID = root_20240119040000_xxxx
Total jobs = 1

Loading data to table dwd_trade.dwd_trade_order_di
Table dwd_trade.dwd_trade_order_di stats: [numFiles=1, numRows=1248750, totalSize=45678901]

# 清洗统计
原始数据: 1250000 条
清洗后数据: 1248750 条
过滤数据: 1250 条 (空主键)
异常金额修正: 35 条
状态码转换: 1248750 条

3.3 数据转换开发

数据转换实现业务逻辑处理:

— 数据转换脚本
— DWD层到DWS层的转换

INSERT OVERWRITE TABLE dws_trade.dws_trade_user_order_1d PARTITION(dt=’${dt}’)
SELECT
  user_id,
  user_name,
  user_level,
  
  — 订单统计
  COUNT(*) AS order_count,
  SUM(order_amount) AS order_amount,
  AVG(order_amount) AS avg_order_amount,
  
  — 支付统计
  SUM(CASE WHEN order_status >= ’20’ THEN order_amount ELSE 0 END) AS payment_amount,
  SUM(CASE WHEN order_status >= ’40’ THEN 1 ELSE 0 END) AS completed_count,
  
  — 时间统计
  MIN(create_time) AS first_order_time,
  MAX(create_time) AS last_order_time,
  
  CURRENT_TIMESTAMP AS etl_time

FROM dwd_trade.dwd_trade_order_di o
LEFT JOIN dim_trade.dim_user_df u ON o.user_id = u.user_id
WHERE o.dt = ‘${dt}’
GROUP BY user_id, user_name, user_level;

Part04-生产案例与实战讲解

4.1 订单数据ETL案例

本案例演示订单数据完整的ETL流程。更多视频教程www.fgedu.net.cn

#!/bin/bash
# order_etl_pipeline.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== 订单数据ETL流程 ===”
echo “Date: $(date)”

DT=$1
if [ -z “$DT” ]; then
  DT=$(date -d “-1 day” +%Y-%m-%d)
fi

echo “Processing date: $DT”

# Step 1: ODS层数据加载
echo “Step 1: Loading ODS layer…”
hive -e “
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

LOAD DATA INPATH ‘/data/orders/${DT}’
OVERWRITE INTO TABLE ods_trade.ods_trade_order_di
PARTITION(dt=’${DT}’);

# Step 2: DWD层数据清洗
echo “Step 2: Cleaning DWD layer…”
hive -e “
INSERT OVERWRITE TABLE dwd_trade.dwd_trade_order_di PARTITION(dt=’${DT}’)
SELECT
  TRIM(order_id), TRIM(user_id), TRIM(product_id),
  ROUND(order_amount, 2), order_status, create_time,
  CURRENT_TIMESTAMP
FROM ods_trade.ods_trade_order_di
WHERE dt=’${DT}’ AND order_id IS NOT NULL;

# Step 3: DWS层数据汇总
echo “Step 3: Aggregating DWS layer…”
hive -e “
INSERT OVERWRITE TABLE dws_trade.dws_trade_order_summary_1d PARTITION(dt=’${DT}’)
SELECT
  user_id,
  COUNT(*) AS order_count,
  SUM(order_amount) AS total_amount,
  CURRENT_TIMESTAMP
FROM dwd_trade.dwd_trade_order_di
WHERE dt=’${DT}’
GROUP BY user_id;

# Step 4: 数据质量检查
echo “Step 4: Data quality check…”
hive -e “
SELECT ‘ODS’ AS layer, COUNT(*) AS cnt FROM ods_trade.ods_trade_order_di WHERE dt=’${DT}’
UNION ALL
SELECT ‘DWD’ AS layer, COUNT(*) AS cnt FROM dwd_trade.dwd_trade_order_di WHERE dt=’${DT}’
UNION ALL
SELECT ‘DWS’ AS layer, COUNT(*) AS cnt FROM dws_trade.dws_trade_order_summary_1d WHERE dt=’${DT}’;

echo “=== ETL流程完成 ===”

=== 订单数据ETL流程 ===
Date: Fri Jan 19 04:00:00 CST 2024
Processing date: 2024-01-18

Step 1: Loading ODS layer…
OK
Time taken: 12.345 seconds

Step 2: Cleaning DWD layer…
OK
Time taken: 45.678 seconds

Step 3: Aggregating DWS layer…
OK
Time taken: 34.567 seconds

Step 4: Data quality check…
+——-+———+
| layer | cnt |
+——-+———+
| ODS | 1250000 |
| DWD | 1248750 |
| DWS | 356789 |
+——-+———+

=== ETL流程完成 ===

4.2 用户数据ETL案例

用户数据ETL案例演示用户维度数据处理。学习交流加群风哥微信: itpux-com

— 用户数据ETL

— ODS层:用户原始数据
CREATE TABLE ods_user.ods_user_info_df (
  user_id STRING,
  user_name STRING,
  phone STRING,
  email STRING,
  register_time STRING,
  etl_time TIMESTAMP
) PARTITIONED BY (dt STRING)
STORED AS ORC;

— DWD层:用户清洗数据
INSERT OVERWRITE TABLE dwd_user.dwd_user_info_df PARTITION(dt=’${dt}’)
SELECT
  TRIM(user_id) AS user_id,
  TRIM(user_name) AS user_name,
  REGEXP_REPLACE(phone, ‘[^0-9]’, ”) AS phone,
  LOWER(TRIM(email)) AS email,
  FROM_UNIXTIME(UNIX_TIMESTAMP(register_time, ‘yyyy-MM-dd HH:mm:ss’)) AS register_time,
  CURRENT_TIMESTAMP AS etl_time
FROM ods_user.ods_user_info_df
WHERE dt = ‘${dt}’
  AND user_id IS NOT NULL
  AND user_id != ”;

4.3 商品数据ETL案例

商品数据ETL案例演示商品维度数据处理。风哥提示:维度表数据需要保持全量快照。

— 商品维度表ETL

— 维度表全量快照
INSERT OVERWRITE TABLE dim_trade.dim_product_df PARTITION(dt=’${dt}’)
SELECT
  p.product_id,
  p.product_name,
  p.category_id,
  c.category_name,
  p.brand_id,
  b.brand_name,
  p.price,
  p.status,
  CURRENT_TIMESTAMP AS etl_time
FROM ods_product.ods_product_info_df p
LEFT JOIN dim_trade.dim_category_df c ON p.category_id = c.category_id
LEFT JOIN dim_trade.dim_brand_df b ON p.brand_id = b.brand_id
WHERE p.dt = ‘${dt}’;

Part05-风哥经验总结与分享

5.1 离线开发最佳实践

风哥在生产环境中的离线开发经验总结:from bigdata视频:www.itpux.com

1. 开发流程:

需求确认 → 方案设计 → 代码开发 → 单元测试 → 集成测试 → 上线部署

2. 代码质量:

统一编码规范,添加必要注释,进行代码审查

3. 性能优化:

合理使用分区,优化Join策略,控制数据倾斜

4. 数据质量:

建立数据质量检查机制,设置告警阈值

5.2 常见问题与解决方案

问题1:任务执行超时

解决方案:优化SQL,增加并行度,调整超时配置。

— 优化示例
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
SET mapreduce.job.timeout=7200000;

问题2:数据质量异常

解决方案:建立数据质量检查机制,设置告警。学习交流加群风哥QQ113257174

5.3 生产环境注意事项

1. 任务监控:建立完善的任务监控和告警机制。

2. 数据备份:定期备份关键数据,建立恢复机制。

3. 文档维护:及时更新开发文档,保持文档与代码同步。

风哥提示:离线数仓开发是大数据平台建设的核心工作,需要建立完善的开发规范和质量保障机制。在生产环境中,要注重代码质量、性能优化和数据质量,确保数仓稳定运行和数据准确性。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息