1. 首页 > 软件安装教程 > 正文

RocketMQ安装配置-RocketMQ消息队列安装配置_升级迁移详细过程

1. RocketMQ概述与环境规划

RocketMQ是阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用性、低延迟等特点。它广泛应用于电商、金融、物流等行业,支持顺序消息、事务消息、延迟消息等多种消息类型。更多学习教程www.fgedu.net.cn

1.1 RocketMQ版本说明

RocketMQ目前主要版本为5.3,本教程以RocketMQ 5.3为例进行详细讲解。

# 查看RocketMQ版本
$ cat /opt/rocketmq/rocketmq-broker/logs/rocketmqlogs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.3.0 started successfully

# 查看NameServer状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0

1.2 环境规划

本次安装环境规划如下:

主机名:fgedudb01.fgedu.net.cn
IP地址:192.168.1.51
NameServer端口:9876
Broker端口:10911
Broker HA端口:10912
安装目录:/opt/rocketmq
数据目录:/data/rocketmq/store
日志目录:/data/rocketmq/logs
配置目录:/opt/rocketmq/conf

Java环境:
JDK版本:OpenJDK 17
JAVA_HOME:/usr/lib/jvm/java-17
JVM堆大小:4GB

1.3 RocketMQ核心特性

主要特点:
1. 高吞吐量:单机支持万级消息吞吐
2. 低延迟:毫秒级消息投递延迟
3. 高可用:支持主从同步和异步复制
4. 消息可靠性:支持消息轨迹追踪
5. 顺序消息:支持全局和分区顺序消息
6. 事务消息:支持分布式事务
7. 延迟消息:支持任意时间延迟
8. 消息过滤:支持Tag和SQL92过滤

2. 硬件环境要求与检查

在安装RocketMQ之前,需要对服务器硬件环境进行全面检查。学习交流加群风哥微信: itpux-com

2.1 最低硬件要求

最低配置:
CPU:4核心
内存:8GB
磁盘:50GB

推荐配置(生产环境):
CPU:8核心以上
内存:16GB以上
磁盘:200GB以上SSD

高吞吐量配置:
CPU:16核心以上
内存:32GB以上
磁盘:500GB以上SSD

2.2 Java环境检查

# 检查JDK版本
$ java -version
openjdk version “17.0.9” 9.0.4
OpenJDK Runtime Environment (Temurin-17.0.9+9) (build 17.0.9+9)
OpenJDK 64-Bit Server VM (build 17.0.9+9, mixed mode, sharing)

# 检查JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-17

# 查看Java安装路径
$ which java
/usr/bin/java

2.3 系统环境检查

# 检查操作系统版本
# cat /etc/redhat-release
Red Hat Enterprise Linux release 8.8 (Ootpa)

# 检查内存信息
# free -h
total used free shared buff/cache available
Mem: 31Gi 2.0Gi 28Gi 256Mi 1.0Gi 29Gi
Swap: 7Gi 0B 7Gi

# 检查磁盘空间
# df -h
文件系统 容量 已用 可用 已用% 挂载点
/dev/mapper/vg_system-lv_root 100G 5.0G 95G 5% /
/dev/mapper/vg_data-lv_data 500G 50G 450G 10% /data

2.4 内核参数配置

# 配置内核参数
# vi /etc/sysctl.d/99-rocketmq.conf

# 添加以下参数
# 文件描述符限制
fs.file-max = 65535

# 网络参数
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.ip_local_port_range = 1024 65535

# 虚拟内存
vm.swappiness = 1
vm.dirty_ratio = 40
vm.dirty_background_ratio = 10

# 使内核参数生效
# sysctl -p /etc/sysctl.d/99-rocketmq.conf

# 配置用户限制
# vi /etc/security/limits.conf

# 添加以下配置
rocketmq soft nofile 65535
rocketmq hard nofile 65535
rocketmq soft nproc 65535
rocketmq hard nproc 65535

3. RocketMQ安装步骤

本节详细介绍RocketMQ 5.3的安装过程。学习交流加群风哥QQ113257174

3.1 创建用户和目录

# 创建rocketmq用户
# groupadd -g 1006 rocketmq
# useradd -u 1006 -g rocketmq -d /opt/rocketmq -s /bin/bash rocketmq
# passwd rocketmq

# 创建目录
# mkdir -p /opt/rocketmq
# mkdir -p /data/rocketmq/{store,logs,commitlog,consumequeue,index}

# 设置目录权限
# chown -R rocketmq:rocketmq /opt/rocketmq
# chown -R rocketmq:rocketmq /data/rocketmq

3.2 下载并安装RocketMQ

# 切换到安装目录
# cd /usr/local/src

# 下载RocketMQ 5.3
# wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

# 解压安装
# unzip -q rocketmq-all-5.3.0-bin-release.zip -d /opt/

# 重命名目录
# mv /opt/rocketmq-all-5.3.0-bin-release /opt/rocketmq

# 设置目录权限
# chown -R rocketmq:rocketmq /opt/rocketmq

# 验证安装
$ ls -la /opt/rocketmq/
总用量 0
drwxr-xr-x. 2 rocketmq rocketty 4096 4月 4 10:00 benchmark
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 bin
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 conf
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 lib

3.3 配置环境变量

# 配置rocketmq用户环境变量
$ vi ~/.bash_profile

# 添加以下内容
export ROCKETMQ_HOME=/opt/rocketmq
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH

# 使配置生效
$ source ~/.bash_profile

# 验证环境变量
$ echo $ROCKETMQ_HOME
/opt/rocketmq

3.4 配置NameServer

# 编辑NameServer启动脚本
$ vi /opt/rocketmq/bin/runserver.sh

# 修改JVM参数
JAVA_OPT=”${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″

# 启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &

# 查看启动日志
$ tail -f /data/rocketmq/logs/namesrv.log

# 输出示例:
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

# 检查端口
$ netstat -tlnp | grep java
tcp6 0 0 :::9876 :::* LISTEN 12345/java

3.5 配置Broker

# 创建Broker配置文件
$ vi /opt/rocketmq/conf/broker.conf

# 添加以下配置
brokerClusterName = DefaultCluster
brokerName = fgedudb01
brokerId = 0

# NameServer地址
namesrvAddr = 192.168.1.51:9876

# Broker监听地址
brokerIP1 = 192.168.1.51
listenPort = 10911

# 数据存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index

# 日志路径
storePathDelayOffset = /data/rocketmq/store/delayOffset
storePathCheckPoint = /data/rocketmq/store/checkpoint
abortFile = /data/rocketmq/store/abort

# 刷盘方式
flushDiskType = ASYNC_FLUSH

# 主从复制方式
brokerRole = ASYNC_MASTER

# 消息大小限制
maxMessageSize = 4194304

# 编辑Broker启动脚本
$ vi /opt/rocketmq/bin/runbroker.sh

# 修改JVM参数
JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″

# 启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 查看启动日志
$ tail -f /data/rocketmq/logs/broker.log

# 输出示例:
The broker[fgedudb01, 192.168.1.51:10911] boot success. serializeType=JSON and name server is 192.168.1.51:9876

# 检查端口
$ netstat -tlnp | grep java
tcp6 0 0 :::9876 :::* LISTEN 12345/java
tcp6 0 0 :::10911 :::* LISTEN 12346/java
tcp6 0 0 :::10912 :::* LISTEN 12346/java

3.6 配置防火墙

# 配置防火墙
# firewall-cmd –permanent –add-port=9876/tcp
success
# firewall-cmd –permanent –add-port=10911/tcp
success
# firewall-cmd –permanent –add-port=10912/tcp
success
# firewall-cmd –reload
success

# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version #InTPS #OutTPS
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0 0.00 0.00

风哥提示:RocketMQ由NameServer和Broker两个核心组件组成。NameServer负责路由注册,Broker负责消息存储和转发。生产环境建议部署多个NameServer实现高可用。

4. RocketMQ参数配置

RocketMQ参数配置是性能优化的关键步骤,直接影响系统性能。更多学习教程公众号风哥教程itpux_com

4.1 Broker核心配置

# 编辑Broker配置文件
$ vi /opt/rocketmq/conf/broker.conf

# 集群配置
brokerClusterName = FgeDuCluster
brokerName = fgedudb01
brokerId = 0

# 网络配置
namesrvAddr = 192.168.1.51:9876
brokerIP1 = 192.168.1.51
listenPort = 10911

# 存储配置
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index

# 刷盘配置
flushDiskType = ASYNC_FLUSH
flushCommitLogTimed = true
flushIntervalCommitLog = 500

# 复制配置
brokerRole = ASYNC_MASTER
syncFlushTimeout = 5000
slaveReadEnable = true

# 消息配置
maxMessageSize = 4194304
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true

# 性能配置
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = false

4.2 内存映射配置

# 编辑Broker配置
$ vi /opt/rocketmq/conf/broker.conf

# 内存映射配置
mappedFileSizeCommitLog = 1073741824
mappedFileSizeConsumeQueue = 300000
mappedFileSizeConsumeQueueExt = 4800

# 页缓存配置
accessMessageInMemoryMaxRatio = 40
messageIndexEnable = true
maxHashSlotNum = 2000000
maxIndexNum = 20000000

# 删除过期文件配置
deleteWhen = 04
fileReservedTime = 48
destroyMapedFileIntervalForcibly = 120000
redeleteHangedFileInterval = 120000

4.3 线程池配置

# 编辑Broker配置
$ vi /opt/rocketmq/conf/broker.conf

# 发送消息线程池
sendMessageThreadPoolNums = 32
waitTimeMillsInSendQueue = 200

# 拉取消息线程池
pullMessageThreadPoolNums = 32
waitTimeMillsInPullQueue = 200

# 查询消息线程池
queryMessageThreadPoolNums = 8

# 管理线程池
adminBrokerThreadPoolNums = 16

# 客户端管理线程池
clientManageThreadPoolNums = 32

# 消费者管理线程池
consumerManageThreadPoolNums = 32

生产环境建议:根据服务器硬件配置合理调整内存映射大小和线程池参数。建议使用SSD存储以提高IO性能。

5. 主题与消息管理

RocketMQ提供了丰富的主题和消息管理功能,本节介绍常用的操作方法。from:www.itpux.com

5.1 创建主题

# 创建主题
$ sh /opt/rocketmq/bin/mqadmin updateTopic -n 192.168.1.51:9876 -c DefaultCluster -t fgedu_topic

# 输出示例:
create topic to 192.168.1.51:10911 success.
TopicConfig [topicName=fgedu_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

# 查看主题列表
$ sh /opt/rocketmq/bin/mqadmin topicList -n 192.168.1.51:9876

# 输出示例:
fgedu_topic
RMQ_SYS_TRANS_HALF_TOPIC
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
SELF_TEST_TOPIC
DefaultCluster_REPLY_TOPIC
SCHEDULE_TOPIC_XXXX

# 查看主题详情
$ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.51:9876 -t fgedu_topic

# 输出示例:
#Broker Name #QID #Min Offset #Max Offset #InTPS #OutTPS
fgedudb01 0 0 0 0.00 0.00
fgedudb01 1 0 0 0.00 0.00
fgedudb01 2 0 0 0.00 0.00

5.2 发送消息

# 使用命令行发送消息
$ sh /opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

# 输出示例:
SendResult [sendStatus=SEND_OK, msgId=C0A8013300002A9F0000000000000000, offsetMsgId=C0A8013300002A9F0000000000000000, messageQueue=MessageQueue [topic=fgedu_topic, brokerName=fgedudb01, queueId=0], queueOffset=0]

# 使用Java发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“fgedu_producer_group”);
producer.setNamesrvAddr(“192.168.1.51:9876”);
producer.start();

Message msg = new Message(“fgedu_topic”, “TagA”, “Hello RocketMQ”.getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);

producer.shutdown();
}
}

5.3 消费消息

# 使用命令行消费消息
$ sh /opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

# 输出示例:
ConsumeMessageThread_please_rename_unique_group_name_4_1 receive new Messages: [MessageExt [brokerName=fgedudb01, queueId=0, storeSize=203, queueOffset=0, sysFlag=0, bornTimestamp=1712205600000, bornHost=/192.168.1.51:12345, storeTimestamp=1712205600000, storeHost=/192.168.1.51:10911, msgId=C0A8013300002A9F0000000000000000, commitLogOffset=0, bodyCRC=1234567890, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’fgedu_topic’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=OrderID001, CONSUME_START_TIME=1712205600000}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId=’null’}]]

# 使用Java消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“fgedu_consumer_group”);
consumer.setNamesrvAddr(“192.168.1.51:9876”);
consumer.subscribe(“fgedu_topic”, “*”);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

6. 集群配置

RocketMQ支持多种集群部署模式,本节介绍常用的集群配置方法。更多学习教程www.fgedu.net.cn

6.1 主从集群配置

# Master节点配置 (fgedudb01)
$ vi /opt/rocketmq/conf/broker.conf

brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# Slave节点配置 (fgedudb02)
$ vi /opt/rocketmq/conf/broker.conf

brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 1
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.52
listenPort = 10911
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH

# 启动Master
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 启动Slave
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
FgeDuCluster broker-a 0 192.168.1.51:10911 V5_3_0
FgeDuCluster broker-a 1 192.168.1.52:10911 V5_3_0

6.2 多Master集群配置

# 节点1配置 (fgedudb01)
$ vi /opt/rocketmq/conf/broker-a.conf

brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
brokerRole = ASYNC_MASTER

# 节点2配置 (fgedudb02)
$ vi /opt/rocketmq/conf/broker-b.conf

brokerClusterName = FgeDuCluster
brokerName = broker-b
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.52
listenPort = 10911
brokerRole = ASYNC_MASTER

# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
FgeDuCluster broker-a 0 192.168.1.51:10911 V5_3_0
FgeDuCluster broker-b 0 192.168.1.52:10911 V5_3_0

风哥提示:RocketMQ集群支持多种部署模式。生产环境建议使用多Master-Slave模式,实现高可用和数据冗余。

7. 安全配置

RocketMQ提供了安全认证机制,本节介绍常用的安全配置方法。学习交流加群风哥微信: itpux-com

7.1 配置ACL

# 创建ACL配置文件
$ vi /opt/rocketmq/conf/plain_acl.yml

# 添加以下配置
globalWhiteRemoteAddresses:
– 192.168.1.*
– 127.0.0.1

accounts:
– accessKey: admin
secretKey: fgedu123456
whiteRemoteAddress:
admin: true

– accessKey: fgedu_producer
secretKey: fgedu123456
whiteRemoteAddress:
topicPerms:
– fgedu_topic=Publish

– accessKey: fgedu_consumer
secretKey: fgedu123456
whiteRemoteAddress:
topicPerms:
– fgedu_topic=Subscribe
groupPerms:
– fgedu_consumer_group=Subscribe

# 修改Broker配置启用ACL
$ vi /opt/rocketmq/conf/broker.conf

aclEnable = true

# 重启Broker
$ sh /opt/rocketmq/bin/mqshutdown broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

7.2 配置SSL/TLS

# 生成证书
$ keytool -genkeypair -alias rocketmq -keyalg RSA -keysize 2048 -keystore /opt/rocketmq/conf/rocketmq.keystore -validity 365 -storepass fgedu123 -keypass fgedu123 -dname “CN=fgedudb01.fgedu.net.cn,OU=IT,O=FGedu,L=BJ,ST=BJ,C=CN”

# 修改Broker配置
$ vi /opt/rocketmq/conf/broker.conf

# SSL配置
tlsEnable = true
tlsKeyStorePath = /opt/rocketmq/conf/rocketmq.keystore
tlsKeyStorePassword = fgedu123
tlsKeyPassword = fgedu123

# 重启Broker
$ sh /opt/rocketmq/bin/mqshutdown broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

生产环境建议:生产环境建议启用ACL认证,为不同应用分配独立的AccessKey和SecretKey。

8. 监控与日志

RocketMQ提供了完善的监控和日志功能,本节介绍常用的监控配置方法。更多学习教程公众号风哥教程itpux_com

8.1 启用控制台

# 下载RocketMQ Dashboard
# wget https://github.com/apache/rocketmq-dashboard/releases/download/2.0.0/rocketmq-dashboard-2.0.0.jar

# 启动Dashboard
$ java -jar rocketmq-dashboard-2.0.0.jar –server.port=8080 –rocketmq.config.namesrvAddr=192.168.1.51:9876 &

# 访问控制台
# 浏览器访问: http://192.168.1.51:8080

# 查看集群信息
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version #InTPS #OutTPS
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0 100.00 100.00

8.2 查看日志

# 查看NameServer日志
$ tail -f /data/rocketmq/logs/namesrv.log

# 输出示例:
2026-04-04 10:00:00 INFO NameServerStartup – The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
2026-04-04 10:00:01 INFO RouteInfoManager – registerBroker, 192.168.1.51:10911

# 查看Broker日志
$ tail -f /data/rocketmq/logs/broker.log

# 输出示例:
2026-04-04 10:00:00 INFO BrokerController – The broker[fgedudb01, 192.168.1.51:10911] boot success. serializeType=JSON
2026-04-04 10:00:01 INFO BrokerController – Register broker to name server 192.168.1.51:9876 OK

8.3 性能监控

# 查看Broker统计信息
$ sh /opt/rocketmq/bin/mqadmin brokerStatus -n 192.168.1.51:9876 -b fgedudb01

# 输出示例:
bootTimestamp: 1712205600000
brokerVersion: 5.3.0
brokerVersionDesc: V5_3_0
commitLogDiskRatio: 0.05
commitLogMaxOffset: 1073741824
commitLogMinOffset: 0
consumeQueueDiskRatio: 0.05
dispatchMaxBuffer: 0
getFoundTps: 100.00
getMessageEntireTimeMax: 10
getMissTps: 0.00
getTotalTps: 100.00
putMessageAverageSize: 1024
putMessageDistributeTimeMap: {<=0ms:100, 0~10ms:0, 10~50ms:0} putMessageEntireTimeMax: 5 putMessageSizeTotal: 102400 putMessageTimesTotal: 100 putTps: 100.00 # 查看主题统计 $ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.51:9876 -t fgedu_topic

风哥提示:生产环境建议部署RocketMQ Dashboard进行可视化监控。定期检查消息积压和消费延迟情况。

9. 升级与迁移

RocketMQ升级和迁移是运维工作中的重要环节,需要仔细规划和执行。from:www.itpux.com

9.1 版本升级

# 检查当前版本
$ cat /opt/rocketmq/logs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.3.0 started successfully

# 备份配置
$ cp -r /opt/rocketmq/conf /backup/rocketmq/conf_$(date +%Y%m%d)

# 停止Broker
$ sh /opt/rocketmq/bin/mqshutdown broker

# 停止NameServer
$ sh /opt/rocketmq/bin/mqshutdown namesrv

# 备份旧版本
$ mv /opt/rocketmq /opt/rocketmq-5.3.0.bak

# 下载新版本
# wget https://archive.apache.org/dist/rocketmq/5.4.0/rocketmq-all-5.4.0-bin-release.zip

# 解压安装
# unzip -q rocketmq-all-5.4.0-bin-release.zip -d /opt/
# mv /opt/rocketmq-all-5.4.0-bin-release /opt/rocketmq

# 恢复配置
$ cp -r /backup/rocketmq/conf_$(date +%Y%m%d)/* /opt/rocketmq/conf/

# 启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &

# 启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 验证版本
$ cat /data/rocketmq/logs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.4.0 started successfully

9.2 数据迁移

# 停止旧Broker
$ sh /opt/rocketmq/bin/mqshutdown broker

# 复制数据目录
$ scp -r /data/rocketmq/store new-server:/data/rocketmq/

# 在新服务器启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 验证数据
$ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.52:9876 -t fgedu_topic

# 输出示例:
#Broker Name #QID #Min Offset #Max Offset #InTPS #OutTPS
fgedudb02 0 0 1000 0.00 0.00

生产环境建议:升级前必须进行完整备份。建议先在测试环境验证兼容性,确保消息不丢失。

10. 生产环境实战案例

本节提供一个完整的生产环境配置案例,帮助读者更好地理解RocketMQ的实际应用。更多学习教程www.fgedu.net.cn

10.1 生产环境完整配置

# 生产环境Broker完整配置
$ vi /opt/rocketmq/conf/broker.conf

# 集群配置
brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0

# 网络配置
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876;192.168.1.53:9876
brokerIP1 = 192.168.1.51
listenPort = 10911

# 存储配置
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
mappedFileSizeCommitLog = 1073741824
mappedFileSizeConsumeQueue = 300000

# 刷盘配置
flushDiskType = SYNC_FLUSH
flushCommitLogTimed = true
flushIntervalCommitLog = 500

# 复制配置
brokerRole = SYNC_MASTER
syncFlushTimeout = 5000
slaveReadEnable = true

# 性能配置
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32
useReentrantLockWhenPutMessage = false

# 消息配置
maxMessageSize = 4194304
autoCreateTopicEnable = false
autoCreateSubscriptionGroup = false

# ACL配置
aclEnable = true

# 创建systemd服务
$ vi /etc/systemd/system/rocketmq-namesrv.service

[Unit]
Description=RocketMQ NameServer
After=network.target

[Service]
Type=simple
User=rocketmq
Group=rocketmq
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/opt/rocketmq/bin/mqshutdown namesrv
Restart=on-failure

[Install]
WantedBy=multi-user.target

$ vi /etc/systemd/system/rocketmq-broker.service

[Unit]
Description=RocketMQ Broker
After=network.target

[Service]
Type=simple
User=rocketmq
Group=rocketmq
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
ExecStop=/opt/rocketmq/bin/mqshutdown broker
Restart=on-failure

[Install]
WantedBy=multi-user.target

# 启用服务
# systemctl enable rocketmq-namesrv
# systemctl enable rocketmq-broker
# systemctl start rocketmq-namesrv
# systemctl start rocketmq-broker

10.2 高可用集群部署

# 三节点集群部署

# NameServer节点
# 192.168.1.51:9876
# 192.168.1.52:9876
# 192.168.1.53:9876

# Broker Master节点
# broker-a: 192.168.1.51:10911
# broker-b: 192.168.1.52:10911
# broker-c: 192.168.1.53:10911

# Broker Slave节点
# broker-a-slave: 192.168.1.52:10921
# broker-b-slave: 192.168.1.53:10921
# broker-c-slave: 192.168.1.51:10921

# 在所有节点启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &

# 在Master节点启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-master.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 在Slave节点启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-slave.conf > /data/rocketmq/logs/broker.log 2>&1 &

# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876

10.3 性能调优实战

# 操作系统优化
# vi /etc/sysctl.d/99-rocketmq.conf

# 网络优化
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30

# 虚拟内存
vm.swappiness = 1
vm.dirty_ratio = 40
vm.dirty_background_ratio = 10

# 使配置生效
# sysctl -p /etc/sysctl.d/99-rocketmq.conf

# JVM优化
$ vi /opt/rocketmq/bin/runbroker.sh

JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″
JAVA_OPT=”${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35″
JAVA_OPT=”${JAVA_OPT} -XX:+ParallelRefProcEnabled”
JAVA_OPT=”${JAVA_OPT} -XX:+UseStringDeduplication”

# 性能测试
$ sh /opt/rocketmq/benchmark/tools.sh org.apache.rocketmq.example.benchmark.Producer -n 192.168.1.51:9876 -t fgedu_benchmark -w 16

# 输出示例:
Send TPS: 50000
Max RT: 100ms
Avg RT: 2ms

风哥提示:RocketMQ作为高性能消息中间件,在电商和金融领域应用广泛。建议根据业务场景选择合适的刷盘方式和复制模式。

本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

联系我们

在线咨询:点击这里给我发消息

微信号:itpux-com

工作日:9:30-18:30,节假日休息