1. Debezium概述与环境规划
Debezium是一个开源的分布式平台,用于捕获数据库变更。Debezium基于Kafka Connect构建,支持多种数据库,包括MySQL、PostgreSQL、MongoDB、Oracle、SQL Server等。通过Debezium,可以实时捕获数据库的INSERT、UPDATE、DELETE操作,并将变更事件发送到Kafka,供下游系统消费。更多学习教程www.fgedu.net.cn
1.1 Debezium版本说明
Debezium目前主要版本为2.x系列,本教程以Debezium 2.7为例进行详细讲解。Debezium 2.x版本相比1.x版本在性能、稳定性和功能方面都有显著提升,支持更多的数据库类型和更灵活的配置选项。
$ curl -s http://192.168.1.51:8083/connector-plugins | jq
[
{
“class”: “io.debezium.connector.mysql.MySqlConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
}
]
# 查看Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)
# 查看系统信息
$ cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″
1.2 环境规划
本次安装环境规划如下:
主机名:debezium01.fgedu.net.cn
IP地址:192.168.1.51
端口:8083(Kafka Connect REST API)
Debezium版本:2.7.0
Kafka版本:3.6.1
Java版本:OpenJDK 11
安装目录:/data/kafka
日志目录:/data/kafka/logs
Kafka集群:
节点1:kafka01.fgedu.net.cn (192.168.1.51)
节点2:kafka02.fgedu.net.cn (192.168.1.52)
节点3:kafka03.fgedu.net.cn (192.168.1.53)
ZooKeeper集群:
节点1:192.168.1.51:2181
节点2:192.168.1.52:2181
节点3:192.168.1.53:2181
数据源:
MySQL:192.168.1.51:3306
PostgreSQL:192.168.1.51:5432
2. 硬件环境要求
Debezium依赖Kafka运行,对硬件资源要求适中,以下是生产环境的硬件配置建议。学习交流加群风哥微信: itpux-com
2.1 最低硬件要求
# free -h
total used free shared buff/cache available
Mem: 32G 8.5G 21G 2.0G 2.5G 21G
Swap: 16G 0B 16G
# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 15G 36G 30% /
/dev/sdb1 500G 50G 450G 10% /data
# 检查CPU核心数
# nproc
16
# 检查系统架构
# uname -m
x86_64
2.2 网络配置要求
# ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
inet 192.168.1.51/24 brd 192.168.1.255 scope global eth0
# 检查主机名解析
# hostname -f
debezium01.fgedu.net.cn
# 配置/etc/hosts
# cat /etc/hosts
127.0.0.1 localhost
192.168.1.51 debezium01.fgedu.net.cn debezium01
192.168.1.52 debezium02.fgedu.net.cn debezium02
192.168.1.53 debezium03.fgedu.net.cn debezium03
3. 操作系统配置
Debezium运行在Linux操作系统上,需要对系统进行一系列配置。学习交流加群风哥QQ113257174
3.1 关闭防火墙和SELinux
# getenforce
Disabled
# 关闭SELinux(如未关闭)
# vi /etc/selinux/config
SELINUX=disabled
# 检查防火墙状态
# systemctl status firewalld
# 关闭防火墙(生产环境建议开放特定端口)
# systemctl stop firewalld
# systemctl disable firewalld
# 或者开放所需端口
# firewall-cmd –permanent –add-port=2181/tcp
# firewall-cmd –permanent –add-port=2888/tcp
# firewall-cmd –permanent –add-port=3888/tcp
# firewall-cmd –permanent –add-port=9092/tcp
# firewall-cmd –permanent –add-port=8083/tcp
# firewall-cmd –reload
3.2 系统参数优化
# vi /etc/sysctl.conf
# 添加以下参数
fs.file-max = 6815744
vm.swappiness = 10
vm.max_map_count = 262144
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 65536
# 使配置生效
# sysctl -p
# 配置文件描述符限制
# vi /etc/security/limits.conf
# 添加以下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
# 验证配置
# ulimit -n
65536
4. Java环境配置
Debezium基于Java开发,需要配置合适的Java环境。更多学习教程公众号风哥教程itpux_com
4.1 安装OpenJDK 11
# yum install -y java-11-openjdk java-11-openjdk-devel
# 验证Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)
# 配置JAVA_HOME环境变量
# vi /etc/profile.d/java.sh
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/java.sh
# 验证JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk
5. Kafka集群安装
Debezium依赖Kafka和Kafka Connect运行,需要先安装Kafka集群。from:www.itpux.com
5.1 安装ZooKeeper
# cd /tmp
# wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
# 解压安装
# tar -xzf kafka_2.13-3.6.1.tgz
# mv kafka_2.13-3.6.1 /data/kafka
# 配置ZooKeeper
# vi /data/kafka/config/zookeeper.properties
dataDir=/data/zookeeper/data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
# 集群配置
server.1=192.168.1.51:2888:3888
server.2=192.168.1.52:2888:3888
server.3=192.168.1.53:2888:3888
# 创建数据目录
# mkdir -p /data/zookeeper/data
# 创建myid文件
# echo “1” > /data/zookeeper/data/myid
# 启动ZooKeeper
# /data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
# 检查ZooKeeper状态
$ echo stat | nc localhost 2181
# 输出示例:
Zookeeper version: 3.8.3
Clients:
/127.0.0.1:54321[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/1.5/5
Received: 10
Sent: 9
Connections: 1
Outstanding: 0
Zxid: 0x100000003
Mode: leader
Node count: 5
5.2 配置Kafka Broker
# vi /data/kafka/config/server.properties
# Broker配置
broker.id=1
listeners=PLAINTEXT://192.168.1.51:9092
advertised.listeners=PLAINTEXT://192.168.1.51:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志配置
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ZooKeeper配置
zookeeper.connect=192.168.1.51:2181,192.168.1.52:2181,192.168.1.53:2181
zookeeper.connection.timeout.ms=18000
# 日志保留配置
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 创建日志目录
# mkdir -p /data/kafka/logs
# 启动Kafka Broker
# /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
# 检查Kafka进程
$ jps | grep Kafka
# 输出示例:
12345 Kafka
# 创建测试Topic
$ /data/kafka/bin/kafka-topics.sh –create –topic test-topic –bootstrap-server 192.168.1.51:9092 –partitions 3 –replication-factor 3
# 输出示例:
Created topic test-topic.
# 查看Topic列表
$ /data/kafka/bin/kafka-topics.sh –list –bootstrap-server 192.168.1.51:9092
# 输出示例:
test-topic
5.3 配置Kafka Connect
# vi /data/kafka/config/connect-distributed.properties
# 基础配置
bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Offset存储配置
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
# Config存储配置
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Status存储配置
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
# 连接器配置
offset.flush.interval.ms=10000
rest.port=8083
rest.advertised.host.name=192.168.1.51
# 插件路径
plugin.path=/data/kafka/plugins
# 创建插件目录
# mkdir -p /data/kafka/plugins
# 启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties
# 检查Kafka Connect状态
$ curl -s http://192.168.1.51:8083/ | jq
# 输出示例:
{
“version”: “3.6.1”,
“commit”: “5e3c2b738d253ff5”,
“kafka_cluster_id”: “abc123-def456-ghi789”
}
6. Debezium安装部署
完成Kafka集群安装后,开始安装Debezium连接器。
6.1 下载Debezium连接器
# cd /tmp
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
# 输出示例:
–2024-04-05 10:00:00– https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
Resolving repo1.maven.org… 199.232.196.209
Connecting to repo1.maven.org|199.232.196.209|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 45678901 (44M) [application/x-gzip]
Saving to: ‘debezium-connector-mysql-2.7.0.Final-plugin.tar.gz’
debezium-connector-mysql-2.7.0.Final-plugin.tar.gz 100%[=================================================>] 43.56M 15.6MB/s in 2.8s
2024-04-05 10:00:03 (15.6 MB/s) – ‘debezium-connector-mysql-2.7.0.Final-plugin.tar.gz’ saved [45678901/45678901]
# 解压到插件目录
# tar -xzf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/
# 下载PostgreSQL连接器
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.0.Final/debezium-connector-postgres-2.7.0.Final-plugin.tar.gz
# tar -xzf debezium-connector-postgres-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/
# 验证安装
$ ls -l /data/kafka/plugins/
# 输出示例:
total 0
drwxr-xr-x 2 root root 4096 Apr 5 10:00 debezium-connector-mysql
drwxr-xr-x 2 root root 4096 Apr 5 10:00 debezium-connector-postgres
# 查看连接器文件
$ ls -lh /data/kafka/plugins/debezium-connector-mysql/
# 输出示例:
total 45M
-rw-r–r– 1 root root 12M Apr 5 10:00 debezium-connector-mysql-2.7.0.Final.jar
-rw-r–r– 1 root root 15M Apr 5 10:00 debezium-core-2.7.0.Final.jar
-rw-r–r– 1 root root 2.5M Apr 5 10:00 mysql-connector-j-8.2.0.jar
…
6.2 重启Kafka Connect
$ curl -X POST http://192.168.1.51:8083/connectors?shutdown=true
# 或者直接kill进程
# kill $(pgrep -f ConnectDistributed)
# 重新启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties
# 检查连接器插件
$ curl -s http://192.168.1.51:8083/connector-plugins | jq
# 输出示例:
[
{
“class”: “io.debezium.connector.mysql.MySqlConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
},
{
“class”: “io.debezium.connector.postgresql.PostgresConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
}
]
7. Debezium参数配置
Debezium的参数配置对数据同步性能和稳定性至关重要。
7.1 MySQL源端配置
# vi /etc/my.cnf
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
gtid_mode = ON
enforce_gtid_consistency = ON
# 重启MySQL
# systemctl restart mysqld
# 创建Debezium用户
$ mysql -u root -p
mysql> CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘fgedu_debezium_2024’;
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘debezium’@’%’;
mysql> FLUSH PRIVILEGES;
# 验证binlog配置
mysql> SHOW VARIABLES LIKE ‘binlog_format’;
# 输出示例:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| binlog_format | ROW |
+—————+——-+
# 创建测试数据库和表
mysql> CREATE DATABASE fgedudb;
mysql> USE fgedudb;
mysql> CREATE TABLE fgedu_employees (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
department VARCHAR(50),
salary DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
mysql> INSERT INTO fgedu_employees (name, department, salary) VALUES
(‘张三’, ‘技术部’, 15000.00),
(‘李四’, ‘销售部’, 12000.00),
(‘王五’, ‘财务部’, 13000.00);
7.2 创建MySQL连接器配置
# vi /data/kafka/config/mysql-connector.json
{
“name”: “fgedudb-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “fgedu_debezium_2024”,
“database.server.id”: “5400”,
“database.server.name”: “fgedudb_server”,
“database.include.list”: “fgedudb”,
“table.include.list”: “fgedudb.fgedu_employees,fgedudb.fgedu_departments”,
“database.history.kafka.bootstrap.servers”: “192.168.1.51:9092”,
“database.history.kafka.topic”: “schema-changes.fgedudb”,
“include.schema.changes”: “true”,
“snapshot.mode”: “initial”,
“snapshot.locking.mode”: “minimal”,
“snapshot.fetch.size”: “2000”,
“max.queue.size”: “8192”,
“max.batch.size”: “2048”,
“poll.interval.ms”: “500”,
“heartbeat.interval.ms”: “5000”,
“decimal.handling.mode”: “string”,
“time.precision.mode”: “connect”,
“tombstones.on.delete”: “true”,
“database.connectionTimeZone”: “Asia/Shanghai”
}
}
# 注册连接器
$ curl -X POST -H “Content-Type: application/json” –data @/data/kafka/config/mysql-connector.json http://192.168.1.51:8083/connectors
# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
…
},
“tasks”: [],
“type”: “source”
}
8. Debezium服务启动
配置完成后,启动Debezium连接器。
8.1 检查连接器状态
$ curl -s http://192.168.1.51:8083/connectors | jq
# 输出示例:
[
“fgedudb-mysql-connector”
]
# 查看连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
],
“type”: “source”
}
# 查看连接器配置
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config | jq
# 查看连接器任务
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks | jq
# 输出示例:
[
{
“id”: {
“connector”: “fgedudb-mysql-connector”,
“task”: 0
},
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
…
}
}
]
8.2 查看Kafka Topic
$ /data/kafka/bin/kafka-topics.sh –list –bootstrap-server 192.168.1.51:9092 | grep fgedudb
# 输出示例:
fgedudb_server
fgedudb_server.fgedudb.fgedu_employees
schema-changes.fgedudb
# 查看Topic详情
$ /data/kafka/bin/kafka-topics.sh –describe –topic fgedudb_server.fgedudb.fgedu_employees –bootstrap-server 192.168.1.51:9092
# 输出示例:
Topic: fgedudb_server.fgedudb.fgedu_employees PartitionCount: 3 ReplicationFactor: 3
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
# 消费Topic数据验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 3
# 输出示例:
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:15000.0,”created_at”:1712289600000}}
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:2,”name”:”李四”,”department”:”销售部”,”salary”:12000.0,”created_at”:1712289600000}}
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:3,”name”:”王五”,”department”:”财务部”,”salary”:13000.0,”created_at”:1712289600000}}
9. Debezium功能测试
完成安装后,需要进行功能测试验证Debezium是否正常工作。
9.1 测试INSERT操作
$ mysql -u debezium -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘测试用户’, ‘测试部’, 10000.00);”
# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1
# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: null,
“after”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 10000.0,
“created_at”: 1712289900000
},
“source”: {
“version”: “2.7.0.Final”,
“connector”: “mysql”,
“name”: “fgedudb_server”,
“ts_ms”: 1712289900000,
“db”: “fgedudb”,
“table”: “fgedu_employees”,
“server_id”: 1,
“gtid”: “1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d:101”,
“file”: “mysql-bin.000005”,
“pos”: 1234,
“row”: 0,
“snapshot”: false
},
“op”: “c”,
“ts_ms”: 1712289900123
}
}
9.2 测试UPDATE操作
$ mysql -u debezium -p -e “UPDATE fgedudb.fgedu_employees SET salary = 11000.00 WHERE id = 4;”
# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1
# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 10000.0,
“created_at”: 1712289900000
},
“after”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 11000.0,
“created_at”: 1712289900000
},
“source”: {…},
“op”: “u”,
“ts_ms”: 1712289960123
}
}
9.3 测试DELETE操作
$ mysql -u debezium -p -e “DELETE FROM fgedudb.fgedu_employees WHERE id = 4;”
# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1
# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 11000.0,
“created_at”: 1712289900000
},
“after”: null,
“source”: {…},
“op”: “d”,
“ts_ms”: 1712290020123
}
}
# 注意:DELETE操作后会发送一条tombstone消息(value为null)
# 这是Kafka的compact topic特性,用于清理已删除的key
10. Debezium性能优化
Debezium性能优化涉及多个方面,包括连接器配置、Kafka配置、JVM配置等。
10.1 连接器性能优化
$ curl -X PUT -H “Content-Type: application/json” –data ‘{
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “fgedu_debezium_2024”,
“database.server.id”: “5400”,
“database.server.name”: “fgedudb_server”,
“database.include.list”: “fgedudb”,
“database.history.kafka.bootstrap.servers”: “192.168.1.51:9092”,
“database.history.kafka.topic”: “schema-changes.fgedudb”,
“snapshot.mode”: “schema_only”,
“max.queue.size”: “16384”,
“max.batch.size”: “4096”,
“poll.interval.ms”: “100”,
“heartbeat.interval.ms”: “1000”,
“snapshot.fetch.size”: “10000”,
“binlog.buffer.size”: “65536”,
“database.connection.pool.size”: “10”,
“database.tcpKeepAlive”: “true”,
“database.connect.timeout.ms”: “30000”,
“database.socket.timeout.ms”: “30000”
}’ http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config
# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“config”: {…},
“tasks”: […],
“type”: “source”
}
10.2 Kafka Connect性能优化
# vi /data/kafka/bin/connect-distributed.sh
# 添加JVM参数
export KAFKA_HEAP_OPTS=”-Xms4g -Xmx8g”
export KAFKA_JVM_PERFORMANCE_OPTS=”-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true”
# 配置Kafka Connect参数
# vi /data/kafka/config/connect-distributed.properties
# 增加并发数
offset.flush.interval.ms=5000
# 增加缓冲区大小
producer.buffer.memory=67108864
producer.batch.size=16384
producer.linger.ms=5
producer.compression.type=lz4
consumer.max.poll.records=1000
consumer.fetch.min.bytes=1048576
consumer.fetch.max.wait.ms=500
# 重启Kafka Connect
# kill $(pgrep -f ConnectDistributed)
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties
10.3 Kafka Broker性能优化
# vi /data/kafka/config/server.properties
# 增加网络线程数
num.network.threads=8
num.io.threads=16
# 增加缓冲区大小
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
# 增加日志刷新参数
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 增加副本拉取线程数
num.replica.fetchers=4
# 增加分区数
num.partitions=6
# 重启Kafka Broker
# /data/kafka/bin/kafka-server-stop.sh
# /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
11. Debezium升级迁移
Debezium升级需要谨慎操作,确保数据安全和业务连续性。
11.1 升级前准备
$ curl -s http://192.168.1.51:8083/connector-plugins | jq ‘.[] | select(.class | contains(“mysql”)) | .version’
# 输出示例:
“2.6.0.Final”
# 查看连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
# 记录当前offset
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/topics | jq
# 备份连接器配置
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config > /backup/mysql-connector-config-backup.json
# 暂停连接器(可选)
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/pause
# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “PAUSED”
}
}
11.2 执行升级操作
# kill $(pgrep -f ConnectDistributed)
# 备份旧版本插件
# mv /data/kafka/plugins/debezium-connector-mysql /data/kafka/plugins/debezium-connector-mysql-2.6.0
# 下载新版本
# cd /tmp
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
# 解压安装
# tar -xzf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/
# 验证安装
$ ls -l /data/kafka/plugins/debezium-connector-mysql/
# 启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties
# 验证新版本
$ curl -s http://192.168.1.51:8083/connector-plugins | jq ‘.[] | select(.class | contains(“mysql”)) | .version’
# 输出示例:
“2.7.0.Final”
# 恢复连接器(如果之前暂停了)
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/resume
11.3 升级后验证
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
]
}
# 测试数据同步
$ mysql -u debezium -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘升级测试’, ‘升级部’, 15000.00);”
# 消费Kafka验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1
# 输出示例:
{“schema”:{…},”payload”:{“after”:{“id”:5,”name”:”升级测试”,”department”:”升级部”,”salary”:15000.0,…},”op”:”c”,…}}
12. Debezium监控运维
Debezium的监控运维包括服务状态监控、日志管理、性能监控等。
12.1 服务状态监控
$ curl -s http://192.168.1.51:8083/ | jq
# 检查连接器状态
$ curl -s http://192.168.1.51:8083/connectors | jq
# 检查特定连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
# 检查连接器任务状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks/0/status | jq
# 检查连接器错误
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks/0/status | jq ‘.tasks[0].trace’
# 检查Kafka Consumer Group
$ /data/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 192.168.1.51:9092 –list | grep connect
# 输出示例:
connect-cluster
# 检查Consumer Group详情
$ /data/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 192.168.1.51:9092 –group connect-cluster –describe
# 输出示例:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
connect-cluster connect-offsets 0 123 123 0
12.2 日志管理
$ tail -100 /data/kafka/logs/connect.log
# 输出示例:
[2024-04-05 10:00:00,000] INFO Worker started successfully (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2024-04-05 10:00:00,100] INFO Created connector fgedudb-mysql-connector (org.apache.kafka.connect.runtime.Worker)
# 查看Debezium连接器日志
$ grep -i debezium /data/kafka/logs/connect.log | tail -20
# 输出示例:
[2024-04-05 10:00:00,000] INFO MySQL Connector started successfully (io.debezium.connector.mysql.MySqlConnectorTask)
[2024-04-05 10:00:00,100] INFO Snapshot is using lock ‘minimal’ (io.debezium.connector.mysql.SnapshotReader)
# 查看错误日志
$ grep -i error /data/kafka/logs/connect.log | tail -20
# 配置日志级别
# vi /data/kafka/config/connect-log4j.properties
log4j.rootLogger=INFO, stdout, connectAppender
log4j.logger.io.debezium=DEBUG
# 重启Kafka Connect使配置生效
12.3 性能监控
$ top -p $(pgrep -f ConnectDistributed)
# 输出示例:
top – 10:00:00 up 1 day, 2:00, 2 users, load average: 1.50, 1.45, 1.40
Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie
%Cpu(s): 10.0 us, 5.0 sy, 0.0 ni, 84.0 id, 1.0 wa, 0.0 hi, 0.0 si
MiB Mem : 32768.0 total, 24000.0 free, 7000.0 used, 1768.0 buff/cache
MiB Swap: 16384.0 total, 16384.0 free, 0.0 used. 24000.0 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12345 root 20 0 8192000 7000000 500000 S 15.0 21.4 1:00.00 java
# 监控JVM内存使用
$ jstat -gc $(pgrep -f ConnectDistributed) 1000 5
# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750
# 监控Kafka Topic Lag
$ /data/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –time -1
# 输出示例:
fgedudb_server.fgedudb.fgedu_employees:0:1234
fgedudb_server.fgedudb.fgedu_employees:1:2345
fgedudb_server.fgedudb.fgedu_employees:2:3456
# 创建监控脚本
# vi /backup/scripts/debezium_monitor.sh
#!/bin/bash
echo “=== Debezium Monitor ===”
echo “Date: $(date)”
echo “”
echo “=== Kafka Connect Status ===”
curl -s http://192.168.1.51:8083/ | jq
echo “”
echo “=== Connector Status ===”
curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
echo “”
echo “=== Memory Usage ===”
free -h
echo “”
echo “=== JVM Memory ===”
jstat -gc $(pgrep -f ConnectDistributed) | tail -1
echo “”
echo “=== Recent Errors ===”
grep -i error /data/kafka/logs/connect.log | tail -5
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
