内容简介:本文详细介绍Sqoop与MySQL数据同步的核心方法与生产实战应用。风哥教程参考Sqoop官方文档MySQL Connector、Import Export等内容,涵盖MySQL数据导入HDFS/Hive、Hive数据导出MySQL、全量增量同步等核心功能,结合生产环境实际案例,帮助读者掌握Sqoop与MySQL数据同步的实战技能。
目录大纲
Part01-基础概念与理论知识
1.1 Sqoop与MySQL集成概述
1.2 数据同步模式分类
1.3 连接器配置原理
Part02-生产环境规划与建议
2.1 同步策略规划
2.2 性能优化建议
2.3 容错处理方案
Part03-生产环境项目实施方案
3.1 MySQL导入HDFS实战
3.2 MySQL导入Hive实战
3.3 Hive导出MySQL实战
Part04-生产案例与实战讲解
4.1 订单数据同步案例
4.2 用户数据同步案例
4.3 定时同步调度案例
Part05-风哥经验总结与分享
5.1 数据同步最佳实践
5.2 常见问题与解决方案
5.3 生产环境注意事项
Part01-基础概念与理论知识
1.1 Sqoop与MySQL集成概述
Sqoop是Apache顶级项目,专为Hadoop与关系数据库之间的数据传输设计。更多视频教程www.fgedu.net.cn
Sqoop与MySQL集成优势:
1. 高效批量数据传输
2. 支持全量和增量同步
3. 自动化的MapReduce任务
4. 支持数据压缩和格式转换
适用场景:
1. 数据仓库ODS层数据导入
2. 业务数据备份到Hadoop
3. 分析结果回流业务系统
4. 数据迁移和同步
1.2 数据同步模式分类
Sqoop支持多种数据同步模式:学习交流加群风哥微信: itpux-com
— 1. 全量导入
— 导入表中所有数据
sqoop import –table orders –target-dir /data/orders
— 2. 增量导入
— 基于时间戳或自增ID
sqoop import –table orders \
–incremental append \
–check-column update_time \
–last-value “2024-01-18 00:00:00”
— 3. 条件导入
— 基于WHERE条件过滤
sqoop import –table orders \
–where “create_time >= ‘2024-01-01′”
— 4. 查询导入
— 基于自定义SQL查询
sqoop import \
–query “SELECT * FROM orders WHERE \$CONDITIONS” \
–split-by id
1.3 连接器配置原理
MySQL连接器配置是Sqoop与MySQL通信的基础:
–connect: JDBC连接URL
–username: 数据库用户名
–password: 数据库密码
–driver: JDBC驱动类
— 连接URL格式
jdbc:mysql://hostname:port/database
— 完整连接示例
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false&useUnicode=true&characterEncoding=UTF-8” \
–username fgedu \
–password fgedu123 \
–table orders
Part02-生产环境规划与建议
2.1 同步策略规划
同步策略需要根据数据特点选择。风哥提示:合理的同步策略可以大幅提升数据传输效率。
— 小表: 全量导入
— 大表: 增量导入
— 维度表: 每日全量覆盖
— 事实表: 增量追加
— 并行度设置
— 小表: 1-4个Mapper
— 中表: 4-8个Mapper
— 大表: 8-16个Mapper
— 分割列选择
— 选择主键或索引列
— 数据分布均匀
— 避免数据倾斜
2.2 性能优化建议
性能优化可以提升数据传输效率:更多学习教程公众号风哥教程itpux_com
— 增加并行度
–num-mappers 8
— 启用压缩
–compress
–compression-codec org.apache.hadoop.io.compress.SnappyCodec
— 批量插入
–batch
–direct
— 优化JDBC
–connect “jdbc:mysql://host/db?useSSL=false&rewriteBatchedStatements=true”
2.3 容错处理方案
容错处理确保数据同步可靠性:
— 重试机制
–fetch-size 1000
— 临时目录
–temporary-root-dir /tmp/sqoop_temp
— 错误处理
–null-string ‘\\N’
–null-non-string ‘\\N’
— 数据验证
–check-column id
–incremental append
Part03-生产环境项目实施方案
3.1 MySQL导入HDFS实战
MySQL导入HDFS是Sqoop的基本功能。from bigdata视频:www.itpux.com
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
–username fgedu \
–password fgedu123 \
–table orders \
–target-dir /data/ods/orders \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–lines-terminated-by ‘\n’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–compress \
–compression-codec org.apache.hadoop.io.compress.SnappyCodec \
–num-mappers 4 \
–split-by id;
24/01/19 02:00:00 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
24/01/19 02:00:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/01/19 02:00:01 INFO mapreduce.Job: The url to track the job: http://fgedu01:8088/
24/01/19 02:05:00 INFO mapreduce.Job: Job job_1705651200000_0001 completed successfully
24/01/19 02:05:00 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=0
HDFS: Number of bytes written=45678901
Map-Reduce Framework
Map input records=1250000
Map output records=1250000
# 导入成功
Total records: 1250000
Total size: 45678901 bytes
3.2 MySQL导入Hive实战
MySQL直接导入Hive简化了数据导入流程。学习交流加群风哥QQ113257174
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
–username fgedu \
–password fgedu123 \
–table orders \
–hive-import \
–hive-database ods_trade \
–hive-table ods_trade_order_di \
–hive-partition-key dt \
–hive-partition-value “2024-01-19” \
–fields-terminated-by ‘\t’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–num-mappers 4 \
–split-by id;
— 导入并创建Hive表
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
–username fgedu \
–password fgedu123 \
–table orders \
–hive-import \
–hive-create-hive-table \
–hive-database ods_trade \
–hive-table ods_trade_order_new;
3.3 Hive导出MySQL实战
Hive数据导出到MySQL实现数据回流:
sqoop export \
–connect “jdbc:mysql://fgedu01:3306/report_db?useSSL=false” \
–username fgedu \
–password fgedu123 \
–table report_daily_sales \
–export-dir /user/hive/warehouse/ads_trade.db/ads_daily_sales \
–input-fields-terminated-by ‘\t’ \
–input-lines-terminated-by ‘\n’ \
–input-null-string ‘\\N’ \
–input-null-non-string ‘\\N’ \
–num-mappers 4 \
–batch;
— 更新模式导出
sqoop export \
–connect “jdbc:mysql://fgedu01:3306/report_db?useSSL=false” \
–username fgedu \
–password fgedu123 \
–table report_daily_sales \
–export-dir /user/hive/warehouse/ads_trade.db/ads_daily_sales \
–update-key report_date \
–update-mode allowinsert \
–input-fields-terminated-by ‘\t’;
Part04-生产案例与实战讲解
4.1 订单数据同步案例
本案例演示订单数据的完整同步流程。更多视频教程www.fgedu.net.cn
# order_sync.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== 订单数据同步 ===”
echo “Date: $(date)”
DT=$1
if [ -z “$DT” ]; then
DT=$(date -d “-1 day” +%Y-%m-%d)
fi
echo “Sync date: $DT”
# 同步订单数据
sqoop import \
–connect “jdbc:mysql://fgedu01:3306/trade_db?useSSL=false” \
–username fgedu \
–password-file /user/hive/.password \
–table orders \
–where “create_time >= ‘${DT} 00:00:00’ AND create_time < '${DT} 23:59:59'" \
–target-dir /user/hive/warehouse/ods_trade.db/ods_trade_order_di/dt=${DT} \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–compress \
–compression-codec org.apache.hadoop.io.compress.SnappyCodec \
–num-mappers 4 \
–split-by id;
# 修复分区
hive -e “MSCK REPAIR TABLE ods_trade.ods_trade_order_di;”
# 数据验证
hive -e “SELECT COUNT(*) FROM ods_trade.ods_trade_order_di WHERE dt=’${DT}’;”
echo “=== 同步完成 ===”
Date: Fri Jan 19 02:00:00 CST 2024
Sync date: 2024-01-18
# 执行Sqoop导入
24/01/19 02:00:00 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
24/01/19 02:05:00 INFO mapreduce.Job: Job completed successfully
# 修复分区
Partitions recovered: 1
OK
# 数据验证
+———+
| _c0 |
+———+
| 1250000 |
+———+
=== 同步完成 ===
4.2 用户数据同步案例
用户数据同步案例演示增量同步。学习交流加群风哥微信: itpux-com
— 创建Sqoop Job
sqoop job –create fgedu_user_sync \
— import \
–connect “jdbc:mysql://fgedu01:3306/user_db?useSSL=false” \
–username fgedu \
–password-file /user/hive/.password \
–table users \
–incremental lastmodified \
–check-column update_time \
–last-value “2024-01-01 00:00:00” \
–target-dir /user/hive/warehouse/ods_user.db/ods_user_info_df \
–merge-key user_id \
–fields-terminated-by ‘\t’ \
–num-mappers 4;
— 执行Job
sqoop job –exec fgedu_user_sync;
— 查看Job状态
sqoop job –show fgedu_user_sync;
4.3 定时同步调度案例
定时同步调度案例演示自动化同步。风哥提示:定时调度是生产环境数据同步的标准做法。
# 编辑定时任务
crontab -e
# 每天凌晨2点执行订单同步
0 2 * * * /bigdata/scripts/order_sync.sh >> /bigdata/logs/order_sync.log 2>&1
# 每天凌晨3点执行用户同步
0 3 * * * /bigdata/scripts/user_sync.sh >> /bigdata/logs/user_sync.log 2>&1
# 每小时执行增量同步
0 * * * * /bigdata/scripts/incremental_sync.sh >> /bigdata/logs/incremental_sync.log 2>&1
Part05-风哥经验总结与分享
5.1 数据同步最佳实践
风哥在生产环境中的数据同步经验总结:from bigdata视频:www.itpux.com
1. 同步策略:
根据数据特点选择全量或增量同步
2. 性能优化:
合理设置并行度和压缩方式
3. 容错处理:
建立重试机制和数据验证
5.2 常见问题与解决方案
问题1:连接超时
解决方案:检查网络连接,增加超时时间。
–connect “jdbc:mysql://host/db?connectTimeout=60000&socketTimeout=60000”
问题2:数据类型不匹配
解决方案:使用–map-column-java参数指定类型映射。学习交流加群风哥QQ113257174
5.3 生产环境注意事项
1. 密码安全:使用–password-file存储密码,避免明文。
2. 资源控制:合理设置并行度,避免对源系统造成压力。
3. 监控告警:建立同步任务监控,及时发现和处理问题。
风哥提示:Sqoop是Hadoop与关系数据库数据同步的重要工具。在生产环境中,要根据数据特点选择合适的同步策略,建立完善的调度和监控机制,确保数据同步的稳定性和可靠性。
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
