1. 首页 > Hadoop教程 > 正文

大数据教程FG162-Hadoop Flink实时计算最佳实践

本文详细介绍Flink实时计算最佳实践,包括Flink架构、集群部署、作业开发、Checkpoint、性能优化、监控运维等内容,风哥教程参考Flink官方文档,适合大数据实时计算工程师使用。学习交流加群风哥QQ113257174

Part01-基础概念与理论知识

Apache Flink是一个分布式流处理和批处理框架,以高吞吐、低延迟、支持事件时间和精确一次语义著称。更多视频教程www.fgedu.net.cn

Flink核心特性:

  • 批流统一:同一套API处理批和流
  • 事件时间:支持基于事件时间的处理
  • 状态管理:支持大状态管理
  • 精确一次:Exactly-Once语义
  • 高可用:支持JobManager高可用
  • 低延迟:毫秒级延迟

Flink核心组件:

# Flink核心组件
JobManager(JM):
– 作业管理器
– 负责任务调度
– 管理Checkpoint
– 管理资源
– 可配置高可用

TaskManager(TM):
– 任务管理器
– 执行具体任务
– 管理内存
– 数据交换
– 可配置多个

Client:
– 客户端
– 提交作业
– 打包程序
– 提交给JM

# 核心概念
Job:
– 用户提交的作业
– 包含多个Task

Task:
– 执行的任务
– 包含多个Subtask

Subtask:
– 并行执行的子任务
– 最小执行单位

Operator:
– 算子
– 数据处理逻辑
– 如:map、filter、keyBy、window

State:
– 状态
– 中间计算结果
– 支持容错

Checkpoint:
– 检查点
– 状态快照
– 用于故障恢复

Savepoint:
– 保存点
– 手动触发的Checkpoint
– 用于升级和迁移

Flink API层级:

  • DataStream API:流处理核心API
  • DataSet API:批处理核心API
  • Table API:关系型API
  • SQL:标准SQL查询
风哥提示:对于实时计算,推荐使用DataStream API或Table API & SQL。DataStream API更灵活,Table API & SQL更易用。根据具体需求选择。更多学习教程公众号风哥教程itpux_com

Part02-生产环境规划与建议

Flink集群规划要点:

# 服务器规划
JobManager:
– 数量:2-3个(HA模式)
– 配置:8核16GB
– 磁盘:SSD 500GB

TaskManager:
– 数量:3-100个
– 配置:16核32GB-32核128GB
– 磁盘:SSD 2TB-4TB

# 内存配置
JobManager堆内存:4GB-8GB
JobManager堆外内存:2GB-4GB

TaskManager堆内存:8GB-64GB
TaskManager堆外内存:2GB-8GB
Managed Memory:4GB-32GB(用于状态)

# 并行度规划
作业总并行度:
– 小作业:2-10
– 中作业:20-100
– 大作业:100-1000

TaskManager Slot数:
– 建议:2-8个
– 推荐:4个

# 目录规划
安装目录:/bigdata/app/flink
数据目录:/bigdata/fgdata/flink
日志目录:/bigdata/fgdata/logs/flink
Checkpoint目录:hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
Savepoint目录:hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints

Flink核心配置:

# flink-conf.yaml配置
jobmanager.rpc.address: fgedu-jm
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 8g
jobmanager.memory.flink.size: 6g

taskmanager.memory.process.size: 32g
taskmanager.memory.flink.size: 28g
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

parallelism.default: 4

state.backend: rocksdb
state.checkpoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
state.savepoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints

execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
execution.checkpointing.min-pause: 5000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

high-availability: zookeeper
high-availability.storageDir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/ha
high-availability.zookeeper.quorum: fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181

rest.address: fgedu-jm
rest.port: 8081

env.log.dir: /bigdata/fgdata/logs/flink

高可用规划要点:

高可用配置:

  • JobManager:至少2个,使用ZK做Leader选举
  • TaskManager:多节点分布,避免单点故障
  • State:Checkpoint存储在HDFS/S3
  • ZooKeeper:至少3个节点的ZK集群

from bigdata视频:www.itpux.com

Part03-生产环境项目实施方案

3.1.1 Flink安装配置

# 1. 下载Flink
cd /bigdata/app
wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar -zxvf flink-1.17.1-bin-scala_2.12.tgz
ln -s flink-1.17.1 flink

# 2. 配置flink-conf.yaml
cat > /bigdata/app/flink/conf/flink-conf.yaml << ‘EOF’
jobmanager.rpc.address: fgedu-jm01
jobmanager.memory.process.size: 8g
taskmanager.memory.process.size: 32g
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4

state.backend: rocksdb
state.checkpoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints
state.savepoints.dir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints

execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000

high-availability: zookeeper
high-availability.storageDir: hdfs://fgedu-nn:8020/bigdata/fgdata/flink/ha
high-availability.zookeeper.quorum: fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181

rest.port: 8081
env.log.dir: /bigdata/fgdata/logs/flink
EOF

# 3. 配置masters
cat > /bigdata/app/flink/conf/masters << ‘EOF’
fgedu-jm01:8081
fgedu-jm02:8081
EOF

# 4. 配置workers
cat > /bigdata/app/flink/conf/workers << ‘EOF’
fgedu-tm01
fgedu-tm02
fgedu-tm03
fgedu-tm04
EOF

# 5. 创建目录
mkdir -p /bigdata/fgdata/flink
mkdir -p /bigdata/fgdata/logs/flink
hdfs dfs -mkdir -p /bigdata/fgdata/flink/checkpoints
hdfs dfs -mkdir -p /bigdata/fgdata/flink/savepoints
hdfs dfs -mkdir -p /bigdata/fgdata/flink/ha

# 6. 启动Flink集群
/bigdata/app/flink/bin/start-cluster.sh

# 7. 验证集群
# 访问Web UI: http://fgedu-jm01:8081
# 查看TaskManager
/bigdata/app/flink/bin/flink list

3.2.1 DataStream API开发

// pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>

// FgeduFlinkJob.java
package com.fgedu.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FgeduFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(60000);

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setTopics(“fgedu_user_events”)
.setGroupId(“fgedu_flink_group”)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
“Kafka Source”
);

DataStream<String> result = stream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
})
.keyBy(value -> value)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.reduce((v1, v2) -> v1 + “,” + v2);

result.print();

env.execute(“Fgedu Flink Job”);
}
}

# 编译打包
mvn clean package

# 提交作业
/bigdata/app/flink/bin/flink run \
-c com.fgedu.flink.FgeduFlinkJob \
-p 8 \
fgedu-flink-job.jar

# 查看作业
/bigdata/app/flink/bin/flink list

# 取消作业
/bigdata/app/flink/bin/flink cancel <job-id>

# 从Savepoint恢复
/bigdata/app/flink/bin/flink run \
-s hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints/savepoint-xxx \
-c com.fgedu.flink.FgeduFlinkJob \
fgedu-flink-job.jar

3.3.1 Checkpoint配置实战

// Checkpoint配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用Checkpoint
env.enableCheckpointing(60000); // 60秒一次

// Checkpoint模式:EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// Checkpoint超时时间
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分钟

// 最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 5秒

// 最大并发Checkpoint数
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 外部化Checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

// State Backend配置
env.setStateBackend(new RocksDBStateBackend(
“hdfs://fgedu-nn:8020/bigdata/fgdata/flink/checkpoints”,
true // 增量Checkpoint
));

// RocksDB配置
RocksDBStateBackend backend = new RocksDBStateBackend(…);
backend.setDbStoragePath(“/bigdata/fgdata/flink/rocksdb”);
backend.setNumberOfTransferThreads(4);
env.setStateBackend(backend);

// 创建Savepoint
// 方式1:命令行
/bigdata/app/flink/bin/flink savepoint <job-id> \
hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints

// 方式2:REST API
curl -X POST http://fgedu-jm:8081/jobs/<job-id>/savepoints \
-d ‘{“target-directory”: “hdfs://fgedu-nn:8020/bigdata/fgdata/flink/savepoints”}’ \
-H “Content-Type: application/json”

风哥提示:Checkpoint是Flink容错的核心。生产环境建议使用RocksDB状态后端,支持大状态和增量Checkpoint。Checkpoint间隔要根据业务需求设置,太频繁影响性能,太稀疏丢失数据多。学习交流加群风哥微信: itpux-com

Part04-生产案例与实战讲解

4.1.1 Kafka + Flink实战

// 1. 创建Kafka Topic
kafka-topics.sh –create –topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092 \
–partitions 12 \
–replication-factor 3

// 2. Kafka Source配置
KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setTopicPattern(“fgedu_.*”)
.setGroupId(“fgedu_flink_group”)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new UserEventDeserializer())
.setProperty(“commit.offsets.on.checkpoint”, “true”)
.build();

// 3. 事件时间和Watermark
WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// 4. 读取数据
DataStream<UserEvent> events = env.fromSource(source, watermarkStrategy, “Kafka Source”);

// 5. 实时统计
DataStream<EventCount> countStream = events
.keyBy(UserEvent::getEventType)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregate(), new CountWindowFunction());

// 6. 输出到Kafka
KafkaSink<EventCount> sink = KafkaSink.<EventCount>builder()
.setBootstrapServers(“fgedu-kafka01:9092,fgedu-kafka02:9092”)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(“fgedu_event_count”)
.setValueSerializationSchema(new EventCountSerializer())
.build()
)
.build();

countStream.sinkTo(sink);

// 7. 提交作业
/bigdata/app/flink/bin/flink run \
-c com.fgedu.flink.KafkaFlinkJob \
-p 12 \
-m yarn-cluster \
-yjm 4096 \
-ytm 8192 \
-ys 4 \
fgedu-flink-kafka.jar

4.2.1 性能优化技巧

# 优化1:合理设置并行度
# 并行度设置:
# – Kafka Source并行度 = Topic分区数
# – 算子并行度根据数据量调整
# – 总并行度 = Slot数 * TaskManager数

# 优化2:内存配置
# TaskManager堆内存:用于执行
# TaskManager Managed内存:用于状态
# 状态大的话增加Managed内存比例

# 优化3:状态后端选择
# HashMapStateBackend:小状态(<1GB) # RocksDBStateBackend:大状态,支持增量 env.setStateBackend(new RocksDBStateBackend(checkpointPath, true)); # 优化4:Checkpoint优化 # 增加Checkpoint间隔 env.enableCheckpointing(300000); // 5分钟 # 使用增量Checkpoint new RocksDBStateBackend(path, true); // 减少Checkpoint超时 env.getCheckpointConfig().setCheckpointTimeout(600000); # 优化5:对象复用 env.getConfig().enableObjectReuse(); # 优化6:算子链 // 自动算子链(默认开启) // map()和filter()会链在一起 DataStream<String> result = stream .map(...) .filter(...) .keyBy(...); // 手动禁用链 env.disableOperatorChaining(); # 优化7:ReBalance // 解决数据倾斜 stream.rebalance().map(...); # 优化8:本地聚合 // 先在本地聚合,再全局聚合 stream .keyBy(...) .window(...) .aggregate(new LocalAgg(), new GlobalAgg());

4.3.1 Flink监控

# 访问Flink Web UI
http://fgedu-jm:8081

# Web UI页面
Overview:
– 作业概览
– TaskManager状态
– 作业状态

Jobs:
– 作业列表
– 作业详情
– 取消作业

TaskManagers:
– TaskManager列表
– 内存使用
– 日志查看

Checkpoints:
– Checkpoint历史
– Checkpoint状态
– Checkpoint大小

# Prometheus监控
# flink-conf.yaml配置
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

# Prometheus配置
scrape_configs:
– job_name: ‘flink’
static_configs:
– targets: [‘fgedu-jm:9250’, ‘fgedu-tm01:9250’, ‘fgedu-tm02:9250’]

# Grafana Dashboard
# 导入Flink Dashboard
# 关键指标:
– 作业运行状态
– Checkpoint完成数
– Checkpoint失败数
– Checkpoint大小
– Task执行时间
– 吞吐量
– 延迟
– 内存使用

# 日志查看
# JobManager日志
tail -f /bigdata/fgdata/logs/flink/flink-*-standalonesession-*.log

# TaskManager日志
tail -f /bigdata/fgdata/logs/flink/flink-*-taskexecutor-*.log

# 常用命令
# 列出作业
flink list

# 列出运行中的作业
flink list -r

# 取消作业
flink cancel <job-id>

# 取消作业并创建Savepoint
flink cancel -s <savepoint-path> <job-id>

# 停止作业(优雅停止)
flink stop <job-id>

# 从Savepoint恢复
flink run -s <savepoint-path> <jar-file>

生产环境建议:Flink Web UI功能强大,建议配置访问权限。同时配置Prometheus+Grafana进行长期监控。Checkpoint要定期检查,确保正常完成。更多视频教程www.fgedu.net.cn

Part05-风哥经验总结与分享

Flink最佳实践:

  • 状态后端:大状态用RocksDB,支持增量Checkpoint
  • Checkpoint:合理设置间隔,确保能在超时前完成
  • 并行度:Source并行度=Kafka分区数,其他根据数据量调整
  • 高可用:配置JobManager HA,State存储在HDFS
  • 监控告警:监控Checkpoint、延迟、吞吐量
  • Savepoint:升级前先创建Savepoint
# 常见问题1:Checkpoint失败
– 检查Checkpoint目录权限
– 检查HDFS空间
– 检查网络连接
– 增加Checkpoint超时时间
– 减少State大小

# 常见问题2:OOM
– 增加TaskManager内存
– 增加Managed内存比例
– 使用RocksDB状态后端
– 优化State大小
– 减少并行度

# 常见问题3:数据倾斜
– 查看Key分布
– 加盐处理
– 两阶段聚合
– 调整并行度

# 常见问题4:延迟高
– 增加并行度
– 优化算子
– 使用本地聚合
– 检查Checkpoint频率
– 优化State访问

# 常见问题5:作业失败
– 查看日志
– 查看异常堆栈
– 检查依赖
– 从Checkpoint恢复
– 调整配置

# Flink运维检查清单
– [ ] JobManager状态
– [ ] TaskManager状态
– [ ] 作业运行状态
– [ ] Checkpoint状态
– [ ] Checkpoint成功率
– [ ] 延迟情况
– [ ] 吞吐量
– [ ] 内存使用
– [ ] State大小
– [ ] 日志检查
– [ ] 告警规则检查

# 日常巡检内容
1. 检查Flink Web UI
2. 检查作业状态
3. 检查Checkpoint状态
4. 检查延迟和吞吐量
5. 查看错误日志
6. 检查资源使用
7. 检查告警
8. 定期创建Savepoint

风哥提示:Flink实时计算生产环境运维要重点关注Checkpoint和状态。确保Checkpoint正常完成,状态大小可控。升级作业前一定要创建Savepoint,确保可以回滚。学习交流加群风哥QQ113257174

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息