1. 首页 > Hadoop教程 > 正文

大数据教程FG031-Kafka分布式消息队列实战

内容简介:本文详细介绍Kafka分布式消息队列实战,包括Kafka架构原理、集群部署、消息生产消费、性能优化等核心内容。风哥教程参考Kafka官方文档Introduction、Configuration等内容。

目录大纲

Part01-基础概念与理论知识
  1.1 Kafka架构概述
  1.2 消息模型与分区
  1.3 消息存储原理
Part02-生产环境规划与建议
  2.1 集群部署规划
  2.2 Topic设计规划
  2.3 性能优化规划
Part03-生产环境项目实施方案
  3.1 Kafka安装配置
  3.2 Topic管理操作
  3.3 消息生产消费
  3.4 集群监控维护
Part04-生产案例与实战讲解
  4.1 实时数据采集案例
  4.2 日志收集案例
  4.3 性能调优案例
Part05-风哥经验总结与分享
  5.1 Kafka最佳实践
  5.2 性能优化经验总结

Part01-基础概念与理论知识

1.1 Kafka架构概述

Kafka是分布式流处理平台。更多视频教程www.fgedu.net.cn 提供高吞吐量、持久化、可水平扩展的消息队列服务。

风哥提示:Kafka适合大规模数据处理场景,如日志收集、实时数据流、事件驱动架构等。

1.2 消息模型与分区

Kafka采用发布订阅模型,消息存储在Topic中。学习交流加群风哥微信: itpux-com

Kafka核心概念:
– Broker:Kafka服务节点
– Topic:消息主题,消息的逻辑分类
– Partition:分区,Topic的物理分片
– Producer:消息生产者
– Consumer:消息消费者
– Consumer Group:消费者组

1.3 消息存储原理

Kafka消息存储基于日志文件,采用顺序写入方式。from bigdata视频:www.itpux.com

# 查看Kafka版本
kafka-topics.sh –version

# 查看Kafka日志目录
ls -la /bigdata/kafka-logs/

# Kafka版本
3.4.0

# Kafka日志目录
total 40
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 fgedu-topic-0
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 fgedu-topic-1
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 __consumer_offsets-0
-rw-r–r– 1 kafka kafka 4 Jan 19 01:00 meta.properties

Part02-生产环境规划与建议

2.1 集群部署规划

Kafka集群部署需要考虑节点数量和资源配置。更多学习教程公众号风哥教程itpux_com

集群部署建议:
– Broker节点:至少3个,保证高可用
– ZooKeeper节点:3或5个
– 磁盘配置:使用独立磁盘存储日志
– 内存配置:Broker建议16GB以上

2.2 Topic设计规划

Topic设计影响系统性能。学习交流加群风哥QQ113257174

# 查看Topic列表
kafka-topics.sh –bootstrap-server fgedu01:9092 –list

# 查看Topic详情
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe –topic fgedu-topic

# Topic列表
__consumer_offsets
fgedu-topic
fgedu-log
fgedu-event

# Topic详情
Topic: fgedu-topic TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

2.3 性能优化规划

性能优化需要从多个角度考虑。风哥提示:合理的分区数和副本数是性能优化的基础。

性能优化要点:
– 合理设置分区数
– 配置合适的副本数
– 调整批量发送大小
– 优化磁盘IO
– 监控消费者延迟

Part03-生产环境项目实施方案

3.1 Kafka安装配置

3.1.1 安装Kafka

# 下载Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz -C /bigdata/app/
ln -s /bigdata/app/kafka_2.13-3.4.0 /bigdata/app/kafka

# 配置环境变量
export KAFKA_HOME=/bigdata/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin

# 配置server.properties
cat > /bigdata/app/kafka/config/server.properties << 'EOF'
broker.id=1
listeners=PLAINTEXT://fgedu01:9092
advertised.listeners=PLAINTEXT://fgedu01:9092
log.dirs=/bigdata/kafka-logs
num.partitions=3
default.replication.factor=3
zookeeper.connect=fgedu01:2181,fgedu02:2181,fgedu03:2181
EOF

# 验证安装
kafka-topics.sh –version

# 下载解压
# 完成

# 环境变量
# 配置完成

# 配置文件
# 配置完成

# 验证安装
3.4.0
# Kafka安装成功

3.1.2 启动Kafka

# 启动ZooKeeper
zkServer.sh start

# 启动Kafka
kafka-server-start.sh -daemon /bigdata/app/kafka/config/server.properties

# 查看进程
jps | grep -E “Kafka|QuorumPeerMain”

# 验证服务
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092

# 启动ZooKeeper
ZooKeeper JMX enabled by default
ZooKeeper starting…

# 启动Kafka
# 启动成功

# 进程查看
12345 QuorumPeerMain
12346 Kafka

# 验证服务
fgedu01:9092 (id: 1 rack: null) -> (
Produce(0): 0 to 9,
Fetch(1): 0 to 12,

)
# Kafka启动成功

3.2 Topic管理操作

3.2.1 创建Topic

# 创建Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create \
–topic fgedu-user \
–partitions 3 \
–replication-factor 3

# 创建带配置的Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create \
–topic fgedu-order \
–partitions 6 \
–replication-factor 3 \
–config retention.ms=604800000 \
–config segment.bytes=1073741824

# 查看Topic列表
kafka-topics.sh –bootstrap-server fgedu01:9092 –list

# 查看Topic详情
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe –topic fgedu-user

# 创建Topic
Created topic fgedu-user.

# 创建带配置的Topic
Created topic fgedu-order.

# Topic列表
__consumer_offsets
fgedu-user
fgedu-order

# Topic详情
Topic: fgedu-user TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-user Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-user Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-user Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

3.2.2 修改Topic

# 修改Topic配置
kafka-configs.sh –bootstrap-server fgedu01:9092 \
–alter \
–entity-type topics \
–entity-name fgedu-user \
–add-config retention.ms=259200000

# 增加分区数
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–alter \
–topic fgedu-user \
–partitions 6

# 查看Topic配置
kafka-configs.sh –bootstrap-server fgedu01:9092 \
–describe \
–entity-type topics \
–entity-name fgedu-user

# 删除Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 –delete –topic fgedu-test

# 修改Topic配置
Completed updating config for topic fgedu-user.

# 增加分区数
Adding partitions succeeded.

# Topic配置
Configs: topic=fgedu-user retention.ms=259200000

# 删除Topic
Topic fgedu-test is marked for deletion.
# Topic管理完成

3.3 消息生产消费

3.3.1 消息生产

# 使用控制台生产者
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user

# 发送消息
> {“user_id”:1,”name”:”fgedu01″,”age”:25}
> {“user_id”:2,”name”:”fgedu02″,”age”:30}
> {“user_id”:3,”name”:”fgedu03″,”age”:28}

# 使用文件发送消息
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user < messages.txt # 使用生产者配置
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–producer-property acks=all \
–producer-property batch.size=16384

# 控制台生产者
# 启动成功

# 发送消息
# 消息已发送

# 文件发送
# 发送完成

# 生产者配置
# 启动成功

3.3.2 消息消费

# 使用控制台消费者
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user –from-beginning

# 消费消息
{“user_id”:1,”name”:”fgedu01″,”age”:25}
{“user_id”:2,”name”:”fgedu02″,”age”:30}
{“user_id”:3,”name”:”fgedu03″,”age”:28}

# 指定消费者组
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–group fgedu-consumer-group –from-beginning

# 显示消息详情
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–property print.key=true –property print.timestamp=true –from-beginning

# 查看消费者组
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –list

# 控制台消费者
# 启动成功

# 消费消息
{“user_id”:1,”name”:”fgedu01″,”age”:25}
{“user_id”:2,”name”:”fgedu02″,”age”:30}
{“user_id”:3,”name”:”fgedu03″,”age”:28}
Processed a total of 3 messages

# 消费者组
fgedu-consumer-group
console-consumer-12345

# 消费者组详情
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –group fgedu-consumer-group

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 10 10 0
fgedu-consumer-group fgedu-user 1 10 10 0
fgedu-consumer-group fgedu-user 2 10 10 0

3.4 集群监控维护

3.4.1 集群状态监控

# 查看Broker列表
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092 | head -5

# 查看Topic分区分布
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe

# 查看消费者组延迟
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –all-groups

# 查看集群配置
kafka-configs.sh –bootstrap-server fgedu01:9092 –describe –entity-type brokers

# Broker列表
fgedu01:9092 (id: 1 rack: null) -> (
Produce(0): 0 to 9,

)

# Topic分区分布
Topic: fgedu-user PartitionCount: 6 ReplicationFactor: 3
Topic: fgedu-order PartitionCount: 6 ReplicationFactor: 3

# 消费者组延迟
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 100 100 0
fgedu-consumer-group fgedu-user 1 100 100 0

# 集群配置
Configs: broker.id=1, listeners=PLAINTEXT://fgedu01:9092, …

3.4.2 集群维护操作

# 查看日志目录
kafka-log-dirs.sh –bootstrap-server fgedu01:9092 –describe

# 重新分配分区
kafka-reassign-partitions.sh –bootstrap-server fgedu01:9092 \
–reassignment-json-file reassign.json –execute

# 查看重新分配状态
kafka-reassign-partitions.sh –bootstrap-server fgedu01:9092 \
–reassignment-json-file reassign.json –verify

# 清理消费者组
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –delete –group old-group

# 日志目录
{“version”:1,”brokers”:[{“broker”:1,”logDirs”:[{“logDir”:”/bigdata/kafka-logs”,”error”:null,”partitions”:{…}}]}]}

# 重新分配分区
Current partition replica assignment…
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions.

# 重新分配状态
Reassignment of partition fgedu-user-0 completed successfully.
Reassignment of partition fgedu-user-1 completed successfully.

# 清理消费者组
Deletion of requested consumer group ‘old-group’ was successful.

Part04-生产案例与实战讲解

4.1 实时数据采集案例

实时数据采集是Kafka的核心应用场景。更多视频教程www.fgedu.net.cn

#!/bin/bash
# kafka_data_producer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== Kafka Data Producer ===”
echo “Date: $(date)”

# 创建Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 –create –topic fgedu-realtime –partitions 6 –replication-factor 3

# 生成测试数据
for i in {1..1000}; do
echo “{\”id\”:${i},\”timestamp\”:\”$(date -Iseconds)\”,\”data\”:\”test_data_${i}\”}”
done > /tmp/kafka_data.txt

# 发送数据
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-realtime < /tmp/kafka_data.txt # 验证数据
kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list fgedu01:9092 –topic fgedu-realtime

echo “=== Producer Completed ===”

# 数据生产执行
./kafka_data_producer.sh
=== Kafka Data Producer ===
Date: Fri Jan 19 02:00:00 CST 2024

Created topic fgedu-realtime.

# 发送数据
# 发送完成

# 验证数据
fgedu-realtime:0:167
fgedu-realtime:1:167
fgedu-realtime:2:166
fgedu-realtime:3:167
fgedu-realtime:4:167
fgedu-realtime:5:166

=== Producer Completed ===

4.2 日志收集案例

日志收集是Kafka的经典应用。学习交流加群风哥微信: itpux-com

# 创建日志Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create –topic fgedu-logs –partitions 12 –replication-factor 3 \
–config retention.ms=604800000 –config segment.bytes=1073741824

# 配置Flume采集日志到Kafka
cat > /bigdata/app/flume/conf/kafka_sink.conf << 'EOF'
agent.sources = src1
agent.channels = ch1
agent.sinks = k1

agent.sources.src1.type = exec
agent.sources.src1.command = tail -F /bigdata/logs/app.log

agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000

agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic = fgedu-logs
agent.sinks.k1.kafka.bootstrap.servers = fgedu01:9092

agent.sources.src1.channels = ch1
agent.sinks.k1.channel = ch1
EOF

# 启动Flume
flume-ng agent -n agent -c conf -f /bigdata/app/flume/conf/kafka_sink.conf

# 创建日志Topic
Created topic fgedu-logs.

# Flume配置
# 配置完成

# 启动Flume
Info: Sourcing environment configuration script…
Info: Including Hive libraries found via…
# Flume启动成功

4.3 性能调优案例

4.3.1 生产者优化

# 生产者性能测试
kafka-producer-perf-test.sh –topic fgedu-perf-test \
–num-records 1000000 –record-size 1024 –throughput -1 \
–producer-props bootstrap.servers=fgedu01:9092 acks=all batch.size=16384

# 优化生产者配置
cat > /bigdata/app/kafka/producer.properties << 'EOF'
bootstrap.servers=fgedu01:9092,fgedu02:9092,fgedu03:9092
acks=all
batch.size=32768
linger.ms=10
buffer.memory=67108864
compression.type=lz4
EOF

# 使用优化配置发送
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-perf-test \
–producer.config /bigdata/app/kafka/producer.properties

# 生产者性能测试
1000000 records sent, 50000.00 records/sec (48.83 MB/sec), 20.00 ms avg latency, 100.00 ms max latency.

# 优化生产者配置
# 配置完成

# 使用优化配置
# 启动成功

4.3.2 消费者优化

# 消费者性能测试
kafka-consumer-perf-test.sh –topic fgedu-perf-test \
–bootstrap-server fgedu01:9092 –messages 1000000

# 优化消费者配置
cat > /bigdata/app/kafka/consumer.properties << 'EOF'
bootstrap.servers=fgedu01:9092,fgedu02:9092,fgedu03:9092
group.id=fgedu-consumer-group
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
enable.auto.commit=false
EOF

# 使用优化配置消费
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-perf-test \
–consumer.config /bigdata/app/kafka/consumer.properties

# 消费者性能测试
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2024-01-19 02:30:00:000, 2024-01-19 02:31:00:000, 976.56, 16.28, 1000000, 16666.67

# 优化消费者配置
# 配置完成

# 使用优化配置
# 启动成功

Part05-风哥经验总结与分享

5.1 Kafka最佳实践

在实际生产环境中,Kafka使用需要注意以下几点:from bigdata视频:www.itpux.com

风哥经验总结:
1. 合理设置分区数
2. 配置合适的副本数
3. 监控消费者延迟
4. 做好运维监控
5. 定期清理过期数据

5.2 性能优化经验总结

5.2.1 优化建议

风哥提示:Kafka性能优化需要从生产者、Broker、消费者三个层面考虑。

性能优化要点:
– 生产者批量发送
– Broker磁盘优化
– 消费者并行消费
– 合理设置保留时间
– 监控集群状态

5.2.2 Kafka运维脚本

#!/bin/bash
# kafka_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn

echo “=== Kafka Maintenance ===”
echo “Date: $(date)”

# 1. 检查集群状态
echo “=== Check Cluster Status ===”
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092 | head -3

# 2. 检查Topic状态
echo “=== Check Topic Status ===”
kafka-topics.sh –bootstrap-server fgedu01:9092 –list

# 3. 检查消费者组
echo “=== Check Consumer Groups ===”
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –list

# 4. 检查消费者延迟
echo “=== Check Consumer Lag ===”
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –all-groups | grep -v “^$”

echo “=== Maintenance Completed ===”

# 维护执行
./kafka_maintenance.sh
=== Kafka Maintenance ===
Date: Fri Jan 19 03:00:00 CST 2024

=== Check Cluster Status ===
fgedu01:9092 (id: 1 rack: null) -> (
fgedu02:9092 (id: 2 rack: null) -> (
fgedu03:9092 (id: 3 rack: null) -> (

=== Check Topic Status ===
__consumer_offsets
fgedu-user
fgedu-order
fgedu-logs

=== Check Consumer Groups ===
fgedu-consumer-group

=== Check Consumer Lag ===
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 100 100 0

=== Maintenance Completed ===

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息