1. 首页 > Hadoop教程 > 正文

大数据教程FG046-Sqoop数据同步MySQL实战

内容简介:本文详细介绍Sqoop与MySQL数据同步的核心方法与生产实战应用。风哥教程参考Sqoop官方文档MySQL Connector、Import Export等内容,涵盖MySQL数据导入HDFS/Hive、Hive数据导出MySQL、全量增量同步等核心功能,结合生产环境实际案例,帮助读者掌握Sqoop与MySQL数据同步的实战技能。

目录大纲

Part01-基础概念与理论知识
  1.1 Sqoop与MySQL集成概述
  1.2 数据同步模式分类
  1.3 连接器配置原理
Part02-生产环境规划与建议
  2.1 同步策略规划
  2.2 性能优化建议
  2.3 容错处理方案
Part03-生产环境项目实施方案
  3.1 MySQL导入HDFS实战
  3.2 MySQL导入Hive实战
  3.3 Hive导出MySQL实战
Part04-生产案例与实战讲解
  4.1 订单数据同步案例
  4.2 用户数据同步案例
  4.3 定时同步调度案例
Part05-风哥经验总结与分享
  5.1 数据同步最佳实践
  5.2 常见问题与解决方案
  5.3 生产环境注意事项

Part01-基础概念与理论知识

1.1 Sqoop与MySQL集成概述

Sqoop是Apache顶级项目,专为Hadoop与关系数据库之间的数据传输设计。更多视频教程www.fgedu.net.cn

Sqoop与MySQL集成优势:

1. 高效批量数据传输
2. 支持全量和增量同步
3. 自动化的MapReduce任务
4. 支持数据压缩和格式转换

适用场景:

1. 数据仓库ODS层数据导入
2. 业务数据备份到Hadoop
3. 分析结果回流业务系统
4. 数据迁移和同步

1.2 数据同步模式分类

Sqoop支持多种数据同步模式:学习交流加群风哥微信: itpux-com

— 同步模式分类
— 1. 全量导入
— 导入表中所有数据
sqoop import –table orders –target-dir /data/orders

— 2. 增量导入
— 基于时间戳或自增ID
sqoop import –table orders \
  –incremental append \
  –check-column update_time \
  –last-value “2024-01-18 00:00:00”

— 3. 条件导入
— 基于WHERE条件过滤
sqoop import –table orders \
  –where “create_time >= ‘2024-01-01′”

— 4. 查询导入
— 基于自定义SQL查询
sqoop import \
  –query “SELECT * FROM orders WHERE \$CONDITIONS” \
  –split-by id

1.3 连接器配置原理

MySQL连接器配置是Sqoop与MySQL通信的基础:

— MySQL连接参数
–connect: JDBC连接URL
–username: 数据库用户名
–password: 数据库密码
–driver: JDBC驱动类

— 连接URL格式
jdbc:mysql://hostname:port/database

— 完整连接示例
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false&useUnicode=true&characterEncoding=UTF-8” \
  –username fgedu \
  –password fgedu123 \
  –table orders

Part02-生产环境规划与建议

2.1 同步策略规划

同步策略需要根据数据特点选择。风哥提示:合理的同步策略可以大幅提升数据传输效率。

— 同步策略选择
— 小表: 全量导入
— 大表: 增量导入
— 维度表: 每日全量覆盖
— 事实表: 增量追加

— 并行度设置
— 小表: 1-4个Mapper
— 中表: 4-8个Mapper
— 大表: 8-16个Mapper

— 分割列选择
— 选择主键或索引列
— 数据分布均匀
— 避免数据倾斜

2.2 性能优化建议

性能优化可以提升数据传输效率:更多学习教程公众号风哥教程itpux_com

— 性能优化参数

— 增加并行度
–num-mappers 8

— 启用压缩
–compress
–compression-codec org.apache.hadoop.io.compress.SnappyCodec

— 批量插入
–batch
–direct

— 优化JDBC
–connect “jdbc:mysql://host/db?useSSL=false&rewriteBatchedStatements=true”

2.3 容错处理方案

容错处理确保数据同步可靠性:

— 容错处理参数

— 重试机制
–fetch-size 1000

— 临时目录
–temporary-root-dir /tmp/sqoop_temp

— 错误处理
–null-string ‘\\N’
–null-non-string ‘\\N’

— 数据验证
–check-column id
–incremental append

Part03-生产环境项目实施方案

3.1 MySQL导入HDFS实战

MySQL导入HDFS是Sqoop的基本功能。from bigdata视频:www.itpux.com

— 基本导入命令
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
  –username fgedu \
  –password fgedu123 \
  –table orders \
  –target-dir /data/ods/orders \
  –delete-target-dir \
  –fields-terminated-by ‘\t’ \
  –lines-terminated-by ‘\n’ \
  –null-string ‘\\N’ \
  –null-non-string ‘\\N’ \
  –compress \
  –compression-codec org.apache.hadoop.io.compress.SnappyCodec \
  –num-mappers 4 \
  –split-by id;

# 导入执行日志
24/01/19 02:00:00 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
24/01/19 02:00:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/01/19 02:00:01 INFO mapreduce.Job: The url to track the job: http://fgedu01:8088/
24/01/19 02:05:00 INFO mapreduce.Job: Job job_1705651200000_0001 completed successfully
24/01/19 02:05:00 INFO mapreduce.Job: Counters: 35
    File System Counters
        FILE: Number of bytes read=0
        HDFS: Number of bytes written=45678901
    Map-Reduce Framework
        Map input records=1250000
        Map output records=1250000

# 导入成功
Total records: 1250000
Total size: 45678901 bytes

3.2 MySQL导入Hive实战

MySQL直接导入Hive简化了数据导入流程。学习交流加群风哥QQ113257174

— 导入到Hive表
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
  –username fgedu \
  –password fgedu123 \
  –table orders \
  –hive-import \
  –hive-database ods_trade \
  –hive-table ods_trade_order_di \
  –hive-partition-key dt \
  –hive-partition-value “2024-01-19” \
  –fields-terminated-by ‘\t’ \
  –null-string ‘\\N’ \
  –null-non-string ‘\\N’ \
  –num-mappers 4 \
  –split-by id;

— 导入并创建Hive表
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
  –username fgedu \
  –password fgedu123 \
  –table orders \
  –hive-import \
  –hive-create-hive-table \
  –hive-database ods_trade \
  –hive-table ods_trade_order_new;

3.3 Hive导出MySQL实战

Hive数据导出到MySQL实现数据回流:

— Hive导出到MySQL
sqoop export \
  –connect “jdbc:mysql://fgedu01:3306/report_db?useSSL=false” \
  –username fgedu \
  –password fgedu123 \
  –table report_daily_sales \
  –export-dir /user/hive/warehouse/ads_trade.db/ads_daily_sales \
  –input-fields-terminated-by ‘\t’ \
  –input-lines-terminated-by ‘\n’ \
  –input-null-string ‘\\N’ \
  –input-null-non-string ‘\\N’ \
  –num-mappers 4 \
  –batch;

— 更新模式导出
sqoop export \
  –connect “jdbc:mysql://fgedu01:3306/report_db?useSSL=false” \
  –username fgedu \
  –password fgedu123 \
  –table report_daily_sales \
  –export-dir /user/hive/warehouse/ads_trade.db/ads_daily_sales \
  –update-key report_date \
  –update-mode allowinsert \
  –input-fields-terminated-by ‘\t’;

Part04-生产案例与实战讲解

4.1 订单数据同步案例

本案例演示订单数据的完整同步流程。更多视频教程www.fgedu.net.cn

#!/bin/bash
# order_sync.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== 订单数据同步 ===”
echo “Date: $(date)”

DT=$1
if [ -z “$DT” ]; then
  DT=$(date -d “-1 day” +%Y-%m-%d)
fi

echo “Sync date: $DT”

# 同步订单数据
sqoop import \
  –connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
  –username fgedu \
  –password-file /user/hive/.password \
  –table orders \
  –where “create_time >= ‘${DT} 00:00:00’ AND create_time < '${DT} 23:59:59'" \
  –target-dir /user/hive/warehouse/ods_trade.db/ods_trade_order_di/dt=${DT} \
  –delete-target-dir \
  –fields-terminated-by ‘\t’ \
  –compress \
  –compression-codec org.apache.hadoop.io.compress.SnappyCodec \
  –num-mappers 4 \
  –split-by id;

# 修复分区
hive -e “MSCK REPAIR TABLE ods_trade.ods_trade_order_di;”

# 数据验证
hive -e “SELECT COUNT(*) FROM ods_trade.ods_trade_order_di WHERE dt=’${DT}’;”

echo “=== 同步完成 ===”

=== 订单数据同步 ===
Date: Fri Jan 19 02:00:00 CST 2024
Sync date: 2024-01-18

# 执行Sqoop导入
24/01/19 02:00:00 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
24/01/19 02:05:00 INFO mapreduce.Job: Job completed successfully

# 修复分区
Partitions recovered: 1
OK

# 数据验证
+———+
| _c0 |
+———+
| 1250000 |
+———+

=== 同步完成 ===

4.2 用户数据同步案例

用户数据同步案例演示增量同步。学习交流加群风哥微信: itpux-com

— 用户数据增量同步

— 创建Sqoop Job
sqoop job –create fgedu_user_sync \
  — import \
  –connect “jdbc:mysql://fgedu01:3306/user_db?useSSL=false” \
  –username fgedu \
  –password-file /user/hive/.password \
  –table users \
  –incremental lastmodified \
  –check-column update_time \
  –last-value “2024-01-01 00:00:00” \
  –target-dir /user/hive/warehouse/ods_user.db/ods_user_info_df \
  –merge-key user_id \
  –fields-terminated-by ‘\t’ \
  –num-mappers 4;

— 执行Job
sqoop job –exec fgedu_user_sync;

— 查看Job状态
sqoop job –show fgedu_user_sync;

4.3 定时同步调度案例

定时同步调度案例演示自动化同步。风哥提示:定时调度是生产环境数据同步的标准做法。

— Crontab定时任务配置

# 编辑定时任务
crontab -e

# 每天凌晨2点执行订单同步
0 2 * * * /bigdata/scripts/order_sync.sh >> /bigdata/logs/order_sync.log 2>&1

# 每天凌晨3点执行用户同步
0 3 * * * /bigdata/scripts/user_sync.sh >> /bigdata/logs/user_sync.log 2>&1

# 每小时执行增量同步
0 * * * * /bigdata/scripts/incremental_sync.sh >> /bigdata/logs/incremental_sync.log 2>&1

Part05-风哥经验总结与分享

5.1 数据同步最佳实践

风哥在生产环境中的数据同步经验总结:from bigdata视频:www.itpux.com

1. 同步策略:

根据数据特点选择全量或增量同步

2. 性能优化:

合理设置并行度和压缩方式

3. 容错处理:

建立重试机制和数据验证

5.2 常见问题与解决方案

问题1:连接超时

解决方案:检查网络连接,增加超时时间。

— 增加超时时间
–connect “jdbc:mysql://host/db?connectTimeout=60000&socketTimeout=60000”

问题2:数据类型不匹配

解决方案:使用–map-column-java参数指定类型映射。学习交流加群风哥QQ113257174

5.3 生产环境注意事项

1. 密码安全:使用–password-file存储密码,避免明文。

2. 资源控制:合理设置并行度,避免对源系统造成压力。

3. 监控告警:建立同步任务监控,及时发现和处理问题。

风哥提示:Sqoop是Hadoop与关系数据库数据同步的重要工具。在生产环境中,要根据数据特点选择合适的同步策略,建立完善的调度和监控机制,确保数据同步的稳定性和可靠性。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息