1. 首页 > Hadoop教程 > 正文

大数据教程FG238-Hadoop数据倾斜案例实战

目录大纲

Part01-数据倾斜基础概念与理论知识

1.1 数据倾斜的定义与表现

数据倾斜是指在分布式计算过程中,数据分布不均匀,导致部分节点处理的数据量远大于其他节点,从而成为计算瓶颈的现象。

# 数据倾斜的典型表现
1. 任务执行时间过长
2. 部分Reduce任务卡住
3. 个别节点CPU/内存使用率异常高
4. 任务日志中出现数据分布不均的警告

1.2 数据倾斜的原因分析

数据倾斜的主要原因包括:

# 数据倾斜的常见原因
1. 数据本身分布不均匀(如某些key出现频率过高)
2. 不合理的Shuffle操作(如Join、Group By等)
3. 数据类型不一致导致的哈希分布不均
4. 业务逻辑导致的热点数据
学习交流加群风哥微信: itpux-com

1.3 数据倾斜的影响

数据倾斜会对Hadoop集群造成严重影响:

# 数据倾斜的影响
1. 任务执行时间大幅增加
2. 集群资源利用率低下
3. 容易导致任务失败或超时
4. 影响整个作业的稳定性
风哥提示:数据倾斜是大数据处理中的常见问题,需要在设计阶段就考虑预防措施

Part02-数据倾斜生产环境规划与建议

2.1 数据倾斜的预防措施

# 数据倾斜的预防措施
1. 数据预处理:对热点数据进行预处理或拆分
2. 合理设计key:避免使用可能导致热点的key
3. 增加随机前缀:对key进行哈希或添加随机前缀
4. 使用合适的分区策略:根据数据特点选择分区方式
5. 数据采样:在开发阶段进行数据采样分析
更多视频教程www.fgedu.net.cn

2.2 资源配置优化建议

# 资源配置优化建议
# 1. YARN资源配置
[root@fgedu.net.cn ~]# vi /bigdata/app/hadoop/etc/hadoop/yarn-site.xml yarn.nodemanager.resource.memory-mb
16384
yarn.scheduler.maximum-allocation-mb
16384

# 2. MapReduce配置
[root@fgedu.net.cn ~]# vi /bigdata/app/hadoop/etc/hadoop/mapred-site.xml mapreduce.job.reduce.slowstart.completedmaps
0.8
mapreduce.reduce.shuffle.input.buffer.percent
0.7
更多学习教程公众号风哥教程itpux_com

2.3 监控与预警机制

# 监控与预警机制
# 1. 配置Prometheus监控
[root@fgedu.net.cn ~]# vi /etc/prometheus/prometheus.yml
scrape_configs:
– job_name: ‘hadoop’
static_configs:
– targets: [‘fgedu.net.cn:9100’]

# 2. 配置Grafana告警
# 在Grafana中配置数据倾斜相关告警,如:
# – 单个Reduce任务执行时间超过平均值的2倍
# – 单个节点CPU使用率超过80%且持续5分钟
from bigdata视频:www.itpux.com

Part03-数据倾斜项目实施方案

3.1 Hive数据倾斜解决方案

# Hive数据倾斜解决方案
# 1. 使用Map端聚合
[root@fgedu.net.cn ~]# hive
hive> set hive.map.aggr=true;
hive> set hive.groupby.mapaggr.checkinterval=100000;

# 2. 使用倾斜key处理
hive> set hive.groupby.skewindata=true;

# 3. 示例SQL:处理倾斜key
hive> SELECT
CASE WHEN key = ‘hot_key’ THEN concat(key, ‘_’, rand()) ELSE key END AS new_key,
count(*)
FROM fgedu_table
GROUP BY new_key;
学习交流加群风哥QQ113257174

3.2 Spark数据倾斜解决方案

# Spark数据倾斜解决方案
# 1. 增加随机前缀
[root@fgedu.net.cn ~]# spark-shell
scala> val rdd = sc.textFile(“hdfs://fgedu.net.cn:9000/user/fgedu/data.txt”)
scala> val processedRdd = rdd.map(line => {
val Array(key, value) = line.split(“\\t”)
if (key == “hot_key”) {
(key + “_” + scala.util.Random.nextInt(10), value)
} else {
(key, value)
}
})
scala> val result = processedRdd.groupByKey().mapValues(_.size)

# 2. 使用salting技术
scala> val saltedRdd = rdd.map(line => {
val Array(key, value) = line.split(“\\t”)
(key + “_” + scala.util.Random.nextInt(100), value)
})
风哥提示:Spark数据倾斜处理需要根据具体场景选择合适的方法,没有通用的解决方案

3.3 MapReduce数据倾斜解决方案

# MapReduce数据倾斜解决方案
# 1. 自定义分区器
[root@fgedu.net.cn ~]# vi CustomPartitioner.java
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String keyStr = key.toString();
if (keyStr.equals(“hot_key”)) {
// 将热点key分散到不同分区
return (keyStr.hashCode() + (int)(Math.random() * 10)) % numPartitions;
} else {
return keyStr.hashCode() % numPartitions;
}
}
}

# 2. 二次排序
# 通过二次排序将相同key的数据分散到不同的Reducer

Part04-数据倾斜生产案例与实战讲解

4.1 案例一:Hive分组聚合数据倾斜

案例背景

某电商平台的用户行为日志表,包含用户ID、商品ID、行为类型等字段。在统计各商品的点击次数时,发现某些热门商品的点击量特别高,导致数据倾斜。

解决方案

# 原始SQL(存在数据倾斜)
hive> SELECT product_id, count(*) as click_count
FROM fgedu.user_behavior
GROUP BY product_id;

# 执行结果(输出日志)

Stage-1 map = 100%, reduce = 0%, Cumulative CPU 10.2 sec
Stage-1 map = 100%, reduce = 10%, Cumulative CPU 15.6 sec
Stage-1 map = 100%, reduce = 20%, Cumulative CPU 20.3 sec
# 卡住,部分Reduce任务处理热点数据

# 优化后SQL
hive> SELECT product_id, sum(click_count) as total_clicks
FROM (
SELECT
CASE WHEN product_id = ‘hot_product_123’ THEN
concat(product_id, ‘_’, rand())
ELSE product_id END AS new_product_id,
count(*) as click_count
FROM fgedu.user_behavior
GROUP BY new_product_id
) t
GROUP BY product_id;

# 执行结果(输出日志)

Stage-1 map = 100%, reduce = 100%, Cumulative CPU 35.2 sec
Stage-2 map = 100%, reduce = 100%, Cumulative CPU 12.5 sec
MapReduce Total cumulative CPU time: 47 seconds 700 msec
OK
product_id total_clicks
hot_product_123 1258000
product_456 23500
product_789 18900

更多视频教程www.fgedu.net.cn

4.2 案例二:Spark Shuffle数据倾斜

案例背景

使用Spark处理用户交易数据,需要根据用户ID进行聚合操作,发现某些活跃用户的交易记录特别多,导致Shuffle阶段数据倾斜。

解决方案

# 原始代码(存在数据倾斜)
[root@fgedu.net.cn ~]# spark-submit –master yarn –deploy-mode cluster \
–class com.fgedu.spark.TransactionAnalysis \
transaction-analysis.jar

# 执行结果(输出日志)

Stage 1:=====================================> (99 + 1) / 100
# 最后一个task执行时间特别长

# 优化后代码
[root@fgedu.net.cn ~]# vi TransactionAnalysis.scala
val spark = SparkSession.builder()
.appName(“TransactionAnalysis”)
.getOrCreate()

import spark.implicits._

val transactions = spark.read.parquet(“hdfs://fgedu.net.cn:9000/user/fgedu/transactions”)

// 处理数据倾斜
val processedData = transactions.rdd.map(row => {
val userId = row.getString(0)
val amount = row.getDouble(1)
// 对热点用户添加随机前缀
val hotUsers = Set(“user_1001”, “user_2002”, “user_3003”)
if (hotUsers.contains(userId)) {
(userId + “_” + scala.util.Random.nextInt(10), amount)
} else {
(userId, amount)
}
})

val result = processedData.reduceByKey(_ + _).map { case (key, value) =>
// 移除随机前缀
val userId = if (key.contains(“_”)) key.split(“_”)(0) else key
(userId, value)
}.reduceByKey(_ + _)

result.saveAsTextFile(“hdfs://fgedu.net.cn:9000/user/fgedu/result”)

# 执行结果(输出日志)

Stage 1:========================================> (100 + 0) / 100
Stage 2:========================================> (100 + 0) / 100
Job completed successfully
Total time taken: 2 minutes 35 seconds
学习交流加群风哥微信: itpux-com

4.3 案例三:MapReduce Join数据倾斜

案例背景

使用MapReduce进行两张表的Join操作,其中一张表的某些key出现频率很高,导致Join操作时数据倾斜。

解决方案

# 1. 使用Map端Join
[root@fgedu.net.cn ~]# hadoop jar /bigdata/app/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.output.compress=true \
-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \
-file mapper.py -mapper “python mapper.py” \
-file reducer.py -reducer “python reducer.py” \
-input /user/fgedu/orders /user/fgedu/customers \
-output /user/fgedu/join_result

# 2. 实现Map端Join的mapper.py
[root@fgedu.net.cn ~]# vi mapper.py
#!/usr/bin/env python
# mapper.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`

import sys
import json

# 加载小表到内存
customers = {}
with open(‘customers.txt’, ‘r’) as f:
for line in f:
parts = line.strip().split(‘\t’)
if len(parts) == 2:
customers[parts[0]] = parts[1]

for line in sys.stdin:
line = line.strip()
if not line:
continue
parts = line.split(‘\t’)
if len(parts) >= 2:
if parts[0].startswith(‘order’):
# 订单数据
order_id, customer_id, amount = parts[0], parts[1], parts[2]
if customer_id in customers:
print(‘\t’.join([order_id, customer_id, customers[customer_id], amount]))

# 执行结果(输出日志)

MapReduce Total cumulative CPU time: 1 minute 45 seconds
OK
order_001 customer_1001 Alice 100.50
order_002 customer_1001 Alice 200.75
order_003 customer_2002 Bob 150.25

更多学习教程公众号风哥教程itpux_com

Part05-风哥经验总结与分享

5.1 数据倾斜排查方法论

# 数据倾斜排查步骤
1. 观察任务执行情况:查看各任务的执行时间和资源使用情况
2. 分析数据分布:对输入数据进行采样分析,识别热点key
3. 定位倾斜阶段:确定数据倾斜发生在Map阶段还是Reduce阶段
4. 选择解决方案:根据具体情况选择合适的解决方法
5. 验证解决方案:执行优化后的任务,验证是否解决了数据倾斜
风哥提示:数据倾斜排查需要结合监控工具和日志分析,全面了解任务执行情况

5.2 最佳实践总结

# 数据倾斜最佳实践
1. 设计阶段:合理设计数据模型和key分布
2. 开发阶段:使用合适的技术和方法处理数据倾斜
3. 运行阶段:监控任务执行情况,及时发现数据倾斜
4. 优化阶段:根据实际情况持续优化解决方案
5. 文档化:记录数据倾斜的处理方法和经验
from bigdata视频:www.itpux.com

5.3 常见问题与解决方案

# 常见数据倾斜问题与解决方案
1. 热点key问题:使用随机前缀、加盐技术
2. Join操作倾斜:使用Map端Join、小表广播
3. Group By倾斜:使用Map端聚合、倾斜key单独处理
4. 数据类型不一致:统一数据类型、规范化处理
5. 任务超时:增加超时时间、优化资源配置
学习交流加群风哥QQ113257174

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息