本文档风哥主要介绍Kafka数据可靠性实战,包括Kafka数据可靠性机制、Kafka ACK机制配置、Kafka幂等性保证、Kafka事务支持、Kafka数据丢失预防等内容,风哥教程参考Kafka官方文档Reliability、Semantics等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Kafka数据可靠性机制
Kafka通过多种机制保证数据可靠性,包括副本机制、ACK机制、ISR机制、幂等性和事务等。学习交流加群风哥微信: itpux-com
- 副本机制:多副本存储,防止数据丢失
- ACK机制:生产者确认机制,确保消息写入
- ISR机制:同步副本集合,保证数据一致性
- 幂等性:防止消息重复写入
- 事务:保证跨分区消息原子性
1.2 Kafka ACK机制详解
Kafka生产者的ACK机制决定了消息写入的可靠性级别:
acks=0
– 生产者不等待Broker确认
– 最高性能,最低可靠性
– 可能丢失消息
acks=1
– 等待Leader确认
– 平衡性能和可靠性
– Leader故障可能丢失消息
acks=all (或-1)
– 等待所有ISR副本确认
– 最高可靠性
– 需要配合min.insync.replicas使用
# ACK配置对比
| acks | 可靠性 | 性能 | 适用场景 |
|——|——–|——|———-|
| 0 | 低 | 高 | 日志收集 |
| 1 | 中 | 中 | 一般业务 |
| all | 高 | 低 | 金融交易 |
1.3 Kafka精确一次语义
Kafka通过幂等性和事务实现精确一次语义:
- 幂等性:保证单个Producer实例的消息不重复
- 事务:保证跨分区的多条消息原子性写入
- 消费者隔离级别:read_committed只读取已提交的消息
Part02-生产环境规划与建议
2.1 数据可靠性规划
数据可靠性规划需要考虑以下因素:
级别1:最高可靠性(金融交易)
– acks=all
– min.insync.replicas=2
– replication.factor=3
– enable.idempotence=true
– enable.transaction=true
– unclean.leader.election.enable=false
级别2:高可靠性(订单系统)
– acks=all
– min.insync.replicas=2
– replication.factor=3
– enable.idempotence=true
级别3:平衡配置(日志系统)
– acks=1
– replication.factor=3
– enable.idempotence=true
级别4:高性能配置(监控指标)
– acks=0
– replication.factor=2
– 允许少量数据丢失
2.2 生产者可靠性配置
生产者可靠性配置建议:
# 确认机制
acks=all
# 重试配置
retries=2147483647 # 最大重试次数
retry.backoff.ms=100 # 重试间隔
# 幂等性
enable.idempotence=true # 启用幂等性
# 顺序保证
max.in.flight.requests.per.connection=1 # 或使用幂等性时可为5
# 超时配置
request.timeout.ms=30000
delivery.timeout.ms=120000
# 事务配置(需要时)
transactional.id=fgedu-tx-1
transaction.timeout.ms=900000
# 完整配置示例
bootstrap.servers=192.168.1.51:9092
acks=all
retries=3
enable.idempotence=true
max.in.flight.requests.per.connection=5
batch.size=16384
linger.ms=5
buffer.memory=33554432
compression.type=lz4
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
2.3 消费者可靠性配置
消费者可靠性配置建议:
# Offset管理
enable.auto.commit=false # 手动提交Offset
# 隔离级别
isolation.level=read_committed # 只读取已提交消息
# 消费配置
max.poll.records=500 # 单次拉取记录数
max.poll.interval.ms=300000 # 最大处理间隔
# 心跳配置
session.timeout.ms=10000
heartbeat.interval.ms=3000
# 完整配置示例
bootstrap.servers=192.168.1.51:9092
group.id=fgedu-consumer-group
enable.auto.commit=false
isolation.level=read_committed
auto.offset.reset=earliest
max.poll.records=500
session.timeout.ms=10000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Part03-生产环境项目实施方案
3.1 ACK配置实战
3.1.1 不同ACK配置测试
$ /bigdata/app/kafka/bin/kafka-topics.sh –create \
–topic fgedu-reliability-test \
–partitions 3 \
–replication-factor 3 \
–config min.insync.replicas=2 \
–bootstrap-server 192.168.1.51:9092
Created topic fgedu-reliability-test.
# 测试acks=0
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-reliability-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props acks=0 \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 200000.0 records/sec (195.31 MB/sec), 2.50 ms avg latency
# 测试acks=1
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-reliability-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props acks=1 \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 150000.0 records/sec (146.48 MB/sec), 3.33 ms avg latency
# 测试acks=all
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-reliability-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props acks=all \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 100000.0 records/sec (97.66 MB/sec), 5.00 ms avg latency
3.2 幂等性配置实战
3.2.1 启用幂等性
$ cat > /tmp/idempotent-producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=all enable.idempotence=true retries=3 max.in.flight.requests.per.connection=5 batch.size=16384 linger.ms=5 compression.type=lz4 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer EOF # 使用幂等性配置发送消息 $ /bigdata/app/kafka/bin/kafka-console-producer.sh \ --topic fgedu-reliability-test \ --producer.config /tmp/idempotent-producer.properties \ --bootstrap-server 192.168.1.51:9092 >message 1
>message 2
>message 3
# 验证消息无重复
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-reliability-test \
–from-beginning \
–bootstrap-server 192.168.1.51:9092
message 1
message 2
message 3
3.2.2 幂等性原理验证
# 1. 每个Producer分配唯一的PID
# 2. 每条消息分配递增的Sequence Number
# 3. Broker端维护
# 查看Producer ID
# 通过JMX指标查看
# kafka.producer:type=producer-metrics,client-id=*
# 幂等性限制
# 1. 只能保证单个Producer实例
# 2. 只能保证单个Partition内不重复
# 3. 重启Producer会分配新的PID
# 如需跨Producer或跨Partition保证
# 需要使用事务
3.3 事务配置实战
3.3.1 事务生产者配置
$ cat > /tmp/transaction-producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=all enable.idempotence=true transactional.id=fgedu-tx-producer-1 transaction.timeout.ms=900000 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer EOF # 事务消费者配置 $ cat > /tmp/transaction-consumer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 group.id=fgedu-tx-consumer-group isolation.level=read_committed enable.auto.commit=false auto.offset.reset=earliest key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer EOF
3.3.2 事务操作流程
// 1. 初始化事务
producer.initTransactions();
try {
// 2. 开始事务
producer.beginTransaction();
// 3. 发送消息到多个分区
producer.send(new ProducerRecord<>(“fgedu-orders”, “key1”, “value1”));
producer.send(new ProducerRecord<>(“fgedu-orders”, “key2”, “value2”));
producer.send(new ProducerRecord<>(“fgedu-payments”, “key1”, “value1”));
// 4. 发送Offset到事务(消费-处理-生产模式)
producer.sendOffsetsToTransaction(
Collections.singletonMap(
new TopicPartition(“fgedu-input”, 0),
new OffsetAndMetadata(1000)
),
“fgedu-consumer-group”
);
// 5. 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 6. 回滚事务
producer.abortTransaction();
}
# 查看事务状态
$ /bigdata/app/kafka/bin/kafka-transactions.sh \
–bootstrap-server 192.168.1.51:9092 \
–list
TransactionalId: fgedu-tx-producer-1
State: Complete
ProducerId: 1001
Part04-生产案例与实战讲解
4.1 数据丢失问题处理
4.1.1 数据丢失原因分析
1. 生产者配置问题
– acks=0或acks=1
– 未启用重试
– 缓冲区满时丢弃消息
2. Broker配置问题
– min.insync.replicas=1
– unclean.leader.election.enable=true
– 副本数不足
3. 消费者配置问题
– 自动提交Offset但处理失败
– 消费异常后跳过消息
# 排查步骤
# 1. 检查生产者配置
$ grep -E “acks|retries|enable.idempotence” /tmp/producer.properties
acks=all
retries=3
enable.idempotence=true
# 2. 检查Broker配置
$ grep -E “min.insync.replicas|unclean.leader.election” /bigdata/app/kafka/config/server.properties
min.insync.replicas=2
unclean.leader.election.enable=false
# 3. 检查消费者配置
$ grep -E “enable.auto.commit” /tmp/consumer.properties
enable.auto.commit=false
4.1.2 数据丢失预防方案
# 配置文件
acks=all
retries=3
enable.idempotence=true
delivery.timeout.ms=120000
# 添加回调处理
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 记录失败消息
// 重试或存储到死信队列
}
}
});
# Broker预防方案
min.insync.replicas=2
unclean.leader.election.enable=false
replica.lag.time.max.ms=30000
# 消费者预防方案
enable.auto.commit=false
# 处理成功后再提交Offset
consumer.commitSync();
4.2 数据重复问题处理
4.2.1 数据重复原因分析
1. 生产者重试
– 网络超时后重试
– Broker已写入但未返回确认
2. 消费者重复消费
– Offset提交失败
– 消费者Rebalance
– 处理成功但Offset未提交
# 解决方案
# 1. 生产者启用幂等性
enable.idempotence=true
# 2. 消费者实现幂等性消费
# 方案A:使用唯一Key去重
# 方案B:使用数据库唯一约束
# 方案C:使用Redis去重
# 消费者幂等性示例
public void consume(ConsumerRecord
String key = record.key();
String value = record.value();
// 检查是否已处理
if (redis.get(“processed:” + key) != null) {
return; // 跳过已处理的消息
}
// 处理消息
processMessage(value);
// 标记为已处理
redis.set(“processed:” + key, “1”);
// 提交Offset
consumer.commitSync();
}
4.3 数据顺序保证实战
# 1. 单分区顺序
# 相同Key的消息发送到同一分区
# 分区内消息有序
# 2. 生产者配置
enable.idempotence=true
max.in.flight.requests.per.connection=5 # 幂等性启用后可为5
# 3. 如果未启用幂等性
max.in.flight.requests.per.connection=1 # 必须为1
# 4. 消费者顺序消费
# 单线程消费或使用分区锁
# 顺序消费示例
public void consume(ConsumerRecords
for (ConsumerRecord
String key = record.key();
// 按Key加锁,保证相同Key顺序处理
synchronized (getLock(key)) {
processMessage(record.value());
}
}
// 批量提交Offset
consumer.commitSync();
}
Part05-风哥经验总结与分享
5.1 数据可靠性最佳实践
数据可靠性最佳实践建议:
1. 启用acks=all
2. 启用enable.idempotence=true
3. 设置合理的retries
4. 实现发送回调处理
5. 使用事务保证跨分区原子性
# Broker最佳实践
1. min.insync.replicas=2
2. unclean.leader.election.enable=false
3. replication.factor=3
4. 监控ISR状态
# 消费者最佳实践
1. enable.auto.commit=false
2. 处理成功后再提交Offset
3. 实现幂等性消费
4. 合理设置max.poll.records
5.2 可靠性检查清单
可靠性检查清单:
- 生产者acks配置是否正确
- 生产者幂等性是否启用
- Broker min.insync.replicas是否>=2
- Broker unclean.leader.election是否禁用
- 消费者Offset提交方式是否正确
- 消费者是否实现幂等性
- ISR状态是否正常
- 监控告警是否配置
5.3 可靠性验证工具
可靠性验证工具:
- kafka-producer-perf-test.sh:生产者性能测试
- kafka-consumer-perf-test.sh:消费者性能测试
- kafka-verifiable-producer.sh:可验证的生产者
- kafka-verifiable-consumer.sh:可验证的消费者
- Chaos Monkey:故障注入测试
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
