目录大纲
- Part01-数据倾斜基础概念与理论知识
- 1.1 数据倾斜的定义与表现
- 1.2 数据倾斜的原因分析
- 1.3 数据倾斜的影响
- Part02-数据倾斜生产环境规划与建议
- 2.1 数据倾斜的预防措施
- 2.2 资源配置优化建议
- 2.3 监控与预警机制
- Part03-数据倾斜项目实施方案
- 3.1 Hive数据倾斜解决方案
- 3.2 Spark数据倾斜解决方案
- 3.3 MapReduce数据倾斜解决方案
- Part04-数据倾斜生产案例与实战讲解
- 4.1 案例一:Hive分组聚合数据倾斜
- 4.2 案例二:Spark Shuffle数据倾斜
- 4.3 案例三:MapReduce Join数据倾斜
- Part05-风哥经验总结与分享
- 5.1 数据倾斜排查方法论
- 5.2 最佳实践总结
- 5.3 常见问题与解决方案
Part01-数据倾斜基础概念与理论知识
1.1 数据倾斜的定义与表现
数据倾斜是指在分布式计算过程中,数据分布不均匀,导致部分节点处理的数据量远大于其他节点,从而成为计算瓶颈的现象。
1. 任务执行时间过长
2. 部分Reduce任务卡住
3. 个别节点CPU/内存使用率异常高
4. 任务日志中出现数据分布不均的警告
1.2 数据倾斜的原因分析
数据倾斜的主要原因包括:
1. 数据本身分布不均匀(如某些key出现频率过高)
2. 不合理的Shuffle操作(如Join、Group By等)
3. 数据类型不一致导致的哈希分布不均
4. 业务逻辑导致的热点数据
学习交流加群风哥微信: itpux-com
1.3 数据倾斜的影响
数据倾斜会对Hadoop集群造成严重影响:
1. 任务执行时间大幅增加
2. 集群资源利用率低下
3. 容易导致任务失败或超时
4. 影响整个作业的稳定性
风哥提示:数据倾斜是大数据处理中的常见问题,需要在设计阶段就考虑预防措施
Part02-数据倾斜生产环境规划与建议
2.1 数据倾斜的预防措施
1. 数据预处理:对热点数据进行预处理或拆分
2. 合理设计key:避免使用可能导致热点的key
3. 增加随机前缀:对key进行哈希或添加随机前缀
4. 使用合适的分区策略:根据数据特点选择分区方式
5. 数据采样:在开发阶段进行数据采样分析
更多视频教程www.fgedu.net.cn
2.2 资源配置优化建议
# 1. YARN资源配置
[root@fgedu.net.cn ~]# vi /bigdata/app/hadoop/etc/hadoop/yarn-site.xml
# 2. MapReduce配置
[root@fgedu.net.cn ~]# vi /bigdata/app/hadoop/etc/hadoop/mapred-site.xml
2.3 监控与预警机制
# 1. 配置Prometheus监控
[root@fgedu.net.cn ~]# vi /etc/prometheus/prometheus.yml
scrape_configs:
– job_name: ‘hadoop’
static_configs:
– targets: [‘fgedu.net.cn:9100’]
# 2. 配置Grafana告警
# 在Grafana中配置数据倾斜相关告警,如:
# – 单个Reduce任务执行时间超过平均值的2倍
# – 单个节点CPU使用率超过80%且持续5分钟
from bigdata视频:www.itpux.com
Part03-数据倾斜项目实施方案
3.1 Hive数据倾斜解决方案
# 1. 使用Map端聚合
[root@fgedu.net.cn ~]# hive
hive> set hive.map.aggr=true;
hive> set hive.groupby.mapaggr.checkinterval=100000;
# 2. 使用倾斜key处理
hive> set hive.groupby.skewindata=true;
# 3. 示例SQL:处理倾斜key
hive> SELECT
CASE WHEN key = ‘hot_key’ THEN concat(key, ‘_’, rand()) ELSE key END AS new_key,
count(*)
FROM fgedu_table
GROUP BY new_key;
学习交流加群风哥QQ113257174
3.2 Spark数据倾斜解决方案
# 1. 增加随机前缀
[root@fgedu.net.cn ~]# spark-shell
scala> val rdd = sc.textFile(“hdfs://fgedu.net.cn:9000/user/fgedu/data.txt”)
scala> val processedRdd = rdd.map(line => {
val Array(key, value) = line.split(“\\t”)
if (key == “hot_key”) {
(key + “_” + scala.util.Random.nextInt(10), value)
} else {
(key, value)
}
})
scala> val result = processedRdd.groupByKey().mapValues(_.size)
# 2. 使用salting技术
scala> val saltedRdd = rdd.map(line => {
val Array(key, value) = line.split(“\\t”)
(key + “_” + scala.util.Random.nextInt(100), value)
})
风哥提示:Spark数据倾斜处理需要根据具体场景选择合适的方法,没有通用的解决方案
3.3 MapReduce数据倾斜解决方案
# 1. 自定义分区器
[root@fgedu.net.cn ~]# vi CustomPartitioner.java
import org.apache.hadoop.mapreduce.Partitioner;
public class CustomPartitioner extends Partitioner
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String keyStr = key.toString();
if (keyStr.equals(“hot_key”)) {
// 将热点key分散到不同分区
return (keyStr.hashCode() + (int)(Math.random() * 10)) % numPartitions;
} else {
return keyStr.hashCode() % numPartitions;
}
}
}
# 2. 二次排序
# 通过二次排序将相同key的数据分散到不同的Reducer
Part04-数据倾斜生产案例与实战讲解
4.1 案例一:Hive分组聚合数据倾斜
案例背景
某电商平台的用户行为日志表,包含用户ID、商品ID、行为类型等字段。在统计各商品的点击次数时,发现某些热门商品的点击量特别高,导致数据倾斜。
解决方案
hive> SELECT product_id, count(*) as click_count
FROM fgedu.user_behavior
GROUP BY product_id;
# 执行结果(输出日志)
…
Stage-1 map = 100%, reduce = 0%, Cumulative CPU 10.2 sec
Stage-1 map = 100%, reduce = 10%, Cumulative CPU 15.6 sec
Stage-1 map = 100%, reduce = 20%, Cumulative CPU 20.3 sec
# 卡住,部分Reduce任务处理热点数据
# 优化后SQL
hive> SELECT product_id, sum(click_count) as total_clicks
FROM (
SELECT
CASE WHEN product_id = ‘hot_product_123’ THEN
concat(product_id, ‘_’, rand())
ELSE product_id END AS new_product_id,
count(*) as click_count
FROM fgedu.user_behavior
GROUP BY new_product_id
) t
GROUP BY product_id;
# 执行结果(输出日志)
…
Stage-1 map = 100%, reduce = 100%, Cumulative CPU 35.2 sec
Stage-2 map = 100%, reduce = 100%, Cumulative CPU 12.5 sec
MapReduce Total cumulative CPU time: 47 seconds 700 msec
OK
product_id total_clicks
hot_product_123 1258000
product_456 23500
product_789 18900
…
更多视频教程www.fgedu.net.cn
4.2 案例二:Spark Shuffle数据倾斜
案例背景
使用Spark处理用户交易数据,需要根据用户ID进行聚合操作,发现某些活跃用户的交易记录特别多,导致Shuffle阶段数据倾斜。
解决方案
[root@fgedu.net.cn ~]# spark-submit –master yarn –deploy-mode cluster \
–class com.fgedu.spark.TransactionAnalysis \
transaction-analysis.jar
# 执行结果(输出日志)
…
Stage 1:=====================================> (99 + 1) / 100
# 最后一个task执行时间特别长
# 优化后代码
[root@fgedu.net.cn ~]# vi TransactionAnalysis.scala
val spark = SparkSession.builder()
.appName(“TransactionAnalysis”)
.getOrCreate()
import spark.implicits._
val transactions = spark.read.parquet(“hdfs://fgedu.net.cn:9000/user/fgedu/transactions”)
// 处理数据倾斜
val processedData = transactions.rdd.map(row => {
val userId = row.getString(0)
val amount = row.getDouble(1)
// 对热点用户添加随机前缀
val hotUsers = Set(“user_1001”, “user_2002”, “user_3003”)
if (hotUsers.contains(userId)) {
(userId + “_” + scala.util.Random.nextInt(10), amount)
} else {
(userId, amount)
}
})
val result = processedData.reduceByKey(_ + _).map { case (key, value) =>
// 移除随机前缀
val userId = if (key.contains(“_”)) key.split(“_”)(0) else key
(userId, value)
}.reduceByKey(_ + _)
result.saveAsTextFile(“hdfs://fgedu.net.cn:9000/user/fgedu/result”)
# 执行结果(输出日志)
…
Stage 1:========================================> (100 + 0) / 100
Stage 2:========================================> (100 + 0) / 100
Job completed successfully
Total time taken: 2 minutes 35 seconds
学习交流加群风哥微信: itpux-com
4.3 案例三:MapReduce Join数据倾斜
案例背景
使用MapReduce进行两张表的Join操作,其中一张表的某些key出现频率很高,导致Join操作时数据倾斜。
解决方案
[root@fgedu.net.cn ~]# hadoop jar /bigdata/app/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.output.compress=true \
-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \
-file mapper.py -mapper “python mapper.py” \
-file reducer.py -reducer “python reducer.py” \
-input /user/fgedu/orders /user/fgedu/customers \
-output /user/fgedu/join_result
# 2. 实现Map端Join的mapper.py
[root@fgedu.net.cn ~]# vi mapper.py
#!/usr/bin/env python
# mapper.py
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: `http://www.fgedu.net.cn`
import sys
import json
# 加载小表到内存
customers = {}
with open(‘customers.txt’, ‘r’) as f:
for line in f:
parts = line.strip().split(‘\t’)
if len(parts) == 2:
customers[parts[0]] = parts[1]
for line in sys.stdin:
line = line.strip()
if not line:
continue
parts = line.split(‘\t’)
if len(parts) >= 2:
if parts[0].startswith(‘order’):
# 订单数据
order_id, customer_id, amount = parts[0], parts[1], parts[2]
if customer_id in customers:
print(‘\t’.join([order_id, customer_id, customers[customer_id], amount]))
# 执行结果(输出日志)
…
MapReduce Total cumulative CPU time: 1 minute 45 seconds
OK
order_001 customer_1001 Alice 100.50
order_002 customer_1001 Alice 200.75
order_003 customer_2002 Bob 150.25
…
更多学习教程公众号风哥教程itpux_com
Part05-风哥经验总结与分享
5.1 数据倾斜排查方法论
1. 观察任务执行情况:查看各任务的执行时间和资源使用情况
2. 分析数据分布:对输入数据进行采样分析,识别热点key
3. 定位倾斜阶段:确定数据倾斜发生在Map阶段还是Reduce阶段
4. 选择解决方案:根据具体情况选择合适的解决方法
5. 验证解决方案:执行优化后的任务,验证是否解决了数据倾斜
风哥提示:数据倾斜排查需要结合监控工具和日志分析,全面了解任务执行情况
5.2 最佳实践总结
1. 设计阶段:合理设计数据模型和key分布
2. 开发阶段:使用合适的技术和方法处理数据倾斜
3. 运行阶段:监控任务执行情况,及时发现数据倾斜
4. 优化阶段:根据实际情况持续优化解决方案
5. 文档化:记录数据倾斜的处理方法和经验
from bigdata视频:www.itpux.com
5.3 常见问题与解决方案
1. 热点key问题:使用随机前缀、加盐技术
2. Join操作倾斜:使用Map端Join、小表广播
3. Group By倾斜:使用Map端聚合、倾斜key单独处理
4. 数据类型不一致:统一数据类型、规范化处理
5. 任务超时:增加超时时间、优化资源配置
学习交流加群风哥QQ113257174
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
