本文档风哥主要介绍Kafka生产者消费者实战,包括Kafka生产者配置与开发、Kafka消费者配置与开发、Kafka消费者组管理、Kafka消息可靠性保证等内容,风哥教程参考Kafka官方文档Producer Configuration、Consumer Configuration等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Kafka生产者核心概念
Kafka生产者负责将消息发送到Kafka集群。生产者通过序列化器将消息对象序列化为字节数组,通过分区器确定消息发送到哪个分区,最终通过网络发送到Broker。学习交流加群风哥微信: itpux-com
- 序列化器:将Key和Value序列化为字节数组
- 分区器:决定消息发送到哪个分区
- 消息累加器:缓存消息,批量发送提高性能
- Sender线程:从累加器获取消息并发送到Broker
1.2 Kafka消费者核心概念
Kafka消费者从Kafka集群拉取消息进行处理。消费者通过订阅Topic或分区来消费消息,支持消费者组模式实现负载均衡:
1. 消费者组
– 一组消费者共同消费一个Topic
– 每个分区只能被组内一个消费者消费
– 实现消息的负载均衡
2. 消费者协调器
– 管理消费者组成员
– 负责分区分配和再均衡
3. 消费进度
– 记录消费者消费到哪个位置
– 支持自动提交和手动提交
4. 消费模式
– 最旧消息:from-beginning
– 最新消息:latest
– 指定Offset:seek
1.3 Kafka消费者组机制
消费者组是Kafka实现消息广播和负载均衡的核心机制:
- 负载均衡模式:同一消费者组内的消费者分担消费任务
- 广播模式:不同消费者组独立消费同一Topic
- 再均衡机制:消费者加入或离开时重新分配分区
Part02-生产环境规划与建议
2.1 生产者参数配置规划
生产者核心参数配置建议:
# 可靠性参数
acks=all # 等待所有ISR副本确认
retries=3 # 重试次数
max.in.flight.requests.per.connection=1 # 保证顺序
enable.idempotence=true # 启用幂等性
# 性能参数
batch.size=16384 # 批量大小
linger.ms=5 # 等待时间
buffer.memory=33554432 # 缓冲区大小
compression.type=lz4 # 压缩类型
# 超时参数
request.timeout.ms=30000 # 请求超时
delivery.timeout.ms=120000 # 发送超时
# 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
2.2 消费者参数配置规划
消费者核心参数配置建议:
# 消费组配置
group.id=fgedu-consumer-group # 消费者组ID
# Offset管理
enable.auto.commit=false # 手动提交Offset
auto.commit.interval.ms=5000 # 自动提交间隔
auto.offset.reset=earliest # 无Offset时从最早开始
# 拉取配置
fetch.min.bytes=1 # 最小拉取字节数
fetch.max.bytes=52428800 # 最大拉取字节数
max.poll.records=500 # 单次拉取最大记录数
max.partition.fetch.bytes=1048576 # 单分区最大拉取字节数
# 心跳配置
session.timeout.ms=10000 # 会话超时时间
heartbeat.interval.ms=3000 # 心跳间隔
max.poll.interval.ms=300000 # 最大处理间隔
# 反序列化
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
2.3 数据可靠性配置规划
数据可靠性配置建议:
- 最高可靠性:acks=all + min.insync.replicas=2 + retries=3 + enable.idempotence=true
- 平衡配置:acks=1 + retries=3 + batch.size=32768
- 最高性能:acks=0 + batch.size=65536 + compression.type=snappy
Part03-生产环境项目实施方案
3.1 生产者开发实战
3.1.1 命令行生产者
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-orders \
–bootstrap-server 192.168.1.51:9092 \
–property parse.key=true \
–property key.separator=:
>order001:{“orderId”:”order001″,”amount”:100.00,”status”:”created”}
>order002:{“orderId”:”order002″,”amount”:200.00,”status”:”created”}
>order003:{“orderId”:”order003″,”amount”:300.00,”status”:”created”}
# 带属性的生产者
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-orders \
–bootstrap-server 192.168.1.51:9092 \
–producer-property acks=all \
–producer-property compression.type=lz4
3.1.2 Java生产者示例
$ cat > /tmp/producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=all retries=3 batch.size=16384 linger.ms=5 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer compression.type=lz4 enable.idempotence=true EOF # 使用配置文件启动生产者 $ /bigdata/app/kafka/bin/kafka-console-producer.sh \ --topic fgedu-orders \ --producer.config /tmp/producer.properties \ --bootstrap-server 192.168.1.51:9092
3.2 消费者开发实战
3.2.1 命令行消费者
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–from-beginning \
–bootstrap-server 192.168.1.51:9092
{“orderId”:”order001″,”amount”:100.00,”status”:”created”}
{“orderId”:”order002″,”amount”:200.00,”status”:”created”}
{“orderId”:”order003″,”amount”:300.00,”status”:”created”}
# 显示Key和分区信息
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–from-beginning \
–property print.key=true \
–property print.partition=true \
–bootstrap-server 192.168.1.51:9092
Partition:0 Key:order001 Value:{“orderId”:”order001″,”amount”:100.00}
Partition:1 Key:order002 Value:{“orderId”:”order002″,”amount”:200.00}
Partition:2 Key:order003 Value:{“orderId”:”order003″,”amount”:300.00}
# 指定消费者组
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092
3.2.2 消费者配置文件
$ cat > /tmp/consumer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 group.id=fgedu-consumer-group enable.auto.commit=false auto.offset.reset=earliest max.poll.records=500 session.timeout.ms=10000 heartbeat.interval.ms=3000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer EOF # 使用配置文件启动消费者 $ /bigdata/app/kafka/bin/kafka-console-consumer.sh \ --topic fgedu-orders \ --consumer.config /tmp/consumer.properties \ --bootstrap-server 192.168.1.51:9092
3.3 消费者组管理实战
3.3.1 查看消费者组
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –list \
–bootstrap-server 192.168.1.51:9092
fgedu-order-consumer-group
fgedu-log-consumer-group
console-consumer-12345
# 查看消费者组详情
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
fgedu-order-consumer-group fgedu-orders 0 1000 1000 0 consumer-1-abc123-456-1 /192.168.1.51 consumer-1
fgedu-order-consumer-group fgedu-orders 1 800 800 0 consumer-2-def456-789-2 /192.168.1.52 consumer-2
fgedu-order-consumer-group fgedu-orders 2 1200 1200 0 consumer-3-ghi789-012-3 /192.168.1.53 consumer-3
# 查看消费者组成员
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–members \
–verbose \
–bootstrap-server 192.168.1.51:9092
GROUP CONSUMER-ID HOST CLIENT-ID PARTITIONS
fgedu-order-consumer-group consumer-1-abc123-456-1 /192.168.1.51 consumer-1 fgedu-orders-0
fgedu-order-consumer-group consumer-2-def456-789-2 /192.168.1.52 consumer-2 fgedu-orders-1
fgedu-order-consumer-group consumer-3-ghi789-012-3 /192.168.1.53 consumer-3 fgedu-orders-2
3.3.2 重置消费者组Offset
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-earliest \
–execute \
–bootstrap-server 192.168.1.51:9092
GROUP TOPIC PARTITION NEW-OFFSET
fgedu-order-consumer-group fgedu-orders 0 0
fgedu-order-consumer-group fgedu-orders 1 0
fgedu-order-consumer-group fgedu-orders 2 0
# 重置到最新位置
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-latest \
–execute \
–bootstrap-server 192.168.1.51:9092
# 重置到指定Offset
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-offset 500 \
–execute \
–bootstrap-server 192.168.1.51:9092
# 按时间重置Offset
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-datetime 2026-04-01T00:00:00.000 \
–execute \
–bootstrap-server 192.168.1.51:9092
Part04-生产案例与实战讲解
4.1 生产者批量发送优化
4.1.1 批量发送配置
$ cat > /tmp/batch-producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=1 batch.size=65536 linger.ms=20 buffer.memory=67108864 compression.type=lz4 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer EOF # 性能测试 $ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \ --topic fgedu-perf-test \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer.config /tmp/batch-producer.properties \ --bootstrap-server 192.168.1.51:9092 1000000 records sent, 125000.0 records/sec (122.07 MB/sec), 8.00 ms avg latency, 250.00 ms max latency, 5 ms 50th, 15 ms 95th, 100 ms 99th, 200 ms 99.9th.
4.1.2 压缩配置对比
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=none \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 50000.0 records/sec (48.83 MB/sec)
# LZ4压缩测试
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=lz4 \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 80000.0 records/sec (78.13 MB/sec)
# Snappy压缩测试
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=snappy \
–bootstrap-server 192.168.1.51:9092
100000 records sent, 75000.0 records/sec (73.24 MB/sec)
4.2 消费者Offset管理
4.2.1 Offset提交策略
enable.auto.commit=true
auto.commit.interval.ms=5000
# 手动同步提交
consumer.commitSync()
# 手动异步提交
consumer.commitAsync()
# 手动提交指定Offset
consumer.commitSync(Collections.singletonMap(
new TopicPartition(“fgedu-orders”, 0),
new OffsetAndMetadata(1000)
))
# 消费者Offset管理脚本
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-order-consumer-group fgedu-orders 0 5000 10000 5000
fgedu-order-consumer-group fgedu-orders 1 4500 9000 4500
fgedu-order-consumer-group fgedu-orders 2 6000 12000 6000
4.2.2 消费者性能测试
$ /bigdata/app/kafka/bin/kafka-consumer-perf-test.sh \
–topic fgedu-perf-test \
–messages 1000000 \
–threads 3 \
–bootstrap-server 192.168.1.51:9092
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2026-04-08 10:30:00:000, 2026-04-08 10:30:30:000, 976.56, 32.55, 1000000, 33333.33, 500, 29500, 33.11, 33898.31
4.3 常见问题处理
4.3.1 消费者Rebalance问题
# 排查步骤:
# 1. 查看消费者组状态
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–state \
–bootstrap-server 192.168.1.51:9092
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE MEMBERS
fgedu-order-consumer-group 192.168.1.51:9092 (1) range Stable 3
# 2. 检查心跳配置
session.timeout.ms=10000
heartbeat.interval.ms=3000
max.poll.interval.ms=300000
# 3. 解决方案
# – 增加session.timeout.ms
# – 增加max.poll.interval.ms
# – 减少单次拉取记录数
# – 优化消费处理逻辑
4.3.2 消息丢失问题
# 排查步骤:
# 1. 检查生产者配置
acks=all
retries=3
enable.idempotence=true
# 2. 检查Broker配置
min.insync.replicas=2
unclean.leader.election.enable=false
# 3. 检查消费者配置
enable.auto.commit=false
# 4. 解决方案
# – 生产者启用acks=all
# – 启用幂等性保证
# – 消费者手动提交Offset
# – 确保ISR副本数>=2
Part05-风哥经验总结与分享
5.1 生产者消费者最佳实践
生产者消费者最佳实践建议:
1. 启用acks=all保证数据可靠性
2. 启用enable.idempotence防止重复
3. 合理设置batch.size和linger.ms
4. 使用压缩减少网络传输
5. 实现自定义分区器保证顺序
# 消费者最佳实践
1. 关闭自动提交,使用手动提交
2. 合理设置max.poll.records
3. 处理完消息后再提交Offset
4. 实现幂等性消费逻辑
5. 监控消费延迟(LAG)
5.2 配置检查清单
配置检查清单:
- 生产者acks配置是否正确
- 生产者重试次数是否合理
- 消费者组ID是否唯一
- 消费者Offset提交方式是否正确
- 消费者心跳配置是否合理
- 批量大小是否优化
- 压缩类型是否选择合适
5.3 监控工具推荐
生产者消费者监控工具:
- kafka-consumer-groups.sh:查看消费者组状态和延迟
- Burrow:消费者延迟监控工具
- JMX Exporter:导出JMX指标
- Kafka Manager:可视化监控管理
- Prometheus + Grafana:指标采集和可视化
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
