1. 首页 > 软件安装教程 > 正文

Debezium安装配置-Debezium数据同步安装配置_升级迁移详细过程

1. Debezium概述与环境规划

Debezium是一个开源的分布式平台,用于捕获数据库变更。Debezium基于Kafka Connect构建,支持多种数据库,包括MySQL、PostgreSQL、MongoDB、Oracle、SQL Server等。通过Debezium,可以实时捕获数据库的INSERT、UPDATE、DELETE操作,并将变更事件发送到Kafka,供下游系统消费。更多学习教程www.fgedu.net.cn

1.1 Debezium版本说明

Debezium目前主要版本为2.x系列,本教程以Debezium 2.7为例进行详细讲解。Debezium 2.x版本相比1.x版本在性能、稳定性和功能方面都有显著提升,支持更多的数据库类型和更灵活的配置选项。

# 查看Debezium版本
$ curl -s http://192.168.1.51:8083/connector-plugins | jq
[
{
“class”: “io.debezium.connector.mysql.MySqlConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
}
]

# 查看Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)

# 查看系统信息
$ cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″

1.2 环境规划

本次安装环境规划如下:

Debezium服务器:
主机名:debezium01.fgedu.net.cn
IP地址:192.168.1.51
端口:8083(Kafka Connect REST API)

Debezium版本:2.7.0
Kafka版本:3.6.1
Java版本:OpenJDK 11
安装目录:/data/kafka
日志目录:/data/kafka/logs

Kafka集群:
节点1:kafka01.fgedu.net.cn (192.168.1.51)
节点2:kafka02.fgedu.net.cn (192.168.1.52)
节点3:kafka03.fgedu.net.cn (192.168.1.53)

ZooKeeper集群:
节点1:192.168.1.51:2181
节点2:192.168.1.52:2181
节点3:192.168.1.53:2181

数据源:
MySQL:192.168.1.51:3306
PostgreSQL:192.168.1.51:5432

2. 硬件环境要求

Debezium依赖Kafka运行,对硬件资源要求适中,以下是生产环境的硬件配置建议。学习交流加群风哥微信: itpux-com

2.1 最低硬件要求

# 检查内存大小
# free -h
total used free shared buff/cache available
Mem: 32G 8.5G 21G 2.0G 2.5G 21G
Swap: 16G 0B 16G

# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 15G 36G 30% /
/dev/sdb1 500G 50G 450G 10% /data

# 检查CPU核心数
# nproc
16

# 检查系统架构
# uname -m
x86_64

生产环境建议:最小内存8GB(测试环境),生产环境建议16GB-32GB。CPU建议8核心以上。磁盘空间建议500GB以上,用于存储Kafka日志和Debezium offset。网络建议千兆网卡以上。

2.2 网络配置要求

# 检查网络配置
# ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
inet 192.168.1.51/24 brd 192.168.1.255 scope global eth0

# 检查主机名解析
# hostname -f
debezium01.fgedu.net.cn

# 配置/etc/hosts
# cat /etc/hosts
127.0.0.1 localhost
192.168.1.51 debezium01.fgedu.net.cn debezium01
192.168.1.52 debezium02.fgedu.net.cn debezium02
192.168.1.53 debezium03.fgedu.net.cn debezium03

3. 操作系统配置

Debezium运行在Linux操作系统上,需要对系统进行一系列配置。学习交流加群风哥QQ113257174

3.1 关闭防火墙和SELinux

# 检查SELinux状态
# getenforce
Disabled

# 关闭SELinux(如未关闭)
# vi /etc/selinux/config
SELINUX=disabled

# 检查防火墙状态
# systemctl status firewalld

# 关闭防火墙(生产环境建议开放特定端口)
# systemctl stop firewalld
# systemctl disable firewalld

# 或者开放所需端口
# firewall-cmd –permanent –add-port=2181/tcp
# firewall-cmd –permanent –add-port=2888/tcp
# firewall-cmd –permanent –add-port=3888/tcp
# firewall-cmd –permanent –add-port=9092/tcp
# firewall-cmd –permanent –add-port=8083/tcp
# firewall-cmd –reload

3.2 系统参数优化

# 配置内核参数
# vi /etc/sysctl.conf

# 添加以下参数
fs.file-max = 6815744
vm.swappiness = 10
vm.max_map_count = 262144
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 65536

# 使配置生效
# sysctl -p

# 配置文件描述符限制
# vi /etc/security/limits.conf

# 添加以下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536

# 验证配置
# ulimit -n
65536

4. Java环境配置

Debezium基于Java开发,需要配置合适的Java环境。更多学习教程公众号风哥教程itpux_com

4.1 安装OpenJDK 11

# 安装OpenJDK 11
# yum install -y java-11-openjdk java-11-openjdk-devel

# 验证Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)

# 配置JAVA_HOME环境变量
# vi /etc/profile.d/java.sh

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH

# 使环境变量生效
# source /etc/profile.d/java.sh

# 验证JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk

5. Kafka集群安装

Debezium依赖Kafka和Kafka Connect运行,需要先安装Kafka集群。from:www.itpux.com

5.1 安装ZooKeeper

# 下载Kafka(包含ZooKeeper)
# cd /tmp
# wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

# 解压安装
# tar -xzf kafka_2.13-3.6.1.tgz
# mv kafka_2.13-3.6.1 /data/kafka

# 配置ZooKeeper
# vi /data/kafka/config/zookeeper.properties

dataDir=/data/zookeeper/data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

# 集群配置
server.1=192.168.1.51:2888:3888
server.2=192.168.1.52:2888:3888
server.3=192.168.1.53:2888:3888

# 创建数据目录
# mkdir -p /data/zookeeper/data

# 创建myid文件
# echo “1” > /data/zookeeper/data/myid

# 启动ZooKeeper
# /data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties

# 检查ZooKeeper状态
$ echo stat | nc localhost 2181

# 输出示例:
Zookeeper version: 3.8.3
Clients:
/127.0.0.1:54321[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/1.5/5
Received: 10
Sent: 9
Connections: 1
Outstanding: 0
Zxid: 0x100000003
Mode: leader
Node count: 5

5.2 配置Kafka Broker

# 配置Kafka Broker
# vi /data/kafka/config/server.properties

# Broker配置
broker.id=1
listeners=PLAINTEXT://192.168.1.51:9092
advertised.listeners=PLAINTEXT://192.168.1.51:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# ZooKeeper配置
zookeeper.connect=192.168.1.51:2181,192.168.1.52:2181,192.168.1.53:2181
zookeeper.connection.timeout.ms=18000

# 日志保留配置
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# 创建日志目录
# mkdir -p /data/kafka/logs

# 启动Kafka Broker
# /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties

# 检查Kafka进程
$ jps | grep Kafka

# 输出示例:
12345 Kafka

# 创建测试Topic
$ /data/kafka/bin/kafka-topics.sh –create –topic test-topic –bootstrap-server 192.168.1.51:9092 –partitions 3 –replication-factor 3

# 输出示例:
Created topic test-topic.

# 查看Topic列表
$ /data/kafka/bin/kafka-topics.sh –list –bootstrap-server 192.168.1.51:9092

# 输出示例:
test-topic

5.3 配置Kafka Connect

# 配置Kafka Connect(分布式模式)
# vi /data/kafka/config/connect-distributed.properties

# 基础配置
bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Offset存储配置
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

# Config存储配置
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Status存储配置
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3

# 连接器配置
offset.flush.interval.ms=10000
rest.port=8083
rest.advertised.host.name=192.168.1.51

# 插件路径
plugin.path=/data/kafka/plugins

# 创建插件目录
# mkdir -p /data/kafka/plugins

# 启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties

# 检查Kafka Connect状态
$ curl -s http://192.168.1.51:8083/ | jq

# 输出示例:
{
“version”: “3.6.1”,
“commit”: “5e3c2b738d253ff5”,
“kafka_cluster_id”: “abc123-def456-ghi789”
}

6. Debezium安装部署

完成Kafka集群安装后,开始安装Debezium连接器。

6.1 下载Debezium连接器

# 下载Debezium MySQL连接器
# cd /tmp
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

# 输出示例:
–2024-04-05 10:00:00– https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
Resolving repo1.maven.org… 199.232.196.209
Connecting to repo1.maven.org|199.232.196.209|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 45678901 (44M) [application/x-gzip]
Saving to: ‘debezium-connector-mysql-2.7.0.Final-plugin.tar.gz’

debezium-connector-mysql-2.7.0.Final-plugin.tar.gz 100%[=================================================>] 43.56M 15.6MB/s in 2.8s

2024-04-05 10:00:03 (15.6 MB/s) – ‘debezium-connector-mysql-2.7.0.Final-plugin.tar.gz’ saved [45678901/45678901]

# 解压到插件目录
# tar -xzf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/

# 下载PostgreSQL连接器
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.0.Final/debezium-connector-postgres-2.7.0.Final-plugin.tar.gz
# tar -xzf debezium-connector-postgres-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/

# 验证安装
$ ls -l /data/kafka/plugins/

# 输出示例:
total 0
drwxr-xr-x 2 root root 4096 Apr 5 10:00 debezium-connector-mysql
drwxr-xr-x 2 root root 4096 Apr 5 10:00 debezium-connector-postgres

# 查看连接器文件
$ ls -lh /data/kafka/plugins/debezium-connector-mysql/

# 输出示例:
total 45M
-rw-r–r– 1 root root 12M Apr 5 10:00 debezium-connector-mysql-2.7.0.Final.jar
-rw-r–r– 1 root root 15M Apr 5 10:00 debezium-core-2.7.0.Final.jar
-rw-r–r– 1 root root 2.5M Apr 5 10:00 mysql-connector-j-8.2.0.jar

6.2 重启Kafka Connect

# 停止Kafka Connect
$ curl -X POST http://192.168.1.51:8083/connectors?shutdown=true

# 或者直接kill进程
# kill $(pgrep -f ConnectDistributed)

# 重新启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties

# 检查连接器插件
$ curl -s http://192.168.1.51:8083/connector-plugins | jq

# 输出示例:
[
{
“class”: “io.debezium.connector.mysql.MySqlConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
},
{
“class”: “io.debezium.connector.postgresql.PostgresConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
}
]

7. Debezium参数配置

Debezium的参数配置对数据同步性能和稳定性至关重要。

7.1 MySQL源端配置

# 配置MySQL开启binlog
# vi /etc/my.cnf

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
gtid_mode = ON
enforce_gtid_consistency = ON

# 重启MySQL
# systemctl restart mysqld

# 创建Debezium用户
$ mysql -u root -p

mysql> CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘fgedu_debezium_2024’;
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘debezium’@’%’;
mysql> FLUSH PRIVILEGES;

# 验证binlog配置
mysql> SHOW VARIABLES LIKE ‘binlog_format’;

# 输出示例:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| binlog_format | ROW |
+—————+——-+

# 创建测试数据库和表
mysql> CREATE DATABASE fgedudb;
mysql> USE fgedudb;
mysql> CREATE TABLE fgedu_employees (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
department VARCHAR(50),
salary DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
mysql> INSERT INTO fgedu_employees (name, department, salary) VALUES
(‘张三’, ‘技术部’, 15000.00),
(‘李四’, ‘销售部’, 12000.00),
(‘王五’, ‘财务部’, 13000.00);

7.2 创建MySQL连接器配置

# 创建连接器配置文件
# vi /data/kafka/config/mysql-connector.json

{
“name”: “fgedudb-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “fgedu_debezium_2024”,
“database.server.id”: “5400”,
“database.server.name”: “fgedudb_server”,
“database.include.list”: “fgedudb”,
“table.include.list”: “fgedudb.fgedu_employees,fgedudb.fgedu_departments”,
“database.history.kafka.bootstrap.servers”: “192.168.1.51:9092”,
“database.history.kafka.topic”: “schema-changes.fgedudb”,
“include.schema.changes”: “true”,
“snapshot.mode”: “initial”,
“snapshot.locking.mode”: “minimal”,
“snapshot.fetch.size”: “2000”,
“max.queue.size”: “8192”,
“max.batch.size”: “2048”,
“poll.interval.ms”: “500”,
“heartbeat.interval.ms”: “5000”,
“decimal.handling.mode”: “string”,
“time.precision.mode”: “connect”,
“tombstones.on.delete”: “true”,
“database.connectionTimeZone”: “Asia/Shanghai”
}
}

# 注册连接器
$ curl -X POST -H “Content-Type: application/json” –data @/data/kafka/config/mysql-connector.json http://192.168.1.51:8083/connectors

# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,

},
“tasks”: [],
“type”: “source”
}

风哥提示:MySQL必须开启binlog且格式为ROW;建议使用GTID模式确保数据一致性;snapshot.mode设置为initial会在首次启动时进行全量快照;max.queue.size和max.batch.size影响吞吐量和延迟。

8. Debezium服务启动

配置完成后,启动Debezium连接器。

8.1 检查连接器状态

# 查看所有连接器
$ curl -s http://192.168.1.51:8083/connectors | jq

# 输出示例:
[
“fgedudb-mysql-connector”
]

# 查看连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq

# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
],
“type”: “source”
}

# 查看连接器配置
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config | jq

# 查看连接器任务
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks | jq

# 输出示例:
[
{
“id”: {
“connector”: “fgedudb-mysql-connector”,
“task”: 0
},
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,

}
}
]

8.2 查看Kafka Topic

# 查看Debezium创建的Topic
$ /data/kafka/bin/kafka-topics.sh –list –bootstrap-server 192.168.1.51:9092 | grep fgedudb

# 输出示例:
fgedudb_server
fgedudb_server.fgedudb.fgedu_employees
schema-changes.fgedudb

# 查看Topic详情
$ /data/kafka/bin/kafka-topics.sh –describe –topic fgedudb_server.fgedudb.fgedu_employees –bootstrap-server 192.168.1.51:9092

# 输出示例:
Topic: fgedudb_server.fgedudb.fgedu_employees PartitionCount: 3 ReplicationFactor: 3
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedudb_server.fgedudb.fgedu_employees Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

# 消费Topic数据验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 3

# 输出示例:
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:15000.0,”created_at”:1712289600000}}
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:2,”name”:”李四”,”department”:”销售部”,”salary”:12000.0,”created_at”:1712289600000}}
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int32″,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”string”,”optional”:true,”field”:”department”},{“type”:”double”,”optional”:true,”field”:”salary”},{“type”:”int64″,”optional”:true,”name”:”io.debezium.time.Timestamp”,”field”:”created_at”}],”optional”:false,”name”:”fgedudb_server.fgedudb.fgedu_employees.Value”},”payload”:{“id”:3,”name”:”王五”,”department”:”财务部”,”salary”:13000.0,”created_at”:1712289600000}}

9. Debezium功能测试

完成安装后,需要进行功能测试验证Debezium是否正常工作。

9.1 测试INSERT操作

# 在MySQL中插入数据
$ mysql -u debezium -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘测试用户’, ‘测试部’, 10000.00);”

# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1

# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: null,
“after”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 10000.0,
“created_at”: 1712289900000
},
“source”: {
“version”: “2.7.0.Final”,
“connector”: “mysql”,
“name”: “fgedudb_server”,
“ts_ms”: 1712289900000,
“db”: “fgedudb”,
“table”: “fgedu_employees”,
“server_id”: 1,
“gtid”: “1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d:101”,
“file”: “mysql-bin.000005”,
“pos”: 1234,
“row”: 0,
“snapshot”: false
},
“op”: “c”,
“ts_ms”: 1712289900123
}
}

9.2 测试UPDATE操作

# 在MySQL中更新数据
$ mysql -u debezium -p -e “UPDATE fgedudb.fgedu_employees SET salary = 11000.00 WHERE id = 4;”

# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1

# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 10000.0,
“created_at”: 1712289900000
},
“after”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 11000.0,
“created_at”: 1712289900000
},
“source”: {…},
“op”: “u”,
“ts_ms”: 1712289960123
}
}

9.3 测试DELETE操作

# 在MySQL中删除数据
$ mysql -u debezium -p -e “DELETE FROM fgedudb.fgedu_employees WHERE id = 4;”

# 消费Kafka Topic验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1

# 输出示例(格式化后):
{
“schema”: {…},
“payload”: {
“before”: {
“id”: 4,
“name”: “测试用户”,
“department”: “测试部”,
“salary”: 11000.0,
“created_at”: 1712289900000
},
“after”: null,
“source”: {…},
“op”: “d”,
“ts_ms”: 1712290020123
}
}

# 注意:DELETE操作后会发送一条tombstone消息(value为null)
# 这是Kafka的compact topic特性,用于清理已删除的key

10. Debezium性能优化

Debezium性能优化涉及多个方面,包括连接器配置、Kafka配置、JVM配置等。

10.1 连接器性能优化

# 更新连接器配置
$ curl -X PUT -H “Content-Type: application/json” –data ‘{
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “fgedu_debezium_2024”,
“database.server.id”: “5400”,
“database.server.name”: “fgedudb_server”,
“database.include.list”: “fgedudb”,
“database.history.kafka.bootstrap.servers”: “192.168.1.51:9092”,
“database.history.kafka.topic”: “schema-changes.fgedudb”,
“snapshot.mode”: “schema_only”,
“max.queue.size”: “16384”,
“max.batch.size”: “4096”,
“poll.interval.ms”: “100”,
“heartbeat.interval.ms”: “1000”,
“snapshot.fetch.size”: “10000”,
“binlog.buffer.size”: “65536”,
“database.connection.pool.size”: “10”,
“database.tcpKeepAlive”: “true”,
“database.connect.timeout.ms”: “30000”,
“database.socket.timeout.ms”: “30000”
}’ http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config

# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“config”: {…},
“tasks”: […],
“type”: “source”
}

10.2 Kafka Connect性能优化

# 配置Kafka Connect JVM参数
# vi /data/kafka/bin/connect-distributed.sh

# 添加JVM参数
export KAFKA_HEAP_OPTS=”-Xms4g -Xmx8g”
export KAFKA_JVM_PERFORMANCE_OPTS=”-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true”

# 配置Kafka Connect参数
# vi /data/kafka/config/connect-distributed.properties

# 增加并发数
offset.flush.interval.ms=5000

# 增加缓冲区大小
producer.buffer.memory=67108864
producer.batch.size=16384
producer.linger.ms=5
producer.compression.type=lz4

consumer.max.poll.records=1000
consumer.fetch.min.bytes=1048576
consumer.fetch.max.wait.ms=500

# 重启Kafka Connect
# kill $(pgrep -f ConnectDistributed)
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties

10.3 Kafka Broker性能优化

# 配置Kafka Broker参数
# vi /data/kafka/config/server.properties

# 增加网络线程数
num.network.threads=8
num.io.threads=16

# 增加缓冲区大小
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

# 增加日志刷新参数
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 增加副本拉取线程数
num.replica.fetchers=4

# 增加分区数
num.partitions=6

# 重启Kafka Broker
# /data/kafka/bin/kafka-server-stop.sh
# /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties

生产环境建议:增加max.queue.size和max.batch.size提高吞吐量;使用G1垃圾收集器优化JVM性能;配置合理的snapshot.fetch.size加快全量同步速度;启用binlog.buffer.size减少网络IO;配置心跳机制检测连接状态。

11. Debezium升级迁移

Debezium升级需要谨慎操作,确保数据安全和业务连续性。

11.1 升级前准备

# 检查当前版本
$ curl -s http://192.168.1.51:8083/connector-plugins | jq ‘.[] | select(.class | contains(“mysql”)) | .version’

# 输出示例:
“2.6.0.Final”

# 查看连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq

# 记录当前offset
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/topics | jq

# 备份连接器配置
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/config > /backup/mysql-connector-config-backup.json

# 暂停连接器(可选)
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/pause

# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “PAUSED”
}
}

11.2 执行升级操作

# 停止Kafka Connect
# kill $(pgrep -f ConnectDistributed)

# 备份旧版本插件
# mv /data/kafka/plugins/debezium-connector-mysql /data/kafka/plugins/debezium-connector-mysql-2.6.0

# 下载新版本
# cd /tmp
# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

# 解压安装
# tar -xzf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz -C /data/kafka/plugins/

# 验证安装
$ ls -l /data/kafka/plugins/debezium-connector-mysql/

# 启动Kafka Connect
# /data/kafka/bin/connect-distributed.sh -daemon /data/kafka/config/connect-distributed.properties

# 验证新版本
$ curl -s http://192.168.1.51:8083/connector-plugins | jq ‘.[] | select(.class | contains(“mysql”)) | .version’

# 输出示例:
“2.7.0.Final”

# 恢复连接器(如果之前暂停了)
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/resume

11.3 升级后验证

# 检查连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq

# 输出示例:
{
“name”: “fgedudb-mysql-connector”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
]
}

# 测试数据同步
$ mysql -u debezium -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘升级测试’, ‘升级部’, 15000.00);”

# 消费Kafka验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –from-beginning –max-messages 1

# 输出示例:
{“schema”:{…},”payload”:{“after”:{“id”:5,”name”:”升级测试”,”department”:”升级部”,”salary”:15000.0,…},”op”:”c”,…}}

12. Debezium监控运维

Debezium的监控运维包括服务状态监控、日志管理、性能监控等。

12.1 服务状态监控

# 检查Kafka Connect状态
$ curl -s http://192.168.1.51:8083/ | jq

# 检查连接器状态
$ curl -s http://192.168.1.51:8083/connectors | jq

# 检查特定连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq

# 检查连接器任务状态
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks/0/status | jq

# 检查连接器错误
$ curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/tasks/0/status | jq ‘.tasks[0].trace’

# 检查Kafka Consumer Group
$ /data/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 192.168.1.51:9092 –list | grep connect

# 输出示例:
connect-cluster

# 检查Consumer Group详情
$ /data/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 192.168.1.51:9092 –group connect-cluster –describe

# 输出示例:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
connect-cluster connect-offsets 0 123 123 0

12.2 日志管理

# 查看Kafka Connect日志
$ tail -100 /data/kafka/logs/connect.log

# 输出示例:
[2024-04-05 10:00:00,000] INFO Worker started successfully (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2024-04-05 10:00:00,100] INFO Created connector fgedudb-mysql-connector (org.apache.kafka.connect.runtime.Worker)

# 查看Debezium连接器日志
$ grep -i debezium /data/kafka/logs/connect.log | tail -20

# 输出示例:
[2024-04-05 10:00:00,000] INFO MySQL Connector started successfully (io.debezium.connector.mysql.MySqlConnectorTask)
[2024-04-05 10:00:00,100] INFO Snapshot is using lock ‘minimal’ (io.debezium.connector.mysql.SnapshotReader)

# 查看错误日志
$ grep -i error /data/kafka/logs/connect.log | tail -20

# 配置日志级别
# vi /data/kafka/config/connect-log4j.properties

log4j.rootLogger=INFO, stdout, connectAppender
log4j.logger.io.debezium=DEBUG

# 重启Kafka Connect使配置生效

12.3 性能监控

# 监控系统资源使用
$ top -p $(pgrep -f ConnectDistributed)

# 输出示例:
top – 10:00:00 up 1 day, 2:00, 2 users, load average: 1.50, 1.45, 1.40
Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie
%Cpu(s): 10.0 us, 5.0 sy, 0.0 ni, 84.0 id, 1.0 wa, 0.0 hi, 0.0 si
MiB Mem : 32768.0 total, 24000.0 free, 7000.0 used, 1768.0 buff/cache
MiB Swap: 16384.0 total, 16384.0 free, 0.0 used. 24000.0 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12345 root 20 0 8192000 7000000 500000 S 15.0 21.4 1:00.00 java

# 监控JVM内存使用
$ jstat -gc $(pgrep -f ConnectDistributed) 1000 5

# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750

# 监控Kafka Topic Lag
$ /data/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list 192.168.1.51:9092 –topic fgedudb_server.fgedudb.fgedu_employees –time -1

# 输出示例:
fgedudb_server.fgedudb.fgedu_employees:0:1234
fgedudb_server.fgedudb.fgedu_employees:1:2345
fgedudb_server.fgedudb.fgedu_employees:2:3456

# 创建监控脚本
# vi /backup/scripts/debezium_monitor.sh

#!/bin/bash

echo “=== Debezium Monitor ===”
echo “Date: $(date)”
echo “”

echo “=== Kafka Connect Status ===”
curl -s http://192.168.1.51:8083/ | jq
echo “”

echo “=== Connector Status ===”
curl -s http://192.168.1.51:8083/connectors/fgedudb-mysql-connector/status | jq
echo “”

echo “=== Memory Usage ===”
free -h
echo “”

echo “=== JVM Memory ===”
jstat -gc $(pgrep -f ConnectDistributed) | tail -1
echo “”

echo “=== Recent Errors ===”
grep -i error /data/kafka/logs/connect.log | tail -5

生产环境建议:定期检查Kafka Connect和连接器状态;监控系统资源使用情况,及时扩容;配置日志轮转避免磁盘空间不足;建立完善的告警机制;监控Kafka Consumer Lag确保数据实时性;定期备份连接器配置和offset。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息