OceanBase教程FG193-OceanBase与Flink实时计算集成实战
目录大纲
Part01-基础概念与理论知识
1.1 Flink概述
Flink是一个分布式流处理框架,用于处理无界和有界数据流。Flink的主要特点:
- 流处理能力:支持实时流处理和批处理
- 低延迟:毫秒级的处理延迟
- 高吞吐:支持高吞吐量的数据处理
- 容错机制:内置的状态管理和容错机制
- 丰富的API:支持Java、Scala、Python等多种语言的API
- 生态系统:丰富的连接器和扩展库
1.2 OceanBase与Flink集成的优势
OceanBase与Flink集成的优势:
- 实时数据处理:实现OceanBase数据的实时处理和分析
- 数据一致性:确保数据处理的一致性和可靠性
- 高扩展性:支持大规模数据处理和分析
- 低延迟:毫秒级的数据处理延迟
- 灵活的处理逻辑:支持复杂的数据处理逻辑
- 丰富的连接器:支持多种数据源和目标系统
1.3 集成架构
OceanBase与Flink集成的架构:
- 数据源:OceanBase作为数据源,提供数据输入
- Flink处理:Flink作为处理引擎,对数据进行实时处理
- 数据输出:处理后的数据输出到目标系统,如OceanBase、Kafka、Elasticsearch等
- 监控与管理:监控Flafka作业的运行状态和性能
,风哥提示:。
Part02-集成前准备
2.1 环境准备
集成前的环境准备:
- OceanBase环境:确保OceanBase数据库正常运行
- Flink环境:安装并配置Flink集群
- 网络环境:确保OceanBase和Flink之间网络连通
- Java环境:确保Java版本符合要求(推荐Java 8或以上)
案例:检查环境
obclient -h192.168.1.1 -P2881 -ufgedu -pfgedu123 -Dfgedudb -e "SHOW CLUSTER STATUS;"
| Zone | Status | Leader Count | Leader Change Time | Checksum Time |
+——-+——–+—————-+———————+———————+
| zone1 | ACTIVE | 100 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 |
| zone2 | ACTIVE | 100 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 |
| zone3 | ACTIVE | 100 | 2024-01-01 00:00:00 | 2024-01-01 00:00:00 |
+——-+——–+—————-+———————+———————+,学习交流加群风哥微信: itpux-com。
./bin/flink list
No running jobs.
2.2 依赖准备
集成前的依赖准备:
- OceanBase JDBC驱动:用于Flink连接OceanBase
- Flink JDBC连接器:用于Flink与关系型数据库的连接
- Flink Kafka连接器:如果需要与Kafka集成
- Flink Elasticsearch连接器:如果需要与Elasticsearch集成
案例:准备依赖
wget https://github.com/oceanbase/obconnector-jdbc/releases/download/1.1.0/oceanbase-client-1.1.0.jar
Resolving github.com (github.com)… 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443… connected.
HTTP request sent, awaiting response… 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/… [following]
–2024-01-01 00:00:01– https://objects.githubusercontent.com/…
Resolving objects.githubusercontent.com (objects.githubusercontent.com)… 185.199.108.133
Connecting to objects.githubusercontent.com (objects.githubusercontent.com)|185.199.108.133|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 2048000 (2.0M) [application/java-archive]
Saving to: ‘oceanbase-client-1.1.0.jar’,学习交流加群风哥QQ113257174。
oceanbase-client-1.1.0.jar 100%[=================================================>] 1.95M 1.00MB/s in 1.9s
2024-01-01 00:00:03 (1.00 MB/s) – ‘oceanbase-client-1.1.0.jar’ saved [2048000/2048000]
cp oceanbase-client-1.1.0.jar /opt/flink/lib/
2.3 配置准备
集成前的配置准备:
- OceanBase配置:确保OceanBase的连接配置正确
- Flink配置:配置Flink的执行参数,如并行度、内存等
- 连接器配置:配置Flink JDBC连接器的参数
案例:配置Flink
vim /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1600m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1,更多视频教程www.fgedu.net.cn。
Part03-集成过程
3.1 数据源配置
数据源配置:
- 配置OceanBase连接:配置OceanBase的连接信息,如URL、用户名、密码等
- 配置表信息:指定要读取的表和字段
- 配置读取模式:配置读取模式,如批处理或流处理
案例:配置数据源
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
public class OceanBaseFlinkIntegration {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置OceanBase连接
String createTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',更多学习教程公众号风哥教程itpux_com。
'password' = 'fgedu123',
'table-name' = 'fgedu_table'
)
""";
tEnv.executeSql(createTableSql);
// 执行查询
TableResult result = tEnv.executeSql("SELECT * FROM source_table");
result.print();
// 执行作业
env.execute("OceanBase Flink Integration");
}
}
3.2 数据处理
数据处理:
- 数据转换:对数据进行转换和处理
- 数据过滤:过滤不需要的数据
- 数据聚合:对数据进行聚合操作
- 数据关联:关联不同数据源的数据
案例:数据处理
,from DB视频:www.itpux.com。
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
public class OceanBaseFlinkProcessing {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置OceanBase连接
String createTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table'
)
""";
tEnv.executeSql(createTableSql);
// 数据处理
String processSql = """
CREATE TABLE result_table AS
SELECT
name,
SUM(value) AS total_value
FROM
source_table
GROUP BY
name
""";
tEnv.executeSql(processSql);
// 执行查询
TableResult result = tEnv.executeSql("SELECT * FROM result_table");
result.print();
// 执行作业
env.execute("OceanBase Flink Processing");
}
}
3.3 数据输出
数据输出:
- 输出到OceanBase:将处理后的数据写回OceanBase
- 输出到Kafka:将处理后的数据输出到Kafka
- 输出到Elasticsearch:将处理后的数据输出到Elasticsearch
- 输出到文件系统:将处理后的数据输出到文件系统
案例:数据输出
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
public class OceanBaseFlinkOutput {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置OceanBase连接(源表)
String createSourceTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table'
)
""";
tEnv.executeSql(createSourceTableSql);
// 配置OceanBase连接(目标表)
String createSinkTableSql = """
CREATE TABLE sink_table (
name STRING,
total_value INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_result'
)
""";
tEnv.executeSql(createSinkTableSql);
// 数据处理和输出
String processSql = """
INSERT INTO sink_table
SELECT
name,
SUM(value) AS total_value
FROM
source_table
GROUP BY
name
""";
tEnv.executeSql(processSql);
// 执行作业
env.execute("OceanBase Flink Output");
}
}
3.4 作业提交与监控
作业提交与监控:
- 作业提交:将Flink作业提交到集群运行
- 作业监控:监控作业的运行状态和性能
- 作业调优:根据监控结果进行作业调优
案例:提交作业
mvn clean package
[INFO] ————————————————————————
[INFO] Building oceanbase-flink-integration 1.0-SNAPSHOT
[INFO] ————————————————————————
[INFO]
[INFO] — maven-clean-plugin:3.1.0:clean (default-clean) @ oceanbase-flink-integration —
[INFO]
[INFO] — maven-resources-plugin:3.0.2:resources (default-resources) @ oceanbase-flink-integration —
[INFO] Using ‘UTF-8’ encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/user/oceanbase-flink-integration/src/main/resources
[INFO]
[INFO] — maven-compiler-plugin:3.8.1:compile (default-compile) @ oceanbase-flink-integration —
[INFO] Changes detected – recompiling the module!
[INFO] Compiling 1 source file to /home/user/oceanbase-flink-integration/target/classes
[INFO]
[INFO] — maven-jar-plugin:3.2.0:jar (default-jar) @ oceanbase-flink-integration —
[INFO] Building jar: /home/user/oceanbase-flink-integration/target/oceanbase-flink-integration-1.0-SNAPSHOT.jar
[INFO]
[INFO] ————————————————————————
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
[INFO] Total time: 2.000 s
[INFO] Finished at: 2024-01-01T00:00:00Z
[INFO] ————————————————————————
./bin/flink run -c com.example.OceanBaseFlinkIntegration /home/user/oceanbase-flink-integration/target/oceanbase-flink-integration-1.0-SNAPSHOT.jar
Job has been submitted with JobID 1234567890abcdef1234567890abcdef
./bin/flink list
—————— Running/Restarting Jobs ——————-
1234567890abcdef1234567890abcdef : OceanBase Flink Integration
————————————————————–
No scheduled jobs.
Part04-实战案例
4.1 实时数据同步
案例:实时数据同步
需求:将OceanBase中的数据实时同步到另一个OceanBase实例。
解决方案:使用Flink JDBC连接器实现实时数据同步。
实现步骤:
- 配置源数据库和目标数据库连接
- 创建源表和目标表
- 编写数据同步逻辑
- 提交作业并监控
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class OceanBaseDataSync {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置源数据库(OceanBase)
String createSourceTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table',
'scan.fetch-size' = '1000',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '10min'
)
""";
tEnv.executeSql(createSourceTableSql);
// 配置目标数据库(OceanBase)
String createSinkTableSql = """
CREATE TABLE sink_table (
id INT,
name STRING,
value INT,
ts TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.2:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table_sync',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '5s',
'sink.max-retries' = '3'
)
""";
tEnv.executeSql(createSinkTableSql);
// 数据同步
String syncSql = """
INSERT INTO sink_table
SELECT * FROM source_table
""";
tEnv.executeSql(syncSql);
// 执行作业
env.execute("OceanBase Data Sync");
}
}
4.2 实时数据处理
案例:实时数据处理
需求:对OceanBase中的数据进行实时处理,计算实时统计信息。
解决方案:使用Flink的流处理能力实现实时数据处理。
实现步骤:
- 配置数据源(OceanBase)
- 编写数据处理逻辑
- 配置数据输出(OceanBase)
- 提交作业并监控
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class OceanBaseRealTimeProcessing {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置数据源(OceanBase)
String createSourceTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table'
)
""";
tEnv.executeSql(createSourceTableSql);
// 配置数据输出(OceanBase)
String createSinkTableSql = """
CREATE TABLE result_table (
window_end TIMESTAMP(3),
name STRING,
total_value INT,
avg_value DOUBLE
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_realtime_result'
)
""";
tEnv.executeSql(createSinkTableSql);
// 实时数据处理
String processSql = """
INSERT INTO result_table
SELECT
TUMBLE_END(ts, INTERVAL '1' MINUTE) AS window_end,
name,
SUM(value) AS total_value,
AVG(value) AS avg_value
FROM
source_table
GROUP BY
TUMBLE(ts, INTERVAL '1' MINUTE),
name
""";
tEnv.executeSql(processSql);
// 执行作业
env.execute("OceanBase Real-time Processing");
}
}
4.3 实时数据分析
案例:实时数据分析
需求:对OceanBase中的数据进行实时分析,生成实时报表。
解决方案:使用Flink的流处理能力实现实时数据分析。
实现步骤:
- 配置数据源(OceanBase)
- 编写数据分析逻辑
- 配置数据输出(Elasticsearch)
- 提交作业并监控
// Flink Java代码
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class OceanBaseRealTimeAnalysis {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 配置数据源(OceanBase)
String createSourceTableSql = """
CREATE TABLE source_table (
id INT,
name STRING,
value INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oceanbase://192.168.1.1:2881/fgedudb',
'username' = 'fgedu',
'password' = 'fgedu123',
'table-name' = 'fgedu_table'
)
""";
tEnv.executeSql(createSourceTableSql);
// 配置数据输出(Elasticsearch)
String createSinkTableSql = """
CREATE TABLE es_table (
window_end TIMESTAMP(3),
name STRING,
total_value INT,
avg_value DOUBLE,
PRIMARY KEY (window_end, name) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.3:9200',
'index' = 'oceanbase_analysis',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '2mb',
'sink.bulk-flush.interval' = '5s',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
'sink.bulk-flush.backoff.max-retries' = '3',
'sink.bulk-flush.backoff.delay' = '1s'
)
""";
tEnv.executeSql(createSinkTableSql);
// 实时数据分析
String analysisSql = """
INSERT INTO es_table
SELECT
TUMBLE_END(ts, INTERVAL '1' MINUTE) AS window_end,
name,
SUM(value) AS total_value,
AVG(value) AS avg_value
FROM
source_table
GROUP BY
TUMBLE(ts, INTERVAL '1' MINUTE),
name
""";
tEnv.executeSql(analysisSql);
// 执行作业
env.execute("OceanBase Real-time Analysis");
}
}
Part05-风哥经验总结与分享
5.1 集成最佳实践
OceanBase与Flink集成的最佳实践:
- 合理配置连接参数:根据实际情况配置连接参数,如fetch-size、cache等
- 优化并行度:根据数据量和集群资源配置合理的并行度
- 使用水印机制:对于流处理,使用水印机制处理乱序数据
- 配置检查点:配置适当的检查点间隔,确保容错
- 监控作业状态:实时监控作业的运行状态和性能
- 优化数据处理逻辑:根据业务需求优化数据处理逻辑
5.2 性能优化
性能优化的方法:
- 增加并行度:适当增加并行度,提高处理速度
- 优化连接参数:优化JDBC连接参数,如fetch-size、batch-size等
- 使用批处理:对于大批量数据,使用批处理模式
- 优化数据结构:优化数据结构,减少数据传输量
- 使用缓存:使用缓存减少重复计算
- 调整资源配置:根据实际情况调整Flink的资源配置
5.3 常见问题处理
常见问题处理:
- 连接失败:
- 检查网络连接
- 检查OceanBase服务状态
- 检查连接参数是否正确
- 数据不一致:
- 检查数据处理逻辑
- 检查水印配置
- 检查检查点配置
- 性能问题:
- 优化并行度
- 优化连接参数
- 调整资源配置
- 作业失败:
- 查看错误日志
- 检查数据格式
- 检查依赖版本
风哥提示:OceanBase与Flink集成是实现实时数据处理的重要方式,需要合理配置和优化
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
