1. Debezium简介与版本说明
Debezium是一个开源的分布式平台,用于捕获数据库变更。它构建在Apache Kafka之上,提供了一组连接器(Connectors),可以监控各种数据库系统的数据变更,并将变更事件流式传输到Kafka。更多学习教程www.fgedu.net.cn
Debezium最新版本:
Debezium 2.6.0 (2024年稳定版)
Debezium 2.5.0 (2023年稳定版)
Debezium 2.4.0 (2023年稳定版)
Debezium 1.9.0 (2022年LTS版本)
Debezium支持的连接器:
PostgreSQL Connector – 支持PostgreSQL 10+
Oracle Connector – 支持Oracle 11g+(需要Oracle许可证)
SQL Server Connector – 支持SQL Server 2016+
MongoDB Connector – 支持MongoDB 4.0+
Db2 Connector – 支持Db2 11.5+
Cassandra Connector – 支持Cassandra 3.x/4.x
Vitess Connector – 支持Vitess
Spanner Connector – 支持Google Cloud Spanner
2. Debezium下载方式
Debezium提供多种下载方式,包括官方下载、Maven仓库、Confluent Hub等。学习交流加群风哥微信: itpux-com
方式一:官方下载
$ mkdir -p /fgeudb/software/debezium
$ cd /fgeudb/software/debezium
# 下载Debezium MySQL Connector 2.7.0
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
# 下载Debezium PostgreSQL Connector 2.7.0
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.0.Final/debezium-connector-postgres-2.7.0.Final-plugin.tar.gz
# 下载Debezium Oracle Connector 2.7.0
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.7.0.Final/debezium-connector-oracle-2.7.0.Final-plugin.tar.gz
# 下载Debezium SQL Server Connector 2.7.0
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.7.0.Final/debezium-connector-sqlserver-2.7.0.Final-plugin.tar.gz
# 查看下载文件
$ ls -lh
输出示例如下:
total 120M
-rw-r–r– 1 root root 35M Apr 4 10:00 debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
-rw-r–r– 1 root root 28M Apr 4 10:00 debezium-connector-postgres-2.7.0.Final-plugin.tar.gz
-rw-r–r– 1 root root 32M Apr 4 10:00 debezium-connector-oracle-2.7.0.Final-plugin.tar.gz
-rw-r–r– 1 root root 25M Apr 4 10:00 debezium-connector-sqlserver-2.7.0.Final-plugin.tar.gz
方式二:Confluent Hub下载
$ confluent-hub install debezium/debezium-connector-mysql:2.7.0
# 或使用离线安装
$ wget https://www.confluent.io/hub/debezium/debezium-connector-mysql/versions/2.7.0/debezium-connector-mysql-2.7.0.zip
# 解压到Kafka Connect插件目录
$ unzip debezium-connector-mysql-2.7.0.zip -d /fgeudb/kafka/plugins/
方式三:Docker镜像下载
$ docker pull debezium/server:2.7
# 拉取带连接器的Kafka镜像
$ docker pull debezium/connect:2.7
# 拉取带MySQL的示例镜像
$ docker pull debezium/example-mysql:2.7
# 查看镜像列表
$ docker images | grep debezium
输出示例如下:
debezium/server 2.7 abc123def456 7 days ago 580MB
debezium/connect 2.7 def456ghi789 7 days ago 750MB
debezium/example-mysql 2.7 ghi789jkl012 7 days ago 520MB
方式四:Maven依赖引入
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>2.7.0.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>2.7.0.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>2.7.0.Final</version>
</dependency>
3. Kafka Connect配置
Debezium运行在Kafka Connect框架之上,需要先配置Kafka Connect环境。学习交流加群风哥QQ113257174
步骤1:安装Kafka
$ cd /fgeudb/software
$ wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
# 解压安装
$ tar -zxvf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 /fgeudb/kafka
# 配置Kafka Broker
$ vi /fgeudb/kafka/config/kraft/server.properties
node.id=1
controller.quorum.voters=1@192.168.1.51:9093
listeners=PLAINTEXT://192.168.1.51:9092,CONTROLLER://192.168.1.51:9093
advertised.listeners=PLAINTEXT://192.168.1.51:9092
log.dirs=/fgeudb/kafka/data
num.partitions=3
default.replication.factor=1
# 初始化Kafka存储
$ /fgeudb/kafka/bin/kafka-storage.sh format -t $(uuidgen) -c /fgeudb/kafka/config/kraft/server.properties
输出示例如下:
Formatting /fgeudb/kafka/data with metadata.log.
# 启动Kafka
$ /fgeudb/kafka/bin/kafka-server-start.sh -daemon /fgeudb/kafka/config/kraft/server.properties
# 检查Kafka状态
$ jps | grep Kafka
输出示例如下:
12345 Kafka
步骤2:配置Kafka Connect
$ mkdir -p /fgeudb/kafka/plugins
# 解压Debezium连接器到插件目录
$ cd /fgeudb/kafka/plugins
$ tar -zxvf /fgeudb/software/debezium/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
# 配置Kafka Connect
$ vi /fgeudb/kafka/config/connect-distributed.properties
bootstrap.servers=192.168.1.51:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
plugin.path=/fgeudb/kafka/plugins
rest.advertised.host.name=192.168.1.51
rest.advertised.port=8083
# 启动Kafka Connect
$ /fgeudb/kafka/bin/connect-distributed.sh -daemon /fgeudb/kafka/config/connect-distributed.properties
# 检查Connect状态
$ curl -s http://192.168.1.51:8083/ | jq
输出示例如下:
{
“version”: “3.7.0”,
“commit”: “abc123def”,
“kafka_cluster_id”: “xyz789”
}
步骤3:验证连接器插件
$ curl -s http://192.168.1.51:8083/connector-plugins | jq
输出示例如下:
[
{
“class”: “io.debezium.connector.mysql.MySqlConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
},
{
“class”: “io.debezium.connector.postgresql.PostgresConnector”,
“type”: “source”,
“version”: “2.7.0.Final”
},
{
“class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“type”: “source”,
“version”: “3.7.0”
}
]
4. MySQL Connector部署
MySQL Connector是Debezium最常用的连接器,用于捕获MySQL数据库的变更事件。风哥提示:MySQL必须开启binlog并配置为ROW格式。
步骤1:配置MySQL服务器
$ vi /etc/my.cnf
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
gtid_mode = ON
enforce_gtid_consistency = ON
# 重启MySQL
$ systemctl restart mysqld
# 创建Debezium用户
$ mysql -uroot -p
mysql> CREATE USER ‘debezium’@’%’ IDENTIFIED BY ‘debezium123’;
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘debezium’@’%’;
mysql> FLUSH PRIVILEGES;
# 验证权限
mysql> SHOW GRANTS FOR ‘debezium’@’%’;
输出示例如下:
+————————————————————————–+
| Grants for debezium@% |
+————————————————————————–+
| GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘debezium’@’%’ |
+————————————————————————–+
步骤2:创建MySQL连接器
$ vi /fgeudb/debezium/mysql-connector.json
{
“name”: “fgedu-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “debezium123”,
“database.server.id”: “184054”,
“topic.prefix”: “fgedu_mysql”,
“database.include.list”: “fgedu_db”,
“table.include.list”: “fgedu_db.orders,fgedu_db.users”,
“database.history.kafka.bootstrap.servers”: “192.168.1.51:9092”,
“database.history.kafka.topic”: “schema-changes.fgedu”,
“include.schema.changes”: “true”,
“snapshot.mode”: “initial”,
“snapshot.locking.mode”: “minimal”,
“binlog.snapshot.locking.mode”: “minimal”,
“decimal.handling.mode”: “double”,
“time.precision.mode”: “connect”,
“tombstones.on.delete”: “true”,
“message.key.columns”: “fgedu_db.orders:id,fgedu_db.users:id”
}
}
# 注册连接器
$ curl -X POST -H “Content-Type: application/json” –data @/fgeudb/debezium/mysql-connector.json http://192.168.1.51:8083/connectors
输出示例如下:
{
“name”: “fgedu-mysql-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“database.hostname”: “192.168.1.51”,
…
},
“tasks”: [],
“type”: “source”
}
步骤3:验证连接器状态
$ curl -s http://192.168.1.51:8083/connectors/fgedu-mysql-connector/status | jq
输出示例如下:
{
“name”: “fgedu-mysql-connector”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
],
“type”: “source”
}
# 查看生成的Topic
$ /fgeudb/kafka/bin/kafka-topics.sh –bootstrap-server 192.168.1.51:9092 –list | grep fgedu
输出示例如下:
fgedu_mysql
fgedu_mysql.fgedu_db.orders
fgedu_mysql.fgedu_db.users
步骤4:消费变更事件
$ /fgeudb/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic fgedu_mysql.fgedu_db.orders –from-beginning
输出示例如下:
{
“schema”: {…},
“payload”: {
“before”: null,
“after”: {
“id”: 1,
“order_no”: “ORD20260404001”,
“user_id”: 100,
“amount”: 999.99,
“status”: “pending”,
“create_time”: “2026-04-04T10:00:00Z”
},
“source”: {
“version”: “2.7.0.Final”,
“connector”: “mysql”,
“name”: “fgedu_mysql”,
“ts_ms”: 1712217600000,
“db”: “fgedu_db”,
“table”: “orders”,
“server_id”: 1,
“gtid”: “3a4e7b00-1234-5678-90ab-cdef12345678:100”,
“file”: “mysql-bin.000001”,
“pos”: 1234,
“row”: 0
},
“op”: “c”,
“ts_ms”: 1712217600123
}
}
5. PostgreSQL Connector部署
PostgreSQL Connector使用PostgreSQL的逻辑解码功能捕获数据变更。更多学习教程公众号风哥教程itpux_com
步骤1:配置PostgreSQL服务器
$ vi /fgeudb/postgresql/data/postgresql.conf
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
wal_sender_timeout = 60s
# 编辑pg_hba.conf
$ vi /fgeudb/postgresql/data/pg_hba.conf
host replication debezium 192.168.1.0/24 md5
host all debezium 192.168.1.0/24 md5
# 重启PostgreSQL
$ systemctl restart postgresql
# 创建Debezium用户和插件
$ psql -U postgres
postgres=# CREATE USER debezium WITH REPLICATION PASSWORD ‘debezium123’;
postgres=# CREATE DATABASE fgedu_db;
postgres=# \c fgedu_db
fgedu_db=# CREATE SCHEMA fgedu;
fgedu_db=# GRANT ALL ON SCHEMA fgedu TO debezium;
fgedu_db=# ALTER USER debezium SET search_path TO fgedu, public;
# 安装pgoutput插件(PostgreSQL 10+内置)
fgedu_db=# SELECT * FROM pg_create_logical_replication_slot(‘debezium_slot’, ‘pgoutput’);
输出示例如下:
slot_name | lsn
—————-+———–
debezium_slot | 0/16B99D0
步骤2:创建PostgreSQL连接器
$ vi /fgeudb/debezium/postgres-connector.json
{
“name”: “fgedu-postgres-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “5432”,
“database.user”: “debezium”,
“database.password”: “debezium123”,
“database.dbname”: “fgedu_db”,
“topic.prefix”: “fgedu_pg”,
“schema.include.list”: “fgedu”,
“table.include.list”: “fgedu.orders,fgedu.users”,
“plugin.name”: “pgoutput”,
“slot.name”: “debezium_slot”,
“publication.name”: “dbz_publication”,
“publication.autocreate.mode”: “filtered”,
“snapshot.mode”: “initial”,
“decimal.handling.mode”: “double”,
“hstore.handling.mode”: “json”,
“interval.handling.mode”: “numeric”,
“database.tcpkeepalive”: “true”
}
}
# 注册连接器
$ curl -X POST -H “Content-Type: application/json” –data @/fgeudb/debezium/postgres-connector.json http://192.168.1.51:8083/connectors
输出示例如下:
{
“name”: “fgedu-postgres-connector”,
“config”: {…},
“tasks”: [],
“type”: “source”
}
6. Oracle Connector部署
Oracle Connector支持捕获Oracle数据库的变更事件,需要Oracle LogMiner或XStream支持。from:www.itpux.com
步骤1:配置Oracle数据库
$ sqlplus / as sysdba
# 开启归档模式
SQL> SHUTDOWN IMMEDIATE;
SQL> STARTUP MOUNT;
SQL> ALTER DATABASE ARCHIVELOG;
SQL> ALTER DATABASE OPEN;
# 开启补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SQL> ALTER SYSTEM SWITCH LOGFILE;
# 创建Debezium用户
SQL> CREATE USER debezium IDENTIFIED BY debezium123;
SQL> GRANT CONNECT, RESOURCE TO debezium;
SQL> GRANT SELECT ANY TABLE TO debezium;
SQL> GRANT SELECT ANY TRANSACTION TO debezium;
SQL> GRANT EXECUTE ON SYS.DBMS_LOGMNR TO debezium;
SQL> GRANT SELECT ON V_$LOGMNR_CONTENTS TO debezium;
SQL> GRANT SELECT ON V_$DATABASE TO debezium;
SQL> GRANT SELECT ON V_$LOG TO debezium;
SQL> GRANT SELECT ON V_$LOGFILE TO debezium;
SQL> GRANT SELECT ON V_$ARCHIVED_LOG TO debezium;
# 验证配置
SQL> SELECT NAME, LOG_MODE FROM V$DATABASE;
输出示例如下:
NAME LOG_MODE
——— ————
ARCHIVELOG
步骤2:创建Oracle连接器
$ vi /fgeudb/debezium/oracle-connector.json
{
“name”: “fgedu-oracle-connector”,
“config”: {
“connector.class”: “io.debezium.connector.oracle.OracleConnector”,
“database.hostname”: “192.168.1.51”,
“database.port”: “1521”,
“database.user”: “debezium”,
“database.password”: “debezium123”,
“database.dbname”: “”,
“topic.prefix”: “fgedu_oracle”,
“schema.include.list”: “”,
“table.include.list”: “.ORDERS,.USERS”,
“database.connection.adapter”: “logminer”,
“log.mining.strategy”: “online_catalog”,
“log.mining.archive.log.hours”: 24,
“log.mining.archive.log.only.mode”: “false”,
“snapshot.mode”: “initial”,
“decimal.handling.mode”: “double”,
“lob.enabled”: “false”,
“unified.value.schemas”: “true”
}
}
# 注册连接器
$ curl -X POST -H “Content-Type: application/json” –data @/fgeudb/debezium/oracle-connector.json http://192.168.1.51:8083/connectors
7. Debezium监控运维
Debezium提供完善的监控指标和运维接口。
步骤1:查看连接器状态
$ curl -s http://192.168.1.51:8083/connectors | jq
输出示例如下:
[
“fgedu-mysql-connector”,
“fgedu-postgres-connector”,
“fgedu-oracle-connector”
]
# 查看连接器配置
$ curl -s http://192.168.1.51:8083/connectors/fgedu-mysql-connector/config | jq
# 查看连接器任务状态
$ curl -s http://192.168.1.51:8083/connectors/fgedu-mysql-connector/tasks | jq
输出示例如下:
[
{
“id”: {
“connector”: “fgedu-mysql-connector”,
“task”: 0
},
“state”: “RUNNING”,
“worker_id”: “192.168.1.51:8083”
}
]
步骤2:管理连接器
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedu-mysql-connector/pause
# 恢复连接器
$ curl -X PUT http://192.168.1.51:8083/connectors/fgedu-mysql-connector/resume
# 重启连接器
$ curl -X POST http://192.168.1.51:8083/connectors/fgedu-mysql-connector/restart
# 重启特定任务
$ curl -X POST http://192.168.1.51:8083/connectors/fgedu-mysql-connector/tasks/0/restart
# 删除连接器
$ curl -X DELETE http://192.168.1.51:8083/connectors/fgedu-mysql-connector
# 更新连接器配置
$ curl -X PUT -H “Content-Type: application/json” –data ‘{“config”:{…}}’ http://192.168.1.51:8083/connectors/fgedu-mysql-connector/config
步骤3:监控JMX指标
$ export KAFKA_JMX_PORT=9999
$ /fgeudb/kafka/bin/connect-distributed.sh -daemon /fgeudb/kafka/config/connect-distributed.properties
# 使用JConsole或JVisualVM连接
# JMX URL: service:jmx:rmi:///jndi/rmi://192.168.1.51:9999/jmxrmi
# 关键监控指标
# debezium.metrics:type=connector-metrics,context=source,server=fgedu_mysql
# – TotalNumberOfEventsSeen
# – NumberOfErroneousEvents
# – MillisecondsSinceLastEvent
# – SourceEventPosition
# – Connected
步骤4:配置Prometheus监控
$ wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar
# 配置Kafka Connect启动参数
$ export KAFKA_OPTS=”-javaagent:/fgeudb/software/jmx_prometheus_javaagent-0.20.0.jar=8084:/fgeudb/config/jmx_exporter.yaml”
# jmx_exporter.yaml配置
$ vi /fgeudb/config/jmx_exporter.yaml
startDelaySeconds: 0
hostPort: 192.168.1.51:9999
lowercaseOutputName: true
lowercaseOutputLabelNames: true
# 访问Prometheus指标
$ curl http://192.168.1.51:8084/metrics | grep debezium
输出示例如下:
debezium_metrics_total_number_of_events_seen{context=”source”,server=”fgedu_mysql”} 12345.0
debezium_metrics_number_of_erroneous_events{context=”source”,server=”fgedu_mysql”} 0.0
debezium_metrics_connected{context=”source”,server=”fgedu_mysql”} 1.0
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
