目录大纲
Part01-基础概念与理论知识
1.1 Kafka架构概述
1.2 消息模型与分区
1.3 消息存储原理
Part02-生产环境规划与建议
2.1 集群部署规划
2.2 Topic设计规划
2.3 性能优化规划
Part03-生产环境项目实施方案
3.1 Kafka安装配置
3.2 Topic管理操作
3.3 消息生产消费
3.4 集群监控维护
Part04-生产案例与实战讲解
4.1 实时数据采集案例
4.2 日志收集案例
4.3 性能调优案例
Part05-风哥经验总结与分享
5.1 Kafka最佳实践
5.2 性能优化经验总结
Part01-基础概念与理论知识
1.1 Kafka架构概述
Kafka是分布式流处理平台。更多视频教程www.fgedu.net.cn 提供高吞吐量、持久化、可水平扩展的消息队列服务。
1.2 消息模型与分区
Kafka采用发布订阅模型,消息存储在Topic中。学习交流加群风哥微信: itpux-com
– Broker:Kafka服务节点
– Topic:消息主题,消息的逻辑分类
– Partition:分区,Topic的物理分片
– Producer:消息生产者
– Consumer:消息消费者
– Consumer Group:消费者组
1.3 消息存储原理
Kafka消息存储基于日志文件,采用顺序写入方式。from bigdata视频:www.itpux.com
kafka-topics.sh –version
# 查看Kafka日志目录
ls -la /bigdata/kafka-logs/
3.4.0
# Kafka日志目录
total 40
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 fgedu-topic-0
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 fgedu-topic-1
drwxr-xr-x 2 kafka kafka 4096 Jan 19 01:00 __consumer_offsets-0
-rw-r–r– 1 kafka kafka 4 Jan 19 01:00 meta.properties
Part02-生产环境规划与建议
2.1 集群部署规划
Kafka集群部署需要考虑节点数量和资源配置。更多学习教程公众号风哥教程itpux_com
– Broker节点:至少3个,保证高可用
– ZooKeeper节点:3或5个
– 磁盘配置:使用独立磁盘存储日志
– 内存配置:Broker建议16GB以上
2.2 Topic设计规划
Topic设计影响系统性能。学习交流加群风哥QQ113257174
kafka-topics.sh –bootstrap-server fgedu01:9092 –list
# 查看Topic详情
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe –topic fgedu-topic
__consumer_offsets
fgedu-topic
fgedu-log
fgedu-event
# Topic详情
Topic: fgedu-topic TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
2.3 性能优化规划
性能优化需要从多个角度考虑。风哥提示:合理的分区数和副本数是性能优化的基础。
– 合理设置分区数
– 配置合适的副本数
– 调整批量发送大小
– 优化磁盘IO
– 监控消费者延迟
Part03-生产环境项目实施方案
3.1 Kafka安装配置
3.1.1 安装Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz -C /bigdata/app/
ln -s /bigdata/app/kafka_2.13-3.4.0 /bigdata/app/kafka
# 配置环境变量
export KAFKA_HOME=/bigdata/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin
# 配置server.properties
cat > /bigdata/app/kafka/config/server.properties << 'EOF'
broker.id=1
listeners=PLAINTEXT://fgedu01:9092
advertised.listeners=PLAINTEXT://fgedu01:9092
log.dirs=/bigdata/kafka-logs
num.partitions=3
default.replication.factor=3
zookeeper.connect=fgedu01:2181,fgedu02:2181,fgedu03:2181
EOF
# 验证安装
kafka-topics.sh –version
# 完成
# 环境变量
# 配置完成
# 配置文件
# 配置完成
# 验证安装
3.4.0
# Kafka安装成功
3.1.2 启动Kafka
zkServer.sh start
# 启动Kafka
kafka-server-start.sh -daemon /bigdata/app/kafka/config/server.properties
# 查看进程
jps | grep -E “Kafka|QuorumPeerMain”
# 验证服务
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092
ZooKeeper JMX enabled by default
ZooKeeper starting…
# 启动Kafka
# 启动成功
# 进程查看
12345 QuorumPeerMain
12346 Kafka
# 验证服务
fgedu01:9092 (id: 1 rack: null) -> (
Produce(0): 0 to 9,
Fetch(1): 0 to 12,
…
)
# Kafka启动成功
3.2 Topic管理操作
3.2.1 创建Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create \
–topic fgedu-user \
–partitions 3 \
–replication-factor 3
# 创建带配置的Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create \
–topic fgedu-order \
–partitions 6 \
–replication-factor 3 \
–config retention.ms=604800000 \
–config segment.bytes=1073741824
# 查看Topic列表
kafka-topics.sh –bootstrap-server fgedu01:9092 –list
# 查看Topic详情
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe –topic fgedu-user
Created topic fgedu-user.
# 创建带配置的Topic
Created topic fgedu-order.
# Topic列表
__consumer_offsets
fgedu-user
fgedu-order
# Topic详情
Topic: fgedu-user TopicId: xxx PartitionCount: 3 ReplicationFactor: 3
Topic: fgedu-user Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: fgedu-user Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: fgedu-user Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
3.2.2 修改Topic
kafka-configs.sh –bootstrap-server fgedu01:9092 \
–alter \
–entity-type topics \
–entity-name fgedu-user \
–add-config retention.ms=259200000
# 增加分区数
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–alter \
–topic fgedu-user \
–partitions 6
# 查看Topic配置
kafka-configs.sh –bootstrap-server fgedu01:9092 \
–describe \
–entity-type topics \
–entity-name fgedu-user
# 删除Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 –delete –topic fgedu-test
Completed updating config for topic fgedu-user.
# 增加分区数
Adding partitions succeeded.
# Topic配置
Configs: topic=fgedu-user retention.ms=259200000
# 删除Topic
Topic fgedu-test is marked for deletion.
# Topic管理完成
3.3 消息生产消费
3.3.1 消息生产
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user
# 发送消息
> {“user_id”:1,”name”:”fgedu01″,”age”:25}
> {“user_id”:2,”name”:”fgedu02″,”age”:30}
> {“user_id”:3,”name”:”fgedu03″,”age”:28}
# 使用文件发送消息
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user < messages.txt
# 使用生产者配置
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–producer-property acks=all \
–producer-property batch.size=16384
# 启动成功
# 发送消息
# 消息已发送
# 文件发送
# 发送完成
# 生产者配置
# 启动成功
3.3.2 消息消费
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user –from-beginning
# 消费消息
{“user_id”:1,”name”:”fgedu01″,”age”:25}
{“user_id”:2,”name”:”fgedu02″,”age”:30}
{“user_id”:3,”name”:”fgedu03″,”age”:28}
# 指定消费者组
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–group fgedu-consumer-group –from-beginning
# 显示消息详情
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-user \
–property print.key=true –property print.timestamp=true –from-beginning
# 查看消费者组
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –list
# 启动成功
# 消费消息
{“user_id”:1,”name”:”fgedu01″,”age”:25}
{“user_id”:2,”name”:”fgedu02″,”age”:30}
{“user_id”:3,”name”:”fgedu03″,”age”:28}
Processed a total of 3 messages
# 消费者组
fgedu-consumer-group
console-consumer-12345
# 消费者组详情
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –group fgedu-consumer-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 10 10 0
fgedu-consumer-group fgedu-user 1 10 10 0
fgedu-consumer-group fgedu-user 2 10 10 0
3.4 集群监控维护
3.4.1 集群状态监控
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092 | head -5
# 查看Topic分区分布
kafka-topics.sh –bootstrap-server fgedu01:9092 –describe
# 查看消费者组延迟
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –all-groups
# 查看集群配置
kafka-configs.sh –bootstrap-server fgedu01:9092 –describe –entity-type brokers
fgedu01:9092 (id: 1 rack: null) -> (
Produce(0): 0 to 9,
…
)
# Topic分区分布
Topic: fgedu-user PartitionCount: 6 ReplicationFactor: 3
Topic: fgedu-order PartitionCount: 6 ReplicationFactor: 3
# 消费者组延迟
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 100 100 0
fgedu-consumer-group fgedu-user 1 100 100 0
# 集群配置
Configs: broker.id=1, listeners=PLAINTEXT://fgedu01:9092, …
3.4.2 集群维护操作
kafka-log-dirs.sh –bootstrap-server fgedu01:9092 –describe
# 重新分配分区
kafka-reassign-partitions.sh –bootstrap-server fgedu01:9092 \
–reassignment-json-file reassign.json –execute
# 查看重新分配状态
kafka-reassign-partitions.sh –bootstrap-server fgedu01:9092 \
–reassignment-json-file reassign.json –verify
# 清理消费者组
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –delete –group old-group
{“version”:1,”brokers”:[{“broker”:1,”logDirs”:[{“logDir”:”/bigdata/kafka-logs”,”error”:null,”partitions”:{…}}]}]}
# 重新分配分区
Current partition replica assignment…
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions.
# 重新分配状态
Reassignment of partition fgedu-user-0 completed successfully.
Reassignment of partition fgedu-user-1 completed successfully.
# 清理消费者组
Deletion of requested consumer group ‘old-group’ was successful.
Part04-生产案例与实战讲解
4.1 实时数据采集案例
实时数据采集是Kafka的核心应用场景。更多视频教程www.fgedu.net.cn
# kafka_data_producer.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Kafka Data Producer ===”
echo “Date: $(date)”
# 创建Topic
kafka-topics.sh –bootstrap-server fgedu01:9092 –create –topic fgedu-realtime –partitions 6 –replication-factor 3
# 生成测试数据
for i in {1..1000}; do
echo “{\”id\”:${i},\”timestamp\”:\”$(date -Iseconds)\”,\”data\”:\”test_data_${i}\”}”
done > /tmp/kafka_data.txt
# 发送数据
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-realtime < /tmp/kafka_data.txt
# 验证数据
kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list fgedu01:9092 –topic fgedu-realtime
echo “=== Producer Completed ===”
./kafka_data_producer.sh
=== Kafka Data Producer ===
Date: Fri Jan 19 02:00:00 CST 2024
Created topic fgedu-realtime.
# 发送数据
# 发送完成
# 验证数据
fgedu-realtime:0:167
fgedu-realtime:1:167
fgedu-realtime:2:166
fgedu-realtime:3:167
fgedu-realtime:4:167
fgedu-realtime:5:166
=== Producer Completed ===
4.2 日志收集案例
日志收集是Kafka的经典应用。学习交流加群风哥微信: itpux-com
kafka-topics.sh –bootstrap-server fgedu01:9092 \
–create –topic fgedu-logs –partitions 12 –replication-factor 3 \
–config retention.ms=604800000 –config segment.bytes=1073741824
# 配置Flume采集日志到Kafka
cat > /bigdata/app/flume/conf/kafka_sink.conf << 'EOF'
agent.sources = src1
agent.channels = ch1
agent.sinks = k1
agent.sources.src1.type = exec
agent.sources.src1.command = tail -F /bigdata/logs/app.log
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic = fgedu-logs
agent.sinks.k1.kafka.bootstrap.servers = fgedu01:9092
agent.sources.src1.channels = ch1
agent.sinks.k1.channel = ch1
EOF
# 启动Flume
flume-ng agent -n agent -c conf -f /bigdata/app/flume/conf/kafka_sink.conf
Created topic fgedu-logs.
# Flume配置
# 配置完成
# 启动Flume
Info: Sourcing environment configuration script…
Info: Including Hive libraries found via…
# Flume启动成功
4.3 性能调优案例
4.3.1 生产者优化
kafka-producer-perf-test.sh –topic fgedu-perf-test \
–num-records 1000000 –record-size 1024 –throughput -1 \
–producer-props bootstrap.servers=fgedu01:9092 acks=all batch.size=16384
# 优化生产者配置
cat > /bigdata/app/kafka/producer.properties << 'EOF'
bootstrap.servers=fgedu01:9092,fgedu02:9092,fgedu03:9092
acks=all
batch.size=32768
linger.ms=10
buffer.memory=67108864
compression.type=lz4
EOF
# 使用优化配置发送
kafka-console-producer.sh –bootstrap-server fgedu01:9092 –topic fgedu-perf-test \
–producer.config /bigdata/app/kafka/producer.properties
1000000 records sent, 50000.00 records/sec (48.83 MB/sec), 20.00 ms avg latency, 100.00 ms max latency.
# 优化生产者配置
# 配置完成
# 使用优化配置
# 启动成功
4.3.2 消费者优化
kafka-consumer-perf-test.sh –topic fgedu-perf-test \
–bootstrap-server fgedu01:9092 –messages 1000000
# 优化消费者配置
cat > /bigdata/app/kafka/consumer.properties << 'EOF'
bootstrap.servers=fgedu01:9092,fgedu02:9092,fgedu03:9092
group.id=fgedu-consumer-group
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
enable.auto.commit=false
EOF
# 使用优化配置消费
kafka-console-consumer.sh –bootstrap-server fgedu01:9092 –topic fgedu-perf-test \
–consumer.config /bigdata/app/kafka/consumer.properties
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2024-01-19 02:30:00:000, 2024-01-19 02:31:00:000, 976.56, 16.28, 1000000, 16666.67
# 优化消费者配置
# 配置完成
# 使用优化配置
# 启动成功
Part05-风哥经验总结与分享
5.1 Kafka最佳实践
在实际生产环境中,Kafka使用需要注意以下几点:from bigdata视频:www.itpux.com
1. 合理设置分区数
2. 配置合适的副本数
3. 监控消费者延迟
4. 做好运维监控
5. 定期清理过期数据
5.2 性能优化经验总结
5.2.1 优化建议
– 生产者批量发送
– Broker磁盘优化
– 消费者并行消费
– 合理设置保留时间
– 监控集群状态
5.2.2 Kafka运维脚本
# kafka_maintenance.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
echo “=== Kafka Maintenance ===”
echo “Date: $(date)”
# 1. 检查集群状态
echo “=== Check Cluster Status ===”
kafka-broker-api-versions.sh –bootstrap-server fgedu01:9092 | head -3
# 2. 检查Topic状态
echo “=== Check Topic Status ===”
kafka-topics.sh –bootstrap-server fgedu01:9092 –list
# 3. 检查消费者组
echo “=== Check Consumer Groups ===”
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –list
# 4. 检查消费者延迟
echo “=== Check Consumer Lag ===”
kafka-consumer-groups.sh –bootstrap-server fgedu01:9092 –describe –all-groups | grep -v “^$”
echo “=== Maintenance Completed ===”
./kafka_maintenance.sh
=== Kafka Maintenance ===
Date: Fri Jan 19 03:00:00 CST 2024
=== Check Cluster Status ===
fgedu01:9092 (id: 1 rack: null) -> (
fgedu02:9092 (id: 2 rack: null) -> (
fgedu03:9092 (id: 3 rack: null) -> (
=== Check Topic Status ===
__consumer_offsets
fgedu-user
fgedu-order
fgedu-logs
=== Check Consumer Groups ===
fgedu-consumer-group
=== Check Consumer Lag ===
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-consumer-group fgedu-user 0 100 100 0
=== Maintenance Completed ===
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
