1. 首页 > Hadoop教程 > 正文

大数据教程FG053-Kafka生产者消费者实战

本文档风哥主要介绍Kafka生产者消费者实战,包括Kafka生产者配置与开发、Kafka消费者配置与开发、Kafka消费者组管理、Kafka消息可靠性保证等内容,风哥教程参考Kafka官方文档Producer Configuration、Consumer Configuration等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn

Part01-基础概念与理论知识

1.1 Kafka生产者核心概念

Kafka生产者负责将消息发送到Kafka集群。生产者通过序列化器将消息对象序列化为字节数组,通过分区器确定消息发送到哪个分区,最终通过网络发送到Broker。学习交流加群风哥微信: itpux-com

Kafka生产者核心组件:

  • 序列化器:将Key和Value序列化为字节数组
  • 分区器:决定消息发送到哪个分区
  • 消息累加器:缓存消息,批量发送提高性能
  • Sender线程:从累加器获取消息并发送到Broker

1.2 Kafka消费者核心概念

Kafka消费者从Kafka集群拉取消息进行处理。消费者通过订阅Topic或分区来消费消息,支持消费者组模式实现负载均衡:

# 消费者核心概念

1. 消费者组
– 一组消费者共同消费一个Topic
– 每个分区只能被组内一个消费者消费
– 实现消息的负载均衡

2. 消费者协调器
– 管理消费者组成员
– 负责分区分配和再均衡

3. 消费进度
– 记录消费者消费到哪个位置
– 支持自动提交和手动提交

4. 消费模式
– 最旧消息:from-beginning
– 最新消息:latest
– 指定Offset:seek

1.3 Kafka消费者组机制

消费者组是Kafka实现消息广播和负载均衡的核心机制:

  • 负载均衡模式:同一消费者组内的消费者分担消费任务
  • 广播模式:不同消费者组独立消费同一Topic
  • 再均衡机制:消费者加入或离开时重新分配分区
风哥提示:消费者组中消费者数量不应超过分区数量,多余的消费者会空闲。建议消费者数量等于或小于分区数量。

Part02-生产环境规划与建议

2.1 生产者参数配置规划

生产者核心参数配置建议:

# 生产者核心参数

# 可靠性参数
acks=all # 等待所有ISR副本确认
retries=3 # 重试次数
max.in.flight.requests.per.connection=1 # 保证顺序
enable.idempotence=true # 启用幂等性

# 性能参数
batch.size=16384 # 批量大小
linger.ms=5 # 等待时间
buffer.memory=33554432 # 缓冲区大小
compression.type=lz4 # 压缩类型

# 超时参数
request.timeout.ms=30000 # 请求超时
delivery.timeout.ms=120000 # 发送超时

# 序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

2.2 消费者参数配置规划

消费者核心参数配置建议:

# 消费者核心参数

# 消费组配置
group.id=fgedu-consumer-group # 消费者组ID

# Offset管理
enable.auto.commit=false # 手动提交Offset
auto.commit.interval.ms=5000 # 自动提交间隔
auto.offset.reset=earliest # 无Offset时从最早开始

# 拉取配置
fetch.min.bytes=1 # 最小拉取字节数
fetch.max.bytes=52428800 # 最大拉取字节数
max.poll.records=500 # 单次拉取最大记录数
max.partition.fetch.bytes=1048576 # 单分区最大拉取字节数

# 心跳配置
session.timeout.ms=10000 # 会话超时时间
heartbeat.interval.ms=3000 # 心跳间隔
max.poll.interval.ms=300000 # 最大处理间隔

# 反序列化
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

2.3 数据可靠性配置规划

数据可靠性配置建议:

数据可靠性配置组合:

  • 最高可靠性:acks=all + min.insync.replicas=2 + retries=3 + enable.idempotence=true
  • 平衡配置:acks=1 + retries=3 + batch.size=32768
  • 最高性能:acks=0 + batch.size=65536 + compression.type=snappy

Part03-生产环境项目实施方案

3.1 生产者开发实战

3.1.1 命令行生产者

# 启动控制台生产者
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-orders \
–bootstrap-server 192.168.1.51:9092 \
–property parse.key=true \
–property key.separator=:

>order001:{“orderId”:”order001″,”amount”:100.00,”status”:”created”}
>order002:{“orderId”:”order002″,”amount”:200.00,”status”:”created”}
>order003:{“orderId”:”order003″,”amount”:300.00,”status”:”created”}

# 带属性的生产者
$ /bigdata/app/kafka/bin/kafka-console-producer.sh \
–topic fgedu-orders \
–bootstrap-server 192.168.1.51:9092 \
–producer-property acks=all \
–producer-property compression.type=lz4

3.1.2 Java生产者示例

# 创建生产者配置文件
$ cat > /tmp/producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=all retries=3 batch.size=16384 linger.ms=5 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer compression.type=lz4 enable.idempotence=true EOF # 使用配置文件启动生产者 $ /bigdata/app/kafka/bin/kafka-console-producer.sh \ --topic fgedu-orders \ --producer.config /tmp/producer.properties \ --bootstrap-server 192.168.1.51:9092

3.2 消费者开发实战

3.2.1 命令行消费者

# 启动控制台消费者(从最早开始)
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–from-beginning \
–bootstrap-server 192.168.1.51:9092

{“orderId”:”order001″,”amount”:100.00,”status”:”created”}
{“orderId”:”order002″,”amount”:200.00,”status”:”created”}
{“orderId”:”order003″,”amount”:300.00,”status”:”created”}

# 显示Key和分区信息
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–from-beginning \
–property print.key=true \
–property print.partition=true \
–bootstrap-server 192.168.1.51:9092

Partition:0 Key:order001 Value:{“orderId”:”order001″,”amount”:100.00}
Partition:1 Key:order002 Value:{“orderId”:”order002″,”amount”:200.00}
Partition:2 Key:order003 Value:{“orderId”:”order003″,”amount”:300.00}

# 指定消费者组
$ /bigdata/app/kafka/bin/kafka-console-consumer.sh \
–topic fgedu-orders \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092

3.2.2 消费者配置文件

# 创建消费者配置文件
$ cat > /tmp/consumer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 group.id=fgedu-consumer-group enable.auto.commit=false auto.offset.reset=earliest max.poll.records=500 session.timeout.ms=10000 heartbeat.interval.ms=3000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer EOF # 使用配置文件启动消费者 $ /bigdata/app/kafka/bin/kafka-console-consumer.sh \ --topic fgedu-orders \ --consumer.config /tmp/consumer.properties \ --bootstrap-server 192.168.1.51:9092
风哥提示:生产环境建议关闭自动提交Offset,使用手动提交确保消息处理成功后再提交,避免消息丢失或重复消费。学习交流加群风哥QQ113257174

3.3 消费者组管理实战

3.3.1 查看消费者组

# 列出所有消费者组
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –list \
–bootstrap-server 192.168.1.51:9092

fgedu-order-consumer-group
fgedu-log-consumer-group
console-consumer-12345

# 查看消费者组详情
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
fgedu-order-consumer-group fgedu-orders 0 1000 1000 0 consumer-1-abc123-456-1 /192.168.1.51 consumer-1
fgedu-order-consumer-group fgedu-orders 1 800 800 0 consumer-2-def456-789-2 /192.168.1.52 consumer-2
fgedu-order-consumer-group fgedu-orders 2 1200 1200 0 consumer-3-ghi789-012-3 /192.168.1.53 consumer-3

# 查看消费者组成员
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–members \
–verbose \
–bootstrap-server 192.168.1.51:9092

GROUP CONSUMER-ID HOST CLIENT-ID PARTITIONS
fgedu-order-consumer-group consumer-1-abc123-456-1 /192.168.1.51 consumer-1 fgedu-orders-0
fgedu-order-consumer-group consumer-2-def456-789-2 /192.168.1.52 consumer-2 fgedu-orders-1
fgedu-order-consumer-group consumer-3-ghi789-012-3 /192.168.1.53 consumer-3 fgedu-orders-2

3.3.2 重置消费者组Offset

# 重置到最早位置
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-earliest \
–execute \
–bootstrap-server 192.168.1.51:9092

GROUP TOPIC PARTITION NEW-OFFSET
fgedu-order-consumer-group fgedu-orders 0 0
fgedu-order-consumer-group fgedu-orders 1 0
fgedu-order-consumer-group fgedu-orders 2 0

# 重置到最新位置
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-latest \
–execute \
–bootstrap-server 192.168.1.51:9092

# 重置到指定Offset
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-offset 500 \
–execute \
–bootstrap-server 192.168.1.51:9092

# 按时间重置Offset
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –reset-offsets \
–group fgedu-order-consumer-group \
–topic fgedu-orders \
–to-datetime 2026-04-01T00:00:00.000 \
–execute \
–bootstrap-server 192.168.1.51:9092

Part04-生产案例与实战讲解

4.1 生产者批量发送优化

4.1.1 批量发送配置

# 批量发送优化配置
$ cat > /tmp/batch-producer.properties << 'EOF' bootstrap.servers=192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092 acks=1 batch.size=65536 linger.ms=20 buffer.memory=67108864 compression.type=lz4 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer EOF # 性能测试 $ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \ --topic fgedu-perf-test \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer.config /tmp/batch-producer.properties \ --bootstrap-server 192.168.1.51:9092 1000000 records sent, 125000.0 records/sec (122.07 MB/sec), 8.00 ms avg latency, 250.00 ms max latency, 5 ms 50th, 15 ms 95th, 100 ms 99th, 200 ms 99.9th.

4.1.2 压缩配置对比

# 无压缩测试
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=none \
–bootstrap-server 192.168.1.51:9092

100000 records sent, 50000.0 records/sec (48.83 MB/sec)

# LZ4压缩测试
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=lz4 \
–bootstrap-server 192.168.1.51:9092

100000 records sent, 80000.0 records/sec (78.13 MB/sec)

# Snappy压缩测试
$ /bigdata/app/kafka/bin/kafka-producer-perf-test.sh \
–topic fgedu-perf-test \
–num-records 100000 \
–record-size 1024 \
–throughput -1 \
–producer-props compression.type=snappy \
–bootstrap-server 192.168.1.51:9092

100000 records sent, 75000.0 records/sec (73.24 MB/sec)

4.2 消费者Offset管理

4.2.1 Offset提交策略

# 自动提交配置
enable.auto.commit=true
auto.commit.interval.ms=5000

# 手动同步提交
consumer.commitSync()

# 手动异步提交
consumer.commitAsync()

# 手动提交指定Offset
consumer.commitSync(Collections.singletonMap(
new TopicPartition(“fgedu-orders”, 0),
new OffsetAndMetadata(1000)
))

# 消费者Offset管理脚本
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-order-consumer-group fgedu-orders 0 5000 10000 5000
fgedu-order-consumer-group fgedu-orders 1 4500 9000 4500
fgedu-order-consumer-group fgedu-orders 2 6000 12000 6000

4.2.2 消费者性能测试

# 消费者性能测试
$ /bigdata/app/kafka/bin/kafka-consumer-perf-test.sh \
–topic fgedu-perf-test \
–messages 1000000 \
–threads 3 \
–bootstrap-server 192.168.1.51:9092

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2026-04-08 10:30:00:000, 2026-04-08 10:30:30:000, 976.56, 32.55, 1000000, 33333.33, 500, 29500, 33.11, 33898.31

4.3 常见问题处理

4.3.1 消费者Rebalance问题

# 问题现象:频繁Rebalance导致消费中断
# 排查步骤:

# 1. 查看消费者组状态
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–state \
–bootstrap-server 192.168.1.51:9092

GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE MEMBERS
fgedu-order-consumer-group 192.168.1.51:9092 (1) range Stable 3

# 2. 检查心跳配置
session.timeout.ms=10000
heartbeat.interval.ms=3000
max.poll.interval.ms=300000

# 3. 解决方案
# – 增加session.timeout.ms
# – 增加max.poll.interval.ms
# – 减少单次拉取记录数
# – 优化消费处理逻辑

4.3.2 消息丢失问题

# 问题现象:消息发送后未持久化
# 排查步骤:

# 1. 检查生产者配置
acks=all
retries=3
enable.idempotence=true

# 2. 检查Broker配置
min.insync.replicas=2
unclean.leader.election.enable=false

# 3. 检查消费者配置
enable.auto.commit=false

# 4. 解决方案
# – 生产者启用acks=all
# – 启用幂等性保证
# – 消费者手动提交Offset
# – 确保ISR副本数>=2

Part05-风哥经验总结与分享

5.1 生产者消费者最佳实践

生产者消费者最佳实践建议:

# 生产者最佳实践
1. 启用acks=all保证数据可靠性
2. 启用enable.idempotence防止重复
3. 合理设置batch.size和linger.ms
4. 使用压缩减少网络传输
5. 实现自定义分区器保证顺序

# 消费者最佳实践
1. 关闭自动提交,使用手动提交
2. 合理设置max.poll.records
3. 处理完消息后再提交Offset
4. 实现幂等性消费逻辑
5. 监控消费延迟(LAG)

5.2 配置检查清单

配置检查清单:

生产者消费者配置检查清单:

  • 生产者acks配置是否正确
  • 生产者重试次数是否合理
  • 消费者组ID是否唯一
  • 消费者Offset提交方式是否正确
  • 消费者心跳配置是否合理
  • 批量大小是否优化
  • 压缩类型是否选择合适

5.3 监控工具推荐

生产者消费者监控工具:

  • kafka-consumer-groups.sh:查看消费者组状态和延迟
  • Burrow:消费者延迟监控工具
  • JMX Exporter:导出JMX指标
  • Kafka Manager:可视化监控管理
  • Prometheus + Grafana:指标采集和可视化
风哥提示:生产环境建议监控消费者延迟(LAG),及时发现消费积压问题。当LAG持续增长时,需要增加消费者数量或优化消费逻辑。更多学习教程公众号风哥教程itpux_com

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息