1. 首页 > 软件安装教程 > 正文

Flink CDC安装配置-Flink CDC数据同步安装配置_升级迁移详细过程

1. Flink CDC概述与环境规划

Flink CDC(Change Data Capture)是Apache Flink的一个连接器,用于捕获数据库的变更数据。Flink CDC支持多种数据库,包括MySQL、PostgreSQL、Oracle、MongoDB、SQL Server等,可以实现实时数据同步、数据湖构建、实时数仓等场景。更多学习教程www.fgedu.net.cn

1.1 Flink CDC版本说明

Flink CDC目前主要版本为3.x系列,本教程以Flink CDC 3.1为例进行详细讲解。Flink CDC 3.x版本相比2.x版本在性能、稳定性和功能方面都有显著提升,支持更多的数据库类型和更灵活的同步配置。

# 查看Flink版本
$ flink –version
Version: 1.18.0, Commit ID: 1a2b3c4d

# 查看Flink CDC版本
$ ls /data/flink/lib/ | grep cdc
flink-connector-mysql-cdc-3.1.0.jar
flink-connector-postgres-cdc-3.1.0.jar

# 查看Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)

# 查看系统信息
$ cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″

1.2 环境规划

本次安装环境规划如下:

Flink CDC集群节点:
节点1:flink01.fgedu.net.cn (192.168.1.51) – JobManager + TaskManager
节点2:flink02.fgedu.net.cn (192.168.1.52) – TaskManager
节点3:flink03.fgedu.net.cn (192.168.1.53) – TaskManager

Flink版本:1.18.0
Flink CDC版本:3.1.0
Java版本:OpenJDK 11
安装目录:/data/flink
日志目录:/data/flink/log

数据源:
MySQL:192.168.1.51:3306
PostgreSQL:192.168.1.51:5432

数据目标:
Kafka:192.168.1.51:9092
Elasticsearch:192.168.1.51:9200
HDFS:hdfs://192.168.1.51:8020

2. 硬件环境要求

Flink CDC对硬件资源要求较高,以下是生产环境的硬件配置建议。学习交流加群风哥微信: itpux-com

2.1 最低硬件要求

# 检查内存大小
# free -h
total used free shared buff/cache available
Mem: 64G 8.5G 52G 2.0G 3.5G 53G
Swap: 32G 0B 32G

# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 15G 36G 30% /
/dev/sdb1 500G 50G 450G 10% /data

# 检查CPU核心数
# nproc
32

# 检查系统架构
# uname -m
x86_64

生产环境建议:最小内存16GB(测试环境),生产环境建议64GB以上。CPU建议16核心以上。磁盘空间建议500GB以上,用于存储检查点和日志。网络建议万兆网卡,确保数据传输效率。

2.2 网络配置要求

# 检查网络配置
# ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
inet 192.168.1.51/24 brd 192.168.1.255 scope global eth0

# 检查主机名解析
# hostname -f
flink01.fgedu.net.cn

# 配置/etc/hosts
# cat /etc/hosts
127.0.0.1 localhost
192.168.1.51 flink01.fgedu.net.cn flink01
192.168.1.52 flink02.fgedu.net.cn flink02
192.168.1.53 flink03.fgedu.net.cn flink03

# 配置SSH免密登录
# ssh-keygen -t rsa
# ssh-copy-id root@flink02.fgedu.net.cn
# ssh-copy-id root@flink03.fgedu.net.cn

3. 操作系统配置

Flink CDC运行在Linux操作系统上,需要对系统进行一系列配置。学习交流加群风哥QQ113257174

3.1 关闭防火墙和SELinux

# 检查SELinux状态
# getenforce
Disabled

# 关闭SELinux(如未关闭)
# vi /etc/selinux/config
SELINUX=disabled

# 检查防火墙状态
# systemctl status firewalld

# 关闭防火墙(生产环境建议开放特定端口)
# systemctl stop firewalld
# systemctl disable firewalld

# 或者开放Flink所需端口
# firewall-cmd –permanent –add-port=8081/tcp
# firewall-cmd –permanent –add-port=6123/tcp
# firewall-cmd –permanent –add-port=6124/tcp
# firewall-cmd –reload

3.2 系统参数优化

# 配置内核参数
# vi /etc/sysctl.conf

# 添加以下参数
fs.file-max = 6815744
vm.swappiness = 10
vm.max_map_count = 262144
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 65536
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30

# 使配置生效
# sysctl -p

# 配置文件描述符限制
# vi /etc/security/limits.conf

# 添加以下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536

# 验证配置
# ulimit -n
65536

4. Java环境配置

Flink基于Java开发,需要配置合适的Java环境。更多学习教程公众号风哥教程itpux_com

4.1 安装OpenJDK 11

# 安装OpenJDK 11
# yum install -y java-11-openjdk java-11-openjdk-devel

# 验证Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)

# 配置JAVA_HOME环境变量
# vi /etc/profile.d/java.sh

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH

# 使环境变量生效
# source /etc/profile.d/java.sh

# 验证JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk

4.2 安装ZooKeeper

# 下载ZooKeeper
# cd /tmp
# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz

# 解压安装
# tar -xzf apache-zookeeper-3.8.3-bin.tar.gz
# mv apache-zookeeper-3.8.3-bin /data/zookeeper

# 配置ZooKeeper
# vi /data/zookeeper/conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=flink01.fgedu.net.cn:2888:3888
server.2=flink02.fgedu.net.cn:2888:3888
server.3=flink03.fgedu.net.cn:2888:3888

# 创建数据目录
# mkdir -p /data/zookeeper/{data,logs}

# 创建myid文件
# echo “1” > /data/zookeeper/data/myid

# 配置环境变量
# vi /etc/profile.d/zookeeper.sh

export ZOOKEEPER_HOME=/data/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# 使环境变量生效
# source /etc/profile.d/zookeeper.sh

# 启动ZooKeeper
# zkServer.sh start

# 输出示例:
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper … STARTED

# 检查ZooKeeper状态
# zkServer.sh status

# 输出示例:
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

Flink CDC依赖Flink集群运行,需要先安装Flink集群。from:www.itpux.com

5.1 下载安装Flink

# 下载Flink
# cd /tmp
# wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz

# 输出示例:
–2024-04-05 10:00:00– https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
Resolving archive.apache.org… 163.172.17.199
Connecting to archive.apache.org|163.172.17.199|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 489567890 (467M) [application/x-gzip]
Saving to: ‘flink-1.18.0-bin-scala_2.12.tgz’

flink-1.18.0-bin-scala_2.12.tgz 100%[=================================================>] 467.11M 25.6MB/s in 18s

2024-04-05 10:00:18 (25.6 MB/s) – ‘flink-1.18.0-bin-scala_2.12.tgz’ saved [489567890/489567890]

# 解压安装
# tar -xzf flink-1.18.0-bin-scala_2.12.tgz
# mv flink-1.18.0 /data/flink

# 配置环境变量
# vi /etc/profile.d/flink.sh

export FLINK_HOME=/data/flink
export PATH=$FLINK_HOME/bin:$PATH

# 使环境变量生效
# source /etc/profile.d/flink.sh

# 验证安装
$ flink –version
Version: 1.18.0, Commit ID: 1a2b3c4d

5.2 配置Flink集群

# 配置Flink
# vi /data/flink/conf/flink-conf.yaml

# JobManager配置
jobmanager.rpc.address: flink01.fgedu.net.cn
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4096m
jobmanager.execution.failover-strategy: region

# TaskManager配置
taskmanager.memory.process.size: 16384m
taskmanager.memory.flink.size: 12288m
taskmanager.memory.task.heap.size: 8192m
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 8

# 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: flink01.fgedu.net.cn:2181,flink02.fgedu.net.cn:2181,flink03.fgedu.net.cn:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /flink_cluster
high-availability.storageDir: hdfs://192.168.1.51:8020/flink/ha

# 状态后端配置
state.backend: rocksdb
state.checkpoints.dir: hdfs://192.168.1.51:8020/flink/checkpoints
state.savepoints.dir: hdfs://192.168.1.51:8020/flink/savepoints
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1

# Web UI配置
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
web.submit.enable: true
web.cancel.enable: true

# 并行度配置
parallelism.default: 4

# 配置masters文件
# vi /data/flink/conf/masters

flink01.fgedu.net.cn:8081

# 配置workers文件
# vi /data/flink/conf/workers

flink01.fgedu.net.cn
flink02.fgedu.net.cn
flink03.fgedu.net.cn

5.3 启动Flink集群

# 分发Flink到其他节点
# scp -r /data/flink root@flink02.fgedu.net.cn:/data/
# scp -r /data/flink root@flink03.fgedu.net.cn:/data/

# 分发配置文件
# scp /etc/profile.d/flink.sh root@flink02.fgedu.net.cn:/etc/profile.d/
# scp /etc/profile.d/flink.sh root@flink03.fgedu.net.cn:/etc/profile.d/

# 在各节点使环境变量生效
# source /etc/profile.d/flink.sh

# 启动Flink集群
# start-cluster.sh

# 输出示例:
Starting HA cluster with 1 masters.
Starting standalonedemon on host flink01.fgedu.net.cn
Starting standalonedemon on host flink02.fgedu.net.cn
Starting standalonedemon on host flink03.fgedu.net.cn

# 检查Flink进程
$ jps

# 输出示例:
12345 StandaloneSessionClusterEntrypoint
12567 TaskManagerRunner
12678 Jps

# 访问Web UI
# 浏览器访问:http://192.168.1.51:8081

# 检查集群状态
$ flink list

# 输出示例:
Waiting for response…
No running jobs.
No scheduled jobs.

完成Flink集群安装后,开始安装Flink CDC连接器。

6.1 下载Flink CDC连接器

# 创建lib目录
# mkdir -p /data/flink/lib/cdc

# 下载MySQL CDC连接器
# cd /data/flink/lib/cdc
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/3.1.0/flink-connector-mysql-cdc-3.1.0.jar

# 输出示例:
–2024-04-05 10:00:00– https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/3.1.0/flink-connector-mysql-3.1.0.jar
Resolving repo1.maven.org… 199.232.196.209
Connecting to repo1.maven.org|199.232.196.209|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 45678901 (44M) [application/java-archive]
Saving to: ‘flink-connector-mysql-cdc-3.1.0.jar’

flink-connector-mysql-cdc-3.1.0.jar 100%[=================================================>] 43.56M 15.6MB/s in 2.8s

2024-04-05 10:00:03 (15.6 MB/s) – ‘flink-connector-mysql-cdc-3.1.0.jar’ saved [45678901/45678901]

# 下载PostgreSQL CDC连接器
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-postgres-cdc/3.1.0/flink-connector-postgres-cdc-3.1.0.jar

# 下载Oracle CDC连接器
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-oracle-cdc/3.1.0/flink-connector-oracle-cdc-3.1.0.jar

# 下载MongoDB CDC连接器
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mongodb-cdc/3.1.0/flink-connector-mongodb-cdc-3.1.0.jar

# 复制到Flink lib目录
# cp /data/flink/lib/cdc/*.jar /data/flink/lib/

# 验证安装
$ ls -lh /data/flink/lib/*.jar | grep cdc

# 输出示例:
-rw-r–r– 1 root root 44M Apr 5 10:00 flink-connector-mysql-cdc-3.1.0.jar
-rw-r–r– 1 root root 38M Apr 5 10:00 flink-connector-postgres-cdc-3.1.0.jar
-rw-r–r– 1 root root 52M Apr 5 10:00 flink-connector-oracle-cdc-3.1.0.jar
-rw-r–r– 1 root root 35M Apr 5 10:00 flink-connector-mongodb-cdc-3.1.0.jar

6.2 安装依赖JAR包

# 下载MySQL JDBC驱动
# cd /data/flink/lib
# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.33.jar

# 下载PostgreSQL JDBC驱动
# wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.1/postgresql-42.7.1.jar

# 下载Kafka连接器
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.0.2-1.18/flink-connector-kafka-3.0.2-1.18.jar

# 下载Elasticsearch连接器
# wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch7/3.0.2-1.18/flink-connector-elasticsearch7-3.0.2-1.18.jar

# 验证安装
$ ls -lh /data/flink/lib/*.jar | head -20

# 输出示例:
-rw-r–r– 1 root root 44M Apr 5 10:00 flink-connector-mysql-cdc-3.1.0.jar
-rw-r–r– 1 root root 38M Apr 5 10:00 flink-connector-postgres-cdc-3.1.0.jar
-rw-r–r– 1 root root 2.4M Apr 5 10:00 mysql-connector-java-8.0.33.jar
-rw-r–r– 1 root root 1.0M Apr 5 10:00 postgresql-42.7.1.jar
-rw-r–r– 1 root root 15M Apr 5 10:00 flink-connector-kafka-3.0.2-1.18.jar

# 分发到其他节点
# scp /data/flink/lib/*.jar root@flink02.fgedu.net.cn:/data/flink/lib/
# scp /data/flink/lib/*.jar root@flink03.fgedu.net.cn:/data/flink/lib/

# 重启Flink集群
# stop-cluster.sh
# start-cluster.sh

Flink CDC的参数配置对数据同步性能和稳定性至关重要。

7.1 MySQL CDC配置

# 创建MySQL CDC配置文件
# vi /data/flink/conf/mysql-cdc.properties

# MySQL源配置
mysql.hostname = 192.168.1.51
mysql.port = 3306
mysql.username = fgedu
mysql.password = fgedu_mysql_2024
mysql.database = fgedudb
mysql.table = fgedu_employees,fgedu_departments,fgedu_sales
mysql.server.id = 5400
mysql.server.timezone = Asia/Shanghai

# CDC配置
mysql.snapshot.mode = initial
mysql.scan.incremental.snapshot.enabled = true
mysql.scan.incremental.snapshot.chunk.size = 8096
mysql.scan.snapshot.fetch.size = 1024

# 连接池配置
mysql.connect.timeout = 30s
mysql.connect.max-retries = 3

# 输出配置
sink.type = kafka
sink.topic = mysql-cdc-fgedudb
sink.bootstrap.servers = 192.168.1.51:9092
sink.format = json
sink.json.timestamp-format.standard = ISO_8601

7.2 PostgreSQL CDC配置

# 创建PostgreSQL CDC配置文件
# vi /data/flink/conf/postgres-cdc.properties

# PostgreSQL源配置
postgres.hostname = 192.168.1.51
postgres.port = 5432
postgres.username = fgedu
postgres.password = fgedu_postgres_2024
postgres.database = fgedudb
postgres.schema = public
postgres.table = fgedu_employees,fgedu_departments,fgedu_sales
postgres.slot.name = flink_cdc_slot

# CDC配置
postgres.decoding.plugin.name = pgoutput
postgres.snapshot.mode = initial

# 连接配置
postgres.connect.timeout = 30s
postgres.connect.max-retries = 3

# 输出配置
sink.type = kafka
sink.topic = postgres-cdc-fgedudb
sink.bootstrap.servers = 192.168.1.51:9092
sink.format = json

风哥提示:MySQL CDC需要开启binlog并配置binlog_format为ROW模式;PostgreSQL CDC需要配置wal_level为logical;建议为每个CDC任务配置唯一的server.id或slot.name避免冲突。

配置完成后,启动Flink CDC数据同步任务。

8.1 准备源数据库

# 配置MySQL binlog
# vi /etc/my.cnf

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
gtid_mode = ON
enforce_gtid_consistency = ON

# 重启MySQL
# systemctl restart mysqld

# 验证binlog配置
$ mysql -u root -p -e “SHOW VARIABLES LIKE ‘binlog_format’;”

# 输出示例:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| binlog_format | ROW |
+—————+——-+

# 创建CDC用户
$ mysql -u root -p

mysql> CREATE USER ‘fgedu’@’%’ IDENTIFIED BY ‘fgedu_mysql_2024’;
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘fgedu’@’%’;
mysql> FLUSH PRIVILEGES;

# 创建测试表
mysql> USE fgedudb;
mysql> CREATE TABLE fgedu_employees (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
department VARCHAR(50),
salary DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
mysql> INSERT INTO fgedu_employees (name, department, salary) VALUES
(‘张三’, ‘技术部’, 15000.00),
(‘李四’, ‘销售部’, 12000.00),
(‘王五’, ‘财务部’, 13000.00);

8.2 创建CDC同步任务

# 创建MySQL CDC同步任务
# vi /data/flink/jobs/mysql-cdc-sync.sql

— 创建MySQL CDC源表
CREATE TABLE mysql_cdc_source (
id INT,
name STRING,
department STRING,
salary DECIMAL(10,2),
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘192.168.1.51’,
‘port’ = ‘3306’,
‘username’ = ‘fgedu’,
‘password’ = ‘fgedu_mysql_2024’,
‘database-name’ = ‘fgedudb’,
‘table-name’ = ‘fgedu_employees’,
‘server-time-zone’ = ‘Asia/Shanghai’,
‘scan.incremental.snapshot.enabled’ = ‘true’,
‘scan.incremental.snapshot.chunk.size’ = ‘8096’
);

— 创建Kafka目标表
CREATE TABLE kafka_sink (
id INT,
name STRING,
department STRING,
salary DECIMAL(10,2),
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘mysql-cdc-fgedudb’,
‘properties.bootstrap.servers’ = ‘192.168.1.51:9092’,
‘format’ = ‘json’,
‘json.timestamp-format.standard’ = ‘ISO_8601’
);

— 执行同步
INSERT INTO kafka_sink SELECT * FROM mysql_cdc_source;

# 提交任务到Flink
$ flink run -d /data/flink/lib/flink-connector-mysql-cdc-3.1.0.jar -j /data/flink/jobs/mysql-cdc-sync.sql

# 输出示例:
Job has been submitted with JobID 1a2b3c4d5e6f7g8h9i0j

# 查看任务状态
$ flink list

# 输出示例:
Waiting for response…
—————— Running/Restarting Jobs ——————-
01.04.2024 10:00:00 : job_id (RUNNING) : MySQL CDC Sync Job
————————————————————–
No scheduled jobs.

8.3 验证数据同步

# 消费Kafka数据验证
$ kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic mysql-cdc-fgedudb –from-beginning

# 输出示例:
{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:15000.00,”created_at”:”2024-04-05T10:00:00″}
{“id”:2,”name”:”李四”,”department”:”销售部”,”salary”:12000.00,”created_at”:”2024-04-05T10:00:00″}
{“id”:3,”name”:”王五”,”department”:”财务部”,”salary”:13000.00,”created_at”:”2024-04-05T10:00:00″}

# 在MySQL中插入新数据
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘赵六’, ‘市场部’, 14000.00);”

# 再次消费Kafka数据验证增量同步
$ kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic mysql-cdc-fgedudb

# 输出示例:
{“id”:4,”name”:”赵六”,”department”:”市场部”,”salary”:14000.00,”created_at”:”2024-04-05T10:05:00″}

# 更新数据验证
$ mysql -u fgedu -p -e “UPDATE fgedudb.fgedu_employees SET salary = 16000.00 WHERE id = 1;”

# Kafka输出示例:
{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:16000.00,”created_at”:”2024-04-05T10:00:00″}

# 删除数据验证
$ mysql -u fgedu -p -e “DELETE FROM fgedudb.fgedu_employees WHERE id = 3;”

# Kafka输出示例:
{“id”:3,”name”:”王五”,”department”:”财务部”,”salary”:13000.00,”created_at”:”2024-04-05T10:00:00″,”op”:”d”}

完成安装后,需要进行功能测试验证Flink CDC是否正常工作。

9.1 全量同步测试

# 创建测试表并插入数据
$ mysql -u fgedu -p

mysql> USE fgedudb;
mysql> CREATE TABLE fgedu_test_full (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
value INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

mysql> INSERT INTO fgedu_test_full (name, value) VALUES
(‘test1’, 100),
(‘test2’, 200),
(‘test3’, 300),
(‘test4’, 400),
(‘test5’, 500);

# 创建同步任务
# vi /data/flink/jobs/test-full-sync.sql

CREATE TABLE test_full_source (
id INT,
name STRING,
value INT,
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘192.168.1.51’,
‘port’ = ‘3306’,
‘username’ = ‘fgedu’,
‘password’ = ‘fgedu_mysql_2024’,
‘database-name’ = ‘fgedudb’,
‘table-name’ = ‘fgedu_test_full’
);

CREATE TABLE test_full_sink (
id INT,
name STRING,
value INT,
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘print’
);

INSERT INTO test_full_sink SELECT * FROM test_full_source;

# 提交任务
$ flink run -d /data/flink/jobs/test-full-sync.sql

# 查看任务日志
$ tail -f /data/flink/log/flink-*-taskexecutor-*.log | grep test_full

# 输出示例:
test_full_sink: +I[1, test1, 100, 2024-04-05T10:00:00]
test_full_sink: +I[2, test2, 200, 2024-04-05T10:00:00]
test_full_sink: +I[3, test3, 300, 2024-04-05T10:00:00]
test_full_sink: +I[4, test4, 400, 2024-04-05T10:00:00]
test_full_sink: +I[5, test5, 500, 2024-04-05T10:00:00]

9.2 增量同步测试

# 插入新数据
mysql> INSERT INTO fgedu_test_full (name, value) VALUES (‘test6’, 600);

# 查看任务日志
$ tail -f /data/flink/log/flink-*-taskexecutor-*.log | grep test_full

# 输出示例:
test_full_sink: +I[6, test6, 600, 2024-04-05T10:05:00]

# 更新数据
mysql> UPDATE fgedu_test_full SET value = 150 WHERE id = 1;

# 输出示例:
test_full_sink: -U[1, test1, 100, 2024-04-05T10:00:00]
test_full_sink: +U[1, test1, 150, 2024-04-05T10:00:00]

# 删除数据
mysql> DELETE FROM fgedu_test_full WHERE id = 5;

# 输出示例:
test_full_sink: -D[5, test5, 500, 2024-04-05T10:00:00]

9.3 断点续传测试

# 查看当前任务状态
$ flink list

# 输出示例:
—————— Running/Restarting Jobs ——————-
01.04.2024 10:00:00 : abc123def456 (RUNNING) : test-full-sync
————————————————————–

# 创建savepoint
$ flink savepoint abc123def456 /data/flink/savepoints/test-full-sync

# 输出示例:
Triggering savepoint for job abc123def456.
Waiting for response…
Savepoint completed. Path: file:///data/flink/savepoints/test-full-sync/savepoint-abc123-1234567890

# 停止任务
$ flink stop abc123def456

# 输出示例:
Suspending job abc123def456 with a savepoint.

# 在MySQL中插入数据(任务停止期间)
mysql> INSERT INTO fgedu_test_full (name, value) VALUES (‘test7’, 700);
mysql> INSERT INTO fgedu_test_full (name, value) VALUES (‘test8’, 800);

# 从savepoint恢复任务
$ flink run -s /data/flink/savepoints/test-full-sync/savepoint-abc123-1234567890 -d /data/flink/jobs/test-full-sync.sql

# 输出示例:
Job has been submitted with JobID xyz789abc123

# 查看任务日志验证数据同步
$ tail -f /data/flink/log/flink-*-taskexecutor-*.log | grep test_full

# 输出示例:
test_full_sink: +I[7, test7, 700, 2024-04-05T10:10:00]
test_full_sink: +I[8, test8, 800, 2024-04-05T10:10:00]

Flink CDC性能优化涉及多个方面,包括并行度配置、内存配置、检查点优化等。

10.1 并行度优化

# 修改Flink配置
# vi /data/flink/conf/flink-conf.yaml

# 增加TaskManager slots
taskmanager.numberOfTaskSlots: 16

# 增加默认并行度
parallelism.default: 8

# 在SQL中设置并行度
# vi /data/flink/jobs/mysql-cdc-sync-optimized.sql

— 设置作业配置
SET ‘execution.parallelism’ = ‘8’;
SET ‘execution.checkpointing.interval’ = ’30s’;
SET ‘execution.checkpointing.mode’ = ‘EXACTLY_ONCE’;
SET ‘execution.checkpointing.timeout’ = ’10min’;
SET ‘state.backend’ = ‘rocksdb’;
SET ‘state.backend.incremental’ = ‘true’;

— 创建源表(增加并行读取配置)
CREATE TABLE mysql_cdc_source (
id INT,
name STRING,
department STRING,
salary DECIMAL(10,2),
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘192.168.1.51’,
‘port’ = ‘3306’,
‘username’ = ‘fgedu’,
‘password’ = ‘fgedu_mysql_2024’,
‘database-name’ = ‘fgedudb’,
‘table-name’ = ‘fgedu_employees’,
‘scan.incremental.snapshot.enabled’ = ‘true’,
‘scan.incremental.snapshot.chunk.size’ = ‘8096’,
‘scan.snapshot.fetch.size’ = ‘2048’
);

— 创建目标表
CREATE TABLE kafka_sink (
id INT,
name STRING,
department STRING,
salary DECIMAL(10,2),
created_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘mysql-cdc-fgedudb’,
‘properties.bootstrap.servers’ = ‘192.168.1.51:9092’,
‘format’ = ‘json’,
‘sink.partitioner’ = ‘fixed’
);

— 执行同步
INSERT INTO kafka_sink SELECT * FROM mysql_cdc_source;

10.2 内存优化

# 修改TaskManager内存配置
# vi /data/flink/conf/flink-conf.yaml

# TaskManager内存配置
taskmanager.memory.process.size: 32768m
taskmanager.memory.flink.size: 24576m
taskmanager.memory.task.heap.size: 16384m
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.15
taskmanager.memory.network.max: 2048m
taskmanager.memory.network.min: 512m

# JVM配置
env.java.opts.taskmanager: “-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails -XX:+PrintGCDateStamps”

# RocksDB状态后端配置
state.backend.rocksdb.localdir: /data/flink/rocksdb
state.backend.rocksdb.block.cache-size: 512m
state.backend.rocksdb.write.buffer.size: 64m
state.backend.rocksdb.write.batch.size: 2m

# 重启Flink集群
# stop-cluster.sh
# start-cluster.sh

10.3 检查点优化

# 配置检查点参数
# vi /data/flink/conf/flink-conf.yaml

# 检查点配置
execution.checkpointing.interval: 30000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 500
execution.checkpointing.tolerable-failure-number: 3
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

# 状态后端配置
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs://192.168.1.51:8020/flink/checkpoints
state.savepoints.dir: hdfs://192.168.1.51:8020/flink/savepoints

# 本地恢复
state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/local-recovery

# 重启策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
restart-strategy.fixed-delay.delay: 30s

# 在SQL中配置检查点
SET ‘execution.checkpointing.interval’ = ’30s’;
SET ‘execution.checkpointing.mode’ = ‘EXACTLY_ONCE’;
SET ‘execution.checkpointing.timeout’ = ’10min’;
SET ‘execution.checkpointing.min-pause’ = ‘500ms’;
SET ‘execution.checkpointing.max-concurrent-checkpoints’ = ‘1’;
SET ‘execution.checkpointing.externalized-checkpoint-retention’ = ‘RETAIN_ON_CANCELLATION’;
SET ‘state.backend’ = ‘rocksdb’;
SET ‘state.backend.incremental’ = ‘true’;

生产环境建议:并行度建议设置为CPU核心数的1-2倍;使用RocksDB状态后端支持大状态;启用增量检查点减少检查点时间;配置合理的检查点间隔避免影响性能;使用HDFS存储检查点保证可靠性。

Flink CDC升级需要谨慎操作,确保数据安全和业务连续性。

11.1 升级前准备

# 检查当前版本
$ flink –version
Version: 1.17.0, Commit ID: 1a2b3c4d

# 查看运行中的任务
$ flink list

# 输出示例:
—————— Running/Restarting Jobs ——————-
01.04.2024 10:00:00 : abc123def456 (RUNNING) : MySQL CDC Sync Job
————————————————————–

# 创建savepoint
$ flink savepoint abc123def456 /data/flink/savepoints/upgrade-backup

# 输出示例:
Triggering savepoint for job abc123def456.
Waiting for response…
Savepoint completed. Path: file:///data/flink/savepoints/upgrade-backup/savepoint-abc123-1234567890

# 停止所有任务
$ flink stop abc123def456

# 备份配置文件
# cp -r /data/flink/conf /backup/flink_conf_$(date +%Y%m%d)

# 备份lib目录
# tar -czf /backup/flink_lib_$(date +%Y%m%d).tar.gz /data/flink/lib

# 停止Flink集群
# stop-cluster.sh

11.2 执行升级操作

# 下载新版本Flink
# cd /tmp
# wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz

# 解压安装
# tar -xzf flink-1.18.0-bin-scala_2.12.tgz

# 备份旧版本
# mv /data/flink /data/flink_1.17.0

# 安装新版本
# mv flink-1.18.0 /data/flink

# 恢复配置文件
# cp -r /backup/flink_conf_$(date +%Y%m%d)/* /data/flink/conf/

# 恢复lib目录
# tar -xzf /backup/flink_lib_$(date +%Y%m%d).tar.gz -C /

# 下载新版本Flink CDC连接器
# cd /data/flink/lib
# rm -f flink-connector-*-cdc-2.*.jar
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/3.1.0/flink-connector-mysql-cdc-3.1.0.jar
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-postgres-cdc/3.1.0/flink-connector-postgres-cdc-3.1.0.jar

# 验证版本
$ flink –version
Version: 1.18.0, Commit ID: 1a2b3c4d

# 启动Flink集群
# start-cluster.sh

11.3 升级后验证

# 检查Flink集群状态
$ jps

# 输出示例:
12345 StandaloneSessionClusterEntrypoint
12567 TaskManagerRunner

# 访问Web UI验证
# 浏览器访问:http://192.168.1.51:8081

# 从savepoint恢复任务
$ flink run -s /data/flink/savepoints/upgrade-backup/savepoint-abc123-1234567890 -d /data/flink/jobs/mysql-cdc-sync.sql

# 输出示例:
Job has been submitted with JobID xyz789abc123

# 验证数据同步
$ kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic mysql-cdc-fgedudb –from-beginning –max-messages 10

# 输出示例:
{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:16000.00,”created_at”:”2024-04-05T10:00:00″}
{“id”:2,”name”:”李四”,”department”:”销售部”,”salary”:12000.00,”created_at”:”2024-04-05T10:00:00″}

# 在MySQL中插入测试数据
mysql> INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘测试升级’, ‘测试部’, 10000.00);

# 验证增量同步
$ kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic mysql-cdc-fgedudb

# 输出示例:
{“id”:9,”name”:”测试升级”,”department”:”测试部”,”salary”:10000.00,”created_at”:”2024-04-05T10:30:00″}

Flink CDC的监控运维包括服务状态监控、日志管理、性能监控等。

12.1 服务状态监控

# 检查Flink集群状态
$ flink list

# 输出示例:
Waiting for response…
—————— Running/Restarting Jobs ——————-
01.04.2024 10:00:00 : abc123def456 (RUNNING) : MySQL CDC Sync Job
————————————————————–
No scheduled jobs.

# 检查TaskManager状态
$ curl http://192.168.1.51:8081/taskmanagers

# 输出示例:
{“taskmanagers”:[{“id”:”container_12345″,”path”:”akka.tcp://flink@flink01.fgedu.net.cn:6122/user/taskmanager_0″,”dataPort”:6125,”timeSinceLastHeartbeat”:1712289600000,”slotsNumber”:8,”freeSlots”:4,”hardware”:{“cpuCores”:32,”physicalMemory”:68719476736,”freeMemory”:53687091200,”managedMemory”:17179869184}}]}

# 检查作业详情
$ curl http://192.168.1.51:8081/jobs/abc123def456

# 输出示例:
{“jid”:”abc123def456″,”name”:”MySQL CDC Sync Job”,”state”:”RUNNING”,”start-time”:1712289600000,”end-time”:-1,”duration”:3600000,”vertices”:[{“id”:”vertex1″,”name”:”Source: MySQL CDC Source”,”parallelism”:8,”status”:”RUNNING”}]}

# 检查检查点状态
$ curl http://192.168.1.51:8081/jobs/abc123def456/checkpoints

# 输出示例:
{“counts”:{“restored”:0,”total”:120,”in_progress”:1,”completed”:118,”failed”:1},”summary”:{“state_size”:{“min”:1024000,”max”:2048000,”avg”:1536000},”duration”:{“min”:500,”max”:2000,”avg”:1000}}}

12.2 日志管理

# 查看JobManager日志
$ tail -100 /data/flink/log/flink-*-standalonesession-*.log

# 输出示例:
2024-04-05 10:00:00,000 INFO org.apache.flink.runtime.jobmaster.JobMaster – Received job submission (abc123def456).
2024-04-05 10:00:00,100 INFO org.apache.flink.runtime.jobmaster.JobMaster – Starting execution of job ‘MySQL CDC Sync Job’ (abc123def456).
2024-04-05 10:00:00,200 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph – Job MySQL CDC Sync Job (abc123def456) switched from state CREATED to RUNNING.

# 查看TaskManager日志
$ tail -100 /data/flink/log/flink-*-taskexecutor-*.log

# 输出示例:
2024-04-05 10:00:00,000 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor – Received task MySQL CDC Source.
2024-04-05 10:00:00,100 INFO org.apache.flink.streaming.connectors.cdc.MySQLSource – MySQL CDC source started.
2024-04-05 10:00:00,200 INFO org.apache.flink.streaming.connectors.cdc.MySQLSource – Snapshot phase completed, switching to binlog phase.

# 查看错误日志
$ grep -i error /data/flink/log/flink-*.log | tail -20

# 配置日志轮转
# vi /etc/logrotate.d/flink

/data/flink/log/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 0644 root root
}

12.3 性能监控

# 监控系统资源使用
$ top -p $(pgrep -d’,’ java)

# 输出示例:
top – 10:00:00 up 1 day, 2:00, 2 users, load average: 2.50, 2.45, 2.40
Tasks: 3 total, 0 running, 3 sleeping, 0 stopped, 0 zombie
%Cpu(s): 15.0 us, 5.0 sy, 0.0 ni, 79.0 id, 1.0 wa, 0.0 hi, 0.0 si
MiB Mem : 65536.0 total, 52000.0 free, 12000.0 used, 1536.0 buff/cache
MiB Swap: 32768.0 total, 32768.0 free, 0.0 used. 52000.0 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12345 root 20 0 8192000 4000000 250000 S 25.0 6.1 0:30.00 java
12567 root 20 0 16384000 8000000 500000 S 30.0 12.2 1:00.00 java

# 监控JVM内存使用
$ jstat -gc $(pgrep -f StandaloneSessionClusterEntrypoint) 1000 5

# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750

# 监控网络流量
$ iftop -i eth0 -n

# 创建监控脚本
# vi /backup/scripts/flink_cdc_monitor.sh

#!/bin/bash

echo “=== Flink CDC Monitor ===”
echo “Date: $(date)”
echo “”

echo “=== Flink Cluster Status ===”
curl -s http://192.168.1.51:8081/overview
echo “”

echo “=== Running Jobs ===”
flink list
echo “”

echo “=== Memory Usage ===”
free -h
echo “”

echo “=== JVM Memory ===”
jstat -gc $(pgrep -f StandaloneSessionClusterEntrypoint) | tail -1
echo “”

echo “=== Recent Errors ===”
grep -i error /data/flink/log/flink-*.log | tail -5

生产环境建议:定期检查Flink集群状态和任务状态;监控系统资源使用情况,及时扩容;配置日志轮转避免磁盘空间不足;建立完善的告警机制;定期备份检查点和savepoint;监控数据同步延迟,确保实时性。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息