本文档风哥主要介绍Kafka监控与管理实战,包括Kafka监控体系架构、Kafka核心监控指标、Kafka JMX监控配置、Kafka Prometheus监控集成、Kafka告警配置等内容,风哥教程参考Kafka官方文档Monitoring、Operations等内容,适合大数据开发运维人员在学习和测试中使用,如果要应用于生产环境则需要自行确认。更多视频教程www.fgedu.net.cn
Part01-基础概念与理论知识
1.1 Kafka监控体系架构
Kafka监控体系包括指标采集、数据存储、可视化展示和告警通知四个核心组件。学习交流加群风哥微信: itpux-com
- 指标采集:JMX Exporter、Kafka Exporter
- 数据存储:Prometheus、InfluxDB
- 可视化展示:Grafana、Kafka Manager
- 告警通知:Alertmanager、钉钉、邮件
1.2 Kafka核心监控指标
Kafka核心监控指标分类:
1. 吞吐量指标
– BytesInPerSec: 入站字节速率
– BytesOutPerSec: 出站字节速率
– MessagesInPerSec: 入站消息速率
2. 请求指标
– TotalTimeMs: 请求处理总时间
– RequestQueueSize: 请求队列大小
– ResponseQueueSize: 响应队列大小
3. 副本指标
– UnderReplicatedPartitions: 未充分复制分区数
– IsrShrinksPerSec: ISR收缩速率
– LeaderElectionsPerSec: Leader选举速率
# Topic监控指标
1. 消息指标
– MessagesInPerSec: 消息入站速率
– BytesInPerSec: 字节入站速率
2. 分区指标
– PartitionCount: 分区数量
– ReplicationFactor: 副本因子
# Consumer监控指标
1. 消费延迟
– ConsumerLag: 消费延迟
– ConsumerOffset: 消费者Offset
– LogEndOffset: 日志末端Offset
2. 消费速率
– MessagesConsumedPerSec: 消费消息速率
– BytesConsumedPerSec: 消费字节速率
1.3 Kafka告警机制
Kafka告警机制设计要点:
- 告警级别:严重、警告、信息
- 告警规则:基于阈值的静态规则和基于趋势的动态规则
- 告警渠道:邮件、钉钉、短信、Webhook
- 告警抑制:避免重复告警和告警风暴
Part02-生产环境规划与建议
2.1 监控系统规划
监控系统规划建议:
方案1:Prometheus + Grafana
– 指标采集:JMX Exporter
– 数据存储:Prometheus
– 可视化:Grafana
– 告警:Alertmanager
方案2:Kafka Manager
– 集群管理:可视化操作
– 监控:基础监控
– 告警:简单告警
方案3:Kafka Eagle
– 综合监控:全面监控
– 可视化:丰富图表
– 告警:多渠道告警
# 推荐方案
– 生产环境:Prometheus + Grafana + Kafka Manager
– 开发环境:Kafka Eagle
2.2 监控指标规划
核心监控指标规划:
1. Broker级别
– UnderReplicatedPartitions > 0
– OfflinePartitionsCount > 0
– ActiveControllerCount != 1
2. Topic级别
– 消息吞吐量
– 分区数量变化
– 副本状态
3. Consumer级别
– ConsumerLag > 阈值
– 消费速率异常
– 消费者组状态
4. 系统级别
– CPU使用率 > 80%
– 内存使用率 > 85%
– 磁盘使用率 > 85%
– 网络IO异常
# 告警阈值建议
| 指标 | 警告阈值 | 严重阈值 |
|——|———-|———-|
| ConsumerLag | 10000 | 100000 |
| UnderReplicatedPartitions | 1 | 5 |
| 磁盘使用率 | 80% | 90% |
| CPU使用率 | 70% | 90% |
2.3 告警策略规划
告警策略规划建议:
P0 – 严重告警
– 集群不可用
– 数据丢失风险
– 需要立即处理
P1 – 高级告警
– 服务降级
– 性能严重下降
– 需要尽快处理
P2 – 中级告警
– 潜在风险
– 性能轻微下降
– 需要关注
P3 – 低级告警
– 信息通知
– 需要了解
# 告警渠道配置
P0: 电话 + 短信 + 钉钉
P1: 短信 + 钉钉
P2: 钉钉
P3: 邮件
# 告警抑制规则
– 同一告警5分钟内不重复发送
– 告警恢复后发送恢复通知
– 告警升级机制
Part03-生产环境项目实施方案
3.1 JMX监控配置实战
3.1.1 启用JMX
$ vi /bigdata/app/kafka/bin/kafka-server-start.sh
# 添加以下内容
if [ “x$KAFKA_JMX_OPTS” = “x” ]; then
export KAFKA_JMX_OPTS=”-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=192.168.1.51″
fi
# 重启Kafka
$ systemctl restart kafka
# 验证JMX端口
$ netstat -tlnp | grep 9999
tcp6 0 0 :::9999 :::* LISTEN 12345/java
# 使用JConsole连接
$ jconsole 192.168.1.51:9999
3.1.2 配置JMX Exporter
$ wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.20.0/jmx_prometheus_javaagent-0.20.0.jar
$ mv jmx_prometheus_javaagent-0.20.0.jar /bigdata/app/kafka/libs/
# 创建配置文件
$ cat > /bigdata/app/kafka/config/kafka-jmx.yml << 'EOF'
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
- pattern: kafka.server
name: kafka_server_$1_$2
type: GAUGE
– pattern: kafka.server
name: kafka_server_$1_$2
labels:
topic: $3
type: GAUGE
– pattern: kafka.server
name: kafka_server_$1_$2_one_minute_rate
type: GAUGE
– pattern: kafka.network
name: kafka_network_$1_$2
type: GAUGE
EOF
# 修改kafka-server-start.sh
$ vi /bigdata/app/kafka/bin/kafka-server-start.sh
# 添加JMX Exporter
export KAFKA_OPTS=”-javaagent:/bigdata/app/kafka/libs/jmx_prometheus_javaagent-0.20.0.jar=7071:/bigdata/app/kafka/config/kafka-jmx.yml”
# 重启Kafka
$ systemctl restart kafka
# 验证JMX Exporter
$ curl http://192.168.1.51:7071/metrics | head -20
# HELP kafka_server_BrokerTopicMetrics_BytesInPerSec_OneMinuteRate
# TYPE kafka_server_BrokerTopicMetrics_BytesInPerSec_OneMinuteRate gauge
kafka_server_BrokerTopicMetrics_BytesInPerSec_OneMinuteRate 12345.67
3.2 Prometheus监控实战
3.2.1 安装配置Prometheus
$ wget https://github.com/prometheus/prometheus/releases/download/v2.45.0/prometheus-2.45.0.linux-amd64.tar.gz
$ tar -zxvf prometheus-2.45.0.linux-amd64.tar.gz
$ mv prometheus-2.45.0.linux-amd64 /bigdata/app/prometheus
# 配置Prometheus
$ cat > /bigdata/app/prometheus/prometheus.yml << 'EOF'
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets:
- '192.168.1.51:7071'
- '192.168.1.52:7071'
- '192.168.1.53:7071'
labels:
cluster: 'fgedu-kafka-cluster'
- job_name: 'zookeeper'
static_configs:
- targets:
- '192.168.1.51:7072'
- '192.168.1.52:7072'
- '192.168.1.53:7072'
EOF
# 启动Prometheus
$ /bigdata/app/prometheus/prometheus --config.file=/bigdata/app/prometheus/prometheus.yml &
# 访问Prometheus UI
# http://192.168.1.51:9090
3.2.2 配置Grafana
$ yum install -y grafana
# 启动Grafana
$ systemctl start grafana-server
$ systemctl enable grafana-server
# 访问Grafana
# http://192.168.1.51:3000
# 默认账号:admin/admin
# 添加Prometheus数据源
# Configuration -> Data Sources -> Add data source -> Prometheus
# URL: http://localhost:9090
# 导入Kafka Dashboard
# 导入ID: 7589 (Kafka Exporter Overview)
# 导入ID: 11962 (Kafka JMX Metrics)
3.3 告警配置实战
3.3.1 Prometheus告警规则
$ cat > /bigdata/app/prometheus/kafka_alerts.yml << 'EOF' groups: - name: kafka_alerts rules: # Broker宕机告警 - alert: KafkaBrokerDown expr: up{job="kafka"} == 0 for: 1m labels: severity: critical annotations: summary: "Kafka Broker宕机" description: "Kafka Broker {{ $labels.instance }} 已宕机超过1分钟" # 未充分复制分区告警 - alert: KafkaUnderReplicatedPartitions expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: “Kafka存在未充分复制分区”
description: “Broker {{ $labels.instance }} 有 {{ $value }} 个未充分复制分区”
# 消费延迟告警
– alert: KafkaConsumerLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: “Kafka消费延迟过高”
description: “消费者组 {{ $labels.group }} 的延迟为 {{ $value }}”
# 磁盘空间告警
– alert: KafkaDiskSpaceLow
expr: node_filesystem_avail_bytes{mountpoint=”/bigdata”} / node_filesystem_size_bytes{mountpoint=”/bigdata”} < 0.15
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka磁盘空间不足"
description: "Broker {{ $labels.instance }} 磁盘使用率超过85%"
EOF
# 在prometheus.yml中添加告警规则
$ vi /bigdata/app/prometheus/prometheus.yml
rule_files:
- /bigdata/app/prometheus/kafka_alerts.yml
# 重启Prometheus
$ systemctl restart prometheus
3.3.2 配置Alertmanager
$ wget https://github.com/prometheus/alertmanager/releases/download/v0.25.0/alertmanager-0.25.0.linux-amd64.tar.gz
$ tar -zxvf alertmanager-0.25.0.linux-amd64.tar.gz
$ mv alertmanager-0.25.0.linux-amd64 /bigdata/app/alertmanager
# 配置Alertmanager
$ cat > /bigdata/app/alertmanager/alertmanager.yml << 'EOF'
global:
resolve_timeout: 5m
route:
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 1h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical'
- match:
severity: warning
receiver: 'warning'
receivers:
- name: 'default'
webhook_configs:
- url: 'http://192.168.1.100:8080/alert'
- name: 'critical'
webhook_configs:
- url: 'http://192.168.1.100:8080/alert/critical'
- name: 'warning'
webhook_configs:
- url: 'http://192.168.1.100:8080/alert/warning'
EOF
# 启动Alertmanager
$ /bigdata/app/alertmanager/alertmanager --config.file=/bigdata/app/alertmanager/alertmanager.yml &
# 在Prometheus中配置Alertmanager
$ vi /bigdata/app/prometheus/prometheus.yml
alerting:
alertmanagers:
- static_configs:
- targets:
- 192.168.1.51:9093
Part04-生产案例与实战讲解
4.1 消费延迟监控实战
4.1.1 使用命令行监控
$ /bigdata/app/kafka/bin/kafka-consumer-groups.sh –describe \
–group fgedu-order-consumer-group \
–bootstrap-server 192.168.1.51:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fgedu-order-consumer-group fgedu-orders 0 10000 15000 5000
fgedu-order-consumer-group fgedu-orders 1 8000 12000 4000
fgedu-order-consumer-group fgedu-orders 2 12000 18000 6000
# 监控脚本
#!/bin/bash
# lag_monitor.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
KAFKA_HOME=/bigdata/app/kafka
BOOTSTRAP_SERVER=192.168.1.51:9092
THRESHOLD=10000
while true; do
echo “=== Consumer Lag Monitor ===”
echo “Time: $(date)”
# 获取所有消费者组
GROUPS=$($KAFKA_HOME/bin/kafka-consumer-groups.sh –list –bootstrap-server $BOOTSTRAP_SERVER)
for GROUP in $GROUPS; do
LAG=$($KAFKA_HOME/bin/kafka-consumer-groups.sh –describe \
–group $GROUP \
–bootstrap-server $BOOTSTRAP_SERVER | \
awk ‘NR>1 {sum+=$6} END {print sum+0}’)
if [ “$LAG” -gt “$THRESHOLD” ]; then
echo “WARNING: Group $GROUP lag is $LAG (threshold: $THRESHOLD)”
fi
done
sleep 60
done
4.1.2 使用Burrow监控
$ git clone https://github.com/linkedin/Burrow.git
$ cd Burrow
$ go build -o burrow ./cmd/burrow
# 配置Burrow
$ cat > /bigdata/app/burrow/burrow.toml << 'EOF'
[general]
logdir = "/bigdata/app/burrow/logs"
logconfig = "/bigdata/app/burrow/logging.cfg"
[zookeeper]
servers = ["192.168.1.51:2181", "192.168.1.52:2181", "192.168.1.53:2181"]
timeout = 6
[client-profile.kafka]
client-id = "burrow"
kafka-version = "3.6.1"
[cluster.kafka]
class-name = "kafka"
client-profile = "kafka"
servers = ["192.168.1.51:9092", "192.168.1.52:9092", "192.168.1.53:9092"]
[consumer.kafka]
class-name = "kafka"
cluster = "kafka"
servers = ["192.168.1.51:9092", "192.168.1.52:9092", "192.168.1.53:9092"]
[httpserver.default]
address = ":8000"
EOF
# 启动Burrow
$ /bigdata/app/burrow/burrow --config-dir /bigdata/app/burrow &
# 查看消费者组状态
$ curl http://localhost:8000/v3/kafka/kafka/consumer/fgedu-order-consumer-group/status
{
"error": false,
"message": "consumer group status returned",
"status": {
"cluster": "kafka",
"group": "fgedu-order-consumer-group",
"status": "OK",
"complete": 1,
"maxlag": 5000,
"totallag": 15000
}
}
4.2 集群状态监控实战
4.2.1 集群健康检查脚本
# cluster_health_check.sh
# from:www.itpux.com.qq113257174.wx:itpux-com
# web: http://www.fgedu.net.cn
KAFKA_HOME=/bigdata/app/kafka
BOOTSTRAP_SERVER=192.168.1.51:9092
echo “=== Kafka Cluster Health Check ===”
echo “Time: $(date)”
echo “”
# 1. 检查Broker状态
echo “1. Broker Status:”
$KAFKA_HOME/bin/kafka-broker-api-versions.sh –bootstrap-server $BOOTSTRAP_SERVER | head -5
# 2. 检查Controller
echo “”
echo “2. Controller Status:”
$KAFKA_HOME/bin/kafka-metadata.sh \
–snapshot /bigdata/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
–command “controller” 2>/dev/null || echo “Controller check failed”
# 3. 检查Topic状态
echo “”
echo “3. Topic Status:”
TOPICS=$($KAFKA_HOME/bin/kafka-topics.sh –list –bootstrap-server $BOOTSTRAP_SERVER)
echo “Total Topics: $(echo $TOPICS | wc -w)”
# 4. 检查Under-Replicated分区
echo “”
echo “4. Under-Replicated Partitions:”
$KAFKA_HOME/bin/kafka-topics.sh –describe –bootstrap-server $BOOTSTRAP_SERVER | \
grep -E “Isr:” | \
awk -F'[\\t ]+’ ‘{
split($6, isr, “,”)
split($5, replicas, “,”)
if(length(isr) < length(replicas)) print $2
}' | sort -u
# 5. 检查磁盘空间
echo ""
echo "5. Disk Space:"
df -h /bigdata | tail -1
# 执行检查
$ chmod +x /bigdata/scripts/cluster_health_check.sh
$ /bigdata/scripts/cluster_health_check.sh
=== Kafka Cluster Health Check ===
Time: Mon Apr 8 10:30:00 CST 2026
1. Broker Status:
192.168.1.51:9092 (id: 1 rack: null) -> (Produce(0): 0 to 9, …)
2. Controller Status:
Controller: Broker 1
3. Topic Status:
Total Topics: 5
4. Under-Replicated Partitions:
5. Disk Space:
/dev/sdb1 500G 150G 350G 30% /bigdata
4.3 监控常见问题处理
4.3.1 监控数据缺失
# 排查步骤
# 1. 检查JMX Exporter是否正常
$ curl http://192.168.1.51:7071/metrics | head
# 2. 检查Prometheus是否正常采集
$ curl http://localhost:9090/api/v1/targets
# 3. 检查网络连通性
$ telnet 192.168.1.51 7071
# 解决方案
# 1. 重启JMX Exporter(重启Kafka)
$ systemctl restart kafka
# 2. 检查防火墙
$ firewall-cmd –list-ports
$ firewall-cmd –add-port=7071/tcp –permanent
# 3. 检查Prometheus配置
$ /bigdata/app/prometheus/promtool check config /bigdata/app/prometheus/prometheus.yml
Part05-风哥经验总结与分享
5.1 监控最佳实践
Kafka监控最佳实践建议:
1. 建立完善的监控体系
– 指标采集:JMX Exporter
– 数据存储:Prometheus
– 可视化:Grafana
– 告警:Alertmanager
2. 监控核心指标
– Broker状态和吞吐量
– 分区和副本状态
– 消费延迟
– 系统资源使用
3. 配置合理告警
– 设置合理阈值
– 配置多级告警
– 避免告警风暴
4. 定期巡检
– 每日检查告警
– 每周检查趋势
– 每月评估容量
5.2 监控检查清单
监控检查清单:
- JMX是否启用
- Prometheus是否正常采集
- Grafana Dashboard是否正常
- 告警规则是否配置
- 告警渠道是否正常
- 消费者延迟是否监控
- 集群状态是否监控
5.3 监控工具推荐
Kafka监控工具:
- Prometheus + Grafana:标准监控方案
- Kafka Manager:集群可视化管理
- Kafka Eagle:综合监控平台
- Burrow:消费者延迟监控
- JMX Exporter:JMX指标导出
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
