本文详细介绍Flink实时计算最佳实践,包括Flink架构、集群部署、作业开发、Checkpoint、性能优化、监控运维等内容,风哥教程参考Flink官方文档,适合大数据实时计算工程师使用。学习交流加群风哥QQ113257174
Part01-基础概念与理论知识
1.1 Flink实时计算概述
Apache Flink是一个分布式流处理和批处理框架,以高吞吐、低延迟、支持事件时间和精确一次语义著称。更多视频教程www.fgedu.net.cn
- 批流统一:同一套API处理批和流
- 事件时间:支持基于事件时间的处理
- 状态管理:支持大状态管理
- 精确一次:Exactly-Once语义
- 高可用:支持JobManager高可用
- 低延迟:毫秒级延迟
1.2 Flink架构与核心概念
Flink核心组件:
JobManager(JM):
– 作业管理器
– 负责任务调度
– 管理Checkpoint
– 管理资源
– 可配置高可用
TaskManager(TM):
– 任务管理器
– 执行具体任务
– 管理内存
– 数据交换
– 可配置多个
Client:
– 客户端
– 提交作业
– 打包程序
– 提交给JM
# 核心概念
Job:
– 用户提交的作业
– 包含多个Task
Task:
– 执行的任务
– 包含多个Subtask
Subtask:
– 并行执行的子任务
– 最小执行单位
Operator:
– 算子
– 数据处理逻辑
– 如:map、filter、keyBy、window
State:
– 状态
– 中间计算结果
– 支持容错
Checkpoint:
– 检查点
– 状态快照
– 用于故障恢复
Savepoint:
– 保存点
– 手动触发的Checkpoint
– 用于升级和迁移
1.3 Flink API介绍
Flink API层级:
- DataStream API:流处理核心API
- DataSet API:批处理核心API
- Table API:关系型API
- SQL:标准SQL查询
Part02-生产环境规划与建议
2.1 Flink集群规划
Flink集群规划要点:
JobManager:
– 数量:2-3个(HA模式)
– 配置:8核16GB
– 磁盘:SSD 500GB
TaskManager:
– 数量:3-100个
– 配置:16核32GB-32核128GB
– 磁盘:SSD 2TB-4TB
# 内存配置
JobManager堆内存:4GB-8GB
JobManager堆外内存:2GB-4GB
TaskManager堆内存:8GB-64GB
TaskManager堆外内存:2GB-8GB
Managed Memory:4GB-32GB(用于状态)
# 并行度规划
作业总并行度:
– 小作业:2-10
– 中作业:20-100
– 大作业:100-1000
TaskManager Slot数:
– 建议:2-8个
– 推荐:4个
# 目录规划
安装目录:/bigdata/app/flink
数据目录:/bigdata/fgdata/flink
日志目录:/bigdata/fgdata/logs/flink
Checkpoint目录:hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
Savepoint目录:hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints
2.2 Flink核心配置
Flink核心配置:
jobmanager.rpc.address: fgedu-jm
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 8g
jobmanager.memory.flink.size: 6g
taskmanager.memory.process.size: 32g
taskmanager.memory.flink.size: 28g
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
state.backend: rocksdb
state.checkpoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
state.savepoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
execution.checkpointing.min-pause: 5000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
high-availability: zookeeper
high-availability.storageDir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/ha
high-availability.zookeeper.quorum: fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181
rest.address: fgedu-jm
rest.port: 8081
env.log.dir: /bigdata/fgdata/logs/flink
2.3 高可用规划
高可用规划要点:
- JobManager:至少2个,使用ZK做Leader选举
- TaskManager:多节点分布,避免单点故障
- State:Checkpoint存储在HDFS/S3
- ZooKeeper:至少3个节点的ZK集群
from bigdata视频:www.itpux.com
Part03-生产环境项目实施方案
3.1 Flink集群安装部署
3.1.1 Flink安装配置
cd /bigdata/app
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -zxvf flink-1.17.1-bin-scala_2.12.tgz
ln -s flink-1.17.1 flink
# 2. 配置flink-conf.yaml
cat > /bigdata/app/flink/conf/flink-conf.yaml << ‘EOF’
jobmanager.rpc.address: fgedu-jm01
jobmanager.memory.process.size: 8g
taskmanager.memory.process.size: 32g
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
state.backend: rocksdb
state.checkpoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
state.savepoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
high-availability: zookeeper
high-availability.storageDir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/ha
high-availability.zookeeper.quorum: fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181
rest.port: 8081
env.log.dir: /bigdata/fgdata/logs/flink
EOF
# 3. 配置masters
cat > /bigdata/app/flink/conf/masters << ‘EOF’
fgedu-jm01:8081
fgedu-jm02:8081
EOF
# 4. 配置workers
cat > /bigdata/app/flink/conf/workers << ‘EOF’
fgedu-tm01
fgedu-tm02
fgedu-tm03
fgedu-tm04
EOF
# 5. 创建目录
mkdir -p /bigdata/fgdata/flink
mkdir -p /bigdata/fgdata/logs/flink
hdfs dfs -mkdir -p /bigdata/fgdata/flink/checkpoints
hdfs dfs -mkdir -p /bigdata/fgdata/flink/savepoints
hdfs dfs -mkdir -p /bigdata/fgdata/flink/ha
# 6. 启动Flink集群
/bigdata/app/flink/bin/start-cluster.sh
# 7. 验证集群
# 访问Web UI: http://fgedu-jm01:8081
# 查看TaskManager
/bigdata/app/flink/bin/flink list
3.2 Flink作业开发提交
3.2.1 DataStream API开发
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
// FgeduFlinkJob.java
package com.fgedu.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FgeduFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setTopics(“fgedu_user_events”)
.setGroupId(“fgedu_flink_group”)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
“Kafka Source”
);
DataStream<String> result = stream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
})
.keyBy(value -> value)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.reduce((v1, v2) -> v1 + “,” + v2);
result.print();
env.execute(“Fgedu Flink Job”);
}
}
# 编译打包
mvn clean package
# 提交作业
/bigdata/app/flink/bin/flink run \
-c com.fgedu.flink.FgeduFlinkJob \
-p 8 \
fgedu-flink-job.jar
# 查看作业
/bigdata/app/flink/bin/flink list
# 取消作业
/bigdata/app/flink/bin/flink cancel <job-id>
# 从Savepoint恢复
/bigdata/app/flink/bin/flink run \
-s hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints/savepoint-xxx \
-c com.fgedu.flink.FgeduFlinkJob \
fgedu-flink-job.jar
3.3 Checkpoint配置
3.3.1 Checkpoint配置实战
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用Checkpoint
env.enableCheckpointing(60000); // 60秒一次
// Checkpoint模式:EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Checkpoint超时时间
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分钟
// 最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 5秒
// 最大并发Checkpoint数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 外部化Checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// State Backend配置
env.setStateBackend(new RocksDBStateBackend(
“hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints”,
true // 增量Checkpoint
));
// RocksDB配置
RocksDBStateBackend backend = new RocksDBStateBackend(…);
backend.setDbStoragePath(“/bigdata/fgdata/flink/rocksdb”);
backend.setNumberOfTransferThreads(4);
env.setStateBackend(backend);
// 创建Savepoint
// 方式1:命令行
/bigdata/app/flink/bin/flink savepoint <job-id> \
hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints
// 方式2:REST API
curl -X POST http://fgedu-jm:8081/jobs/<job-id>/savepoints \
-d ‘{“target-directory”: “hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints”}’ \
-H “Content-Type: application/json”
Part04-生产案例与实战讲解
4.1 Kafka + Flink实时计算实战
4.1.1 Kafka + Flink实战
kafka-topics.sh –create –topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092 \
–partitions 12 \
–replication-factor 3
// 2. Kafka Source配置
KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setTopicPattern(“fgedu_.*”)
.setGroupId(“fgedu_flink_group”)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new UserEventDeserializer())
.setProperty(“commit.offsets.on.checkpoint”, “true”)
.build();
// 3. 事件时间和Watermark
WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// 4. 读取数据
DataStream<UserEvent> events = env.fromSource(source, watermarkStrategy, “Kafka Source”);
// 5. 实时统计
DataStream<EventCount> countStream = events
.keyBy(UserEvent::getEventType)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregate(), new CountWindowFunction());
// 6. 输出到Kafka
KafkaSink<EventCount> sink = KafkaSink.<EventCount>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(“fgedu_event_count”)
.setValueSerializationSchema(new EventCountSerializer())
.build()
)
.build();
countStream.sinkTo(sink);
// 7. 提交作业
/bigdata/app/flink/bin/flink run \
-c com.fgedu.flink.KafkaFlinkJob \
-p 12 \
-m yarn-cluster \
-yjm 4096 \
-ytm 8192 \
-ys 4 \
fgedu-flink-kafka.jar
4.2 性能优化实战
4.2.1 性能优化技巧
# 并行度设置:
# – Kafka Source并行度 = Topic分区数
# – 算子并行度根据数据量调整
# – 总并行度 = Slot数 * TaskManager数
# 优化2:内存配置
# TaskManager堆内存:用于执行
# TaskManager Managed内存:用于状态
# 状态大的话增加Managed内存比例
# 优化3:状态后端选择
# HashMapStateBackend:小状态(<1GB)
# RocksDBStateBackend:大状态,支持增量
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
# 优化4:Checkpoint优化
# 增加Checkpoint间隔
env.enableCheckpointing(300000); // 5分钟
# 使用增量Checkpoint
new RocksDBStateBackend(path, true);
// 减少Checkpoint超时
env.getCheckpointConfig().setCheckpointTimeout(600000);
# 优化5:对象复用
env.getConfig().enableObjectReuse();
# 优化6:算子链
// 自动算子链(默认开启)
// map()和filter()会链在一起
DataStream<String> result = stream
.map(...)
.filter(...)
.keyBy(...);
// 手动禁用链
env.disableOperatorChaining();
# 优化7:ReBalance
// 解决数据倾斜
stream.rebalance().map(...);
# 优化8:本地聚合
// 先在本地聚合,再全局聚合
stream
.keyBy(...)
.window(...)
.aggregate(new LocalAgg(), new GlobalAgg());
4.3 监控与运维实战
4.3.1 Flink监控
http://fgedu-jm:8081
# Web UI页面
Overview:
– 作业概览
– TaskManager状态
– 作业状态
Jobs:
– 作业列表
– 作业详情
– 取消作业
TaskManagers:
– TaskManager列表
– 内存使用
– 日志查看
Checkpoints:
– Checkpoint历史
– Checkpoint状态
– Checkpoint大小
# Prometheus监控
# flink-conf.yaml配置
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260
# Prometheus配置
scrape_configs:
– job_name: ‘flink’
static_configs:
– targets: [‘fgedu-jm:9250’, ‘fgedu-tm01:9250’, ‘fgedu-tm02:9250’]
# Grafana Dashboard
# 导入Flink Dashboard
# 关键指标:
– 作业运行状态
– Checkpoint完成数
– Checkpoint失败数
– Checkpoint大小
– Task执行时间
– 吞吐量
– 延迟
– 内存使用
# 日志查看
# JobManager日志
tail -f /bigdata/fgdata/logs/flink/flink-*-standalonesession-*.log
# TaskManager日志
tail -f /bigdata/fgdata/logs/flink/flink-*-taskexecutor-*.log
# 常用命令
# 列出作业
flink list
# 列出运行中的作业
flink list -r
# 取消作业
flink cancel <job-id>
# 取消作业并创建Savepoint
flink cancel -s <savepoint-path> <job-id>
# 停止作业(优雅停止)
flink stop <job-id>
# 从Savepoint恢复
flink run -s <savepoint-path> <jar-file>
Part05-风哥经验总结与分享
5.1 Flink最佳实践
Flink最佳实践:
- 状态后端:大状态用RocksDB,支持增量Checkpoint
- Checkpoint:合理设置间隔,确保能在超时前完成
- 并行度:Source并行度=Kafka分区数,其他根据数据量调整
- 高可用:配置JobManager HA,State存储在HDFS
- 监控告警:监控Checkpoint、延迟、吞吐量
- Savepoint:升级前先创建Savepoint
5.2 常见问题处理
– 检查Checkpoint目录权限
– 检查HDFS空间
– 检查网络连接
– 增加Checkpoint超时时间
– 减少State大小
# 常见问题2:OOM
– 增加TaskManager内存
– 增加Managed内存比例
– 使用RocksDB状态后端
– 优化State大小
– 减少并行度
# 常见问题3:数据倾斜
– 查看Key分布
– 加盐处理
– 两阶段聚合
– 调整并行度
# 常见问题4:延迟高
– 增加并行度
– 优化算子
– 使用本地聚合
– 检查Checkpoint频率
– 优化State访问
# 常见问题5:作业失败
– 查看日志
– 查看异常堆栈
– 检查依赖
– 从Checkpoint恢复
– 调整配置
5.3 运维检查清单
– [ ] JobManager状态
– [ ] TaskManager状态
– [ ] 作业运行状态
– [ ] Checkpoint状态
– [ ] Checkpoint成功率
– [ ] 延迟情况
– [ ] 吞吐量
– [ ] 内存使用
– [ ] State大小
– [ ] 日志检查
– [ ] 告警规则检查
# 日常巡检内容
1. 检查Flink Web UI
2. 检查作业状态
3. 检查Checkpoint状态
4. 检查延迟和吞吐量
5. 查看错误日志
6. 检查资源使用
7. 检查告警
8. 定期创建Savepoint
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
