1. 首页 > 软件下载 > 正文

Flink CDC下载-Flink CDC数据同步工具下载地址-Flink CDC下载方法

1. Flink CDC简介与版本说明

Apache Flink CDC(Change Data Capture)是一个流式数据集成工具,用于实时捕获数据库变更数据。更多学习教程www.fgedu.net.cn。Flink CDC支持多种数据库,包括MySQL、PostgreSQL、Oracle、MongoDB等,可以将变更数据实时同步到各种目标系统。

Flink CDC基于Apache Flink构建,提供端到端的实时数据同步解决方案。学习交流加群风哥微信: itpux-com。它支持全量+增量同步、Schema Evolution、Exactly-Once语义等特性,是构建实时数据管道的理想选择。

Flink CDC核心特性:

– 实时捕获:实时捕获数据库变更数据
– 多数据源:支持MySQL、PostgreSQL、Oracle、MongoDB等
– 多目标端:支持Kafka、Iceberg、Paimon、StarRocks等
– 全量增量:支持全量+增量一体化同步
– Schema Evolution:支持表结构变更自动同步
– Exactly-Once:保证数据一致性
– 分布式:基于Flink的分布式处理能力
– 高可用:支持故障恢复和断点续传
– 易用性:提供YAML和SQL两种配置方式
– 可扩展:支持自定义连接器开发

Flink CDC架构组件:

组件 说明
Source Connector 数据源连接器,捕获变更数据
Pipeline 数据管道,定义同步规则
Transform 数据转换,处理数据格式
Sink Connector 目标连接器,写入数据
Schema Registry Schema管理,处理结构变更

2. Flink CDC版本选择与下载地址

Flink CDC需要与Apache Flink版本配合使用,当前最新稳定版本为3.6.0。

Flink CDC版本状态:

版本号 发布日期 说明
3.6.0 2026-03-30 最新稳定版
3.5.0 2025-09-26 稳定版
3.4.0 2025-05-16 稳定版
3.3.0 2025-XX-XX 旧版支持

Flink CDC 3.6.0主要更新:
– 支持Flink 1.20.x和2.2.x
– JDK版本升级到11
– 新增Oracle Source Pipeline连接器
– 新增Apache Hudi Sink Pipeline连接器
– PostgreSQL Schema Evolution支持
– Transform框架增强
– 性能优化和Bug修复

官方下载地址:

Flink CDC官网:https://flink.apache.org/
下载页面:https://flink.apache.org/downloads/
源码仓库:https://github.com/apache/flink-cdc
文档中心:https://nightlies.apache.org/flink/flink-cdc-docs-stable/

3. Flink CDC下载方式详解

方式一:下载二进制包(推荐)

下载Flink CDC 3.6.0(for Flink 2.2.x):
$ cd /fgeudb/software
$ wget https://archive.apache.org/dist/flink/flink-cdc-3.6.0/flink-cdc-3.6.0-2.2-bin.tar.gz

输出示例如下:
–2026-04-04 10:00:00– https://archive.apache.org/dist/flink/flink-cdc-3.6.0/flink-cdc-3.6.0-2.2-bin.tar.gz
Resolving archive.apache.org… 163.172.17.49
Connecting to archive.apache.org|163.172.17.49|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 123456789 (118M) [application/octet-stream]
Saving to: ‘flink-cdc-3.6.0-2.2-bin.tar.gz’

flink-cdc-3.6.0-2.2-bin.tar.gz 100%[======================================================================>] 117.74M 25.6MB/s in 4.6s

2026-04-04 10:00:05 (25.6 MB/s) – ‘flink-cdc-3.6.0-2.2-bin.tar.gz’ saved [123456789/123456789]

下载Flink CDC 3.6.0(for Flink 1.20.x):
$ wget https://archive.apache.org/dist/flink/flink-cdc-3.6.0/flink-cdc-3.6.0-1.20-bin.tar.gz

解压安装:
$ tar -zxvf flink-cdc-3.6.0-2.2-bin.tar.gz -C /fgeudb/
$ ln -s /fgeudb/flink-cdc-3.6.0-2.2 /fgeudb/flink-cdc

方式二:下载Pipeline连接器

下载MySQL Pipeline连接器:
$ cd /fgeudb/flink-cdc/lib
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.6.0-2.2/flink-cdc-pipeline-connector-mysql-3.6.0-2.2.jar

下载PostgreSQL Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-postgres/3.6.0-2.2/flink-cdc-pipeline-connector-postgres-3.6.0-2.2.jar

下载Kafka Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.6.0-2.2/flink-cdc-pipeline-connector-kafka-3.6.0-2.2.jar

下载Paimon Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.6.0-2.2/flink-cdc-pipeline-connector-paimon-3.6.0-2.2.jar

下载StarRocks Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.6.0-2.2/flink-cdc-pipeline-connector-starrocks-3.6.0-2.2.jar

下载Iceberg Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-iceberg/3.6.0-2.2/flink-cdc-pipeline-connector-iceberg-3.6.0-2.2.jar

下载Doris Pipeline连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.6.0-2.2/flink-cdc-pipeline-connector-doris-3.6.0-2.2.jar

方式三:下载Source连接器

下载MySQL Source连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.6.0/flink-sql-connector-mysql-cdc-3.6.0.jar

下载PostgreSQL Source连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.6.0/flink-sql-connector-postgres-cdc-3.6.0.jar

下载Oracle Source连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/3.6.0/flink-sql-connector-oracle-cdc-3.6.0.jar

下载MongoDB Source连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mongodb-cdc/3.6.0/flink-sql-connector-mongodb-cdc-3.6.0.jar

下载SQL Server Source连接器:
$ wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-sqlserver-cdc/3.6.0/flink-sql-connector-sqlserver-cdc-3.6.0.jar

方式四:Maven依赖

Maven依赖配置:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<version>3.6.0-2.2</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.6.0</version>
</dependency>

4. Flink CDC安装部署实战

步骤1:安装Apache Flink

下载Apache Flink 2.2.0:
$ cd /fgeudb/software
$ wget https://archive.apache.org/dist/flink/flink-2.2.0/flink-2.2.0-bin-scala_2.12.tgz

解压安装:
$ tar -zxvf flink-2.2.0-bin-scala_2.12.tgz -C /fgeudb/
$ ln -s /fgeudb/flink-2.2.0 /fgeudb/flink

配置环境变量:
$ vi /etc/profile.d/flink.sh

export FLINK_HOME=/fgeudb/flink
export PATH=$FLINK_HOME/bin:$PATH

使配置生效:
$ source /etc/profile.d/flink.sh

启动Flink集群:
$ /fgeudb/flink/bin/start-cluster.sh

输出示例如下:
Starting cluster.
Starting standalonesession daemon on host fgedu.
Starting taskexecutor daemon on host fgedu.

验证Flink:
$ /fgeudb/flink/bin/flink –version

输出示例如下:
Version: 2.2.0, Commit ID: abc123d

步骤2:安装Java环境

安装JDK 11:
# yum install -y java-11-openjdk java-11-openjdk-devel

或Ubuntu/Debian:
# apt install -y openjdk-11-jdk

验证Java版本:
$ java -version

输出示例如下:
openjdk version “11.0.22” 2024-01-16
OpenJDK Runtime Environment (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1)
OpenJDK 64-Bit Server VM (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1, mixed mode, sharing)

设置JAVA_HOME:
# vi /etc/profile.d/java.sh

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH

步骤3:配置Flink CDC

复制连接器到Flink lib目录:
$ cp /fgeudb/flink-cdc/lib/*.jar /fgeudb/flink/lib/

重启Flink集群:
$ /fgeudb/flink/bin/stop-cluster.sh
$ /fgeudb/flink/bin/start-cluster.sh

创建配置目录:
$ mkdir -p /fgeudb/flink-cdc/conf
$ mkdir -p /fgeudb/flink-cdc/jobs

步骤4:配置MySQL数据源

开启MySQL Binlog:
$ vi /etc/my.cnf

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
gtid_mode = ON
enforce_gtid_consistency = ON

重启MySQL:
# systemctl restart mysqld

验证Binlog:
$ mysql -u root -p -e “SHOW VARIABLES LIKE ‘log_bin’;”

输出示例如下:
+—————+——-+
| Variable_name | Value |
+—————+——-+
| log_bin | ON |
+—————+——-+

创建CDC用户:
mysql> CREATE USER ‘flinkcdc’@’%’ IDENTIFIED BY ‘flinkcdc123’;
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘flinkcdc’@’%’;
mysql> FLUSH PRIVILEGES;

5. Flink CDC配置文件详解

Pipeline YAML配置

创建MySQL到Kafka同步配置:
$ vi /fgeudb/flink-cdc/jobs/mysql-to-kafka.yaml

source:
type: mysql
hostname: 192.168.1.51
port: 3306
username: flinkcdc
password: flinkcdc123
tables: fgedu_db.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai

sink:
type: kafka
topic: cdc-events
bootstrap-servers: 192.168.1.51:9092
format: json
json.timestamp-format.standard: ISO_8601

pipeline:
name: mysql-to-kafka-pipeline
parallelism: 2

创建MySQL到Paimon同步配置:
$ vi /fgeudb/flink-cdc/jobs/mysql-to-paimon.yaml

source:
type: mysql
hostname: 192.168.1.51
port: 3306
username: flinkcdc
password: flinkcdc123
tables: fgedu_db.\.*
server-id: 5400-5404

sink:
type: paimon
path: hdfs://192.168.1.51:8020/paimon/fgedu_db
warehouse: hdfs://192.168.1.51:8020/paimon

pipeline:
name: mysql-to-paimon-pipeline
parallelism: 4

Transform配置

数据转换配置:
$ vi /fgeudb/flink-cdc/jobs/mysql-to-kafka-transform.yaml

source:
type: mysql
hostname: 192.168.1.51
port: 3306
username: flinkcdc
password: flinkcdc123
tables: fgedu_db.orders

transform:
– source-table: fgedu_db.orders
projection: >
id,
customer_id,
order_amount,
DATE_FORMAT(order_date, ‘yyyy-MM-dd’) as order_date,
UPPER(status) as status
filter: “order_amount > 100”
primary-keys: id

sink:
type: kafka
topic: orders-cdc
bootstrap-servers: 192.168.1.51:9092
format: json

pipeline:
name: orders-transform-pipeline
parallelism: 2

Route配置

表路由配置:
route:
– source-table: fgedu_db.orders
sink-table: dw.orders_detail
– source-table: fgedu_db.customers
sink-table: dw.dim_customers

Schema Evolution配置:
schema:
evolution:
enabled: true
include:
– add.column
– alter.column.type
– drop.column
exclude:
– rename.column

6. Flink CDC数据同步实战

步骤1:MySQL到Kafka同步

创建同步作业:
$ vi /fgeudb/flink-cdc/jobs/mysql-kafka-sync.yaml

source:
type: mysql
hostname: 192.168.1.51
port: 3306
username: flinkcdc
password: flinkcdc123
tables: fgedu_db.sales_data
server-id: 5400-5404
server-time-zone: Asia/Shanghai

sink:
type: kafka
topic: sales-cdc
bootstrap-servers: 192.168.1.51:9092
format: json
json.timestamp-format.standard: ISO_8601
properties:
ack: all
retries: 3

pipeline:
name: mysql-kafka-sync
parallelism: 2

启动同步作业:
$ /fgeudb/flink-cdc/bin/flink-cdc.sh /fgeudb/flink-cdc/jobs/mysql-kafka-sync.yaml

输出示例如下:
Pipeline has been submitted to Flink cluster.
Job ID: abc123def456789

查看作业状态:
$ /fgeudb/flink/bin/flink list

输出示例如下:
———————- Running/Restarting Jobs ———————-
abc123def456789 : mysql-kafka-sync (RUNNING)

步骤2:MySQL到StarRocks同步

创建同步作业:
$ vi /fgeudb/flink-cdc/jobs/mysql-starrocks-sync.yaml

source:
type: mysql
hostname: 192.168.1.51
port: 3306
username: flinkcdc
password: flinkcdc123
tables: fgedu_db.\.*
server-id: 5400-5404

sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.1.51:9030
load-url: 192.168.1.51:8030
username: root
password: starrocks123
table.create.properties.replication_num: 3
database.pattern: fgedu_dw
table.pattern: ${table_name}

pipeline:
name: mysql-starrocks-sync
parallelism: 4

启动同步:
$ /fgeudb/flink-cdc/bin/flink-cdc.sh /fgeudb/flink-cdc/jobs/mysql-starrocks-sync.yaml

步骤3:使用SQL方式同步

创建SQL作业:
$ vi /fgeudb/flink-cdc/jobs/mysql-sync.sql

创建MySQL CDC表:
CREATE TABLE mysql_orders (
id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(18,2),
order_date TIMESTAMP,
status STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘192.168.1.51’,
‘port’ = ‘3306’,
‘username’ = ‘flinkcdc’,
‘password’ = ‘flinkcdc123’,
‘database-name’ = ‘fgedu_db’,
‘table-name’ = ‘orders’,
‘server-time-zone’ = ‘Asia/Shanghai’
);

创建Kafka目标表:
CREATE TABLE kafka_orders (
id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(18,2),
order_date TIMESTAMP,
status STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘orders-cdc’,
‘properties.bootstrap.servers’ = ‘192.168.1.51:9092’,
‘format’ = ‘json’,
‘key.format’ = ‘json’,
‘key.fields’ = ‘id’
);

执行同步:
INSERT INTO kafka_orders SELECT * FROM mysql_orders;

启动SQL作业:
$ /fgeudb/flink/bin/sql-client.sh -f /fgeudb/flink-cdc/jobs/mysql-sync.sql

7. 安装验证与测试

验证Flink CDC状态

查看Flink集群状态:
$ /fgeudb/flink/bin/flink list

输出示例如下:
———————- Running/Restarting Jobs ———————-
abc123def456789 : mysql-kafka-sync (RUNNING)

查看作业详情:
$ /fgeudb/flink/bin/flink info abc123def456789

访问Flink Web UI:
打开浏览器访问:http://192.168.1.51:8081

查看CDC作业日志:
$ tail -f /fgeudb/flink/log/flink-*-taskexecutor-*.log

验证数据同步:
在MySQL中插入测试数据:
mysql> INSERT INTO fgedu_db.sales_data VALUES (1, ‘Product A’, 1000.00, NOW());

在Kafka中消费数据:
$ kafka-console-consumer –bootstrap-server 192.168.1.51:9092 –topic sales-cdc –from-beginning

输出示例如下:
{“id”:1,”name”:”Product A”,”sales_amount”:1000.00,”sales_date”:”2026-04-04 10:15:00″,”op”:”r”}

功能测试

测试全量同步:
1. 启动CDC作业
2. 验证历史数据同步完成

测试增量同步:
1. 在MySQL中插入新数据
2. 验证数据实时同步到目标

测试Schema Evolution:
1. 在MySQL中添加新列
2. 验证Schema自动同步

测试故障恢复:
1. 停止CDC作业
2. 在MySQL中插入数据
3. 重启CDC作业
4. 验证断点续传

8. 常见问题与解决方案

问题1:Binlog权限不足

症状:Access denied for REPLICATION SLAVE

解决方案:
1. 授予正确权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO ‘flinkcdc’@’%’;
mysql> FLUSH PRIVILEGES;

2. 验证权限:
mysql> SHOW GRANTS FOR ‘flinkcdc’@’%’;

3. 检查Binlog配置:
mysql> SHOW VARIABLES LIKE ‘log_bin’;

问题2:连接超时

症状:Connection timed out

解决方案:
1. 检查网络连通性:
$ telnet 192.168.1.51 3306

2. 检查防火墙:
# firewall-cmd –add-port=3306/tcp –permanent
# firewall-cmd –reload

3. 增加超时时间:
connect.timeout: 60s

问题3:数据延迟

症状:数据同步延迟过高

解决方案:
1. 增加并行度:
pipeline:
parallelism: 8

2. 优化Source配置:
source:
snapshot.split.size: 8096
snapshot.fetch.size: 1024

3. 增加Flink资源:
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4

4. 检查网络带宽

Flink CDC管理命令

启动Pipeline作业:
$ /fgeudb/flink-cdc/bin/flink-cdc.sh job.yaml

启动SQL作业:
$ /fgeudb/flink/bin/sql-client.sh -f job.sql

查看作业列表:
$ /fgeudb/flink/bin/flink list

停止作业:
$ /fgeudb/flink/bin/flink cancel job_id

从Savepoint恢复:
$ /fgeudb/flink/bin/flink run -s savepoint_path -d job.jar

查看作业详情:
$ /fgeudb/flink/bin/flink info job_id

触发Savepoint:
$ /fgeudb/flink/bin/flink savepoint job_id

查看日志:
$ tail -f /fgeudb/flink/log/flink-*.log

生产环境建议
1. 使用Flink CDC 3.6.0最新稳定版本;2. 确保MySQL开启Binlog并配置正确权限;3. 合理设置并行度提高吞吐量;4. 配置Checkpoint实现Exactly-Once语义;5. 启用Schema Evolution支持表结构变更;6. 配置监控和告警;7. 定期创建Savepoint备份;8. 使用高可用Flink集群;9. 监控数据延迟和同步状态;10. 合理规划资源避免OOM。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息