1. SeaTunnel简介与版本说明
SeaTunnel(原Waterdrop)是一个开源的高性能、分布式、海量数据集成工具,由Apache基金会孵化。它提供了丰富的连接器,支持多种数据源之间的数据同步和转换。更多学习教程www.fgedu.net.cn
SeaTunnel最新版本:
SeaTunnel 2.3.7 (2024年稳定版)
SeaTunnel 2.3.6 (2023年稳定版)
SeaTunnel 2.3.5 (2023年稳定版)
SeaTunnel 2.3.4 (2023年稳定版)
SeaTunnel支持的连接器:
– MySQL, PostgreSQL, Oracle, SQL Server
– MongoDB, Elasticsearch, ClickHouse
– Kafka, Pulsar, RocketMQ
– HDFS, HBase, Hive, Iceberg
– S3, OSS, COS
数据目标连接器:
– MySQL, PostgreSQL, Oracle, SQL Server
– ClickHouse, Doris, StarRocks
– Elasticsearch, Solr
– Kafka, Pulsar
– HDFS, HBase, Hive
2. SeaTunnel下载方式
SeaTunnel提供多种下载方式,包括官方下载、Maven仓库、Docker镜像等。学习交流加群风哥微信: itpux-com
方式一:官方下载
$ mkdir -p /fgeudb/software/seatunnel
$ cd /fgeudb/software/seatunnel
# 下载SeaTunnel 2.3.8
$ wget https://archive.apache.org/dist/seatunnel/2.3.8/apache-seatunnel-2.3.8-bin.tar.gz
# 下载源码包(可选)
$ wget https://archive.apache.org/dist/seatunnel/2.3.8/apache-seatunnel-2.3.8-src.tar.gz
# 查看下载文件
$ ls -lh
输出示例如下:
total 850M
-rw-r–r– 1 root root 850M Apr 4 10:00 apache-seatunnel-2.3.8-bin.tar.gz
-rw-r–r– 1 root root 120M Apr 4 10:00 apache-seatunnel-2.3.8-src.tar.gz
方式二:国内镜像下载
$ wget https://mirrors.aliyun.com/apache/seatunnel/2.3.8/apache-seatunnel-2.3.8-bin.tar.gz
# 使用华为云镜像
$ wget https://mirrors.huawei.com/apache/seatunnel/2.3.8/apache-seatunnel-2.3.8-bin.tar.gz
# 使用清华大学镜像
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/seatunnel/2.3.8/apache-seatunnel-2.3.8-bin.tar.gz
方式三:Docker镜像下载
$ docker pull apache/seatunnel:2.3.8
# 拉取带Flink的镜像
$ docker pull apache/seatunnel-flink:2.3.8
# 拉取带Spark的镜像
$ docker pull apache/seatunnel-spark:2.3.8
# 查看镜像列表
$ docker images | grep seatunnel
输出示例如下:
apache/seatunnel 2.3.8 abc123def456 7 days ago 1.2GB
apache/seatunnel-flink 2.3.8 def456ghi789 7 days ago 1.8GB
方式四:Maven依赖引入
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>2.3.8</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-starter</artifactId>
<version>2.3.8</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
<version>2.3.8</version>
</dependency>
3. SeaTunnel安装部署
SeaTunnel支持本地模式、Flink模式和Spark模式运行。学习交流加群风哥QQ113257174
步骤1:解压安装
$ cd /fgeudb
$ tar -zxvf /fgeudb/software/seatunnel/apache-seatunnel-2.3.8-bin.tar.gz
$ mv apache-seatunnel-2.3.8 seatunnel
# 查看目录结构
$ ls -la /fgeudb/seatunnel/
输出示例如下:
total 32
drwxr-xr-x 2 root root 4096 Apr 4 10:00 bin
drwxr-xr-x 2 root root 4096 Apr 4 10:00 config
drwxr-xr-x 3 root root 4096 Apr 4 10:00 connectors
drwxr-xr-x 2 root root 4096 Apr 4 10:00 lib
drwxr-xr-x 2 root root 4096 Apr 4 10:00 logs
drwxr-xr-x 2 root root 4096 Apr 4 10:00 plugins
步骤2:安装连接器
$ cd /fgeudb/seatunnel
# 安装连接器插件
$ sh bin/install-plugin.sh
输出示例如下:
Installing connectors…
[INFO] ————————————————————————
[INFO] Building Apache SeaTunnel : Connectors V2 2.3.8
[INFO] ————————————————————————
[INFO] Downloading connector: connector-jdbc
[INFO] Downloading connector: connector-kafka
[INFO] Downloading connector: connector-clickhouse
[INFO] Downloading connector: connector-elasticsearch
…
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
# 查看已安装的连接器
$ ls -la connectors/seatunnel/
输出示例如下:
total 256000
-rw-r–r– 1 root root 25M Apr 4 10:00 connector-jdbc-2.3.8.jar
-rw-r–r– 1 root root 15M Apr 4 10:00 connector-kafka-2.3.8.jar
-rw-r–r– 1 root root 20M Apr 4 10:00 connector-clickhouse-2.3.8.jar
-rw-r–r– 1 root root 18M Apr 4 10:00 connector-elasticsearch-2.3.8.jar
步骤3:配置环境变量
$ vi ~/.bash_profile
export SEATUNNEL_HOME=/fgeudb/seatunnel
export PATH=$SEATUNNEL_HOME/bin:$PATH
# 使环境变量生效
$ source ~/.bash_profile
# 验证安装
$ seatunnel.sh –version
输出示例如下:
SeaTunnel 2.3.8
Git commit hash: abc123def
Build time: 2024-01-15T10:00:00Z
步骤4:配置SeaTunnel
$ vi /fgeudb/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint
storage.type: hdfs
fs.defaultFS: file:///
# 编辑JVM配置
$ vi /fgeudb/seatunnel/config/jvm_options
-Xms2g
-Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/fgeudb/seatunnel/logs/heapdump.hprof
4. SeaTunnel配置详解
SeaTunnel使用HOCON格式配置文件,配置清晰易读。风哥提示:配置文件需要严格按照格式编写,避免语法错误。
配置文件结构
env {
# 环境配置
parallelism = 2
job.mode = “BATCH”
checkpoint.interval = 5000
}
source {
# 数据源配置
Jdbc {
result_table_name = “source_table”
driver = “com.mysql.cj.jdbc.Driver”
url = “jdbc:mysql://192.168.1.51:3306/fgedu_db”
user = “root”
password = “root123”
query = “SELECT * FROM orders WHERE create_time >= ‘2026-01-01′”
}
}
transform {
# 数据转换配置(可选)
Filter {
source_table_name = “source_table”
result_table_name = “filtered_table”
fields = [“id”, “order_no”, “amount”, “status”]
}
}
sink {
# 数据目标配置
Jdbc {
source_table_name = “filtered_table”
driver = “com.mysql.cj.jdbc.Driver”
url = “jdbc:mysql://192.168.1.52:3306/fgedu_dw”
user = “root”
password = “root123”
table = “orders”
primary_keys = [“id”]
batch_size = 1000
}
}
环境配置说明
# 并行度设置
parallelism = 4 # 任务并行度
# 运行模式
job.mode = “BATCH” # BATCH或STREAMING
# Checkpoint配置
checkpoint.interval = 10000 # Checkpoint间隔(毫秒)
checkpoint.timeout = 60000 # Checkpoint超时时间
# 重试配置
retry.times = 3 # 失败重试次数
retry.interval = 5000 # 重试间隔(毫秒)
# 资源配置
job.resource.cpu = 2 # CPU核心数
job.resource.memory = 4096 # 内存大小(MB)
}
5. MySQL数据同步实战
MySQL到MySQL的数据同步是最常见的场景之一。更多学习教程公众号风哥教程itpux_com
步骤1:创建同步任务配置
$ vi /fgeudb/seatunnel/config/mysql_to_mysql.conf
env {
parallelism = 4
job.mode = “BATCH”
checkpoint.interval = 10000
}
source {
Jdbc {
result_table_name = “orders_source”
driver = “com.mysql.cj.jdbc.Driver”
url = “jdbc:mysql://192.168.1.51:3306/fgedu_db?useSSL=false&serverTimezone=Asia/Shanghai”
user = “seatunnel”
password = “seatunnel123”
query = “””
SELECT
id, order_no, user_id, product_id,
quantity, amount, status,
create_time, update_time
FROM orders
WHERE update_time >= DATE_SUB(NOW(), INTERVAL 1 DAY)
“””
partition_column = “id”
partition_num = 4
}
}
transform {
Filter {
source_table_name = “orders_source”
result_table_name = “orders_filtered”
fields = [“id”, “order_no”, “user_id”, “product_id”, “quantity”, “amount”, “status”, “create_time”]
}
Sql {
source_table_name = “orders_filtered”
result_table_name = “orders_transformed”
query = “””
SELECT
id, order_no, user_id, product_id,
quantity, amount,
CASE status
WHEN 0 THEN ‘pending’
WHEN 1 THEN ‘paid’
WHEN 2 THEN ‘shipped’
WHEN 3 THEN ‘completed’
ELSE ‘unknown’
END as status,
create_time
FROM orders_filtered
“””
}
}
sink {
Jdbc {
source_table_name = “orders_transformed”
driver = “com.mysql.cj.jdbc.Driver”
url = “jdbc:mysql://192.168.1.52:3306/fgedu_dw?useSSL=false&serverTimezone=Asia/Shanghai”
user = “seatunnel”
password = “seatunnel123”
table = “orders”
primary_keys = [“id”]
batch_size = 2000
max_retries = 3
is_exactly_once = “true”
xa_data_source_class_name = “com.mysql.cj.jdbc.MysqlXADataSource”
}
}
步骤2:运行同步任务
$ cd /fgeudb/seatunnel
$ sh bin/seatunnel.sh –config config/mysql_to_mysql.conf
输出示例如下:
2026-04-04 10:00:00.000 [main] INFO org.apache.seatunnel.core.starter.Seatunnel – Start running SeaTunnel job…
2026-04-04 10:00:00.100 [main] INFO org.apache.seatunnel.engine.server.SeaTunnelServer – Starting SeaTunnel server…
2026-04-04 10:00:01.000 [main] INFO org.apache.seatunnel.engine.server.TaskExecutionService – Task execution service started
2026-04-04 10:00:02.000 [main] INFO org.apache.seatunnel.engine.server.SeaTunnelServer – Job fgedu_mysql_sync_001 submitted
2026-04-04 10:00:05.000 [main] INFO org.apache.seatunnel.core.starter.Seatunnel – Job Status: RUNNING
2026-04-04 10:00:30.000 [main] INFO org.apache.seatunnel.core.starter.Seatunnel – Job Status: FINISHED
2026-04-04 10:00:30.100 [main] INFO org.apache.seatunnel.core.starter.Seatunnel –
***********************************************
Job Statistics
***********************************************
Total Read Count: 100000
Total Write Count: 100000
Total Failed Count: 0
Total Time: 30000ms
***********************************************
# 查看任务日志
$ tail -f logs/seatunnel.log
步骤3:增量同步配置
$ vi /fgeudb/seatunnel/config/mysql_incremental.conf
env {
parallelism = 2
job.mode = “STREAMING”
checkpoint.interval = 5000
}
source {
MySQL-CDC {
result_table_name = “orders_cdc”
base-url = “jdbc:mysql://192.168.1.51:3306/fgedu_db”
username = “seatunnel”
password = “seatunnel123”
database-names = [“fgedu_db”]
table-names = [“fgedu_db.orders”, “fgedu_db.users”]
server-id = 5400
server-time-zone = “Asia/Shanghai”
}
}
sink {
Jdbc {
source_table_name = “orders_cdc”
driver = “com.mysql.cj.jdbc.Driver”
url = “jdbc:mysql://192.168.1.52:3306/fgedu_dw”
user = “seatunnel”
password = “seatunnel123”
table = “${table_name}”
primary_keys = [“id”]
batch_size = 1000
}
}
6. Kafka数据同步实战
Kafka作为消息队列,常用于数据管道的中间层。from:www.itpux.com
步骤1:MySQL到Kafka同步
$ vi /fgeudb/seatunnel/config/mysql_to_kafka.conf
env {
parallelism = 4
job.mode = “STREAMING”
checkpoint.interval = 5000
}
source {
MySQL-CDC {
result_table_name = “source_data”
base-url = “jdbc:mysql://192.168.1.51:3306/fgedu_db”
username = “seatunnel”
password = “seatunnel123”
database-names = [“fgedu_db”]
table-names = [“fgedu_db.orders”]
server-id = 5401
server-time-zone = “Asia/Shanghai”
}
}
sink {
Kafka {
source_table_name = “source_data”
topic = “fgedu_orders”
bootstrap.servers = “192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092”
format = json
semantics = EXACTLY_ONCE
partition = 3
replication.factor = 3
kafka.request.timeout.ms = 60000
kafka.max.block.ms = 120000
}
}
# 运行任务
$ sh bin/seatunnel.sh –config config/mysql_to_kafka.conf
输出示例如下:
2026-04-04 10:00:00.000 [main] INFO org.apache.seatunnel.core.starter.Seatunnel – Start CDC streaming job…
2026-04-04 10:00:01.000 [main] INFO org.apache.seatunnel.connectors.seatunnel.kafka.KafkaSinkWriter – Kafka producer initialized
2026-04-04 10:00:02.000 [main] INFO org.apache.seatunnel.core.starter.Seatunnel – Job Status: RUNNING
步骤2:Kafka到ClickHouse同步
$ vi /fgeudb/seatunnel/config/kafka_to_clickhouse.conf
env {
parallelism = 4
job.mode = “STREAMING”
checkpoint.interval = 10000
}
source {
Kafka {
result_table_name = “kafka_source”
topics = [“fgedu_orders”]
bootstrap.servers = “192.168.1.51:9092”
consumer.group = “seatunnel_clickhouse_group”
format = json
schema = “””
{
“type”: “object”,
“properties”: {
“id”: {“type”: “integer”},
“order_no”: {“type”: “string”},
“user_id”: {“type”: “integer”},
“amount”: {“type”: “number”},
“status”: {“type”: “string”},
“create_time”: {“type”: “string”}
}
}
“””
start_mode = earliest
}
}
transform {
Filter {
source_table_name = “kafka_source”
result_table_name = “filtered_data”
fields = [“id”, “order_no”, “user_id”, “amount”, “status”, “create_time”]
}
}
sink {
Clickhouse {
source_table_name = “filtered_data”
host = “192.168.1.51:8123”
database = “fgedu_dw”
table = “orders”
username = “default”
password = “clickhouse123”
bulk_size = 10000
split_mode = true
sharding_key = “id”
}
}
# 运行任务
$ sh bin/seatunnel.sh –config config/kafka_to_clickhouse.conf
7. SeaTunnel集群部署
生产环境建议部署SeaTunnel集群,实现高可用和负载均衡。
步骤1:集群配置
$ vi /fgeudb/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /seatunnel/checkpoint
storage.type: hdfs
fs.defaultFS: hdfs://192.168.1.51:9000
hazelcast:
cluster-name: seatunnel-cluster
network:
join:
tcp-ip:
enabled: true
member-list:
– 192.168.1.51:5801
– 192.168.1.52:5801
– 192.168.1.53:5801
port:
auto-increment: false
port: 5801
# 在所有节点上启动SeaTunnel Server
$ sh bin/seatunnel-cluster.sh -d
# 查看集群状态
$ curl http://192.168.1.51:5801/hazelcast/rest/maps/overview
输出示例如下:
{
“clusterName”: “seatunnel-cluster”,
“members”: [
{“address”: “192.168.1.51:5801”, “status”: “ACTIVE”},
{“address”: “192.168.1.52:5801”, “status”: “ACTIVE”},
{“address”: “192.168.1.53:5801”, “status”: “ACTIVE”}
],
“slotCount”: 12,
“usedSlotCount”: 2
}
步骤2:提交任务到集群
$ sh bin/seatunnel.sh –config config/mysql_to_mysql.conf –master 192.168.1.51:5801
# 查看运行中的任务
$ curl http://192.168.1.51:5801/hazelcast/rest/maps/running-jobs
输出示例如下:
[
{
“jobId”: “fgedu_sync_001”,
“jobName”: “mysql_to_mysql”,
“status”: “RUNNING”,
“createTime”: “2026-04-04T10:00:00”,
“progress”: {
“totalRead”: 50000,
“totalWrite”: 48000
}
}
]
# 停止任务
$ curl -X POST http://192.168.1.51:5801/hazelcast/rest/maps/stop-job/fgedu_sync_001
步骤3:监控集群
$ vi /fgeudb/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
metrics:
enabled: true
port: 5802
reporter: prometheus
# 访问Prometheus指标
$ curl http://192.168.1.51:5802/metrics
输出示例如下:
# HELP seatunnel_job_count Total job count
# TYPE seatunnel_job_count gauge
seatunnel_job_count{status=”running”} 2.0
seatunnel_job_count{status=”finished”} 15.0
seatunnel_job_count{status=”failed”} 1.0
# HELP seatunnel_slot_count Total slot count
# TYPE seatunnel_slot_count gauge
seatunnel_slot_count{status=”total”} 12.0
seatunnel_slot_count{status=”used”} 4.0
# HELP seatunnel_record_count Total record count
# TYPE seatunnel_record_count counter
seatunnel_record_count{job=”fgedu_sync_001″,type=”read”} 100000.0
seatunnel_record_count{job=”fgedu_sync_001″,type=”write”} 99500.0
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
