本文档详细介绍Hadoop生态系统中的数据去重与清洗技术,包括数据去重的概念、方法、工具和实战应用。风哥教程参考bigdata官方文档Data Cleaning、Data Deduplication等内容。
目录大纲
- Part01-基础概念与理论知识
- 1.1 数据去重基本概念
- 1.2 数据清洗基本概念
- 1.3 去重与清洗的重要性
- Part02-生产环境规划与建议
- 2.1 硬件与网络要求
- 2.2 软件环境配置
- 2.3 去重与清洗策略选择
- Part03-生产环境项目实施方案
- 3.1 基于MapReduce的去重实现
- 3.2 基于Spark的去重实现
- 3.3 基于Hive的去重实现
- Part04-生产案例与实战讲解
- 4.1 HDFS文件去重实战
- 4.2 HBase数据去重实战
- 4.3 Hive数据清洗实战
- Part05-风哥经验总结与分享
- 5.1 去重与清洗性能优化
- 5.2 常见问题与解决方案
- 5.3 最佳实践建议
Part01-基础概念与理论知识
1.1 数据去重基本概念
数据去重是指识别并删除数据集中的重复记录,确保数据的唯一性。在大数据环境中,数据去重可以减少存储开销、提高查询性能、保证数据分析结果的准确性。
1.2 数据清洗基本概念
数据清洗是指对数据进行预处理,包括去除重复数据、处理缺失值、纠正错误数据、标准化数据格式等,以提高数据质量。
1.3 去重与清洗的重要性
数据去重与清洗的重要性体现在:
- 提高数据质量:确保数据的准确性和一致性
- 减少存储成本:去除重复数据,节省存储空间
- 提高处理效率:减少数据量,加速数据处理
- 保证分析结果:避免重复数据导致的分析偏差
Part02-生产环境规划与建议
2.1 硬件与网络要求
数据去重与清洗对硬件和网络的要求:
- 存储:充足的存储空间,用于临时数据处理
- CPU:多核处理器,支持并行计算
- 内存:充足的内存,加速数据处理
- 网络:高带宽网络,支持数据传输
2.2 软件环境配置
生产环境建议配置:
$ cat /etc/redhat-release
Oracle Linux Server release 9.3
# JDK版本
$ java -version
openjdk version “11.0.17” 2022-10-18 LTS
# Hadoop版本
$ hadoop version
Hadoop 3.3.4
# Spark版本
$ spark-submit –version
Version 3.2.1
# Hive版本
$ hive –version
Hive 3.1.3
2.3 去重与清洗策略选择
不同场景下的去重与清洗策略:
- 实时去重:使用流处理框架如Kafka Streams、Spark Streaming
- 批量去重:使用MapReduce、Spark批处理
- 增量去重:只处理新增数据,减少处理量
- 全量去重:定期对全量数据进行去重
Part03-生产环境项目实施方案
3.1 基于MapReduce的去重实现
MapReduce是Hadoop中最基础的分布式计算框架,可以用于实现数据去重。
3.1.1 MapReduce去重代码
public class DeduplicationMapper extends Mapper
private Text outKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接将每行数据作为key输出
outKey.set(value);
context.write(outKey, NullWritable.get());
}
}
// Reducer类
public class DeduplicationReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
// 对于相同的key,只输出一次
context.write(key, NullWritable.get());
}
}
// 主类
public class DeduplicationJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, “Deduplication”);
job.setJarByClass(DeduplicationJob.class);
job.setMapperClass(DeduplicationMapper.class);
job.setReducerClass(DeduplicationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.2 基于Spark的去重实现
Spark提供了更高效的数据处理能力,适合处理大规模数据去重。
3.2.1 Spark去重代码
import org.apache.spark.sql.SparkSession
object Deduplication {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(“Deduplication”)
.master(“local[*]”)
.getOrCreate()
// 读取数据
val data = spark.read.text(“hdfs://192.168.1.101:9000/input/data.txt”)
// 去重
val deduplicated = data.distinct()
// 保存结果
deduplicated.write.text(“hdfs://192.168.1.101:9000/output/deduplicated”)
spark.stop()
}
}
3.3 基于Hive的去重实现
Hive提供了SQL接口,可以方便地实现数据去重。
3.3.1 Hive去重SQL
CREATE TABLE fgedu_original_data (
id STRING,
name STRING,
age INT,
address STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;
— 加载数据
LOAD DATA INPATH ‘/input/data.txt’ INTO TABLE fgedu_original_data;
— 去重并创建新表
CREATE TABLE fgedu_deduplicated_data AS
SELECT DISTINCT * FROM fgedu_original_data;
— 查看去重结果
SELECT COUNT(*) FROM fgedu_original_data;
SELECT COUNT(*) FROM fgedu_deduplicated_data;
Part04-生产案例与实战讲解
4.1 HDFS文件去重实战
使用Hadoop命令和MapReduce实现HDFS文件去重。
4.1.1 准备测试数据
2,Mary,30,London
3,John,25,New York
4,Tom,35,Paris
2,Mary,30,London
4.1.2 使用MapReduce去重
22/07/25 12:00:01 INFO input.FileInputFormat: Total input files to process : 1
22/07/25 12:00:02 INFO mapreduce.JobSubmitter: number of splits:1
22/07/25 12:00:03 INFO mapreduce.Job: Running job: job_1658740800000_0001
22/07/25 12:00:08 INFO mapreduce.Job: Job job_1658740800000_0001 completed successfully
22/07/25 12:00:08 INFO mapreduce.Job: Counters: 49
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=100
Map output materialized bytes=100
Input split bytes=100
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=100
Reduce input records=5
Reduce output records=3
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=10
Total committed heap usage (bytes)=268435456
File Input Format Counters
Bytes Read=100
File Output Format Counters
Bytes Written=60
4.1.3 查看去重结果
2,Mary,30,London
4,Tom,35,Paris
4.2 HBase数据去重实战
使用HBase的CheckAndPut操作实现数据去重。
4.2.1 HBase表创建
0 row(s) in 1.2340 seconds
=> Hbase::Table – fgedu_users
4.2.2 批量导入数据并去重
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.BufferedReader;
import java.io.FileReader;
public class HBaseDeduplication {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “192.168.1.101:2181”);
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(“fgedu_users”));
BufferedReader reader = new BufferedReader(new FileReader(“test.txt”))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(“,”);
if (parts.length >= 4) {
String id = parts[0];
String name = parts[1];
String age = parts[2];
String address = parts[3];
Put put = new Put(Bytes.toBytes(id));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“name”), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“age”), Bytes.toBytes(age));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“address”), Bytes.toBytes(address));
// 使用CheckAndPut确保幂等性,避免重复插入
boolean success = table.checkAndPut(
Bytes.toBytes(id),
Bytes.toBytes(“cf”),
Bytes.toBytes(“name”),
null, // 只有当列不存在时才插入
put
);
System.out.println(“Inserted ” + id + “: ” + success);
}
}
}
}
}
4.2.3 验证去重结果
ROW COLUMN+CELL
1 column=cf:address, timestamp=1627284000000, value=New York
1 column=cf:age, timestamp=1627284000000, value=25
1 column=cf:name, timestamp=1627284000000, value=John
2 column=cf:address, timestamp=1627284000000, value=London
2 column=cf:age, timestamp=1627284000000, value=30
2 column=cf:name, timestamp=1627284000000, value=Mary
4 column=cf:address, timestamp=1627284000000, value=Paris
4 column=cf:age, timestamp=1627284000000, value=35
4 column=cf:name, timestamp=1627284000000, value=Tom
3 row(s) in 0.0450 seconds
4.3 Hive数据清洗实战
使用Hive SQL实现数据清洗,包括去重、处理空值、标准化数据格式等。
4.3.1 创建测试表
> id STRING,
> name STRING,
> age STRING,
> address STRING,
> email STRING
> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;
OK
Time taken: 0.567 seconds
hive> LOAD DATA LOCAL INPATH ‘/home/fgedu/test_data.txt’ INTO TABLE fgedu_raw_data;
Loading data to table default.fgedu_raw_data
OK
Time taken: 0.345 seconds
4.3.2 查看原始数据
2 Mary 30 London mary@example.com
3 John 25 New York john@example.com
4 Tom 35 Paris tom@example.com
5 NULL 28 Tokyo null
6 Alice NULL Beijing alice@example.com
7 Bob 40 NULL bob@example.com
8 Mary 30 London mary@example.com
9 NULL NULL NULL null
10 Eve 32 Berlin eve@example.com
4.3.3 数据清洗与去重
CREATE TABLE fgedu_cleaned_data AS
SELECT
COALESCE(id, CONCAT(‘GEN_’, ROW_NUMBER() OVER (ORDER BY name))) AS id,
COALESCE(name, ‘Unknown’) AS name,
CAST(COALESCE(age, ‘0’) AS INT) AS age,
COALESCE(address, ‘Unknown’) AS address,
LOWER(COALESCE(email, ‘unknown@example.com’)) AS email
FROM (
SELECT DISTINCT * FROM fgedu_raw_data
) t;
SELECT * FROM fgedu_cleaned_data LIMIT 10;
“
2 Mary 30 London mary@example.com
4 Tom 35 Paris tom@example.com
5 Unknown 28 Tokyo unknown@example.com
6 Alice 0 Beijing alice@example.com
7 Bob 40 Unknown bob@example.com
10 Eve 32 Berlin eve@example.com
GEN_1 Unknown 0 Unknown unknown@example.com
Part05-风哥经验总结与分享
5.1 去重与清洗性能优化
数据去重与清洗的性能优化建议:
- 使用合适的分区策略:根据数据特点选择分区方式,提高并行处理能力
- 使用压缩:对输入和输出数据进行压缩,减少网络传输和存储开销
- 优化内存配置:根据数据量调整MapReduce/Spark的内存配置
- 使用 bloom filter:在去重前使用bloom filter快速过滤重复数据
- 增量处理:只处理新增数据,减少处理量
5.2 常见问题与解决方案
常见问题及解决方法:
- 内存不足:增加内存配置,或使用外部存储
- 处理速度慢:增加并行度,优化算法
- 数据倾斜:使用随机前缀,均衡数据分布
- 去重不彻底:检查去重键的选择,确保唯一性
- 清洗规则复杂:使用规则引擎,提高可维护性
5.3 最佳实践建议
数据去重与清洗最佳实践:
- 建立数据质量标准:定义数据质量指标,定期评估
- 自动化处理:编写脚本自动执行去重和清洗操作
- 监控与告警:监控数据质量,及时发现问题
- 版本控制:保存清洗前后的数据,便于回溯
- 持续优化:根据业务需求和数据特点,不断优化去重和清洗策略
风哥提示:数据去重与清洗是大数据处理的基础环节,直接影响后续分析结果的准确性。建议在系统设计初期就考虑数据质量问题,建立完善的数据治理体系。
本文档风哥教程参考bigdata官方文档Data Cleaning、Data Deduplication等内容,结合生产环境实际经验编写。更多视频教程www.fgedu.net.cn
学习交流加群风哥微信: itpux-com,学习交流加群风哥QQ113257174
更多学习教程公众号风哥教程itpux_com,from bigdata视频:www.itpux.com
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
