1. Canal概述与环境规划
Canal是阿里巴巴开源的MySQL binlog增量订阅和消费工具,用于实现MySQL数据的实时同步。Canal通过模拟MySQL Slave的交互协议,伪装成MySQL Slave向MySQL Master发送dump协议,MySQL Master接收到dump请求后推送binlog给Canal,Canal解析binlog后发送给下游存储或应用。更多学习教程www.fgedu.net.cn
1.1 Canal版本说明
Canal目前主要版本为1.1.x系列,本教程以Canal 1.1.7为例进行详细讲解。Canal 1.1.7版本相比之前版本在性能、稳定性和功能方面都有显著提升,支持MySQL 8.0、GTID模式、HA高可用等特性。
$ ls /data/canal/ | grep canal
canal.deployer-1.1.7
# 查看Java版本
$ java -version
openjdk version “1.8.0_402″
OpenJDK Runtime Environment (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)
# 查看系统信息
$ cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″
1.2 环境规划
本次安装环境规划如下:
主机名:canal01.fgedu.net.cn
IP地址:192.168.1.51
端口:11111(Canal Server)
Canal版本:1.1.7
Java版本:OpenJDK 1.8
安装目录:/data/canal
日志目录:/data/canal/logs
MySQL源端:
主机名:mysql01.fgedu.net.cn
IP地址:192.168.1.51
端口:3306
数据库:fgedudb
数据目标:
Kafka:192.168.1.51:9092
MySQL:192.168.1.52:3306
Elasticsearch:192.168.1.51:9200
2. 硬件环境要求
Canal对硬件资源要求适中,以下是生产环境的硬件配置建议。学习交流加群风哥微信: itpux-com
2.1 最低硬件要求
# free -h
total used free shared buff/cache available
Mem: 16G 2.5G 12G 0.5G 1.2G 13G
Swap: 8G 0B 8G
# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 12G 39G 24% /
/dev/sdb1 200G 20G 180G 10% /data
# 检查CPU核心数
# nproc
8
# 检查系统架构
# uname -m
x86_64
2.2 网络配置要求
# ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
inet 192.168.1.51/24 brd 192.168.1.255 scope global eth0
# 检查主机名解析
# hostname -f
canal01.fgedu.net.cn
# 配置/etc/hosts
# cat /etc/hosts
127.0.0.1 localhost
192.168.1.51 canal01.fgedu.net.cn canal01
192.168.1.52 mysql02.fgedu.net.cn mysql02
3. 操作系统配置
Canal运行在Linux操作系统上,需要对系统进行一系列配置。学习交流加群风哥QQ113257174
3.1 关闭防火墙和SELinux
# getenforce
Disabled
# 关闭SELinux(如未关闭)
# vi /etc/selinux/config
SELINUX=disabled
# 检查防火墙状态
# systemctl status firewalld
# 关闭防火墙(生产环境建议开放特定端口)
# systemctl stop firewalld
# systemctl disable firewalld
# 或者开放Canal所需端口
# firewall-cmd –permanent –add-port=11111/tcp
# firewall-cmd –reload
3.2 系统参数优化
# vi /etc/sysctl.conf
# 添加以下参数
fs.file-max = 6815744
vm.swappiness = 10
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 65536
# 使配置生效
# sysctl -p
# 配置文件描述符限制
# vi /etc/security/limits.conf
# 添加以下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
# 验证配置
# ulimit -n
65536
4. Java环境配置
Canal基于Java开发,需要配置合适的Java环境。更多学习教程公众号风哥教程itpux_com
4.1 安装OpenJDK 1.8
# yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# 验证Java版本
$ java -version
openjdk version “1.8.0_402”
OpenJDK Runtime Environment (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)
# 配置JAVA_HOME环境变量
# vi /etc/profile.d/java.sh
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$JAVA_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/java.sh
# 验证JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-1.8.0-openjdk
4.2 安装ZooKeeper
# cd /tmp
# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz
# 解压安装
# tar -xzf apache-zookeeper-3.8.3-bin.tar.gz
# mv apache-zookeeper-3.8.3-bin /data/zookeeper
# 配置ZooKeeper
# vi /data/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
clientPort=2181
# 创建数据目录
# mkdir -p /data/zookeeper/data
# 配置环境变量
# vi /etc/profile.d/zookeeper.sh
export ZOOKEEPER_HOME=/data/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/zookeeper.sh
# 启动ZooKeeper
# zkServer.sh start
# 输出示例:
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper … STARTED
# 检查ZooKeeper状态
# zkServer.sh status
# 输出示例:
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
5. MySQL源端配置
Canal需要MySQL开启binlog并配置相应权限。from:www.itpux.com
5.1 配置MySQL binlog
# vi /etc/my.cnf
[mysqld]
# 服务器ID(必须唯一)
server-id = 1
# 开启binlog
log-bin = mysql-bin
# binlog格式(必须为ROW)
binlog_format = ROW
# binlog行镜像(建议FULL)
binlog_row_image = FULL
# 开启GTID(可选,建议开启)
gtid_mode = ON
enforce_gtid_consistency = ON
# binlog过期时间(天)
expire_logs_days = 7
# binlog缓存大小
binlog_cache_size = 1M
# 最大binlog缓存大小
max_binlog_cache_size = 512M
# binlog文件大小
max_binlog_size = 500M
# 重启MySQL
# systemctl restart mysqld
# 验证binlog配置
$ mysql -u root -p -e “SHOW VARIABLES LIKE ‘binlog_format’;”
# 输出示例:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| binlog_format | ROW |
+—————+——-+
$ mysql -u root -p -e “SHOW VARIABLES LIKE ‘log_bin’;”
# 输出示例:
+———————————+——-+
| Variable_name | Value |
+———————————+——-+
| log_bin | ON |
+———————————+——-+
$ mysql -u root -p -e “SHOW VARIABLES LIKE ‘server_id’;”
# 输出示例:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| server_id | 1 |
+—————+——-+
5.2 创建Canal用户
$ mysql -u root -p
# 创建Canal用户
mysql> CREATE USER ‘canal’@’%’ IDENTIFIED BY ‘fgedu_canal_2024’;
# 授予必要权限
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal’@’%’;
# 刷新权限
mysql> FLUSH PRIVILEGES;
# 验证用户权限
mysql> SHOW GRANTS FOR ‘canal’@’%’;
# 输出示例:
+—————————————————+
| Grants for canal@% |
+—————————————————+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘canal’@’%’ |
+—————————————————+
# 创建测试数据库和表
mysql> CREATE DATABASE fgedudb;
mysql> USE fgedudb;
mysql> CREATE TABLE fgedu_employees (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
department VARCHAR(50),
salary DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
mysql> INSERT INTO fgedu_employees (name, department, salary) VALUES
(‘张三’, ‘技术部’, 15000.00),
(‘李四’, ‘销售部’, 12000.00),
(‘王五’, ‘财务部’, 13000.00);
# 验证数据
mysql> SELECT * FROM fgedu_employees;
# 输出示例:
+—-+——–+————+———-+———————+
| id | name | department | salary | created_at |
+—-+——–+————+———-+———————+
| 1 | 张三 | 技术部 | 15000.00 | 2024-04-05 10:00:00 |
| 2 | 李四 | 销售部 | 12000.00 | 2024-04-05 10:00:00 |
| 3 | 王五 | 财务部 | 13000.00 | 2024-04-05 10:00:00 |
+—-+——–+————+———-+———————+
6. Canal安装部署
完成环境配置后,开始安装Canal服务。
6.1 下载Canal
# cd /tmp
# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
# 输出示例:
–2024-04-05 10:00:00– https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
Resolving github.com… 140.82.112.4
Connecting to github.com|140.82.112.4|:443… connected.
HTTP request sent, awaiting response… 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/58958953/abc123?X-Amz-Algorithm=AWS4-HMAC-SHA256 [following]
–2024-04-05 10:00:05– https://objects.githubusercontent.com/github-production-release-asset-2e65be/58958953/abc123
Length: 56789012 (54M) [application/octet-stream]
Saving to: ‘canal.deployer-1.1.7.tar.gz’
canal.deployer-1.1.7.tar.gz 100%[=================================================>] 54.16M 15.6MB/s in 3.5s
2024-04-05 10:00:09 (15.6 MB/s) – ‘canal.deployer-1.1.7.tar.gz’ saved [56789012/56789012]
# 创建安装目录
# mkdir -p /data/canal
# 解压安装
# tar -xzf canal.deployer-1.1.7.tar.gz -C /data/canal/
# 验证安装
$ ls -l /data/canal/
# 输出示例:
total 8
drwxr-xr-x 2 root root 4096 Apr 5 10:00 bin
drwxr-xr-x 2 root root 4096 Apr 5 10:00 conf
drwxr-xr-x 2 root root 4096 Apr 5 10:00 lib
drwxr-xr-x 2 root root 4096 Apr 5 10:00 logs
# 查看版本信息
$ cat /data/canal/conf/canal.properties | grep canal.id
canal.id = 1
6.2 配置Canal Server
# vi /data/canal/conf/canal.properties
# Canal Server配置
canal.id = 1
canal.ip = 192.168.1.51
canal.port = 11111
canal.zkServers = 127.0.0.1:2181
# 数据存储配置
canal.file.data.dir = /data/canal/data
canal.file.flush.period = 1000
# 内存存储配置
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
# 心跳配置
canal.instance.detecting.enable = true
canal.instance.detecting.sql = SELECT 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
# 并发配置
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# Server模式配置
canal.serverMode = tcp
# 如果输出到Kafka,配置如下
# canal.serverMode = kafka
# canal.mq.servers = 192.168.1.51:9092
# canal.mq.retries = 3
# canal.mq.batchSize = 16384
# canal.mq.maxRequestSize = 1048576
# canal.mq.lingerMs = 1
# canal.mq.bufferMemory = 33554432
# canal.mq.acks = all
# 创建数据目录
# mkdir -p /data/canal/data
6.3 配置Canal Instance
# mkdir -p /data/canal/conf/fgedudb
# 复制默认配置
# cp /data/canal/conf/example/instance.properties /data/canal/conf/fgedudb/
# 配置Instance
# vi /data/canal/conf/fgedudb/instance.properties
# MySQL源配置
canal.instance.master.address = 192.168.1.51:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
# 如果使用GTID
canal.instance.gtidon = true
canal.instance.master.gtid =
# 用户名密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = fgedu_canal_2024
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = fgedudb
# 过滤配置
canal.instance.filter.regex = fgedudb\\..*
canal.instance.filter.black.regex = .*\\..*_bak,.*\\.test_.*
# 心跳配置
canal.instance.detecting.enable = true
canal.instance.detecting.sql = SELECT 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# 存储配置
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.dir = /data/canal/conf/fgedudb/h2
canal.instance.tsdb.url = jdbc:h2:/data/canal/conf/fgedudb/h2;CACHE_SIZE=1000;MODE=MYSQL;
# MQ配置(如果输出到Kafka)
canal.mq.topic = canal-fgedudb
canal.mq.partition = 0
canal.mq.partitionsNum = 3
canal.mq.partitionHash = .*\\..*:$pk$
# 验证配置
$ cat /data/canal/conf/fgedudb/instance.properties | grep -v “^#” | grep -v “^$”
7. Canal参数配置
Canal的参数配置对数据同步性能和稳定性至关重要。
7.1 JVM参数配置
# vi /data/canal/bin/startup.sh
# 修改JAVA_OPTS参数
JAVA_OPTS=”-server -Xms2g -Xmx4g -Xmn1g -XX:SurvivorRatio=2 -XX:PermSize=128m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseGCOverheadLimit -XX:CMSFullGCsBeforeCompaction=0 -XX:CMSInitiatingPermOccupancyFraction=80 -XX:ParallelGCThreads=8 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/canal/logs/heap_dump.hprof”
# 或者使用G1垃圾收集器
JAVA_OPTS=”-server -Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/canal/logs/heap_dump.hprof”
# 配置内存参数
# vi /data/canal/conf/canal.properties
# 内存存储配置
canal.instance.memory.buffer.size = 32768
canal.instance.memory.buffer.memunit = 2048
canal.instance.memory.batch.mode = MEMSIZE
7.2 性能参数配置
# vi /data/canal/conf/canal.properties
# 并发配置
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreshold = 256
canal.instance.parser.parallelBufferSize = 256
# 批处理配置
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.buffer.size = 32768
# 网络配置
canal.socket.send.buffer = 16384
canal.socket.receive.buffer = 16384
# Instance配置
# vi /data/canal/conf/fgedudb/instance.properties
# 并行解析配置
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 8
canal.instance.parser.parallelBufferSize = 512
# 过滤配置(精确过滤提高性能)
canal.instance.filter.regex = fgedudb\\.fgedu_employees,fgedudb\\.fgedu_departments,fgedudb\\.fgedu_sales
8. Canal服务启动
配置完成后,启动Canal服务。
8.1 启动Canal Server
# cd /data/canal
# sh bin/startup.sh
# 输出示例:
cd to /data/canal/bin for workaround relative path
LOG CONFIGURATION : /data/canal/bin/../conf/logback.xml
canal conf : /data/canal/bin/../conf/canal.properties
CLASSPATH : /data/canal/bin/../conf:/data/canal/bin/../lib/*:
cd /data/canal/bin for continue
# 检查进程
$ jps | grep Canal
# 输出示例:
12345 CanalLauncher
# 检查日志
$ tail -100 /data/canal/logs/canal/canal.log
# 输出示例:
2024-04-05 10:00:00.000 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ## start the canal server.
2024-04-05 10:00:00.100 [main] INFO com.alibaba.otter.canal.deployer.CanalController – ## start the canal server[192.168.1.51:11111]
2024-04-05 10:00:00.200 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ## the canal server is running now ……
# 检查端口
$ netstat -tlnp | grep 11111
tcp 0 0 192.168.1.51:11111 0.0.0.0:* LISTEN 12345/java
8.2 检查Instance状态
$ tail -100 /data/canal/logs/fgedudb/fgedudb.log
# 输出示例:
2024-04-05 10:00:00.000 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer – Loading properties file from class path resource [canal.properties]
2024-04-05 10:00:00.100 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer – Loading properties file from class path resource [fgedudb/instance.properties]
2024-04-05 10:00:00.200 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance – start CannalInstance for 1-fgedudb
2024-04-05 10:00:00.300 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring – start successful….
# 检查MySQL连接状态
$ tail -100 /data/canal/logs/fgedudb/fgedudb.log | grep -i “subscribe”
# 输出示例:
2024-04-05 10:00:01.000 [destination = fgedudb , address = /192.168.1.51:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy – —> begin to find start position, it will be long time for reset or first position
2024-04-05 10:00:01.500 [destination = fgedudb , address = /192.168.1.51:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy – —> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000003,position=154,serverId=1,gtid=,timestamp=1712289600000]
# 检查ZooKeeper注册
$ zkCli.sh -server 127.0.0.1:2181 ls /otter/canal/destinations
# 输出示例:
[connected fgedudb]
9. Canal功能测试
完成安装后,需要进行功能测试验证Canal是否正常工作。
9.1 使用Canal Adapter测试
# cd /tmp
# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
# 解压安装
# mkdir -p /data/canal-adapter
# tar -xzf canal.adapter-1.1.7.tar.gz -C /data/canal-adapter/
# 配置Adapter
# vi /data/canal-adapter/conf/application.yml
server:
port: 8081
canal.conf:
mode: tcp
canalServerHost: 192.168.1.51:11111
batchSize: 500
syncBatchSize: 1000
retries: 3
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.1.51:3306/fgedudb?useUnicode=true
username: canal
password: fgedu_canal_2024
canalAdapters:
– instance: fgedudb
groups:
– groupId: g1
outerAdapters:
– name: logger
# 启动Adapter
# cd /data/canal-adapter
# sh bin/startup.sh
# 输出示例:
cd to /data/canal-adapter/bin for workaround relative path
LOG CONFIGURATION : /data/canal-adapter/bin/../conf/logback.xml
canal conf : /data/canal-adapter/bin/../conf/application.yml
CLASSPATH : /data/canal-adapter/bin/../conf:/data/canal-adapter/bin/../lib/*:
cd /data/canal-adapter/bin for continue
# 查看日志
$ tail -f /data/canal-adapter/logs/adapter/adapter.log
# 输出示例:
2024-04-05 10:00:00.000 [main] INFO c.a.o.canal.adapter.launcher.CanalAdapterApplication – ## start the canal adapter.
2024-04-05 10:00:00.100 [main] INFO c.a.o.canal.adapter.launcher.CanalAdapterApplication – ## the canal adapter is running now ……
9.2 测试数据同步
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘测试用户’, ‘测试部’, 10000.00);”
# 查看Adapter日志验证数据同步
$ tail -f /data/canal-adapter/logs/adapter/adapter.log
# 输出示例:
2024-04-05 10:05:00.000 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample – DML: {“data”:[{“id”:4,”name”:”测试用户”,”department”:”测试部”,”salary”:10000.00,”created_at”:”2024-04-05 10:05:00″}],”database”:”fgedudb”,”destination”:”fgedudb”,”es”:1712289900000,”groupId”:”g1″,”isDdl”:false,”old”:null,”sql”:””,”table”:”fgedu_employees”,”ts”:1712289900123,”type”:”INSERT”}
# 更新数据
$ mysql -u fgedu -p -e “UPDATE fgedudb.fgedu_employees SET salary = 11000.00 WHERE id = 4;”
# 输出示例:
2024-04-05 10:06:00.000 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample – DML: {“data”:[{“id”:4,”name”:”测试用户”,”department”:”测试部”,”salary”:11000.00,”created_at”:”2024-04-05 10:05:00″}],”database”:”fgedudb”,”destination”:”fgedudb”,”es”:1712289960000,”groupId”:”g1″,”isDdl”:false,”old”:[{“salary”:10000.00}],”sql”:””,”table”:”fgedu_employees”,”ts”:1712289960123,”type”:”UPDATE”}
# 删除数据
$ mysql -u fgedu -p -e “DELETE FROM fgedudb.fgedu_employees WHERE id = 4;”
# 输出示例:
2024-04-05 10:07:00.000 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample – DML: {“data”:[{“id”:4,”name”:”测试用户”,”department”:”测试部”,”salary”:11000.00,”created_at”:”2024-04-05 10:05:00″}],”database”:”fgedudb”,”destination”:”fgedudb”,”es”:1712290020000,”groupId”:”g1″,”isDdl”:false,”old”:null,”sql”:””,”table”:”fgedu_employees”,”ts”:1712290020123,”type”:”DELETE”}
9.3 同步到MySQL测试
# vi /data/canal-adapter/conf/application.yml
canalAdapters:
– instance: fgedudb
groups:
– groupId: g1
outerAdapters:
– name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.cj.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.1.52:3306/fgedudb?useUnicode=true
jdbc.username: fgedu
jdbc.password: fgedu_mysql_2024
# 配置表映射
# vi /data/canal-adapter/conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: fgedudb
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: fgedudb
table: fgedu_employees
targetTable: fgedudb.fgedu_employees
targetPk:
id: id
mapAll: true
etlCondition:
commitBatch: 3000
# 重启Adapter
# sh /data/canal-adapter/bin/stop.sh
# sh /data/canal-adapter/bin/startup.sh
# 在源MySQL插入数据
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘同步测试’, ‘同步部’, 12000.00);”
# 检查目标MySQL数据
$ mysql -h 192.168.1.52 -u fgedu -p -e “SELECT * FROM fgedudb.fgedu_employees WHERE name = ‘同步测试’;”
# 输出示例:
+—-+———-+————+———-+———————+
| id | name | department | salary | created_at |
+—-+———-+————+———-+———————+
| 5 | 同步测试 | 同步部 | 12000.00 | 2024-04-05 10:10:00 |
+—-+———-+————+———-+———————+
10. Canal性能优化
Canal性能优化涉及多个方面,包括JVM配置、内存配置、并行解析等。
10.1 JVM优化配置
# vi /data/canal/bin/startup.sh
# 使用G1垃圾收集器
JAVA_OPTS=”-server -Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/canal/logs/heap_dump.hprof -Djava.awt.headless=true -Dfile.encoding=UTF-8″
# 重启Canal
# sh /data/canal/bin/stop.sh
# sh /data/canal/bin/startup.sh
# 监控JVM内存使用
$ jstat -gc $(pgrep -f CanalLauncher) 1000 5
# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750
10.2 内存优化配置
# vi /data/canal/conf/canal.properties
# 增加内存缓冲区大小
canal.instance.memory.buffer.size = 65536
canal.instance.memory.buffer.memunit = 4096
canal.instance.memory.batch.mode = MEMSIZE
# 配置网络缓冲区
canal.socket.send.buffer = 65536
canal.socket.receive.buffer = 65536
# 配置并行解析
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 1024
# Instance配置
# vi /data/canal/conf/fgedudb/instance.properties
# 并行解析配置
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 1024
# 重启Canal
# sh /data/canal/bin/stop.sh
# sh /data/canal/bin/startup.sh
10.3 HA高可用配置
# vi /data/canal/conf/canal.properties
# Canal Server 1配置
canal.id = 1
canal.ip = 192.168.1.51
canal.port = 11111
canal.zkServers = 192.168.1.51:2181,192.168.1.52:2181,192.168.1.53:2181
# Canal Server 2配置
canal.id = 2
canal.ip = 192.168.1.52
canal.port = 11111
canal.zkServers = 192.168.1.51:2181,192.168.1.52:2181,192.168.1.53:2181
# Instance配置
# vi /data/canal/conf/fgedudb/instance.properties
# HA配置
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 自动切换配置
canal.instance.detecting.heartbeatHaEnable = true
canal.instance.detecting.retry.threshold = 3
# 检查HA状态
$ zkCli.sh -server 192.168.1.51:2181 ls /otter/canal/destinations/fgedudb/cluster
# 输出示例:
[192.168.1.51:11111, 192.168.1.52:11111]
# 检查当前运行的Instance
$ zkCli.sh -server 192.168.1.51:2181 get /otter/canal/destinations/fgedudb/running
# 输出示例:
{“active”:true,”address”:”192.168.1.51:11111″}
11. Canal升级迁移
Canal升级需要谨慎操作,确保数据安全和业务连续性。
11.1 升级前准备
$ cat /data/canal/conf/canal.properties | grep canal.id
canal.id = 1
# 查看当前binlog位置
$ mysql -u canal -p -e “SHOW MASTER STATUS;”
# 输出示例:
+——————+———-+————–+——————+——————————————–+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+——————+———-+————–+——————+——————————————–+
| mysql-bin.000005 | 1234 | | | 1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d:1-100 |
+——————+———-+————–+——————+——————————————–+
# 记录当前同步位置
$ cat /data/canal/conf/fgedudb/meta.dat
# 备份配置文件
# cp -r /data/canal/conf /backup/canal_conf_$(date +%Y%m%d)
# 备份数据目录
# tar -czf /backup/canal_data_$(date +%Y%m%d).tar.gz /data/canal/data
# 停止Canal
# sh /data/canal/bin/stop.sh
# 输出示例:
Canal is stop.
11.2 执行升级操作
# cd /tmp
# wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
# 备份旧版本
# mv /data/canal /data/canal_1.1.7
# 解压新版本
# mkdir -p /data/canal
# tar -xzf canal.deployer-1.1.8.tar.gz -C /data/canal/
# 恢复配置文件
# cp -r /backup/canal_conf_$(date +%Y%m%d)/* /data/canal/conf/
# 恢复数据目录
# tar -xzf /backup/canal_data_$(date +%Y%m%d).tar.gz -C /
# 启动新版本
# sh /data/canal/bin/startup.sh
# 输出示例:
cd to /data/canal/bin for workaround relative path
LOG CONFIGURATION : /data/canal/bin/../conf/logback.xml
canal conf : /data/canal/bin/../conf/canal.properties
CLASSPATH : /data/canal/bin/../conf:/data/canal/bin/../lib/*:
cd /data/canal/bin for continue
11.3 升级后验证
$ jps | grep Canal
# 输出示例:
12345 CanalLauncher
# 检查日志
$ tail -100 /data/canal/logs/canal/canal.log
# 输出示例:
2024-04-05 10:00:00.000 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ## start the canal server.
2024-04-05 10:00:00.100 [main] INFO com.alibaba.otter.canal.deployer.CanalController – ## start the canal server[192.168.1.51:11111]
2024-04-05 10:00:00.200 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ## the canal server is running now ……
# 测试数据同步
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘升级测试’, ‘升级部’, 15000.00);”
# 查看Adapter日志
$ tail -f /data/canal-adapter/logs/adapter/adapter.log
# 输出示例:
2024-04-05 10:00:00.000 [pool-1-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample – DML: {“data”:[{“id”:6,”name”:”升级测试”,”department”:”升级部”,”salary”:15000.00,”created_at”:”2024-04-05 10:00:00″}],”database”:”fgedudb”,”destination”:”fgedudb”,”es”:1712290000000,”groupId”:”g1″,”isDdl”:false,”old”:null,”sql”:””,”table”:”fgedu_employees”,”ts”:1712290000123,”type”:”INSERT”}
12. Canal监控运维
Canal的监控运维包括服务状态监控、日志管理、性能监控等。
12.1 服务状态监控
$ jps | grep Canal
# 输出示例:
12345 CanalLauncher
# 检查端口
$ netstat -tlnp | grep 11111
tcp 0 0 192.168.1.51:11111 0.0.0.0:* LISTEN 12345/java
# 检查ZooKeeper注册
$ zkCli.sh -server 127.0.0.1:2181 ls /otter/canal/destinations
# 输出示例:
[fgedudb]
# 检查Instance状态
$ curl http://127.0.0.1:8081/destinations
# 输出示例:
[{“destination”:”fgedudb”,”status”:”RUNNING”}]
# 检查MySQL连接
$ mysql -u canal -p -h 192.168.1.51 -e “SHOW PROCESSLIST;” | grep canal
# 输出示例:
| 123 | canal | 192.168.1.51:54321 | NULL | Binlog Dump | 3600 | Master has sent all binlog to slave; waiting for more updates | NULL |
12.2 日志管理
$ tail -100 /data/canal/logs/canal/canal.log
# 输出示例:
2024-04-05 10:00:00.000 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher – ## start the canal server.
2024-04-05 10:00:00.100 [main] INFO com.alibaba.otter.canal.deployer.CanalController – ## start the canal server[192.168.1.51:11111]
# 查看Instance日志
$ tail -100 /data/canal/logs/fgedudb/fgedudb.log
# 输出示例:
2024-04-05 10:00:00.000 [destination = fgedudb , address = /192.168.1.51:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy – —> find start position successfully
# 查看错误日志
$ grep -i error /data/canal/logs/canal/canal.log | tail -20
# 配置日志轮转
# vi /etc/logrotate.d/canal
/data/canal/logs/*/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 0644 root root
}
12.3 性能监控
$ top -p $(pgrep -f CanalLauncher)
# 输出示例:
top – 10:00:00 up 1 day, 2:00, 2 users, load average: 0.50, 0.45, 0.40
Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie
%Cpu(s): 5.0 us, 2.0 sy, 0.0 ni, 92.0 id, 1.0 wa, 0.0 hi, 0.0 si
MiB Mem : 16384.0 total, 12000.0 free, 3500.0 used, 884.0 buff/cache
MiB Swap: 8192.0 total, 8192.0 free, 0.0 used. 12000.0 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12345 root 20 0 8192000 3500000 250000 S 5.0 21.4 0:30.00 java
# 监控JVM内存使用
$ jstat -gc $(pgrep -f CanalLauncher) 1000 5
# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750
# 监控网络连接
$ netstat -an | grep 11111 | wc -l
5
# 创建监控脚本
# vi /backup/scripts/canal_monitor.sh
#!/bin/bash
echo “=== Canal Monitor ===”
echo “Date: $(date)”
echo “”
echo “=== Canal Process ===”
jps | grep Canal
echo “”
echo “=== Canal Port ===”
netstat -tlnp | grep 11111
echo “”
echo “=== Memory Usage ===”
free -h
echo “”
echo “=== JVM Memory ===”
jstat -gc $(pgrep -f CanalLauncher) | tail -1
echo “”
echo “=== Recent Errors ===”
grep -i error /data/canal/logs/canal/canal.log | tail -5
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
