1. SeaTunnel概述与环境规划
SeaTunnel(原Waterdrop)是一个开源的高性能、分布式数据集成工具,由Apache基金会孵化。SeaTunnel支持多种数据源之间的数据同步,包括MySQL、PostgreSQL、Oracle、MongoDB、Elasticsearch、Kafka、HDFS、Hive等,提供批流一体的数据处理能力。更多学习教程www.fgedu.net.cn
1.1 SeaTunnel版本说明
SeaTunnel目前主要版本为2.3.x系列,本教程以SeaTunnel 2.3.5为例进行详细讲解。SeaTunnel 2.3.x版本相比之前版本在性能、稳定性和功能方面都有显著提升,支持更多的连接器和更灵活的配置选项。
$ cat /data/seatunnel/version
2.3.5
# 查看Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)
# 查看系统信息
$ cat /etc/os-release
NAME=”Oracle Linux Server”
VERSION=”8.9″
ID=”ol”
PRETTY_NAME=”Oracle Linux Server 8.9″
1.2 环境规划
本次安装环境规划如下:
节点1:seatunnel01.fgedu.net.cn (192.168.1.51) – Master + Worker
节点2:seatunnel02.fgedu.net.cn (192.168.1.52) – Worker
节点3:seatunnel03.fgedu.net.cn (192.168.1.53) – Worker
SeaTunnel版本:2.3.5
Java版本:OpenJDK 11
Hadoop版本:3.3.6
安装目录:/data/seatunnel
日志目录:/data/seatunnel/logs
数据源:
MySQL:192.168.1.51:3306
PostgreSQL:192.168.1.51:5432
Kafka:192.168.1.51:9092
数据目标:
Elasticsearch:192.168.1.51:9200
HDFS:hdfs://192.168.1.51:8020
Hive:thrift://192.168.1.51:9083
2. 硬件环境要求
SeaTunnel对硬件资源要求适中,以下是生产环境的硬件配置建议。学习交流加群风哥微信: itpux-com
2.1 最低硬件要求
# free -h
total used free shared buff/cache available
Mem: 32G 8.5G 21G 2.0G 2.5G 21G
Swap: 16G 0B 16G
# 检查磁盘空间
# df -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 15G 36G 30% /
/dev/sdb1 500G 50G 450G 10% /data
# 检查CPU核心数
# nproc
16
# 检查系统架构
# uname -m
x86_64
2.2 网络配置要求
# ip addr show
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
inet 127.0.0.1/8 scope host lo
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP
inet 192.168.1.51/24 brd 192.168.1.255 scope global eth0
# 检查主机名解析
# hostname -f
seatunnel01.fgedu.net.cn
# 配置/etc/hosts
# cat /etc/hosts
127.0.0.1 localhost
192.168.1.51 seatunnel01.fgedu.net.cn seatunnel01
192.168.1.52 seatunnel02.fgedu.net.cn seatunnel02
192.168.1.53 seatunnel03.fgedu.net.cn seatunnel03
# 配置SSH免密登录
# ssh-keygen -t rsa
# ssh-copy-id root@seatunnel02.fgedu.net.cn
# ssh-copy-id root@seatunnel03.fgedu.net.cn
3. 操作系统配置
SeaTunnel运行在Linux操作系统上,需要对系统进行一系列配置。学习交流加群风哥QQ113257174
3.1 关闭防火墙和SELinux
# getenforce
Disabled
# 关闭SELinux(如未关闭)
# vi /etc/selinux/config
SELINUX=disabled
# 检查防火墙状态
# systemctl status firewalld
# 关闭防火墙(生产环境建议开放特定端口)
# systemctl stop firewalld
# systemctl disable firewalld
# 或者开放SeaTunnel所需端口
# firewall-cmd –permanent –add-port=5801/tcp
# firewall-cmd –reload
3.2 系统参数优化
# vi /etc/sysctl.conf
# 添加以下参数
fs.file-max = 6815744
vm.swappiness = 10
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 65536
# 使配置生效
# sysctl -p
# 配置文件描述符限制
# vi /etc/security/limits.conf
# 添加以下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
# 验证配置
# ulimit -n
65536
4. Java环境配置
SeaTunnel基于Java开发,需要配置合适的Java环境。更多学习教程公众号风哥教程itpux_com
4.1 安装OpenJDK 11
# yum install -y java-11-openjdk java-11-openjdk-devel
# 验证Java版本
$ java -version
openjdk version “11.0.22” 2024-01-16 LTS
OpenJDK Runtime Environment (build 11.0.22+7-LTS)
OpenJDK 64-Bit Server VM (build 11.0.22+7-LTS, mixed mode, sharing)
# 配置JAVA_HOME环境变量
# vi /etc/profile.d/java.sh
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/java.sh
# 验证JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk
4.2 安装Hadoop
# cd /tmp
# wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
# 解压安装
# tar -xzf hadoop-3.3.6.tar.gz
# mv hadoop-3.3.6 /data/hadoop
# 配置环境变量
# vi /etc/profile.d/hadoop.sh
export HADOOP_HOME=/data/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# 使环境变量生效
# source /etc/profile.d/hadoop.sh
# 配置Hadoop
# vi /data/hadoop/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.1.51:8020</value>
</property>
</configuration>
# vi /data/hadoop/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hadoop/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hadoop/datanode</value>
</property>
</configuration>
# 验证Hadoop安装
$ hadoop version
# 输出示例:
Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r abc123def456
Compiled by root on 2024-01-15T10:00Z
Compiled with protoc 3.7.1
From source with checksum abc123
This command was run using /data/hadoop/share/hadoop/common/hadoop-common-3.3.6.jar
5. SeaTunnel安装部署
完成环境配置后,开始安装SeaTunnel服务。from:www.itpux.com
5.1 下载SeaTunnel
# cd /tmp
# wget https://archive.apache.org/dist/seatunnel/2.3.5/apache-seatunnel-2.3.5-bin.tar.gz
# 输出示例:
–2024-04-05 10:00:00– https://archive.apache.org/dist/seatunnel/2.3.5/apache-seatunnel-2.3.5-bin.tar.gz
Resolving archive.apache.org… 163.172.17.199
Connecting to archive.apache.org|163.172.17.199|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 456789012 (436M) [application/x-gzip]
Saving to: ‘apache-seatunnel-2.3.5-bin.tar.gz’
apache-seatunnel-2.3.5-bin.tar.gz 100%[=================================================>] 435.56M 25.6MB/s in 17s
2024-04-05 10:00:17 (25.6 MB/s) – ‘apache-seatunnel-2.3.5-bin.tar.gz’ saved [456789012/456789012]
# 解压安装
# tar -xzf apache-seatunnel-2.3.5-bin.tar.gz
# mv apache-seatunnel-2.3.5-bin /data/seatunnel
# 配置环境变量
# vi /etc/profile.d/seatunnel.sh
export SEATUNNEL_HOME=/data/seatunnel
export PATH=$SEATUNNEL_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/seatunnel.sh
# 验证安装
$ ls -l /data/seatunnel/
# 输出示例:
total 12
drwxr-xr-x 2 root root 4096 Apr 5 10:00 bin
drwxr-xr-x 2 root root 4096 Apr 5 10:00 config
drwxr-xr-x 2 root root 4096 Apr 5 10:00 connectors
drwxr-xr-x 2 root root 4096 Apr 5 10:00 lib
drwxr-xr-x 2 root root 4096 Apr 5 10:00 logs
5.2 安装连接器
# cd /data/seatunnel
# 下载MySQL连接器
$ ./bin/install-plugin.sh connector-jdbc
# 输出示例:
[INFO] Installing connector-jdbc…
[INFO] Downloading connector-jdbc-2.3.5.jar…
[INFO] connector-jdbc installed successfully.
# 下载Kafka连接器
$ ./bin/install-plugin.sh connector-kafka
# 下载Elasticsearch连接器
$ ./bin/install-plugin.sh connector-elasticsearch
# 下载Hive连接器
$ ./bin/install-plugin.sh connector-hive
# 下载所有连接器(可选)
$ ./bin/install-plugin.sh
# 查看已安装的连接器
$ ls -l /data/seatunnel/connectors/
# 输出示例:
total 0
drwxr-xr-x 2 root root 4096 Apr 5 10:00 connector-jdbc
drwxr-xr-x 2 root root 4096 Apr 5 10:00 connector-kafka
drwxr-xr-x 2 root root 4096 Apr 5 10:00 connector-elasticsearch
drwxr-xr-x 2 root root 4096 Apr 5:00 connector-hive
# 查看连接器JAR包
$ ls -lh /data/seatunnel/connectors/connector-jdbc/
# 输出示例:
total 15M
-rw-r–r– 1 root root 15M Apr 5 10:00 connector-jdbc-2.3.5.jar
5.3 配置SeaTunnel集群
# vi /data/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /seatunnel/checkpoint
storage.type: hdfs
fs.defaultFS: hdfs://192.168.1.51:8020
# 配置Master节点
# vi /data/seatunnel/config/hazelcast-master.yaml
hazelcast:
cluster-name: seatunnel
network:
join:
tcp-ip:
enabled: true
member-list:
– seatunnel01.fgedu.net.cn:5801
port:
auto-increment: false
port: 5801
properties:
hazelcast.logging.type: log4j2
# 配置Worker节点
# vi /data/seatunnel/config/hazelcast-worker.yaml
hazelcast:
cluster-name: seatunnel
network:
join:
tcp-ip:
enabled: true
member-list:
– seatunnel01.fgedu.net.cn:5801
port:
auto-increment: true
port-count: 100
port: 5801
properties:
hazelcast.logging.type: log4j2
# 配置JVM参数
# vi /data/seatunnel/config/jvm_options
-Xms2g
-Xmx4g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/seatunnel/logs/heap_dump.hprof
6. SeaTunnel参数配置
SeaTunnel的参数配置对数据同步性能和稳定性至关重要。
6.1 创建同步作业配置
# vi /data/seatunnel/config/mysql-to-kafka.conf
env {
parallelism = 4
job.mode = “STREAMING”
checkpoint.interval = 5000
}
source {
MySQL-CDC {
result_table_name = “fgedu_employees”
server-id = 5400
username = “fgedu”
password = “fgedu_mysql_2024”
database-name = “fgedudb”
table-name = “fgedudb.fgedu_employees”
base-url = “jdbc:mysql://192.168.1.51:3306/fgedudb?useSSL=false”
}
}
transform {
}
sink {
Kafka {
topic = “seatunnel-fgedudb-employees”
bootstrap.servers = “192.168.1.51:9092”
format = json
semantic = EXACTLY_ONCE
}
}
# 创建MySQL到Elasticsearch同步作业
# vi /data/seatunnel/config/mysql-to-es.conf
env {
parallelism = 4
job.mode = “BATCH”
}
source {
Jdbc {
result_table_name = “fgedu_employees”
url = “jdbc:mysql://192.168.1.51:3306/fgedudb?useSSL=false”
username = “fgedu”
password = “fgedu_mysql_2024”
query = “SELECT * FROM fgedu_employees”
partition_column = “id”
partition_num = 4
}
}
transform {
}
sink {
Elasticsearch {
hosts = [“192.168.1.51:9200”]
index = “fgedu_employees”
index_type = “_doc”
primary_keys = [“id”]
}
}
# 创建MySQL到Hive同步作业
# vi /data/seatunnel/config/mysql-to-hive.conf
env {
parallelism = 4
job.mode = “BATCH”
}
source {
Jdbc {
result_table_name = “fgedu_employees”
url = “jdbc:mysql://192.168.1.51:3306/fgedudb?useSSL=false”
username = “fgedu”
password = “fgedu_mysql_2024”
query = “SELECT * FROM fgedu_employees”
partition_column = “id”
partition_num = 4
}
}
transform {
}
sink {
Hive {
table_name = “fgedudb.fgedu_employees”
metastore_uri = “thrift://192.168.1.51:9083”
hdfs_site_path = “/data/hadoop/etc/hadoop/hdfs-site.xml”
hive_site_path = “/data/hive/conf/hive-site.xml”
}
}
7. SeaTunnel服务启动
配置完成后,启动SeaTunnel集群。
7.1 启动SeaTunnel集群
# cd /data/seatunnel
# ./bin/seatunnel-cluster.sh -d
# 输出示例:
[INFO] Starting SeaTunnel Master…
[INFO] SeaTunnel Master started successfully.
# 检查进程
$ jps | grep SeaTunnel
# 输出示例:
12345 SeaTunnelServer
# 检查端口
$ netstat -tlnp | grep 5801
tcp 0 0 0.0.0.0:5801 0.0.0.0:* LISTEN 12345/java
# 启动Worker节点(在其他节点执行)
# scp -r /data/seatunnel root@seatunnel02.fgedu.net.cn:/data/
# scp -r /data/seatunnel root@seatunnel03.fgedu.net.cn:/data/
# 在Worker节点启动
# cd /data/seatunnel
# ./bin/seatunnel-cluster.sh -d
# 查看集群状态
$ curl http://localhost:5801/hazelcast/rest/maps/cluster-details
# 输出示例:
{
“members”: [
{“address”: “seatunnel01.fgedu.net.cn:5801”, “role”: “master”},
{“address”: “seatunnel02.fgedu.net.cn:5801”, “role”: “worker”},
{“address”: “seatunnel03.fgedu.net.cn:5801”, “role”: “worker”}
]
}
7.2 提交同步作业
$ ./bin/seatunnel.sh –config config/mysql-to-kafka.conf
# 输出示例:
[INFO] Submitting job: mysql-to-kafka
[INFO] Job submitted successfully. Job ID: job_20240405100000_0001
[INFO] Job status: RUNNING
# 提交MySQL到Elasticsearch同步作业
$ ./bin/seatunnel.sh –config config/mysql-to-es.conf
# 输出示例:
[INFO] Submitting job: mysql-to-es
[INFO] Job submitted successfully. Job ID: job_20240405100000_0002
[INFO] Job status: FINISHED
# 查看运行中的作业
$ curl http://localhost:5801/hazelcast/rest/maps/running-jobs
# 输出示例:
[
{
“jobId”: “job_20240405100000_0001”,
“jobName”: “mysql-to-kafka”,
“status”: “RUNNING”,
“submitTime”: “2024-04-05 10:00:00”
}
]
# 查看作业详情
$ curl http://localhost:5801/hazelcast/rest/maps/job-detail/job_20240405100000_0001
8. SeaTunnel功能测试
完成安装后,需要进行功能测试验证SeaTunnel是否正常工作。
8.1 测试MySQL到Kafka同步
$ mysql -u fgedu -p -e “CREATE DATABASE IF NOT EXISTS fgedudb;”
$ mysql -u fgedu -p -e “USE fgedudb; CREATE TABLE fgedu_employees (id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100), department VARCHAR(50), salary DECIMAL(10,2));”
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘张三’, ‘技术部’, 15000.00);”
# 提交同步作业
$ ./bin/seatunnel.sh –config config/mysql-to-kafka.conf
# 消费Kafka验证数据
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic seatunnel-fgedudb-employees –from-beginning –max-messages 1
# 输出示例:
{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:15000.00}
# 插入新数据验证增量同步
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘李四’, ‘销售部’, 12000.00);”
# 再次消费Kafka验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic seatunnel-fgedudb-employees –from-beginning –max-messages 2
# 输出示例:
{“id”:1,”name”:”张三”,”department”:”技术部”,”salary”:15000.00}
{“id”:2,”name”:”李四”,”department”:”销售部”,”salary”:12000.00}
8.2 测试MySQL到Elasticsearch同步
$ ./bin/seatunnel.sh –config config/mysql-to-es.conf
# 输出示例:
[INFO] Submitting job: mysql-to-es
[INFO] Job submitted successfully. Job ID: job_20240405100000_0003
[INFO] Job status: FINISHED
# 查询Elasticsearch验证数据
$ curl -X GET “http://192.168.1.51:9200/fgedu_employees/_search?pretty”
# 输出示例:
{
“took” : 5,
“hits” : {
“total” : {
“value” : 2,
“relation” : “eq”
},
“hits” : [
{
“_id” : “1”,
“_source” : {
“id” : 1,
“name” : “张三”,
“department” : “技术部”,
“salary” : 15000.00
}
},
{
“_id” : “2”,
“_source” : {
“id” : 2,
“name” : “李四”,
“department” : “销售部”,
“salary” : 12000.00
}
}
]
}
}
8.3 测试MySQL到Hive同步
$ ./bin/seatunnel.sh –config config/mysql-to-hive.conf
# 输出示例:
[INFO] Submitting job: mysql-to-hive
[INFO] Job submitted successfully. Job ID: job_20240405100000_0004
[INFO] Job status: FINISHED
# 查询Hive验证数据
$ hive -e “SELECT * FROM fgedudb.fgedu_employees;”
# 输出示例:
OK
1 张三 技术部 15000.00
2 李四 销售部 12000.00
Time taken: 2.5 seconds
9. SeaTunnel性能优化
SeaTunnel性能优化涉及多个方面,包括并行度配置、内存配置、检查点配置等。
9.1 并行度优化
# vi /data/seatunnel/config/mysql-to-kafka.conf
env {
parallelism = 16
job.mode = “STREAMING”
checkpoint.interval = 5000
}
source {
MySQL-CDC {
result_table_name = “fgedu_employees”
server-id = 5400
username = “fgedu”
password = “fgedu_mysql_2024”
database-name = “fgedudb”
table-name = “fgedudb.fgedu_employees”
base-url = “jdbc:mysql://192.168.1.51:3306/fgedudb?useSSL=false”
snapshot.split.size = 8096
snapshot.fetch.size = 2048
}
}
sink {
Kafka {
topic = “seatunnel-fgedudb-employees”
bootstrap.servers = “192.168.1.51:9092”
format = json
semantic = EXACTLY_ONCE
partition_key = “id”
}
}
# 重新提交作业
$ ./bin/seatunnel.sh –config config/mysql-to-kafka.conf
9.2 内存优化
# vi /data/seatunnel/config/jvm_options
-Xms4g
-Xmx8g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/seatunnel/logs/heap_dump.hprof
# 配置SeaTunnel引擎参数
# vi /data/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
backup-count: 2
queue-type: blockingqueue
queue-size: 10000
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /seatunnel/checkpoint
storage.type: hdfs
fs.defaultFS: hdfs://192.168.1.51:8020
# 重启SeaTunnel集群
# ./bin/stop-seatunnel-cluster.sh
# ./bin/seatunnel-cluster.sh -d
9.3 检查点优化
# vi /data/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
checkpoint:
interval: 30000
timeout: 120000
max-concurrent-checkpoints: 1
min-pause-between-checkpoints: 500
tolerable-failure-number: 3
storage:
type: hdfs
max-retained: 5
plugin-config:
namespace: /seatunnel/checkpoint
storage.type: hdfs
fs.defaultFS: hdfs://192.168.1.51:8020
# 在作业配置中设置检查点
env {
parallelism = 8
job.mode = “STREAMING”
checkpoint.interval = 30000
checkpoint.timeout = 120000
checkpoint.min-pause = 500
}
10. SeaTunnel升级迁移
SeaTunnel升级需要谨慎操作,确保数据安全和业务连续性。
10.1 升级前准备
$ cat /data/seatunnel/version
2.3.4
# 查看运行中的作业
$ curl http://localhost:5801/hazelcast/rest/maps/running-jobs
# 备份配置文件
# cp -r /data/seatunnel/config /backup/seatunnel_config_$(date +%Y%m%d)
# 备份连接器
# tar -czf /backup/seatunnel_connectors_$(date +%Y%m%d).tar.gz /data/seatunnel/connectors
# 停止SeaTunnel集群
# ./bin/stop-seatunnel-cluster.sh
# 输出示例:
[INFO] Stopping SeaTunnel cluster…
[INFO] SeaTunnel cluster stopped successfully.
10.2 执行升级操作
# cd /tmp
# wget https://archive.apache.org/dist/seatunnel/2.3.5/apache-seatunnel-2.3.5-bin.tar.gz
# 备份旧版本
# mv /data/seatunnel /data/seatunnel_2.3.4
# 解压新版本
# tar -xzf apache-seatunnel-2.3.5-bin.tar.gz
# mv apache-seatunnel-2.3.5-bin /data/seatunnel
# 恢复配置文件
# cp -r /backup/seatunnel_config_$(date +%Y%m%d)/* /data/seatunnel/config/
# 恢复连接器
# tar -xzf /backup/seatunnel_connectors_$(date +%Y%m%d).tar.gz -C /
# 验证版本
$ cat /data/seatunnel/version
2.3.5
# 启动新版本
# ./bin/seatunnel-cluster.sh -d
# 输出示例:
[INFO] Starting SeaTunnel Master…
[INFO] SeaTunnel Master started successfully.
10.3 升级后验证
$ curl http://localhost:5801/hazelcast/rest/maps/cluster-details
# 输出示例:
{
“members”: [
{“address”: “seatunnel01.fgedu.net.cn:5801”, “role”: “master”}
]
}
# 重新提交作业
$ ./bin/seatunnel.sh –config config/mysql-to-kafka.conf
# 输出示例:
[INFO] Submitting job: mysql-to-kafka
[INFO] Job submitted successfully. Job ID: job_20240405100000_0005
[INFO] Job status: RUNNING
# 测试数据同步
$ mysql -u fgedu -p -e “INSERT INTO fgedudb.fgedu_employees (name, department, salary) VALUES (‘升级测试’, ‘升级部’, 20000.00);”
# 消费Kafka验证
$ /data/kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.51:9092 –topic seatunnel-fgedudb-employees –from-beginning –max-messages 1
# 输出示例:
{“id”:3,”name”:”升级测试”,”department”:”升级部”,”salary”:20000.00}
11. SeaTunnel监控运维
SeaTunnel的监控运维包括服务状态监控、日志管理、性能监控等。
11.1 服务状态监控
$ jps | grep SeaTunnel
# 输出示例:
12345 SeaTunnelServer
# 检查端口
$ netstat -tlnp | grep 5801
tcp 0 0 0.0.0.0:5801 0.0.0.0:* LISTEN 12345/java
# 检查集群状态
$ curl http://localhost:5801/hazelcast/rest/maps/cluster-details
# 检查运行中的作业
$ curl http://localhost:5801/hazelcast/rest/maps/running-jobs
# 检查作业详情
$ curl http://localhost:5801/hazelcast/rest/maps/job-detail/job_20240405100000_0001
# 检查作业状态
$ curl http://localhost:5801/hazelcast/rest/maps/job-status/job_20240405100000_0001
11.2 日志管理
$ tail -100 /data/seatunnel/logs/seatunnel.log
# 输出示例:
[2024-04-05 10:00:00,000] INFO org.apache.seatunnel.engine.server.SeaTunnelServer – SeaTunnel Server started
[2024-04-05 10:00:00,100] INFO org.apache.seatunnel.engine.server.master.JobMaster – Job job_20240405100000_0001 submitted
[2024-04-05 10:00:00,200] INFO org.apache.seatunnel.engine.server.master.JobMaster – Job job_20240405100000_0001 started
# 查看作业日志
$ tail -100 /data/seatunnel/logs/job_20240405100000_0001.log
# 查看错误日志
$ grep -i error /data/seatunnel/logs/seatunnel.log | tail -20
# 配置日志轮转
# vi /etc/logrotate.d/seatunnel
/data/seatunnel/logs/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 0644 root root
}
11.3 性能监控
$ top -p $(pgrep -f SeaTunnelServer)
# 输出示例:
top – 10:00:00 up 1 day, 2:00, 2 users, load average: 1.50, 1.45, 1.40
Tasks: 1 total, 0 running, 1 sleeping, 0 stopped, 0 zombie
%Cpu(s): 10.0 us, 5.0 sy, 0.0 ni, 84.0 id, 1.0 wa, 0.0 hi, 0.0 si
MiB Mem : 32768.0 total, 24000.0 free, 7000.0 used, 1768.0 buff/cache
MiB Swap: 16384.0 total, 16384.0 free, 0.0 used. 24000.0 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
12345 root 20 0 8192000 7000000 500000 S 15.0 21.4 1:00.00 java
# 监控JVM内存使用
$ jstat -gc $(pgrep -f SeaTunnelServer) 1000 5
# 输出示例:
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT CGC CGCT GCT
1024.0 1024.0 0.0 512.0 81920.0 40960.0 819200.0 409600.0 51200.0 25600.0 5120.0 2560.0 10 0.500 0 0.000 5 0.250 0.750
# 创建监控脚本
# vi /backup/scripts/seatunnel_monitor.sh
#!/bin/bash
echo “=== SeaTunnel Monitor ===”
echo “Date: $(date)”
echo “”
echo “=== Cluster Status ===”
curl -s http://localhost:5801/hazelcast/rest/maps/cluster-details
echo “”
echo “=== Running Jobs ===”
curl -s http://localhost:5801/hazelcast/rest/maps/running-jobs
echo “”
echo “=== Memory Usage ===”
free -h
echo “”
echo “=== JVM Memory ===”
jstat -gc $(pgrep -f SeaTunnelServer) | tail -1
echo “”
echo “=== Recent Errors ===”
grep -i error /data/seatunnel/logs/seatunnel.log | tail -5
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
