1. 首页 > Hadoop教程 > 正文

大数据教程FG019-MapReduce作业开发与优化实战

内容简介:本文详细介绍MapReduce作业开发与优化实战,包括MapReduce编程模型、作业开发流程、性能优化技巧、常见问题处理等核心内容。风哥教程参考Hadoop官方文档MapReduce Tutorial、MapReduce Programming等内容。

目录大纲

Part01-基础概念与理论知识
  1.1 MapReduce编程模型
  1.2 MapReduce执行流程
  1.3 MapReduce数据流
Part02-生产环境规划与建议
  2.1 作业开发规划
  2.2 资源配置规划
  2.3 性能优化规划
Part03-生产环境项目实施方案
  3.1 MapReduce作业开发
  3.2 作业提交与执行
  3.3 作业性能优化
  3.4 作业监控与调试
Part04-生产案例与实战讲解
  4.1 WordCount作业实战案例
  4.2 数据清洗作业案例
  4.3 大规模数据处理案例
Part05-风哥经验总结与分享
  5.1 MapReduce开发最佳实践
  5.2 性能优化经验总结

Part01-基础概念与理论知识

1.1 MapReduce编程模型

MapReduce是一种分布式计算编程模型,将计算任务分解为Map和Reduce两个阶段。更多视频教程www.fgedu.net.cn Map阶段负责数据映射,Reduce阶段负责数据聚合。

风哥提示:MapReduce的核心思想是分而治之,将大任务分解为小任务并行处理。

1.2 MapReduce执行流程

MapReduce作业执行分为多个阶段。学习交流加群风哥微信: itpux-com

MapReduce执行阶段:
1. Input:读取输入数据
2. Map:执行Map函数
3. Shuffle:数据排序和传输
4. Reduce:执行Reduce函数
5. Output:输出结果

1.3 MapReduce数据流

MapReduce数据流涉及多个数据转换过程。from bigdata视频:www.itpux.com

# 查看MapReduce作业执行流程
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount -D mapreduce.job.maps=10 -D mapreduce.job.reduces=5 /input /output

# 作业执行信息
24/01/17 20:00:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 20:00:05 INFO mapreduce.Job: Running job: job_1705473600000_0001
24/01/17 20:00:10 INFO mapreduce.Job: map 0% reduce 0%
24/01/17 20:01:00 INFO mapreduce.Job: map 50% reduce 0%
24/01/17 20:02:00 INFO mapreduce.Job: map 100% reduce 0%
24/01/17 20:03:00 INFO mapreduce.Job: map 100% reduce 50%
24/01/17 20:04:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 20:04:05 INFO mapreduce.Job: Job job_1705473600000_0001 completed successfully

Part02-生产环境规划与建议

2.1 作业开发规划

MapReduce作业开发需要遵循规范的开发流程。更多学习教程公众号风哥教程itpux_com

作业开发规划建议:
– 设计合理的Map和Reduce逻辑
– 选择合适的数据类型
– 实现自定义Partitioner
– 配置Combiner减少数据传输

2.2 资源配置规划

资源配置需要根据数据量和计算复杂度确定。学习交流加群风哥QQ113257174

# 查看默认资源配置
cat /bigdata/app/hadoop/etc/hadoop/mapred-site.xml | grep -A3 “mapreduce.map.memory”

<!– Map任务内存配置 –>
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>

<!– Reduce任务内存配置 –>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>

<!– JVM堆内存配置 –>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>

<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value>
</property>

2.3 性能优化规划

性能优化需要从多个维度考虑。风哥提示:合理配置Map和Reduce数量是优化的关键。

性能优化要点:
– 合理设置Map和Reduce数量
– 配置Combiner减少Shuffle数据量
– 优化数据序列化
– 压缩中间数据
– 调整内存配置

Part03-生产环境项目实施方案

3.1 MapReduce作业开发

3.1.1 WordCount作业开发

# WordCount Mapper类
cat /bigdata/app/hadoop/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-*-sources.jar

// WordCount Mapper示例代码
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}

// WordCount Reducer示例代码
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

3.1.2 自定义作业开发

# 创建自定义MapReduce作业
# 编译Java代码
javac -classpath $(hadoop classpath) -d /bigdata/app/classes/ FgeduDataProcessor.java
# 打包JAR文件
jar -cvf /bigdata/app/jars/fgedu-data-processor.jar -C /bigdata/app/classes/ .
# 查看打包结果
ls -la /bigdata/app/jars/fgedu-data-processor.jar

# 编译输出
Note: FgeduDataProcessor.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

# 打包结果
added manifest
adding: FgeduDataProcessor.class(in = 1234) (out= 567)(deflated 54%)
adding: FgeduDataProcessor$FgeduMapper.class(in = 2345) (out= 1234)(deflated 47%)
adding: FgeduDataProcessor$FgeduReducer.class(in = 2345) (out= 1234)(deflated 47%)

# JAR文件
-rw-r–r– 1 fgedu fgedu 5678 Jan 17 20:30 /bigdata/app/jars/fgedu-data-processor.jar

3.2 作业提交与执行

3.2.1 提交MapReduce作业

# 提交MapReduce作业
hadoop jar /bigdata/app/jars/fgedu-data-processor.jar \
com.fgedu.mapreduce.FgeduDataProcessor \
-D mapreduce.job.maps=20 \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
/bigdata/warehouse/fgedu/input \
/bigdata/warehouse/fgedu/output

# 查看作业状态
yarn application -list -appStates RUNNING

# 作业提交成功
24/01/17 20:35:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 20:35:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1705473600000_0002
24/01/17 20:35:10 INFO impl.YarnClientImpl: Submitted application application_1705473600050

# 运行中作业
Total Applications:1
Application-Id Application-Name User State
app_1705473600050 FgeduDataProcessor fgedu RUNNING

3.2.2 监控作业执行

# 查看作业详情
mapred job -status job_1705473600000_0002
# 查看作业计数器
mapred job -counter job_1705473600000_0002

# 作业状态
Job job_1705473600000_0002
Job State: RUNNING
Job Start Time: 1705475700000
Maps: 20 Total, 15 Completed, 5 Running
Reduces: 10 Total, 0 Completed, 0 Running

# 作业计数器
org.apache.hadoop.mapreduce.JobCounter
TOTAL_LAUNCHED_UBERTASKS=0
TOTAL_LAUNCHED_MAPS=20
TOTAL_LAUNCHED_REDUCES=10
MAP_INPUT_RECORDS=1000000
MAP_OUTPUT_RECORDS=5000000
REDUCE_INPUT_RECORDS=5000000
REDUCE_OUTPUT_RECORDS=100000

3.3 作业性能优化

3.3.1 配置Combiner

# 配置Combiner减少Shuffle数据量
hadoop jar /bigdata/app/jars/fgedu-data-processor.jar \
com.fgedu.mapreduce.FgeduDataProcessor \
-D mapreduce.job.combiner.class=com.fgedu.mapreduce.FgeduCombiner \
/bigdata/warehouse/fgedu/input \
/bigdata/warehouse/fgedu/output

# 使用Combiner后的效果
24/01/17 20:45:00 INFO mapreduce.Job: Job job_1705473600000_0003

# 数据量对比
# 未使用Combiner:Map输出 5000000 条记录
# 使用Combiner后:Map输出 500000 条记录
# Shuffle数据量减少 90%

3.3.2 压缩配置

# 配置Map输出压缩
cat /bigdata/app/hadoop/etc/hadoop/mapred-site.xml | grep -A3 “compress”

<!– Map输出压缩 –>
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>

<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

<!– Reduce输出压缩 –>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>

<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

3.4 作业监控与调试

3.4.1 查看作业日志

# 查看作业日志
yarn logs -applicationId application_1705473600050 > /bigdata/logs/job_1705473600050.log
# 查看特定任务日志
grep “Error” /bigdata/logs/job_1705473600050.log | head -20

# 日志内容
Container: container_1705473600050_0001_01_000001
LogType:stderr
24/01/17 20:35:00 INFO mapred.MapTask: Processing split: hdfs://fgedu01:9000/bigdata/warehouse/fgedu/input/part-00000:0+67108864
24/01/17 20:35:05 INFO mapred.MapTask: Map output record size: 100
24/01/17 20:35:10 INFO mapred.MapTask: Spilling map output

# 错误日志
24/01/17 20:40:00 WARN mapred.LocalJobRunner: job_1705473600000_0002
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:986)

3.4.2 性能分析

# 分析作业性能
mapred job -history /bigdata/warehouse/fgedu/output/_logs/history/job_1705473600000_0002.jhist

# 作业历史分析
Hadoop job: job_1705473600000_0002
Job Name: FgeduDataProcessor
User: fgedu
Submit Time: 1705475700000
Launch Time: 1705475705000
Finish Time: 1705476300000
Total Time: 600000ms (10 minutes)

# 任务统计
Average Map Time: 240000ms
Average Shuffle Time: 120000ms
Average Sort Time: 60000ms
Average Reduce Time: 180000ms

Part04-生产案例与实战讲解

4.1 WordCount作业实战案例

WordCount是MapReduce入门经典案例。更多视频教程www.fgedu.net.cn

#!/bin/bash
# wordcount_example.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

# 准备测试数据
INPUT_DIR=”/bigdata/warehouse/fgedu/wordcount/input”
OUTPUT_DIR=”/bigdata/warehouse/fgedu/wordcount/output”

# 创建输入目录
hdfs dfs -mkdir -p ${INPUT_DIR}

# 上传测试数据
echo “hello world hello hadoop hello mapreduce” | hdfs dfs -put – ${INPUT_DIR}/test.txt
echo “hadoop is good mapreduce is powerful” | hdfs dfs -put – ${INPUT_DIR}/test2.txt

# 运行WordCount
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount ${INPUT_DIR} ${OUTPUT_DIR}

# 查看结果
hdfs dfs -cat ${OUTPUT_DIR}/part-r-00000

# WordCount执行
24/01/17 21:00:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 21:00:30 INFO mapreduce.Job: Job job_1705473600000_0010 completed successfully

# 统计结果
good 1
hadoop 2
hello 3
is 2
mapreduce 2
powerful 1
world 1

4.2 数据清洗作业案例

数据清洗是ETL流程的重要环节。学习交流加群风哥微信: itpux-com

#!/bin/bash
# data_clean_job.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

# 数据清洗作业
INPUT_DIR=”/bigdata/warehouse/fgedu/raw_data”
OUTPUT_DIR=”/bigdata/warehouse/fgedu/clean_data”

# 运行数据清洗作业
hadoop jar /bigdata/app/jars/fgedu-data-clean.jar \
com.fgedu.etl.DataCleanJob \
-D mapreduce.job.maps=50 \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
-D mapreduce.job.combiner.class=com.fgedu.etl.DataCleanCombiner \
${INPUT_DIR} ${OUTPUT_DIR}

# 查看清洗结果
hdfs dfs -ls ${OUTPUT_DIR}/

# 数据清洗执行
24/01/17 21:30:00 INFO mapreduce.Job: Running job: job_1705473600000_0020
24/01/17 21:35:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 21:35:05 INFO mapreduce.Job: Job job_1705473600000_0020 completed successfully

# 清洗统计
Input Records: 10000000
Output Records: 9500000
Invalid Records: 500000
Clean Rate: 95%

# 输出文件
Found 10 items
-rw-r–r– 3 fgedu fgedu 1073741824 2024-01-17 21:35 /bigdata/warehouse/fgedu/clean_data/part-r-00000
-rw-r–r– 3 fgedu fgedu 1073741824 2024-01-17 21:35 /bigdata/warehouse/fgedu/clean_data/part-r-00001

4.3 大规模数据处理案例

4.3.1 大规模数据排序

# 大规模数据排序作业
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar sort \
-D mapreduce.job.maps=100 \
-D mapreduce.job.reduces=50 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
/bigdata/warehouse/fgedu/unsorted_data \
/bigdata/warehouse/fgedu/sorted_data

# 查看排序结果
hdfs dfs -cat /bigdata/warehouse/fgedu/sorted_data/part-r-00000 | head -10

# 排序作业执行
24/01/17 22:00:00 INFO mapreduce.Job: Running job: job_1705473600000_0030
24/01/17 22:30:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 22:30:05 INFO mapreduce.Job: Job job_1705473600000_0030 completed successfully

# 排序统计
Input Records: 100000000
Output Records: 100000000
Execution Time: 30 minutes

# 排序结果
0000000001
0000000002
0000000003
0000000004
0000000005
0000000006
0000000007
0000000008
0000000009
0000000010

4.3.2 大规模数据聚合

# 大规模数据聚合作业
hadoop jar /bigdata/app/jars/fgedu-aggregation.jar \
com.fgedu.etl.AggregationJob \
-D mapreduce.job.maps=200 \
-D mapreduce.job.reduces=20 \
-D mapreduce.map.memory.mb=8192 \
-D mapreduce.reduce.memory.mb=16384 \
-D mapreduce.job.combiner.class=com.fgedu.etl.AggregationCombiner \
/bigdata/warehouse/fgedu/detail_data \
/bigdata/warehouse/fgedu/summary_data

# 聚合作业执行
24/01/17 23:00:00 INFO mapreduce.Job: Running job: job_1705473600000_0040
24/01/17 23:45:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 23:45:05 INFO mapreduce.Job: Job job_1705473600000_0040 completed successfully

# 聚合统计
Input Records: 1000000000
Output Records: 1000000
Aggregation Ratio: 1000:1
Execution Time: 45 minutes

Part05-风哥经验总结与分享

5.1 MapReduce开发最佳实践

在实际生产环境中,MapReduce开发需要注意以下几点:from bigdata视频:www.itpux.com

风哥经验总结:
1. 合理设计Map和Reduce逻辑
2. 使用Combiner减少数据传输
3. 选择合适的数据类型和序列化
4. 配置压缩减少IO开销
5. 根据数据量调整并行度

5.2 性能优化经验总结

5.2.1 性能优化建议

风哥提示:MapReduce性能优化需要从数据倾斜、内存配置、IO优化等多个方面综合考虑。

性能优化注意事项:
– 避免数据倾斜,合理设计Partitioner
– 调整内存配置,避免OOM
– 使用压缩减少Shuffle数据量
– 合理设置Map和Reduce数量
– 监控作业执行,及时发现问题

5.2.2 作业优化脚本

#!/bin/bash
# mr_job_optimizer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

# MapReduce作业优化脚本
INPUT_SIZE=$(hdfs dfs -du -s $1 | awk ‘{print $1}’)
BLOCK_SIZE=134217728

# 计算Map数量
MAP_NUM=$((${INPUT_SIZE}/${BLOCK_SIZE}))
if [ ${MAP_NUM} -lt 10 ]; then
MAP_NUM=10
fi

# 计算Reduce数量
REDUCE_NUM=$((${MAP_NUM}/10))
if [ ${REDUCE_NUM} -lt 1 ]; then
REDUCE_NUM=1
fi

echo “Recommended Configuration:”
echo “Map Tasks: ${MAP_NUM}”
echo “Reduce Tasks: ${REDUCE_NUM}”
echo “Map Memory: 4096 MB”
echo “Reduce Memory: 8192 MB”

# 优化建议输出
./mr_job_optimizer.sh /bigdata/warehouse/fgedu/input
Recommended Configuration:
Map Tasks: 150
Reduce Tasks: 15
Map Memory: 4096 MB
Reduce Memory: 8192 MB

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息