本文详细介绍Kafka最佳实践与调优,包括Kafka架构、集群部署、生产优化、消费优化、性能调优等内容,风哥教程参考Kafka官方文档,适合大数据工程师使用。学习交流加群风哥微信: itpux-com
Part01-基础概念与理论知识
1.1 Kafka核心概念
Kafka是一个分布式事件流平台,用于高性能数据管道、流分析、数据集成和关键任务应用。更多视频教程www.fgedu.net.cn
- Broker:Kafka服务器节点
- Topic:消息主题,消息分类
- Partition:分区,Topic的分片,提高并发
- Replica:副本,高可用保证
- Producer:生产者,发送消息
- Consumer:消费者,消费消息
- Consumer Group:消费组,一组消费者共同消费
- Offset:消费偏移量
1.2 Kafka架构原理
Kafka核心架构:
Broker:
– 存储消息
– 处理请求
– 可配置多个
ZooKeeper/Kraft:
– 元数据管理
– Controller选举
– 配置管理
Producer:
– 发送消息
– 负载均衡
– 重试机制
Consumer:
– 消费消息
– Offset管理
– 负载均衡
1.3 消息流转流程
消息流转流程:
- 生产者发送:Producer根据Key选择Partition,发送消息
- Broker接收:Broker写入消息到磁盘,同步副本
- 消费者消费:Consumer拉取消息,提交Offset
Part02-生产环境规划与建议
2.1 Kafka集群规划
Kafka集群规划要点:
Broker节点:
– 数量:3-100个
– 配置:16核32GB-32核128GB
– 磁盘:SSD/NVMe,多个磁盘
– 网络:万兆网卡
# 目录规划
安装目录:/bigdata/app/kafka
数据目录:/bigdata/fgdata/kafka/data1,/bigdata/fgdata/kafka/data2,/bigdata/fgdata/kafka/data3
日志目录:/bigdata/fgdata/logs/kafka
# 副本配置
主题副本数:
– 生产环境:3副本
– 测试环境:2副本
– 最小同步副本:2
# 分区规划
分区数量:
– 根据吞吐量确定
– 建议:10-1000个
– 分区数 ≈ 目标吞吐量 / 单分区吞吐量
# 示例
目标吞吐量:100MB/s
单分区吞吐量:5MB/s
分区数 ≈ 100 / 5 = 20个
2.2 Kafka核心配置
Kafka核心配置:
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://fgedu-kafka01:9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/bigdata/fgdata/kafka/data1,/bigdata/fgdata/kafka/data2,/bigdata/fgdata/kafka/data3
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
log.retention.hours=168
log.retention.bytes=107374182400
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=3000
auto.create.topics.enable=false
delete.topic.enable=true
2.3 Topic设计规范
Topic设计规范:
- 命名规范:业务名.场景名.环境,如fgedu.user.event.prod
- 分区数:根据吞吐量确定,10-1000个
- 副本数:生产环境3副本
- 消息大小:建议<1MB,最大不超过10MB
- 保留时间:根据业务需求,7天-30天
更多学习教程公众号风哥教程itpux_com
Part03-生产环境项目实施方案
3.1 Kafka集群安装部署
3.1.1 Kafka安装配置
cd /bigdata/app
wget https://archive.apache.org/dist/kafka/3.4.1/kafka_2.13-3.4.1.tgz
tar -zxvf kafka_2.13-3.4.1.tgz
ln -s kafka_2.13-3.4.1 kafka
# 2. 配置server.properties
cat > /bigdata/app/kafka/config/server.properties << ‘EOF’
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://fgedu-kafka01:9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/bigdata/fgdata/kafka/data1,/bigdata/fgdata/kafka/data2,/bigdata/fgdata/kafka/data3
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=fgedu-zk01:2181,fgedu-zk02:2181,fgedu-zk03:2181/kafka
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=3000
auto.create.topics.enable=false
delete.topic.enable=true
EOF
# 3. 创建目录
mkdir -p /bigdata/fgdata/kafka/data1
mkdir -p /bigdata/fgdata/kafka/data2
mkdir -p /bigdata/fgdata/kafka/data3
mkdir -p /bigdata/fgdata/logs/kafka
# 4. 修改broker.id
# 每个节点配置不同的broker.id
# fgedu-kafka01: broker.id=0
# fgedu-kafka02: broker.id=1
# fgedu-kafka03: broker.id=2
# 5. 配置环境变量
cat >> /etc/profile << ‘EOF’
export KAFKA_HOME=/bigdata/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
# 6. 启动Kafka
cd /bigdata/app/kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 7. 验证Kafka
jps | grep Kafka
bin/kafka-topics.sh –list –bootstrap-server localhost:9092
# 8. 创建Topic
bin/kafka-topics.sh –create \
–topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092 \
–partitions 12 \
–replication-factor 3 \
–config min.insync.replicas=2
# 9. 查看Topic
bin/kafka-topics.sh –describe \
–topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092
3.2 安全配置
3.2.1 安全认证配置
# server.properties添加
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 2. 创建jaas.conf
cat > /bigdata/app/kafka/config/kafka_server_jaas.conf << ‘EOF’
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=”admin”
password=”fgedu123″
user_admin=”fgedu123″
user_producer=”producer123″
user_consumer=”consumer123″;
};
EOF
# 3. 配置启动参数
cat >> /bigdata/app/kafka/bin/kafka-server-start.sh << ‘EOF’
export KAFKA_OPTS=”-Djava.security.auth.login.config=/bigdata/app/kafka/config/kafka_server_jaas.conf”
EOF
# 4. 配置ACL
# 创建Topic时配置
bin/kafka-topics.sh –create \
–topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092 \
–partitions 12 \
–replication-factor 3 \
–config min.insync.replicas=2
# 添加Producer权限
bin/kafka-acls.sh –authorizer-properties zookeeper.connect=fgedu-zk01:2181/kafka \
–add \
–allow-principal User:producer \
–operation Write \
–topic fgedu_user_events
# 添加Consumer权限
bin/kafka-acls.sh –authorizer-properties zookeeper.connect=fgedu-zk01:2181/kafka \
–add \
–allow-principal User:consumer \
–operation Read \
–topic fgedu_user_events \
–group fgedu_consumer_group
# 5. Producer配置
# producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”producer” password=”producer123″;
# 6. Consumer配置
# consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”consumer” password=”consumer123″;
3.3 监控体系搭建
3.3.1 Prometheus + Grafana监控
cd /bigdata/app
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.19.0/jmx_prometheus_javaagent-0.19.0.jar
# 2. 配置JMX Exporter
cat > /bigdata/app/kafka/config/kafka_jmx_exporter.yaml << ‘EOF’
lowercaseOutputName: true
rules:
– pattern: kafka.server<type=(.+), name=(.+)><>Value
name: kafka_server_$1_$2
– pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+)><>Count
name: kafka_server_$1_$2_count
labels:
clientId: “$3”
topic: “$4″
EOF
# 3. 修改启动脚本
cat >> /bigdata/app/kafka/bin/kafka-server-start.sh << ‘EOF’
export KAFKA_OPTS=”$KAFKA_OPTS -javaagent:/bigdata/app/jmx_prometheus_javaagent-0.19.0.jar=9500:/bigdata/app/kafka/config/kafka_jmx_exporter.yaml”
EOF
# 4. 重启Kafka
bin/kafka-server-stop.sh
bin/kafka-server-start.sh -daemon config/server.properties
# 5. 配置Prometheus
scrape_configs:
– job_name: ‘kafka’
static_configs:
– targets: [‘fgedu-kafka01:9500’, ‘fgedu-kafka02:9500’, ‘fgedu-kafka03:9500’]
# 6. Grafana Dashboard
# 导入Kafka Dashboard
# 关键指标:
– 消息流入/流出速率
– 分区副本状态
– ISR状态
– 请求延迟
– 消息大小分布
– Broker状态
Part04-生产案例与实战讲解
4.1 Producer优化实战
4.1.1 Producer配置优化
Properties props = new Properties();
props.put(“bootstrap.servers”, “fgedu-kafka01:9092,fgedu-kafka02:9092,fgedu-kafka03:9092”);
props.put(“acks”, “all”); // 等待所有副本确认
props.put(“retries”, 3); // 重试3次
props.put(“batch.size”, 16384); // 批量大小16KB
props.put(“linger.ms”, 10); // 等待10ms
props.put(“buffer.memory”, 33554432); // 缓冲区32MB
props.put(“compression.type”, “snappy”); // Snappy压缩
props.put(“max.in.flight.requests.per.connection”, 5); // 最多5个并发请求
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
// 创建Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 1000000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
“fgedu_user_events”,
“key_” + i,
“value_” + i
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
}
// 关闭Producer
producer.close();
// 参数说明
acks:
– 0:不等待确认,性能最好但可能丢数据
– 1:等待Leader确认,性能和可靠性平衡
– all/-1:等待所有ISR确认,最可靠
batch.size:
– 16KB-32KB,太小性能差,太大延迟高
linger.ms:
– 0-100ms,批量发送,增加吞吐
compression.type:
– none:不压缩
– gzip:压缩率高,CPU高
– snappy:平衡
– lz4:快
– zstd:新,好
retries:
– 0-10,建议3-5
4.2 Consumer优化实战
4.2.1 Consumer配置优化
Properties props = new Properties();
props.put(“bootstrap.servers”, “fgedu-kafka01:9092,fgedu-kafka02:9092,fgedu-kafka03:9092”);
props.put(“group.id”, “fgedu_consumer_group”);
props.put(“enable.auto.commit”, “false”); // 手动提交
props.put(“auto.commit.interval.ms”, “1000”);
props.put(“session.timeout.ms”, 30000);
props.put(“max.poll.records”, 500); // 每次最多拉取500条
props.put(“max.poll.interval.ms”, 300000); // 5分钟
props.put(“fetch.min.bytes”, 1024); // 最小1KB
props.put(“fetch.max.wait.ms”, 500); // 最多等500ms
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“auto.offset.reset”, “earliest”);
// 创建Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅Topic
consumer.subscribe(Arrays.asList(“fgedu_user_events”));
// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“offset = %d, key = %s, value = %s%n”,
record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 同步提交Offset
}
} finally {
consumer.close();
}
// 参数说明
enable.auto.commit:
– true:自动提交,简单但可能丢数据
– false:手动提交,可靠但复杂
max.poll.records:
– 10-1000,根据处理能力调整
auto.offset.reset:
– earliest:从头开始
– latest:从最新开始
– none:报错
fetch.min.bytes:
– 1-102400,批量拉取
4.3 性能调优实战
4.3.1 性能调优技巧
# 增加分区数提高并发
bin/kafka-topics.sh –alter \
–topic fgedu_user_events \
–bootstrap-server fgedu-kafka01:9092 \
–partitions 24
# 优化2:副本优化
# min.insync.replicas=2,acks=all
# 平衡性能和可靠性
# 优化3:日志段大小
# 1GB-2GB,减少分段
log.segment.bytes=1073741824
# 优化4:副本同步
num.replica.fetchers=4
# 优化5:网络线程
num.network.threads=8
num.io.threads=16
# 优化6:垃圾回收
# 配置G1GC
export KAFKA_HEAP_OPTS=”-Xms8G -Xmx8G -XX:+UseG1GC -XX:MaxGCPauseMillis=20″
# 优化7:磁盘优化
# 使用多个数据目录
log.dirs=/bigdata/fgdata/kafka/data1,/bigdata/fgdata/kafka/data2,/bigdata/fgdata/kafka/data3
# 优化8:操作系统优化
# 网络优化
echo ‘net.core.somaxconn = 1024’ >> /etc/sysctl.conf
echo ‘net.ipv4.tcp_max_syn_backlog = 8192’ >> /etc/sysctl.conf
echo ‘vm.swappiness = 1’ >> /etc/sysctl.conf
sysctl -p
# 文件描述符
echo ‘* soft nofile 100000’ >> /etc/security/limits.conf
echo ‘* hard nofile 100000’ >> /etc/security/limits.conf
Part05-风哥经验总结与分享
5.1 Kafka生产最佳实践
Kafka生产最佳实践:
- 分区设计:合理的分区数,单分区不要太大
- 副本配置:3副本,min.insync.replicas=2
- 消息大小:消息不要太大,建议<1MB
- 压缩:启用压缩,减少网络和存储
- 监控告警:监控关键指标,及时告警
- 高可用:多Broker,跨机架部署
5.2 常见问题处理
– 检查Producer acks配置
– 检查min.insync.replicas
– 检查副本同步状态
– 启用重试
# 常见问题2:消息重复
– 检查Consumer幂等性
– 实现幂等处理
– 使用事务
# 常见问题3:ISR收缩
– 检查网络
– 检查磁盘IO
– 增加副本同步线程
– 调整超时参数
# 常见问题4:消费慢
– 增加Consumer数量
– 增加分区数
– 优化处理逻辑
– 批量处理
# 常见问题5:Leader选举频繁
– 检查ZooKeeper/Kraft
– 检查网络
– 检查Broker状态
– 调整超时参数
5.3 运维检查清单
– [ ] Broker状态
– [ ] 分区状态
– [ ] 副本状态
– [ ] ISR状态
– [ ] 消息流入速率
– [ ] 消息流出速率
– [ ] 消息延迟
– [ ] 磁盘空间
– [ ] Leader分布
– [ ] 网络连接
– [ ] 告警规则检查
– [ ] 日志检查
# 日常巡检内容
1. 检查Broker状态
2. 检查分区和副本状态
3. 检查ISR状态
4. 检查消息流入流出
5. 检查磁盘空间
6. 检查Leader分布
7. 查看错误日志
8. 检查监控告警
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
