目录大纲
Part01-基础概念与理论知识
1.1 MapReduce编程模型
1.2 MapReduce执行流程
1.3 MapReduce数据流
Part02-生产环境规划与建议
2.1 作业开发规划
2.2 资源配置规划
2.3 性能优化规划
Part03-生产环境项目实施方案
3.1 MapReduce作业开发
3.2 作业提交与执行
3.3 作业性能优化
3.4 作业监控与调试
Part04-生产案例与实战讲解
4.1 WordCount作业实战案例
4.2 数据清洗作业案例
4.3 大规模数据处理案例
Part05-风哥经验总结与分享
5.1 MapReduce开发最佳实践
5.2 性能优化经验总结
Part01-基础概念与理论知识
1.1 MapReduce编程模型
MapReduce是一种分布式计算编程模型,将计算任务分解为Map和Reduce两个阶段。更多视频教程www.fgedu.net.cn Map阶段负责数据映射,Reduce阶段负责数据聚合。
1.2 MapReduce执行流程
MapReduce作业执行分为多个阶段。学习交流加群风哥微信: itpux-com
1. Input:读取输入数据
2. Map:执行Map函数
3. Shuffle:数据排序和传输
4. Reduce:执行Reduce函数
5. Output:输出结果
1.3 MapReduce数据流
MapReduce数据流涉及多个数据转换过程。from bigdata视频:www.itpux.com
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount -D mapreduce.job.maps=10 -D mapreduce.job.reduces=5 /input /output
24/01/17 20:00:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 20:00:05 INFO mapreduce.Job: Running job: job_1705473600000_0001
24/01/17 20:00:10 INFO mapreduce.Job: map 0% reduce 0%
24/01/17 20:01:00 INFO mapreduce.Job: map 50% reduce 0%
24/01/17 20:02:00 INFO mapreduce.Job: map 100% reduce 0%
24/01/17 20:03:00 INFO mapreduce.Job: map 100% reduce 50%
24/01/17 20:04:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 20:04:05 INFO mapreduce.Job: Job job_1705473600000_0001 completed successfully
Part02-生产环境规划与建议
2.1 作业开发规划
MapReduce作业开发需要遵循规范的开发流程。更多学习教程公众号风哥教程itpux_com
– 设计合理的Map和Reduce逻辑
– 选择合适的数据类型
– 实现自定义Partitioner
– 配置Combiner减少数据传输
2.2 资源配置规划
资源配置需要根据数据量和计算复杂度确定。学习交流加群风哥QQ113257174
cat /bigdata/app/hadoop/etc/hadoop/mapred-site.xml | grep -A3 “mapreduce.map.memory”
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<!– Reduce任务内存配置 –>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<!– JVM堆内存配置 –>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value>
</property>
2.3 性能优化规划
性能优化需要从多个维度考虑。风哥提示:合理配置Map和Reduce数量是优化的关键。
– 合理设置Map和Reduce数量
– 配置Combiner减少Shuffle数据量
– 优化数据序列化
– 压缩中间数据
– 调整内存配置
Part03-生产环境项目实施方案
3.1 MapReduce作业开发
3.1.1 WordCount作业开发
cat /bigdata/app/hadoop/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-*-sources.jar
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
// WordCount Reducer示例代码
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
3.1.2 自定义作业开发
# 编译Java代码
javac -classpath $(hadoop classpath) -d /bigdata/app/classes/ FgeduDataProcessor.java
# 打包JAR文件
jar -cvf /bigdata/app/jars/fgedu-data-processor.jar -C /bigdata/app/classes/ .
# 查看打包结果
ls -la /bigdata/app/jars/fgedu-data-processor.jar
Note: FgeduDataProcessor.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
# 打包结果
added manifest
adding: FgeduDataProcessor.class(in = 1234) (out= 567)(deflated 54%)
adding: FgeduDataProcessor$FgeduMapper.class(in = 2345) (out= 1234)(deflated 47%)
adding: FgeduDataProcessor$FgeduReducer.class(in = 2345) (out= 1234)(deflated 47%)
# JAR文件
-rw-r–r– 1 fgedu fgedu 5678 Jan 17 20:30 /bigdata/app/jars/fgedu-data-processor.jar
3.2 作业提交与执行
3.2.1 提交MapReduce作业
hadoop jar /bigdata/app/jars/fgedu-data-processor.jar \
com.fgedu.mapreduce.FgeduDataProcessor \
-D mapreduce.job.maps=20 \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
/bigdata/warehouse/fgedu/input \
/bigdata/warehouse/fgedu/output
# 查看作业状态
yarn application -list -appStates RUNNING
24/01/17 20:35:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 20:35:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1705473600000_0002
24/01/17 20:35:10 INFO impl.YarnClientImpl: Submitted application application_1705473600050
# 运行中作业
Total Applications:1
Application-Id Application-Name User State
app_1705473600050 FgeduDataProcessor fgedu RUNNING
3.2.2 监控作业执行
mapred job -status job_1705473600000_0002
# 查看作业计数器
mapred job -counter job_1705473600000_0002
Job job_1705473600000_0002
Job State: RUNNING
Job Start Time: 1705475700000
Maps: 20 Total, 15 Completed, 5 Running
Reduces: 10 Total, 0 Completed, 0 Running
# 作业计数器
org.apache.hadoop.mapreduce.JobCounter
TOTAL_LAUNCHED_UBERTASKS=0
TOTAL_LAUNCHED_MAPS=20
TOTAL_LAUNCHED_REDUCES=10
MAP_INPUT_RECORDS=1000000
MAP_OUTPUT_RECORDS=5000000
REDUCE_INPUT_RECORDS=5000000
REDUCE_OUTPUT_RECORDS=100000
3.3 作业性能优化
3.3.1 配置Combiner
hadoop jar /bigdata/app/jars/fgedu-data-processor.jar \
com.fgedu.mapreduce.FgeduDataProcessor \
-D mapreduce.job.combiner.class=com.fgedu.mapreduce.FgeduCombiner \
/bigdata/warehouse/fgedu/input \
/bigdata/warehouse/fgedu/output
24/01/17 20:45:00 INFO mapreduce.Job: Job job_1705473600000_0003
# 数据量对比
# 未使用Combiner:Map输出 5000000 条记录
# 使用Combiner后:Map输出 500000 条记录
# Shuffle数据量减少 90%
3.3.2 压缩配置
cat /bigdata/app/hadoop/etc/hadoop/mapred-site.xml | grep -A3 “compress”
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<!– Reduce输出压缩 –>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>
3.4 作业监控与调试
3.4.1 查看作业日志
yarn logs -applicationId application_1705473600050 > /bigdata/logs/job_1705473600050.log
# 查看特定任务日志
grep “Error” /bigdata/logs/job_1705473600050.log | head -20
Container: container_1705473600050_0001_01_000001
LogType:stderr
24/01/17 20:35:00 INFO mapred.MapTask: Processing split: hdfs://fgedu01:9000/bigdata/warehouse/fgedu/input/part-00000:0+67108864
24/01/17 20:35:05 INFO mapred.MapTask: Map output record size: 100
24/01/17 20:35:10 INFO mapred.MapTask: Spilling map output
…
# 错误日志
24/01/17 20:40:00 WARN mapred.LocalJobRunner: job_1705473600000_0002
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:986)
3.4.2 性能分析
mapred job -history /bigdata/warehouse/fgedu/output/_logs/history/job_1705473600000_0002.jhist
Hadoop job: job_1705473600000_0002
Job Name: FgeduDataProcessor
User: fgedu
Submit Time: 1705475700000
Launch Time: 1705475705000
Finish Time: 1705476300000
Total Time: 600000ms (10 minutes)
# 任务统计
Average Map Time: 240000ms
Average Shuffle Time: 120000ms
Average Sort Time: 60000ms
Average Reduce Time: 180000ms
Part04-生产案例与实战讲解
4.1 WordCount作业实战案例
WordCount是MapReduce入门经典案例。更多视频教程www.fgedu.net.cn
# wordcount_example.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
# 准备测试数据
INPUT_DIR=”/bigdata/warehouse/fgedu/wordcount/input”
OUTPUT_DIR=”/bigdata/warehouse/fgedu/wordcount/output”
# 创建输入目录
hdfs dfs -mkdir -p ${INPUT_DIR}
# 上传测试数据
echo “hello world hello hadoop hello mapreduce” | hdfs dfs -put – ${INPUT_DIR}/test.txt
echo “hadoop is good mapreduce is powerful” | hdfs dfs -put – ${INPUT_DIR}/test2.txt
# 运行WordCount
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount ${INPUT_DIR} ${OUTPUT_DIR}
# 查看结果
hdfs dfs -cat ${OUTPUT_DIR}/part-r-00000
24/01/17 21:00:00 INFO client.RMProxy: Connecting to ResourceManager
24/01/17 21:00:30 INFO mapreduce.Job: Job job_1705473600000_0010 completed successfully
# 统计结果
good 1
hadoop 2
hello 3
is 2
mapreduce 2
powerful 1
world 1
4.2 数据清洗作业案例
数据清洗是ETL流程的重要环节。学习交流加群风哥微信: itpux-com
# data_clean_job.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
# 数据清洗作业
INPUT_DIR=”/bigdata/warehouse/fgedu/raw_data”
OUTPUT_DIR=”/bigdata/warehouse/fgedu/clean_data”
# 运行数据清洗作业
hadoop jar /bigdata/app/jars/fgedu-data-clean.jar \
com.fgedu.etl.DataCleanJob \
-D mapreduce.job.maps=50 \
-D mapreduce.job.reduces=10 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
-D mapreduce.job.combiner.class=com.fgedu.etl.DataCleanCombiner \
${INPUT_DIR} ${OUTPUT_DIR}
# 查看清洗结果
hdfs dfs -ls ${OUTPUT_DIR}/
24/01/17 21:30:00 INFO mapreduce.Job: Running job: job_1705473600000_0020
24/01/17 21:35:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 21:35:05 INFO mapreduce.Job: Job job_1705473600000_0020 completed successfully
# 清洗统计
Input Records: 10000000
Output Records: 9500000
Invalid Records: 500000
Clean Rate: 95%
# 输出文件
Found 10 items
-rw-r–r– 3 fgedu fgedu 1073741824 2024-01-17 21:35 /bigdata/warehouse/fgedu/clean_data/part-r-00000
-rw-r–r– 3 fgedu fgedu 1073741824 2024-01-17 21:35 /bigdata/warehouse/fgedu/clean_data/part-r-00001
…
4.3 大规模数据处理案例
4.3.1 大规模数据排序
hadoop jar /bigdata/app/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar sort \
-D mapreduce.job.maps=100 \
-D mapreduce.job.reduces=50 \
-D mapreduce.map.memory.mb=4096 \
-D mapreduce.reduce.memory.mb=8192 \
/bigdata/warehouse/fgedu/unsorted_data \
/bigdata/warehouse/fgedu/sorted_data
# 查看排序结果
hdfs dfs -cat /bigdata/warehouse/fgedu/sorted_data/part-r-00000 | head -10
24/01/17 22:00:00 INFO mapreduce.Job: Running job: job_1705473600000_0030
24/01/17 22:30:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 22:30:05 INFO mapreduce.Job: Job job_1705473600000_0030 completed successfully
# 排序统计
Input Records: 100000000
Output Records: 100000000
Execution Time: 30 minutes
# 排序结果
0000000001
0000000002
0000000003
0000000004
0000000005
0000000006
0000000007
0000000008
0000000009
0000000010
4.3.2 大规模数据聚合
hadoop jar /bigdata/app/jars/fgedu-aggregation.jar \
com.fgedu.etl.AggregationJob \
-D mapreduce.job.maps=200 \
-D mapreduce.job.reduces=20 \
-D mapreduce.map.memory.mb=8192 \
-D mapreduce.reduce.memory.mb=16384 \
-D mapreduce.job.combiner.class=com.fgedu.etl.AggregationCombiner \
/bigdata/warehouse/fgedu/detail_data \
/bigdata/warehouse/fgedu/summary_data
24/01/17 23:00:00 INFO mapreduce.Job: Running job: job_1705473600000_0040
24/01/17 23:45:00 INFO mapreduce.Job: map 100% reduce 100%
24/01/17 23:45:05 INFO mapreduce.Job: Job job_1705473600000_0040 completed successfully
# 聚合统计
Input Records: 1000000000
Output Records: 1000000
Aggregation Ratio: 1000:1
Execution Time: 45 minutes
Part05-风哥经验总结与分享
5.1 MapReduce开发最佳实践
在实际生产环境中,MapReduce开发需要注意以下几点:from bigdata视频:www.itpux.com
1. 合理设计Map和Reduce逻辑
2. 使用Combiner减少数据传输
3. 选择合适的数据类型和序列化
4. 配置压缩减少IO开销
5. 根据数据量调整并行度
5.2 性能优化经验总结
5.2.1 性能优化建议
– 避免数据倾斜,合理设计Partitioner
– 调整内存配置,避免OOM
– 使用压缩减少Shuffle数据量
– 合理设置Map和Reduce数量
– 监控作业执行,及时发现问题
5.2.2 作业优化脚本
# mr_job_optimizer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
# MapReduce作业优化脚本
INPUT_SIZE=$(hdfs dfs -du -s $1 | awk ‘{print $1}’)
BLOCK_SIZE=134217728
# 计算Map数量
MAP_NUM=$((${INPUT_SIZE}/${BLOCK_SIZE}))
if [ ${MAP_NUM} -lt 10 ]; then
MAP_NUM=10
fi
# 计算Reduce数量
REDUCE_NUM=$((${MAP_NUM}/10))
if [ ${REDUCE_NUM} -lt 1 ]; then
REDUCE_NUM=1
fi
echo “Recommended Configuration:”
echo “Map Tasks: ${MAP_NUM}”
echo “Reduce Tasks: ${REDUCE_NUM}”
echo “Map Memory: 4096 MB”
echo “Reduce Memory: 8192 MB”
./mr_job_optimizer.sh /bigdata/warehouse/fgedu/input
Recommended Configuration:
Map Tasks: 150
Reduce Tasks: 15
Map Memory: 4096 MB
Reduce Memory: 8192 MB
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
