1. 首页 > Hadoop教程 > 正文

大数据教程FG218-Hadoop数据去重与清洗实战

本文档详细介绍Hadoop生态系统中的数据去重与清洗技术,包括数据去重的概念、方法、工具和实战应用。风哥教程参考bigdata官方文档Data Cleaning、Data Deduplication等内容。

目录大纲

Part01-基础概念与理论知识

1.1 数据去重基本概念

数据去重是指识别并删除数据集中的重复记录,确保数据的唯一性。在大数据环境中,数据去重可以减少存储开销、提高查询性能、保证数据分析结果的准确性。

1.2 数据清洗基本概念

数据清洗是指对数据进行预处理,包括去除重复数据、处理缺失值、纠正错误数据、标准化数据格式等,以提高数据质量。

1.3 去重与清洗的重要性

数据去重与清洗的重要性体现在:

  • 提高数据质量:确保数据的准确性和一致性
  • 减少存储成本:去除重复数据,节省存储空间
  • 提高处理效率:减少数据量,加速数据处理
  • 保证分析结果:避免重复数据导致的分析偏差

Part02-生产环境规划与建议

2.1 硬件与网络要求

数据去重与清洗对硬件和网络的要求:

  • 存储:充足的存储空间,用于临时数据处理
  • CPU:多核处理器,支持并行计算
  • 内存:充足的内存,加速数据处理
  • 网络:高带宽网络,支持数据传输

2.2 软件环境配置

生产环境建议配置:

# 操作系统配置
$ cat /etc/redhat-release
Oracle Linux Server release 9.3

# JDK版本
$ java -version
openjdk version “11.0.17” 2022-10-18 LTS

# Hadoop版本
$ hadoop version
Hadoop 3.3.4

# Spark版本
$ spark-submit –version
Version 3.2.1

# Hive版本
$ hive –version
Hive 3.1.3

2.3 去重与清洗策略选择

不同场景下的去重与清洗策略:

  • 实时去重:使用流处理框架如Kafka Streams、Spark Streaming
  • 批量去重:使用MapReduce、Spark批处理
  • 增量去重:只处理新增数据,减少处理量
  • 全量去重:定期对全量数据进行去重

Part03-生产环境项目实施方案

3.1 基于MapReduce的去重实现

MapReduce是Hadoop中最基础的分布式计算框架,可以用于实现数据去重。

3.1.1 MapReduce去重代码

// Mapper类
public class DeduplicationMapper extends Mapper {
private Text outKey = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接将每行数据作为key输出
outKey.set(value);
context.write(outKey, NullWritable.get());
}
}

// Reducer类
public class DeduplicationReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 对于相同的key,只输出一次
context.write(key, NullWritable.get());
}
}

// 主类
public class DeduplicationJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, “Deduplication”);
job.setJarByClass(DeduplicationJob.class);
job.setMapperClass(DeduplicationMapper.class);
job.setReducerClass(DeduplicationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

3.2 基于Spark的去重实现

Spark提供了更高效的数据处理能力,适合处理大规模数据去重。

3.2.1 Spark去重代码

// Scala代码
import org.apache.spark.sql.SparkSession

object Deduplication {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(“Deduplication”)
.master(“local[*]”)
.getOrCreate()

// 读取数据
val data = spark.read.text(“hdfs://192.168.1.101:9000/input/data.txt”)

// 去重
val deduplicated = data.distinct()

// 保存结果
deduplicated.write.text(“hdfs://192.168.1.101:9000/output/deduplicated”)

spark.stop()
}
}

3.3 基于Hive的去重实现

Hive提供了SQL接口,可以方便地实现数据去重。

3.3.1 Hive去重SQL

— 创建原始表
CREATE TABLE fgedu_original_data (
id STRING,
name STRING,
age INT,
address STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;

— 加载数据
LOAD DATA INPATH ‘/input/data.txt’ INTO TABLE fgedu_original_data;

— 去重并创建新表
CREATE TABLE fgedu_deduplicated_data AS
SELECT DISTINCT * FROM fgedu_original_data;

— 查看去重结果
SELECT COUNT(*) FROM fgedu_original_data;
SELECT COUNT(*) FROM fgedu_deduplicated_data;

Part04-生产案例与实战讲解

4.1 HDFS文件去重实战

使用Hadoop命令和MapReduce实现HDFS文件去重。

4.1.1 准备测试数据

$ cat > test.txt << EOF 1,John,25,New York 2,Mary,30,London 3,John,25,New York 4,Tom,35,Paris 2,Mary,30,London EOF
$ hdfs dfs -mkdir -p /input

$ hdfs dfs -put test.txt /input/

$ hdfs dfs -cat /input/test.txt

1,John,25,New York
2,Mary,30,London
3,John,25,New York
4,Tom,35,Paris
2,Mary,30,London

4.1.2 使用MapReduce去重

$ hadoop jar deduplication.jar DeduplicationJob /input/test.txt /output/deduplicated

22/07/25 12:00:00 INFO client.RMProxy: Connecting to ResourceManager at fgedu.net.cn/192.168.1.101:8032
22/07/25 12:00:01 INFO input.FileInputFormat: Total input files to process : 1
22/07/25 12:00:02 INFO mapreduce.JobSubmitter: number of splits:1
22/07/25 12:00:03 INFO mapreduce.Job: Running job: job_1658740800000_0001
22/07/25 12:00:08 INFO mapreduce.Job: Job job_1658740800000_0001 completed successfully
22/07/25 12:00:08 INFO mapreduce.Job: Counters: 49
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=100
Map output materialized bytes=100
Input split bytes=100
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=100
Reduce input records=5
Reduce output records=3
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=10
Total committed heap usage (bytes)=268435456
File Input Format Counters
Bytes Read=100
File Output Format Counters
Bytes Written=60

4.1.3 查看去重结果

$ hdfs dfs -cat /output/deduplicated/part-r-00000

1,John,25,New York
2,Mary,30,London
4,Tom,35,Paris

4.2 HBase数据去重实战

使用HBase的CheckAndPut操作实现数据去重。

4.2.1 HBase表创建

$ hbase shell

hbase(main):001:0> create ‘fgedu_users’, ‘cf’
0 row(s) in 1.2340 seconds

=> Hbase::Table – fgedu_users

4.2.2 批量导入数据并去重

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.FileReader;

public class HBaseDeduplication {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “192.168.1.101:2181”);

try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(“fgedu_users”));
BufferedReader reader = new BufferedReader(new FileReader(“test.txt”))) {

String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(“,”);
if (parts.length >= 4) {
String id = parts[0];
String name = parts[1];
String age = parts[2];
String address = parts[3];

Put put = new Put(Bytes.toBytes(id));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“name”), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“age”), Bytes.toBytes(age));
put.addColumn(Bytes.toBytes(“cf”), Bytes.toBytes(“address”), Bytes.toBytes(address));

// 使用CheckAndPut确保幂等性,避免重复插入
boolean success = table.checkAndPut(
Bytes.toBytes(id),
Bytes.toBytes(“cf”),
Bytes.toBytes(“name”),
null, // 只有当列不存在时才插入
put
);

System.out.println(“Inserted ” + id + “: ” + success);
}
}
}
}
}

4.2.3 验证去重结果

$ hbase shell

hbase(main):001:0> scan ‘fgedu_users’
ROW COLUMN+CELL
1 column=cf:address, timestamp=1627284000000, value=New York
1 column=cf:age, timestamp=1627284000000, value=25
1 column=cf:name, timestamp=1627284000000, value=John
2 column=cf:address, timestamp=1627284000000, value=London
2 column=cf:age, timestamp=1627284000000, value=30
2 column=cf:name, timestamp=1627284000000, value=Mary
4 column=cf:address, timestamp=1627284000000, value=Paris
4 column=cf:age, timestamp=1627284000000, value=35
4 column=cf:name, timestamp=1627284000000, value=Tom
3 row(s) in 0.0450 seconds

4.3 Hive数据清洗实战

使用Hive SQL实现数据清洗,包括去重、处理空值、标准化数据格式等。

4.3.1 创建测试表

$ hive

hive> CREATE TABLE fgedu_raw_data (
> id STRING,
> name STRING,
> age STRING,
> address STRING,
> email STRING
> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;
OK
Time taken: 0.567 seconds

hive> LOAD DATA LOCAL INPATH ‘/home/fgedu/test_data.txt’ INTO TABLE fgedu_raw_data;
Loading data to table default.fgedu_raw_data
OK
Time taken: 0.345 seconds

4.3.2 查看原始数据

$ hive -e “SELECT * FROM fgedu_raw_data LIMIT 10;”

1 John 25 New York john@example.com
2 Mary 30 London mary@example.com
3 John 25 New York john@example.com
4 Tom 35 Paris tom@example.com
5 NULL 28 Tokyo null
6 Alice NULL Beijing alice@example.com
7 Bob 40 NULL bob@example.com
8 Mary 30 London mary@example.com
9 NULL NULL NULL null
10 Eve 32 Berlin eve@example.com

4.3.3 数据清洗与去重

$ hive -e ”
CREATE TABLE fgedu_cleaned_data AS
SELECT
COALESCE(id, CONCAT(‘GEN_’, ROW_NUMBER() OVER (ORDER BY name))) AS id,
COALESCE(name, ‘Unknown’) AS name,
CAST(COALESCE(age, ‘0’) AS INT) AS age,
COALESCE(address, ‘Unknown’) AS address,
LOWER(COALESCE(email, ‘unknown@example.com’)) AS email
FROM (
SELECT DISTINCT * FROM fgedu_raw_data
) t;

SELECT * FROM fgedu_cleaned_data LIMIT 10;

1 John 25 New York john@example.com
2 Mary 30 London mary@example.com
4 Tom 35 Paris tom@example.com
5 Unknown 28 Tokyo unknown@example.com
6 Alice 0 Beijing alice@example.com
7 Bob 40 Unknown bob@example.com
10 Eve 32 Berlin eve@example.com
GEN_1 Unknown 0 Unknown unknown@example.com

Part05-风哥经验总结与分享

5.1 去重与清洗性能优化

数据去重与清洗的性能优化建议:

  • 使用合适的分区策略:根据数据特点选择分区方式,提高并行处理能力
  • 使用压缩:对输入和输出数据进行压缩,减少网络传输和存储开销
  • 优化内存配置:根据数据量调整MapReduce/Spark的内存配置
  • 使用 bloom filter:在去重前使用bloom filter快速过滤重复数据
  • 增量处理:只处理新增数据,减少处理量

5.2 常见问题与解决方案

常见问题及解决方法:

  • 内存不足:增加内存配置,或使用外部存储
  • 处理速度慢:增加并行度,优化算法
  • 数据倾斜:使用随机前缀,均衡数据分布
  • 去重不彻底:检查去重键的选择,确保唯一性
  • 清洗规则复杂:使用规则引擎,提高可维护性

5.3 最佳实践建议

数据去重与清洗最佳实践:

  • 建立数据质量标准:定义数据质量指标,定期评估
  • 自动化处理:编写脚本自动执行去重和清洗操作
  • 监控与告警:监控数据质量,及时发现问题
  • 版本控制:保存清洗前后的数据,便于回溯
  • 持续优化:根据业务需求和数据特点,不断优化去重和清洗策略

风哥提示:数据去重与清洗是大数据处理的基础环节,直接影响后续分析结果的准确性。建议在系统设计初期就考虑数据质量问题,建立完善的数据治理体系。

本文档风哥教程参考bigdata官方文档Data Cleaning、Data Deduplication等内容,结合生产环境实际经验编写。更多视频教程www.fgedu.net.cn

学习交流加群风哥微信: itpux-com,学习交流加群风哥QQ113257174

更多学习教程公众号风哥教程itpux_com,from bigdata视频:www.itpux.com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息