1. 首页 > 软件安装教程 > 正文

Sqoop安装配置-Sqoop数据同步安装配置_升级迁移详细过程

1. Sqoop概述与环境规划

Sqoop是Apache顶级项目,是一款开源的工具,主要用于在Hadoop(HDFS/Hive)与关系型数据库(MySQL、Oracle、PostgreSQL等)之间进行数据传输。Sqoop可以将关系型数据库中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导出到关系型数据库中。更多学习教程www.fgedu.net.cn

1.1 Sqoop版本说明

Sqoop目前主要有两个版本:Sqoop 1(1.4.x系列)和Sqoop 2(1.99.x系列)。本教程以Sqoop 1.4.7为例进行详细讲解,Sqoop 1在生产环境中使用最为广泛,稳定性较好。

# 查看Sqoop版本
$ sqoop version
Sqoop 1.4.7
git commit id 2328972b08c61337d3c1d5197e4d1c7c0a5c6c7a
Compiled by root on Mon Apr 1 10:00:00 UTC 2024

# 查看Java版本
$ java -version
openjdk version “1.8.0_402”
OpenJDK Runtime Environment (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)

# 查看Hadoop版本
$ hadoop version
Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1234567890abcdef
Compiled by root on 2024-01-15T10:00Z

1.2 环境规划

本次安装环境规划如下:

Sqoop服务器:
sqoop01.fgedu.net.cn (192.168.1.51)

Sqoop版本:1.4.7
Hadoop版本:3.3.6
Hive版本:3.1.3
Java版本:OpenJDK 1.8.0
安装目录:/data/sqoop
配置目录:/data/sqoop/conf

数据源:
MySQL:192.168.1.51:3306/fgedudb
Oracle:192.168.1.52:1521/fgedudb
PostgreSQL:192.168.1.53:5432/fgedudb

数据目标:
HDFS:hdfs://192.168.1.51:8020
Hive:thrift://192.168.1.51:9083
HBase:192.168.1.51:16010

2. 硬件环境要求

Sqoop作为数据传输工具,对硬件资源要求相对较低,但需要考虑数据传输的吞吐量。学习交流加群风哥微信: itpux-com

2.1 物理主机环境要求

# 检查内存大小
# free -h
total used free shared buff/cache available
Mem: 32G 4.2G 24G 256M 3.8G 27G
Swap: 16G 0B 16G

# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 12G 39G 24% /
/dev/sdb1 500G 50G 451G 10% /data
/dev/sdc1 200G 20G 181G 10% /backup

# 检查CPU核心数
# nproc
16

# 检查系统架构
# uname -m
x86_64

生产环境建议:最小内存8GB(测试环境),生产环境建议16GB以上。磁盘空间根据数据传输量规划,建议至少200GB。CPU核心数建议8核以上,以支持并行数据传输。

2.2 vSphere虚拟主机环境要求

虚拟机配置:
– vCPU:8核
– 内存:16GB
– 磁盘:系统盘50GB + 数据盘500GB
– 网络:VMXNET3网卡,千兆网络
– 存储:建议使用SSD存储以提高I/O性能

资源池配置:
– CPU预留:4GHz
– 内存预留:8GB
– 内存限制:16GB
– CPU份额:正常
– 内存份额:正常

2.3 云平台主机环境要求

云主机规格(阿里云/腾讯云/华为云):
– 实例规格:ecs.g6.2xlarge或同等规格
– vCPU:8核
– 内存:32GB
– 系统盘:高效云盘 100GB
– 数据盘:SSD云盘 500GB
– 网络带宽:10Mbps以上

存储配置:
– OSS对象存储:用于存储导入导出的临时数据
– NAS文件存储:用于共享配置文件
– 云盘快照:定期备份配置和数据

3. 操作系统环境准备

在安装Sqoop之前,需要对操作系统进行必要的配置和优化。

3.1 操作系统版本检查

# 检查操作系统版本
# cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″

# 检查内核版本
# uname -r
5.4.17-2136.302.7.2.el8uek.x86_64

# 检查SELinux状态
# getenforce
Disabled

# 检查防火墙状态
# systemctl status firewalld
● firewalld.service – firewalld – dynamic firewall daemon
Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
Active: inactive (dead)

3.2 内核参数优化

# 编辑sysctl.conf文件
# vi /etc/sysctl.conf

# 添加以下内核参数
fs.file-max = 6815744
kernel.sem = 250 32000 100 128
kernel.shmmni = 4096
kernel.shmall = 4294967296
kernel.shmmax = 68719476736
net.ipv4.ip_local_port_range = 9000 65500
net.core.rmem_default = 262144
net.core.rmem_max = 4194304
net.core.wmem_default = 262144
net.core.wmem_max = 1048576
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30
net.ipv4.tcp_keepalive_time = 1200
net.ipv4.tcp_max_syn_backlog = 8192
net.core.somaxconn = 1024
vm.swappiness = 10
vm.dirty_ratio = 15
vm.dirty_background_ratio = 5

# 使内核参数生效
# sysctl -p

# 验证参数设置
# sysctl -a | grep fs.file-max
fs.file-max = 6815744

3.3 用户资源限制配置

# 配置用户资源限制
# vi /etc/security/limits.conf

# 添加以下内容
* soft nproc 65535
* hard nproc 65535
* soft nofile 65535
* hard nofile 65535
* soft stack 10240
* hard stack 32768
hadoop soft memlock unlimited
hadoop hard memlock unlimited

# 验证配置
# ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 63499
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 65535
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 10240
cpu time (seconds, -t) unlimited
max user processes (-u) 65535
virtual memory (kbytes, -v) unlimited

3.4 Java环境安装

# 安装OpenJDK 1.8
# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# 配置Java环境变量
# vi /etc/profile.d/java.sh

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

# 使环境变量生效
# source /etc/profile.d/java.sh

# 验证Java安装
# java -version
openjdk version “1.8.0_402”
OpenJDK Runtime Environment (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)

# 验证JAVA_HOME
# echo $JAVA_HOME
/usr/lib/jvm/java-1.8.0-openjdk

3.5 Hadoop环境安装

Sqoop依赖Hadoop环境,需要提前安装配置好Hadoop。学习交流加群风哥QQ113257174

# 下载Hadoop
# cd /tmp
# wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz

# 解压安装
# tar -xzf hadoop-3.3.6.tar.gz
# mv hadoop-3.3.6 /data/hadoop

# 配置Hadoop环境变量
# vi /etc/profile.d/hadoop.sh

export HADOOP_HOME=/data/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

# 使环境变量生效
# source /etc/profile.d/hadoop.sh

# 验证Hadoop安装
# hadoop version
Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r abc123
Compiled by root on 2024-01-15T10:00Z

# 验证HDFS可用性
# hdfs dfs -ls /
Found 3 items
drwxr-xr-x – root supergroup 0 2024-04-05 10:00 /tmp
drwxr-xr-x – root supergroup 0 2024-04-05 10:00 /user
drwxr-xr-x – root supergroup 0 2024-04-05 10:00 /data

4. Sqoop安装配置

完成环境准备后,开始安装Sqoop。

4.1 下载Sqoop安装包

# 创建安装目录
# mkdir -p /data/sqoop
# mkdir -p /data/sqoop/lib

# 下载Sqoop
# cd /tmp
# wget https://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

# 解压安装
# tar -xzf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
# mv sqoop-1.4.7.bin__hadoop-2.6.0/* /data/sqoop/

# 查看安装目录
# ls -la /data/sqoop/
total 24
drwxr-xr-x 2 root root 4096 Apr 5 10:00 bin
drwxr-xr-x 2 root root 4096 Apr 5 10:00 conf
drwxr-xr-x 5 root root 4096 Apr 5 10:00 docs
drwxr-xr-x 2 root root 4096 Apr 5 10:00 lib
-rw-r–r– 1 root root 8196 Apr 5 10:00 README.txt

4.2 配置Sqoop环境变量

# 配置Sqoop环境变量
# vi /etc/profile.d/sqoop.sh

export SQOOP_HOME=/data/sqoop
export PATH=$SQOOP_HOME/bin:$PATH

# 使环境变量生效
# source /etc/profile.d/sqoop.sh

# 验证环境变量
# echo $SQOOP_HOME
/data/sqoop

4.3 配置sqoop-env.sh

# 复制配置模板
# cd /data/sqoop/conf
# cp sqoop-env-template.sh sqoop-env.sh

# 编辑配置文件
# vi sqoop-env.sh

# 设置Hadoop相关路径
export HADOOP_COMMON_HOME=/data/hadoop
export HADOOP_MAPRED_HOME=/data/hadoop
export HBASE_HOME=/data/hbase
export HIVE_HOME=/data/hive
export ZOOCFGDIR=/data/zookeeper/conf
export ZOOKEEPER_HOME=/data/zookeeper

# 验证配置
# cat /data/sqoop/conf/sqoop-env.sh | grep -v “^#” | grep -v “^$”
export HADOOP_COMMON_HOME=/data/hadoop
export HADOOP_MAPRED_HOME=/data/hadoop
export HBASE_HOME=/data/hbase
export HIVE_HOME=/data/hive
export ZOOCFGDIR=/data/zookeeper/conf
export ZOOKEEPER_HOME=/data/zookeeper

4.4 安装MySQL JDBC驱动

# 下载MySQL JDBC驱动
# cd /tmp
# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

# 复制到Sqoop lib目录
# cp mysql-connector-java-8.0.28.jar /data/sqoop/lib/

# 验证驱动安装
# ls -la /data/sqoop/lib/
total 2528
-rw-r–r– 1 root root 2580752 Apr 5 10:00 mysql-connector-java-8.0.28.jar

4.5 安装Oracle JDBC驱动

# 下载Oracle JDBC驱动(需要Oracle账号)
# 或者从Oracle安装目录复制
# cp /data/oracle/product/19c/db_1/jdbc/lib/ojdbc8.jar /data/sqoop/lib/

# 验证驱动安装
# ls -la /data/sqoop/lib/ | grep ojdbc
-rw-r–r– 1 root root 4567890 Apr 5 10:00 ojdbc8.jar

4.6 验证Sqoop安装

# 验证Sqoop安装
# sqoop version
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Sqoop 1.4.7
git commit id 2328972b08c61337d3c1d5197e4d1c7c0a5c6c7a
Compiled by root on Mon Apr 1 10:00:00 UTC 2024

# 查看Sqoop帮助
# sqoop help
usage: sqoop COMMAND [ARGS]
Available commands:
codegen Generate code to interact with database records
create-hcatalog-table Define a HCatalog table structure based on a database table
eval Evaluate a SQL statement and display the results
export Export an HDFS directory to a database table
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
import-mainframe Import datasets from a mainframe server to HDFS
job Work with saved jobs
list-databases List available databases on a server
list-tables List available tables in a database
merge Merge results of incremental imports
metastore Run a standalone Sqoop metastore
version Display version information

风哥提示:上述警告信息是因为没有安装HCatalog、Accumulo和ZooKeeper,这些是可选组件,不影响Sqoop的基本功能使用。

5. Sqoop配置优化

为了提高Sqoop的性能和稳定性,需要进行一些配置优化。

5.1 配置sqoop-site.xml

# 创建sqoop-site.xml
# vi /data/sqoop/conf/sqoop-site.xml

<?xml version=”1.0″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
<configuration>
<property>
<name>sqoop.metastore.client.record.password</name>
<value>true</value>
<description>是否在metastore中保存密码</description>
</property>
<property>
<name>sqoop.metastore.client.autoconnect.url</name>
<value>jdbc:derby:;databaseName=/data/sqoop/metastore;create=true</value>
<description>metastore数据库连接URL</description>
</property>
<property>
<name>sqoop.metastore.client.autoconnect.username</name>
<value>sqoop</value>
<description>metastore数据库用户名</description>
</property>
<property>
<name>sqoop.metastore.client.autoconnect.password</name>
<value>sqoop123</value>
<description>metastore数据库密码</description>
</property>
<property>
<name>sqoop.metastore.client.enable.autoconnect</name>
<value>true</value>
<description>启用自动连接metastore</description>
</property>
</configuration>

5.2 配置日志输出

# 编辑log4j配置
# vi /data/sqoop/conf/log4j.properties

# 设置日志级别
log4j.rootLogger=INFO, file, console

# 文件输出配置
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/data/sqoop/logs/sqoop.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L – %m%n

# 控制台输出配置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L – %m%n

# 创建日志目录
# mkdir -p /data/sqoop/logs

5.3 配置Hadoop相关参数

# 将Sqoop的jar包添加到Hadoop类路径
# cp /data/sqoop/sqoop-1.4.7.jar /data/hadoop/share/hadoop/common/

# 配置Hadoop的mapred-site.xml以支持Sqoop
# vi /data/hadoop/etc/hadoop/mapred-site.xml

<property>
<name>mapreduce.job.queuename</name>
<value>default</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value>
</property>

6. Sqoop与MySQL集成

Sqoop与MySQL数据库的集成是最常用的场景之一,下面详细介绍配置和使用方法。更多学习教程公众号风哥教程itpux_com

6.1 测试MySQL连接

# 测试MySQL连接
# sqoop list-databases –connect jdbc:mysql://192.168.1.51:3306/ –username root –password fgedu123
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:00:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
fgedudb
mysql
performance_schema
sys

# 列出数据库中的表
# sqoop list-tables –connect jdbc:mysql://192.168.1.51:3306/fgedudb –username root –password fgedu123
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:00:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
fgedu_users
fgedu_orders
fgedu_products
fgedu_transactions

6.2 从MySQL导入数据到HDFS

# 导入单表数据到HDFS
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_users \
–target-dir /data/mysql/fgedu_users \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–lines-terminated-by ‘\n’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–m 4

# 输出案例如下:
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:00:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:00:01 INFO tool.CodeGenTool: Beginning code generation
24/04/05 10:00:02 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `fgedu_users` AS t LIMIT 1
24/04/05 10:00:03 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /data/hadoop
24/04/05 10:00:10 INFO mapreduce.ImportJobBase: Beginning import of fgedu_users
24/04/05 10:00:15 INFO impl.YarnClientImpl: Submitted application application_1712300400000_0001
24/04/05 10:01:30 INFO mapreduce.ImportJobBase: Transferred 125.5 MB in 80.25 seconds (1.56 MB/sec)
24/04/05 10:01:30 INFO mapreduce.ImportJobBase: Retrieved 1000000 records.

# 验证导入的数据
# hdfs dfs -ls /data/mysql/fgedu_users
Found 5 items
-rw-r–r– 1 root supergroup 0 2024-04-05 10:01 /data/mysql/fgedu_users/_SUCCESS
-rw-r–r– 1 root supergroup 32565432 2024-04-05 10:01 /data/mysql/fgedu_users/part-m-00000
-rw-r–r– 1 root supergroup 32568912 2024-04-05 10:01 /data/mysql/fgedu_users/part-m-00001
-rw-r–r– 1 root supergroup 32562345 2024-04-05 10:01 /data/mysql/fgedu_users/part-m-00002
-rw-r–r– 1 root supergroup 32565123 2024-04-05 10:01 /data/mysql/fgedu_users/part-m-00003

# 查看数据内容
# hdfs dfs -cat /data/mysql/fgedu_users/part-m-00000 | head -5
1 zhangsan zhangsan@example.com 13800138001 2024-01-01 10:00:00
2 lisi lisi@example.com 13800138002 2024-01-02 11:00:00
3 wangwu wangwu@example.com 13800138003 2024-01-03 12:00:00
4 zhaoliu zhaoliu@example.com 13800138004 2024-01-04 13:00:00
5 sunqi sunqi@example.com 13800138005 2024-01-05 14:00:00

6.3 从MySQL导入数据到Hive

# 导入数据到Hive表
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_orders \
–hive-import \
–hive-database fgedudb \
–hive-table fgedu_orders \
–create-hive-table \
–fields-terminated-by ‘\t’ \
–lines-terminated-by ‘\n’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–m 4

# 输出案例如下:
Warning: /data/sqoop/../hcatalog does exist!
24/04/05 10:02:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:02:01 INFO hive.HiveImport: Loading uploaded data into Hive
24/04/05 10:02:30 INFO hive.HiveImport: Table fgedudb.fgedu_orders stats: [numFiles=4, totalSize=524288000]
24/04/05 10:02:30 INFO hive.HiveImport: Hive import complete.

# 验证Hive表数据
# hive -e “SELECT COUNT(*) FROM fgedudb.fgedu_orders;”
OK
5000000
Time taken: 15.234 seconds, Fetched: 1 row(s)

# 查看表结构
# hive -e “DESCRIBE fgedudb.fgedu_orders;”
OK
order_id int None
user_id int None
product_id int None
order_amount decimal(10,2) None
order_status string None
create_time timestamp None
Time taken: 0.523 seconds, Fetched: 6 row(s)

7. Sqoop与Oracle集成

Sqoop与Oracle数据库的集成需要使用Oracle JDBC驱动,下面详细介绍配置和使用方法。from:www.itpux.com

7.1 测试Oracle连接

# 测试Oracle连接
# sqoop list-databases –connect jdbc:oracle:thin:@192.168.1.52:1521:fgedudb –username system –password oracle123
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:03:00 INFO manager.OracleManager: Preparing to use a Oracle resultset.
SYS
SYSTEM
FGEDUDB
OUTLN

# 列出数据库中的表
# sqoop list-tables –connect jdbc:oracle:thin:@192.168.1.52:1521:fgedudb –username fgedudb –password fgedu123
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:03:10 INFO manager.OracleManager: Preparing to use a Oracle resultset.
FGEDU_EMPLOYEES
FGEDU_DEPARTMENTS
FGEDU_SALARIES
FGEDU_ATTENDANCE

7.2 从Oracle导入数据到HDFS

# 导入Oracle表数据到HDFS
# sqoop import \
–connect jdbc:oracle:thin:@192.168.1.52:1521:fgedudb \
–username fgedudb \
–password fgedu123 \
–table FGEDU_EMPLOYEES \
–target-dir /data/oracle/fgedu_employees \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–lines-terminated-by ‘\n’ \
–null-string ‘\\N’ \
–null-non-string ‘\\N’ \
–m 4

# 输出案例如下:
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:04:00 INFO manager.OracleManager: Preparing to use a Oracle resultset.
24/04/05 10:04:01 INFO tool.CodeGenTool: Beginning code generation
24/04/05 10:04:02 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM FGEDU_EMPLOYEES t WHERE 1=0
24/04/05 10:04:10 INFO mapreduce.ImportJobBase: Beginning import of FGEDU_EMPLOYEES
24/04/05 10:05:30 INFO mapreduce.ImportJobBase: Transferred 85.6 MB in 80.25 seconds (1.07 MB/sec)
24/04/05 10:05:30 INFO mapreduce.ImportJobBase: Retrieved 500000 records.

# 验证导入的数据
# hdfs dfs -ls /data/oracle/fgedu_employees
Found 5 items
-rw-r–r– 1 root supergroup 0 2024-04-05 10:05 /data/oracle/fgedu_employees/_SUCCESS
-rw-r–r– 1 root supergroup 22567890 2024-04-05 10:05 /data/oracle/fgedu_employees/part-m-00000
-rw-r–r– 1 root supergroup 22568912 2024-04-05 10:05 /data/oracle/fgedu_employees/part-m-00001
-rw-r–r– 1 root supergroup 22562345 2024-04-05 10:05 /data/oracle/fgedu_employees/part-m-00002
-rw-r–r– 1 root supergroup 22565123 2024-04-05 10:05 /data/oracle/fgedu_employees/part-m-00003

7.3 使用查询方式导入数据

# 使用SQL查询导入数据
# sqoop import \
–connect jdbc:oracle:thin:@192.168.1.52:1521:fgedudb \
–username fgedudb \
–password fgedu123 \
–query “SELECT e.employee_id, e.employee_name, d.department_name, s.salary
FROM FGEDU_EMPLOYEES e
JOIN FGEDU_DEPARTMENTS d ON e.department_id = d.department_id
JOIN FGEDU_SALARIES s ON e.employee_id = s.employee_id
WHERE \$CONDITIONS” \
–target-dir /data/oracle/employee_salary \
–delete-target-dir \
–split-by e.employee_id \
–fields-terminated-by ‘\t’ \
–m 4

# 输出案例如下:
24/04/05 10:06:00 INFO manager.OracleManager: Preparing to use a Oracle resultset.
24/04/05 10:06:01 INFO tool.CodeGenTool: Beginning code generation
24/04/05 10:06:10 INFO mapreduce.ImportJobBase: Beginning query import.
24/04/05 10:07:30 INFO mapreduce.ImportJobBase: Transferred 65.2 MB in 80.25 seconds (0.81 MB/sec)
24/04/05 10:07:30 INFO mapreduce.ImportJobBase: Retrieved 450000 records.

# 验证数据
# hdfs dfs -cat /data/oracle/employee_salary/part-m-00000 | head -5
1001 张三 技术部 15000.00
1002 李四 销售部 12000.00
1003 王五 财务部 13000.00
1004 赵六 人事部 11000.00
1005 孙七 技术部 16000.00

8. Sqoop数据导入导出实战

本节介绍Sqoop在实际生产环境中的数据导入导出操作。

8.1 增量导入数据

# 增量导入 – 基于自增ID
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_transactions \
–target-dir /data/mysql/fgedu_transactions \
–incremental append \
–check-column transaction_id \
–last-value 1000000 \
–fields-terminated-by ‘\t’ \
–m 4

# 输出案例如下:
24/04/05 10:08:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:08:01 INFO tool.ImportTool: Incremental import based on column transaction_id
24/04/05 10:08:02 INFO tool.ImportTool: Lower bound value: 1000000
24/04/05 10:08:02 INFO tool.ImportTool: Upper bound value: 1500000
24/04/05 10:09:30 INFO mapreduce.ImportJobBase: Transferred 256.8 MB in 88.25 seconds (2.91 MB/sec)
24/04/05 10:09:30 INFO mapreduce.ImportJobBase: Retrieved 500000 records.
24/04/05 10:09:30 INFO tool.ImportTool: Saving incremental import state to the metastore

# 增量导入 – 基于时间戳
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_transactions \
–target-dir /data/mysql/fgedu_transactions \
–incremental lastmodified \
–check-column update_time \
–last-value “2024-04-04 00:00:00” \
–merge-key transaction_id \
–fields-terminated-by ‘\t’ \
–m 4

# 输出案例如下:
24/04/05 10:10:00 INFO tool.ImportTool: Incremental import based on column update_time
24/04/05 10:10:01 INFO tool.ImportTool: Lower bound value: 2024-04-04 00:00:00
24/04/05 10:10:01 INFO tool.ImportTool: Upper bound value: 2024-04-05 10:10:01
24/04/05 10:11:30 INFO mapreduce.ImportJobBase: Transferred 128.4 MB in 88.25 seconds (1.45 MB/sec)
24/04/05 10:11:30 INFO mapreduce.ImportJobBase: Retrieved 250000 records.

8.2 数据导出到MySQL

# 从HDFS导出数据到MySQL
# 首先创建目标表
# mysql -h 192.168.1.51 -u root -pfgedu123 -e ”
USE fgedudb;
CREATE TABLE fgedu_users_export (
user_id INT PRIMARY KEY,
user_name VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(20),
create_time DATETIME
);”

# 执行导出
# sqoop export \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_users_export \
–export-dir /data/mysql/fgedu_users \
–fields-terminated-by ‘\t’ \
–lines-terminated-by ‘\n’ \
–input-null-string ‘\\N’ \
–input-null-non-string ‘\\N’ \
–m 4

# 输出案例如下:
24/04/05 10:12:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:12:01 INFO mapreduce.ExportJobBase: Beginning export of fgedu_users_export
24/04/05 10:12:10 INFO impl.YarnClientImpl: Submitted application application_1712300400000_0002
24/04/05 10:13:30 INFO mapreduce.ExportJobBase: Exported 1000000 records.

# 验证导出数据
# mysql -h 192.168.1.51 -u root -pfgedu123 -e “SELECT COUNT(*) FROM fgedudb.fgedu_users_export;”
+———-+
| COUNT(*) |
+———-+
| 1000000 |
+———-+

8.3 创建Sqoop Job

# 创建保存的Job
# sqoop job –create import_users \
— import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_users \
–target-dir /data/mysql/fgedu_users \
–incremental append \
–check-column user_id \
–last-value 0 \
–fields-terminated-by ‘\t’ \
–m 4

# 输出案例如下:
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:14:00 INFO tool.JobTool: Saved job ‘import_users’

# 查看已创建的Job
# sqoop job –list
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Available jobs:
import_users

# 查看Job详情
# sqoop job –show import_users
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
Job: import_users
Tool: import
Options:
——————————-
connect = jdbc:mysql://192.168.1.51:3306/fgedudb
username = root
password = fgedu123
table = fgedu_users
target-dir = /data/mysql/fgedu_users
incremental = append
check-column = user_id
last-value = 0
fields-terminated-by = \t
num-mappers = 4

# 执行Job
# sqoop job –exec import_users
Warning: /data/sqoop/../hcatalog does not exist! HCatalog jobs will fail.
Warning: /data/sqoop/../accumulo does not exist! Accumulo imports will fail.
Warning: /data/sqoop/../zookeeper does not exist! Accumulo imports will fail.
24/04/05 10:15:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:15:01 INFO tool.ImportTool: Incremental import based on column user_id
24/04/05 10:15:02 INFO tool.ImportTool: Lower bound value: 0
24/04/05 10:15:02 INFO tool.ImportTool: Upper bound value: 1000000
24/04/05 10:16:30 INFO mapreduce.ImportJobBase: Transferred 125.5 MB in 88.25 seconds (1.42 MB/sec)
24/04/05 10:16:30 INFO mapreduce.ImportJobBase: Retrieved 1000000 records.

9. Sqoop性能优化

在生产环境中,需要对Sqoop进行性能优化以提高数据传输效率。

9.1 并行度优化

# 根据数据量和集群资源调整map任务数量
# 小数据量(小于100万条):m=1-4
# 中等数据量(100万-1000万条):m=4-8
# 大数据量(大于1000万条):m=8-16

# 示例:使用8个map任务并行导入
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_large_table \
–target-dir /data/mysql/fgedu_large_table \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–split-by id \
–m 8

# 输出案例如下:
24/04/05 10:17:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:17:01 INFO mapreduce.ImportJobBase: Beginning import of fgedu_large_table
24/04/05 10:20:30 INFO mapreduce.ImportJobBase: Transferred 1.5 GB in 208.25 seconds (7.38 MB/sec)
24/04/05 10:20:30 INFO mapreduce.ImportJobBase: Retrieved 10000000 records.

9.2 批量导入优化

# 使用批量导入提高效率
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_products \
–target-dir /data/mysql/fgedu_products \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–direct \
–m 4

# 输出案例如下:
24/04/05 10:21:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:21:01 INFO manager.DirectMySQLManager: Beginning MySQL fast import.
24/04/05 10:22:30 INFO mapreduce.ImportJobBase: Transferred 512.0 MB in 88.25 seconds (5.80 MB/sec)
24/04/05 10:22:30 INFO mapreduce.ImportJobBase: Retrieved 2000000 records.

# 注意:–direct参数仅适用于MySQL和PostgreSQL,使用数据库自带的批量导出工具

9.3 压缩优化

# 使用压缩减少存储空间和网络传输
# sqoop import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_transactions \
–target-dir /data/mysql/fgedu_transactions \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–compress \
–compression-codec org.apache.hadoop.io.compress.GzipCodec \
–m 4

# 输出案例如下:
24/04/05 10:23:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:23:01 INFO mapreduce.ImportJobBase: Beginning import of fgedu_transactions
24/04/05 10:25:30 INFO mapreduce.ImportJobBase: Transferred 256.0 MB in 148.25 seconds (1.73 MB/sec)
24/04/05 10:25:30 INFO mapreduce.ImportJobBase: Retrieved 5000000 records.

# 验证压缩文件
# hdfs dfs -ls /data/mysql/fgedu_transactions
Found 5 items
-rw-r–r– 1 root supergroup 0 2024-04-05 10:25 /data/mysql/fgedu_transactions/_SUCCESS
-rw-r–r– 1 root supergroup 64000000 2024-04-05 10:25 /data/mysql/fgedu_transactions/part-m-00000.gz
-rw-r–r– 1 root supergroup 64123456 2024-04-05 10:25 /data/mysql/fgedu_transactions/part-m-00001.gz
-rw-r–r– 1 root supergroup 63987654 2024-04-05 10:25 /data/mysql/fgedu_transactions/part-m-00002.gz
-rw-r–r– 1 root supergroup 64098765 2024-04-05 10:25 /data/mysql/fgedu_transactions/part-m-00003.gz

9.4 数据库连接池优化

# 配置数据库连接参数
# sqoop import \
–connect “jdbc:mysql://192.168.1.51:3306/fgedudb?useSSL=false&useUnicode=true&characterEncoding=utf8&autoReconnect=true&maxReconnects=10” \
–username root \
–password fgedu123 \
–table fgedu_orders \
–target-dir /data/mysql/fgedu_orders \
–delete-target-dir \
–fields-terminated-by ‘\t’ \
–m 4

# 输出案例如下:
24/04/05 10:26:00 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
24/04/05 10:26:01 INFO mapreduce.ImportJobBase: Beginning import of fgedu_orders
24/04/05 10:28:30 INFO mapreduce.ImportJobBase: Transferred 1.2 GB in 148.25 seconds (8.30 MB/sec)
24/04/05 10:28:30 INFO mapreduce.ImportJobBase: Retrieved 8000000 records.

生产环境建议:根据数据源类型选择合适的并行度,MySQL建议使用–direct模式提高导入效率,大数据量导入建议启用压缩,数据库连接参数需要根据实际情况调整以避免连接超时。

10. Sqoop升级迁移

本节介绍Sqoop的版本升级和数据迁移方法。

10.1 Sqoop版本升级

# 备份当前Sqoop配置
# cp -r /data/sqoop/conf /backup/sqoop_conf_$(date +%Y%m%d)

# 备份metastore数据
# cp -r /data/sqoop/metastore /backup/sqoop_metastore_$(date +%Y%m%d)

# 下载新版本Sqoop
# cd /tmp
# wget https://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

# 解压新版本
# tar -xzf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

# 停止当前Sqoop服务
# 停止所有正在运行的Sqoop任务

# 替换安装目录
# mv /data/sqoop /data/sqoop_old
# mv sqoop-1.4.7.bin__hadoop-2.6.0 /data/sqoop

# 恢复配置文件
# cp -r /backup/sqoop_conf_$(date +%Y%m%d)/* /data/sqoop/conf/

# 恢复metastore数据
# cp -r /backup/sqoop_metastore_$(date +%Y%m%d) /data/sqoop/metastore

# 复制JDBC驱动
# cp /data/sqoop_old/lib/*.jar /data/sqoop/lib/

# 验证升级
# sqoop version
Sqoop 1.4.7
git commit id 2328972b08c61337d3c1d5197e4d1c7c0a5c6c7a
Compiled by root on Mon Apr 1 10:00:00 UTC 2024

# 测试连接
# sqoop list-databases –connect jdbc:mysql://192.168.1.51:3306/ –username root –password fgedu123
information_schema
fgedudb
mysql
performance_schema
sys

10.2 Sqoop配置迁移

# 导出Sqoop Job配置
# sqoop job –list > /backup/sqoop_jobs_list.txt

# 逐个导出Job详情
# for job in $(sqoop job –list | grep -v “Warning” | grep -v “Available”); do
echo “=== $job ===” >> /backup/sqoop_jobs_detail.txt
sqoop job –show $job >> /backup/sqoop_jobs_detail.txt
done

# 输出案例如下:
=== import_users ===
Job: import_users
Tool: import
Options:
——————————-
connect = jdbc:mysql://192.168.1.51:3306/fgedudb
username = root
password = fgedu123
table = fgedu_users
target-dir = /data/mysql/fgedu_users
incremental = append
check-column = user_id
last-value = 1000000
fields-terminated-by = \t
num-mappers = 4

# 在新服务器上创建Job
# sqoop job –create import_users \
— import \
–connect jdbc:mysql://192.168.1.51:3306/fgedudb \
–username root \
–password fgedu123 \
–table fgedu_users \
–target-dir /data/mysql/fgedu_users \
–incremental append \
–check-column user_id \
–last-value 1000000 \
–fields-terminated-by ‘\t’ \
–m 4

10.3 Sqoop备份恢复

# 备份Sqoop安装目录
# tar -czf /backup/sqoop_backup_$(date +%Y%m%d).tar.gz -C /data sqoop

# 备份配置文件
# tar -czf /backup/sqoop_conf_$(date +%Y%m%d).tar.gz -C /data/sqoop conf

# 备份metastore数据库
# tar -czf /backup/sqoop_metastore_$(date +%Y%m%d).tar.gz -C /data/sqoop metastore

# 恢复Sqoop
# 停止Sqoop服务
# rm -rf /data/sqoop
# tar -xzf /backup/sqoop_backup_$(date +%Y%m%d).tar.gz -C /data

# 验证恢复
# sqoop version
Sqoop 1.4.7

# 验证Job列表
# sqoop job –list
Available jobs:
import_users
import_orders
export_products

10.4 Sqoop监控脚本

# 创建Sqoop监控脚本
# vi /data/sqoop/scripts/sqoop_monitor.sh

#!/bin/bash
SQOOP_HOME=/data/sqoop
LOG_FILE=/data/sqoop/logs/sqoop_monitor.log
ALERT_EMAIL=admin@fgedu.net.cn

check_sqoop_jobs() {
echo “$(date): Checking Sqoop jobs…” >> $LOG_FILE
jobs=$($SQOOP_HOME/bin/sqoop job –list 2>/dev/null | grep -v “Warning” | grep -v “Available”)
for job in $jobs; do
echo “$(date): Job $job exists” >> $LOG_FILE
done
}

check_database_connection() {
echo “$(date): Checking database connections…” >> $LOG_FILE
$SQOOP_HOME/bin/sqoop list-databases \
–connect jdbc:mysql://192.168.1.51:3306/ \
–username root \
–password fgedu123 > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo “$(date): MySQL connection OK” >> $LOG_FILE
else
echo “$(date): MySQL connection FAILED” >> $LOG_FILE
echo “Sqoop MySQL connection failed” | mail -s “Sqoop Alert” $ALERT_EMAIL
fi
}

check_hdfs_space() {
echo “$(date): Checking HDFS space…” >> $LOG_FILE
hdfs dfs -df -h / >> $LOG_FILE
}

main() {
check_sqoop_jobs
check_database_connection
check_hdfs_space
}

main

# 添加执行权限
# chmod +x /data/sqoop/scripts/sqoop_monitor.sh

# 添加定时任务
# crontab -e
0 */1 * * * /data/sqoop/scripts/sqoop_monitor.sh

生产环境建议:定期备份Sqoop配置和metastore数据,建议每天执行一次完整备份。升级前务必进行完整备份,并在测试环境验证升级过程。监控脚本建议每小时执行一次,及时发现并处理问题。

通过以上步骤,Sqoop安装配置、性能优化、升级迁移等内容已全部完成。Sqoop作为Hadoop生态系统中的重要数据传输工具,能够高效地实现关系型数据库与Hadoop之间的数据交换,是大数据平台数据集成的核心组件之一。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息