1. RocketMQ概述与环境规划
RocketMQ是阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用性、低延迟等特点。它广泛应用于电商、金融、物流等行业,支持顺序消息、事务消息、延迟消息等多种消息类型。更多学习教程www.fgedu.net.cn
1.1 RocketMQ版本说明
RocketMQ目前主要版本为5.3,本教程以RocketMQ 5.3为例进行详细讲解。
$ cat /opt/rocketmq/rocketmq-broker/logs/rocketmqlogs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.3.0 started successfully
# 查看NameServer状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0
1.2 环境规划
本次安装环境规划如下:
IP地址:192.168.1.51
NameServer端口:9876
Broker端口:10911
Broker HA端口:10912
安装目录:/opt/rocketmq
数据目录:/data/rocketmq/store
日志目录:/data/rocketmq/logs
配置目录:/opt/rocketmq/conf
Java环境:
JDK版本:OpenJDK 17
JAVA_HOME:/usr/lib/jvm/java-17
JVM堆大小:4GB
1.3 RocketMQ核心特性
1. 高吞吐量:单机支持万级消息吞吐
2. 低延迟:毫秒级消息投递延迟
3. 高可用:支持主从同步和异步复制
4. 消息可靠性:支持消息轨迹追踪
5. 顺序消息:支持全局和分区顺序消息
6. 事务消息:支持分布式事务
7. 延迟消息:支持任意时间延迟
8. 消息过滤:支持Tag和SQL92过滤
2. 硬件环境要求与检查
在安装RocketMQ之前,需要对服务器硬件环境进行全面检查。学习交流加群风哥微信: itpux-com
2.1 最低硬件要求
CPU:4核心
内存:8GB
磁盘:50GB
推荐配置(生产环境):
CPU:8核心以上
内存:16GB以上
磁盘:200GB以上SSD
高吞吐量配置:
CPU:16核心以上
内存:32GB以上
磁盘:500GB以上SSD
2.2 Java环境检查
$ java -version
openjdk version “17.0.9” 9.0.4
OpenJDK Runtime Environment (Temurin-17.0.9+9) (build 17.0.9+9)
OpenJDK 64-Bit Server VM (build 17.0.9+9, mixed mode, sharing)
# 检查JAVA_HOME
$ echo $JAVA_HOME
/usr/lib/jvm/java-17
# 查看Java安装路径
$ which java
/usr/bin/java
2.3 系统环境检查
# cat /etc/redhat-release
Red Hat Enterprise Linux release 8.8 (Ootpa)
# 检查内存信息
# free -h
total used free shared buff/cache available
Mem: 31Gi 2.0Gi 28Gi 256Mi 1.0Gi 29Gi
Swap: 7Gi 0B 7Gi
# 检查磁盘空间
# df -h
文件系统 容量 已用 可用 已用% 挂载点
/dev/mapper/vg_system-lv_root 100G 5.0G 95G 5% /
/dev/mapper/vg_data-lv_data 500G 50G 450G 10% /data
2.4 内核参数配置
# vi /etc/sysctl.d/99-rocketmq.conf
# 添加以下参数
# 文件描述符限制
fs.file-max = 65535
# 网络参数
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.ip_local_port_range = 1024 65535
# 虚拟内存
vm.swappiness = 1
vm.dirty_ratio = 40
vm.dirty_background_ratio = 10
# 使内核参数生效
# sysctl -p /etc/sysctl.d/99-rocketmq.conf
# 配置用户限制
# vi /etc/security/limits.conf
# 添加以下配置
rocketmq soft nofile 65535
rocketmq hard nofile 65535
rocketmq soft nproc 65535
rocketmq hard nproc 65535
3. RocketMQ安装步骤
本节详细介绍RocketMQ 5.3的安装过程。学习交流加群风哥QQ113257174
3.1 创建用户和目录
# groupadd -g 1006 rocketmq
# useradd -u 1006 -g rocketmq -d /opt/rocketmq -s /bin/bash rocketmq
# passwd rocketmq
# 创建目录
# mkdir -p /opt/rocketmq
# mkdir -p /data/rocketmq/{store,logs,commitlog,consumequeue,index}
# 设置目录权限
# chown -R rocketmq:rocketmq /opt/rocketmq
# chown -R rocketmq:rocketmq /data/rocketmq
3.2 下载并安装RocketMQ
# cd /usr/local/src
# 下载RocketMQ 5.3
# wget https://archive.apache.org/dist/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip
# 解压安装
# unzip -q rocketmq-all-5.3.0-bin-release.zip -d /opt/
# 重命名目录
# mv /opt/rocketmq-all-5.3.0-bin-release /opt/rocketmq
# 设置目录权限
# chown -R rocketmq:rocketmq /opt/rocketmq
# 验证安装
$ ls -la /opt/rocketmq/
总用量 0
drwxr-xr-x. 2 rocketmq rocketty 4096 4月 4 10:00 benchmark
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 bin
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 conf
drwxr-xr-x. 2 rocketmq rocketmq 4096 4月 4 10:00 lib
3.3 配置环境变量
$ vi ~/.bash_profile
# 添加以下内容
export ROCKETMQ_HOME=/opt/rocketmq
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH
# 使配置生效
$ source ~/.bash_profile
# 验证环境变量
$ echo $ROCKETMQ_HOME
/opt/rocketmq
3.4 配置NameServer
$ vi /opt/rocketmq/bin/runserver.sh
# 修改JVM参数
JAVA_OPT=”${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″
# 启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &
# 查看启动日志
$ tail -f /data/rocketmq/logs/namesrv.log
# 输出示例:
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
# 检查端口
$ netstat -tlnp | grep java
tcp6 0 0 :::9876 :::* LISTEN 12345/java
3.5 配置Broker
$ vi /opt/rocketmq/conf/broker.conf
# 添加以下配置
brokerClusterName = DefaultCluster
brokerName = fgedudb01
brokerId = 0
# NameServer地址
namesrvAddr = 192.168.1.51:9876
# Broker监听地址
brokerIP1 = 192.168.1.51
listenPort = 10911
# 数据存储路径
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 日志路径
storePathDelayOffset = /data/rocketmq/store/delayOffset
storePathCheckPoint = /data/rocketmq/store/checkpoint
abortFile = /data/rocketmq/store/abort
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# 主从复制方式
brokerRole = ASYNC_MASTER
# 消息大小限制
maxMessageSize = 4194304
# 编辑Broker启动脚本
$ vi /opt/rocketmq/bin/runbroker.sh
# 修改JVM参数
JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″
# 启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 查看启动日志
$ tail -f /data/rocketmq/logs/broker.log
# 输出示例:
The broker[fgedudb01, 192.168.1.51:10911] boot success. serializeType=JSON and name server is 192.168.1.51:9876
# 检查端口
$ netstat -tlnp | grep java
tcp6 0 0 :::9876 :::* LISTEN 12345/java
tcp6 0 0 :::10911 :::* LISTEN 12346/java
tcp6 0 0 :::10912 :::* LISTEN 12346/java
3.6 配置防火墙
# firewall-cmd –permanent –add-port=9876/tcp
success
# firewall-cmd –permanent –add-port=10911/tcp
success
# firewall-cmd –permanent –add-port=10912/tcp
success
# firewall-cmd –reload
success
# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version #InTPS #OutTPS
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0 0.00 0.00
4. RocketMQ参数配置
RocketMQ参数配置是性能优化的关键步骤,直接影响系统性能。更多学习教程公众号风哥教程itpux_com
4.1 Broker核心配置
$ vi /opt/rocketmq/conf/broker.conf
# 集群配置
brokerClusterName = FgeDuCluster
brokerName = fgedudb01
brokerId = 0
# 网络配置
namesrvAddr = 192.168.1.51:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
# 存储配置
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
# 刷盘配置
flushDiskType = ASYNC_FLUSH
flushCommitLogTimed = true
flushIntervalCommitLog = 500
# 复制配置
brokerRole = ASYNC_MASTER
syncFlushTimeout = 5000
slaveReadEnable = true
# 消息配置
maxMessageSize = 4194304
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
# 性能配置
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
useReentrantLockWhenPutMessage = false
4.2 内存映射配置
$ vi /opt/rocketmq/conf/broker.conf
# 内存映射配置
mappedFileSizeCommitLog = 1073741824
mappedFileSizeConsumeQueue = 300000
mappedFileSizeConsumeQueueExt = 4800
# 页缓存配置
accessMessageInMemoryMaxRatio = 40
messageIndexEnable = true
maxHashSlotNum = 2000000
maxIndexNum = 20000000
# 删除过期文件配置
deleteWhen = 04
fileReservedTime = 48
destroyMapedFileIntervalForcibly = 120000
redeleteHangedFileInterval = 120000
4.3 线程池配置
$ vi /opt/rocketmq/conf/broker.conf
# 发送消息线程池
sendMessageThreadPoolNums = 32
waitTimeMillsInSendQueue = 200
# 拉取消息线程池
pullMessageThreadPoolNums = 32
waitTimeMillsInPullQueue = 200
# 查询消息线程池
queryMessageThreadPoolNums = 8
# 管理线程池
adminBrokerThreadPoolNums = 16
# 客户端管理线程池
clientManageThreadPoolNums = 32
# 消费者管理线程池
consumerManageThreadPoolNums = 32
5. 主题与消息管理
RocketMQ提供了丰富的主题和消息管理功能,本节介绍常用的操作方法。from:www.itpux.com
5.1 创建主题
$ sh /opt/rocketmq/bin/mqadmin updateTopic -n 192.168.1.51:9876 -c DefaultCluster -t fgedu_topic
# 输出示例:
create topic to 192.168.1.51:10911 success.
TopicConfig [topicName=fgedu_topic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
# 查看主题列表
$ sh /opt/rocketmq/bin/mqadmin topicList -n 192.168.1.51:9876
# 输出示例:
fgedu_topic
RMQ_SYS_TRANS_HALF_TOPIC
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
SELF_TEST_TOPIC
DefaultCluster_REPLY_TOPIC
SCHEDULE_TOPIC_XXXX
# 查看主题详情
$ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.51:9876 -t fgedu_topic
# 输出示例:
#Broker Name #QID #Min Offset #Max Offset #InTPS #OutTPS
fgedudb01 0 0 0 0.00 0.00
fgedudb01 1 0 0 0.00 0.00
fgedudb01 2 0 0 0.00 0.00
5.2 发送消息
$ sh /opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 输出示例:
SendResult [sendStatus=SEND_OK, msgId=C0A8013300002A9F0000000000000000, offsetMsgId=C0A8013300002A9F0000000000000000, messageQueue=MessageQueue [topic=fgedu_topic, brokerName=fgedudb01, queueId=0], queueOffset=0]
# 使用Java发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(“fgedu_producer_group”);
producer.setNamesrvAddr(“192.168.1.51:9876”);
producer.start();
Message msg = new Message(“fgedu_topic”, “TagA”, “Hello RocketMQ”.getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
producer.shutdown();
}
}
5.3 消费消息
$ sh /opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 输出示例:
ConsumeMessageThread_please_rename_unique_group_name_4_1 receive new Messages: [MessageExt [brokerName=fgedudb01, queueId=0, storeSize=203, queueOffset=0, sysFlag=0, bornTimestamp=1712205600000, bornHost=/192.168.1.51:12345, storeTimestamp=1712205600000, storeHost=/192.168.1.51:10911, msgId=C0A8013300002A9F0000000000000000, commitLogOffset=0, bodyCRC=1234567890, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=’fgedu_topic’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=OrderID001, CONSUME_START_TIME=1712205600000}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId=’null’}]]
# 使用Java消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“fgedu_consumer_group”);
consumer.setNamesrvAddr(“192.168.1.51:9876”);
consumer.subscribe(“fgedu_topic”, “*”);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
6. 集群配置
RocketMQ支持多种集群部署模式,本节介绍常用的集群配置方法。更多学习教程www.fgedu.net.cn
6.1 主从集群配置
$ vi /opt/rocketmq/conf/broker.conf
brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# Slave节点配置 (fgedudb02)
$ vi /opt/rocketmq/conf/broker.conf
brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 1
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.52
listenPort = 10911
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动Master
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 启动Slave
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
FgeDuCluster broker-a 0 192.168.1.51:10911 V5_3_0
FgeDuCluster broker-a 1 192.168.1.52:10911 V5_3_0
6.2 多Master集群配置
$ vi /opt/rocketmq/conf/broker-a.conf
brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
brokerRole = ASYNC_MASTER
# 节点2配置 (fgedudb02)
$ vi /opt/rocketmq/conf/broker-b.conf
brokerClusterName = FgeDuCluster
brokerName = broker-b
brokerId = 0
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876
brokerIP1 = 192.168.1.52
listenPort = 10911
brokerRole = ASYNC_MASTER
# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version
FgeDuCluster broker-a 0 192.168.1.51:10911 V5_3_0
FgeDuCluster broker-b 0 192.168.1.52:10911 V5_3_0
7. 安全配置
RocketMQ提供了安全认证机制,本节介绍常用的安全配置方法。学习交流加群风哥微信: itpux-com
7.1 配置ACL
$ vi /opt/rocketmq/conf/plain_acl.yml
# 添加以下配置
globalWhiteRemoteAddresses:
– 192.168.1.*
– 127.0.0.1
accounts:
– accessKey: admin
secretKey: fgedu123456
whiteRemoteAddress:
admin: true
– accessKey: fgedu_producer
secretKey: fgedu123456
whiteRemoteAddress:
topicPerms:
– fgedu_topic=Publish
– accessKey: fgedu_consumer
secretKey: fgedu123456
whiteRemoteAddress:
topicPerms:
– fgedu_topic=Subscribe
groupPerms:
– fgedu_consumer_group=Subscribe
# 修改Broker配置启用ACL
$ vi /opt/rocketmq/conf/broker.conf
aclEnable = true
# 重启Broker
$ sh /opt/rocketmq/bin/mqshutdown broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
7.2 配置SSL/TLS
$ keytool -genkeypair -alias rocketmq -keyalg RSA -keysize 2048 -keystore /opt/rocketmq/conf/rocketmq.keystore -validity 365 -storepass fgedu123 -keypass fgedu123 -dname “CN=fgedudb01.fgedu.net.cn,OU=IT,O=FGedu,L=BJ,ST=BJ,C=CN”
# 修改Broker配置
$ vi /opt/rocketmq/conf/broker.conf
# SSL配置
tlsEnable = true
tlsKeyStorePath = /opt/rocketmq/conf/rocketmq.keystore
tlsKeyStorePassword = fgedu123
tlsKeyPassword = fgedu123
# 重启Broker
$ sh /opt/rocketmq/bin/mqshutdown broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
8. 监控与日志
RocketMQ提供了完善的监控和日志功能,本节介绍常用的监控配置方法。更多学习教程公众号风哥教程itpux_com
8.1 启用控制台
# wget https://github.com/apache/rocketmq-dashboard/releases/download/2.0.0/rocketmq-dashboard-2.0.0.jar
# 启动Dashboard
$ java -jar rocketmq-dashboard-2.0.0.jar –server.port=8080 –rocketmq.config.namesrvAddr=192.168.1.51:9876 &
# 访问控制台
# 浏览器访问: http://192.168.1.51:8080
# 查看集群信息
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
# 输出示例:
#Cluster Name #Broker Name #BrokerId #Addr #Version #InTPS #OutTPS
DefaultCluster fgedudb01 0 192.168.1.51:10911 V5_3_0 100.00 100.00
8.2 查看日志
$ tail -f /data/rocketmq/logs/namesrv.log
# 输出示例:
2026-04-04 10:00:00 INFO NameServerStartup – The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
2026-04-04 10:00:01 INFO RouteInfoManager – registerBroker, 192.168.1.51:10911
# 查看Broker日志
$ tail -f /data/rocketmq/logs/broker.log
# 输出示例:
2026-04-04 10:00:00 INFO BrokerController – The broker[fgedudb01, 192.168.1.51:10911] boot success. serializeType=JSON
2026-04-04 10:00:01 INFO BrokerController – Register broker to name server 192.168.1.51:9876 OK
8.3 性能监控
$ sh /opt/rocketmq/bin/mqadmin brokerStatus -n 192.168.1.51:9876 -b fgedudb01
# 输出示例:
bootTimestamp: 1712205600000
brokerVersion: 5.3.0
brokerVersionDesc: V5_3_0
commitLogDiskRatio: 0.05
commitLogMaxOffset: 1073741824
commitLogMinOffset: 0
consumeQueueDiskRatio: 0.05
dispatchMaxBuffer: 0
getFoundTps: 100.00
getMessageEntireTimeMax: 10
getMissTps: 0.00
getTotalTps: 100.00
putMessageAverageSize: 1024
putMessageDistributeTimeMap: {<=0ms:100, 0~10ms:0, 10~50ms:0}
putMessageEntireTimeMax: 5
putMessageSizeTotal: 102400
putMessageTimesTotal: 100
putTps: 100.00
# 查看主题统计
$ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.51:9876 -t fgedu_topic
9. 升级与迁移
RocketMQ升级和迁移是运维工作中的重要环节,需要仔细规划和执行。from:www.itpux.com
9.1 版本升级
$ cat /opt/rocketmq/logs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.3.0 started successfully
# 备份配置
$ cp -r /opt/rocketmq/conf /backup/rocketmq/conf_$(date +%Y%m%d)
# 停止Broker
$ sh /opt/rocketmq/bin/mqshutdown broker
# 停止NameServer
$ sh /opt/rocketmq/bin/mqshutdown namesrv
# 备份旧版本
$ mv /opt/rocketmq /opt/rocketmq-5.3.0.bak
# 下载新版本
# wget https://archive.apache.org/dist/rocketmq/5.4.0/rocketmq-all-5.4.0-bin-release.zip
# 解压安装
# unzip -q rocketmq-all-5.4.0-bin-release.zip -d /opt/
# mv /opt/rocketmq-all-5.4.0-bin-release /opt/rocketmq
# 恢复配置
$ cp -r /backup/rocketmq/conf_$(date +%Y%m%d)/* /opt/rocketmq/conf/
# 启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &
# 启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 验证版本
$ cat /data/rocketmq/logs/broker.log | grep “RocketMQ”
RocketMQ Broker 5.4.0 started successfully
9.2 数据迁移
$ sh /opt/rocketmq/bin/mqshutdown broker
# 复制数据目录
$ scp -r /data/rocketmq/store new-server:/data/rocketmq/
# 在新服务器启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 验证数据
$ sh /opt/rocketmq/bin/mqadmin topicStatus -n 192.168.1.52:9876 -t fgedu_topic
# 输出示例:
#Broker Name #QID #Min Offset #Max Offset #InTPS #OutTPS
fgedudb02 0 0 1000 0.00 0.00
10. 生产环境实战案例
本节提供一个完整的生产环境配置案例,帮助读者更好地理解RocketMQ的实际应用。更多学习教程www.fgedu.net.cn
10.1 生产环境完整配置
$ vi /opt/rocketmq/conf/broker.conf
# 集群配置
brokerClusterName = FgeDuCluster
brokerName = broker-a
brokerId = 0
# 网络配置
namesrvAddr = 192.168.1.51:9876;192.168.1.52:9876;192.168.1.53:9876
brokerIP1 = 192.168.1.51
listenPort = 10911
# 存储配置
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
mappedFileSizeCommitLog = 1073741824
mappedFileSizeConsumeQueue = 300000
# 刷盘配置
flushDiskType = SYNC_FLUSH
flushCommitLogTimed = true
flushIntervalCommitLog = 500
# 复制配置
brokerRole = SYNC_MASTER
syncFlushTimeout = 5000
slaveReadEnable = true
# 性能配置
sendMessageThreadPoolNums = 32
pullMessageThreadPoolNums = 32
useReentrantLockWhenPutMessage = false
# 消息配置
maxMessageSize = 4194304
autoCreateTopicEnable = false
autoCreateSubscriptionGroup = false
# ACL配置
aclEnable = true
# 创建systemd服务
$ vi /etc/systemd/system/rocketmq-namesrv.service
[Unit]
Description=RocketMQ NameServer
After=network.target
[Service]
Type=simple
User=rocketmq
Group=rocketmq
ExecStart=/opt/rocketmq/bin/mqnamesrv
ExecStop=/opt/rocketmq/bin/mqshutdown namesrv
Restart=on-failure
[Install]
WantedBy=multi-user.target
$ vi /etc/systemd/system/rocketmq-broker.service
[Unit]
Description=RocketMQ Broker
After=network.target
[Service]
Type=simple
User=rocketmq
Group=rocketmq
ExecStart=/opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf
ExecStop=/opt/rocketmq/bin/mqshutdown broker
Restart=on-failure
[Install]
WantedBy=multi-user.target
# 启用服务
# systemctl enable rocketmq-namesrv
# systemctl enable rocketmq-broker
# systemctl start rocketmq-namesrv
# systemctl start rocketmq-broker
10.2 高可用集群部署
# NameServer节点
# 192.168.1.51:9876
# 192.168.1.52:9876
# 192.168.1.53:9876
# Broker Master节点
# broker-a: 192.168.1.51:10911
# broker-b: 192.168.1.52:10911
# broker-c: 192.168.1.53:10911
# Broker Slave节点
# broker-a-slave: 192.168.1.52:10921
# broker-b-slave: 192.168.1.53:10921
# broker-c-slave: 192.168.1.51:10921
# 在所有节点启动NameServer
$ nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/logs/namesrv.log 2>&1 &
# 在Master节点启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-master.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 在Slave节点启动Broker
$ nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker-slave.conf > /data/rocketmq/logs/broker.log 2>&1 &
# 查看集群状态
$ sh /opt/rocketmq/bin/mqadmin clusterList -n 192.168.1.51:9876
10.3 性能调优实战
# vi /etc/sysctl.d/99-rocketmq.conf
# 网络优化
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 30
# 虚拟内存
vm.swappiness = 1
vm.dirty_ratio = 40
vm.dirty_background_ratio = 10
# 使配置生效
# sysctl -p /etc/sysctl.d/99-rocketmq.conf
# JVM优化
$ vi /opt/rocketmq/bin/runbroker.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”
JAVA_OPT=”${JAVA_OPT} -XX:+UseG1GC -XX:MaxGCPauseMillis=200″
JAVA_OPT=”${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35″
JAVA_OPT=”${JAVA_OPT} -XX:+ParallelRefProcEnabled”
JAVA_OPT=”${JAVA_OPT} -XX:+UseStringDeduplication”
# 性能测试
$ sh /opt/rocketmq/benchmark/tools.sh org.apache.rocketmq.example.benchmark.Producer -n 192.168.1.51:9876 -t fgedu_benchmark -w 16
# 输出示例:
Send TPS: 50000
Max RT: 100ms
Avg RT: 2ms
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
