1. Pulsar消息队列简介
Apache Pulsar是Yahoo开源的云原生分布式消息流平台,集消息、队列、流处理于一体。Pulsar以其存算分离架构、多租户支持和地理复制能力著称,广泛用于金融、物联网、实时数据处理等场景。更多学习教程www.fgedu.net.cn
Pulsar核心特点
存算分离:Broker无状态,存储层可独立扩展
多租户:原生支持多租户和命名空间隔离
地理复制:支持跨数据中心复制
统一消息模型:支持队列和流两种消费模式
持久化存储:基于BookKeeper的持久化保证
2. Pulsar版本说明
Pulsar提供多个版本,用户可根据需求选择。学习交流加群风哥微信: itpux-com
最新版本信息
Pulsar 4.0.3:最新稳定版本
Pulsar 3.3.4:稳定版本,广泛使用
Pulsar 3.2.x:历史稳定版本
Pulsar 2.11.x:历史长期支持版本
版本特性
4.x版本:全新架构,性能优化
3.x版本:稳定可靠,功能完整
2.x版本:经典版本,广泛使用
3. Pulsar下载地址
Pulsar可以从官方网站或Apache镜像获取安装包。
官方下载地址
官方网站:https://pulsar.apache.org/download/
Apache镜像:https://archive.apache.org/dist/pulsar/
文档中心:https://pulsar.apache.org/docs/
下载安装包
# cd /fgeudb/software
# wget https://archive.apache.org/dist/pulsar/pulsar-4.0.3/apache-pulsar-4.0.3-bin.tar.gz
# 输出示例如下:
–2026-04-04 10:00:00– https://archive.apache.org/dist/pulsar/pulsar-4.0.3/apache-pulsar-4.0.3-bin.tar.gz
Resolving archive.apache.org… 135.181.172.217
Connecting to archive.apache.org|135.181.172.217|:443… connected.
HTTP request sent, awaiting response… 200 OK
Length: 262144000 (250M) [application/gzip]
Saving to: ‘apache-pulsar-4.0.3-bin.tar.gz’
apache-pulsar-4.0.3-bin.tar.gz 100%[===================>] 250.00M 25.5MB/s in 9.8s
2026-04-04 10:00:10 (25.5 MB/s) – ‘apache-pulsar-4.0.3-bin.tar.gz’ saved
# 验证下载文件
# ls -lh apache-pulsar-4.0.3-bin.tar.gz
-rw-r–r–. 1 root root 250M Apr 4 10:00 apache-pulsar-4.0.3-bin.tar.gz
4. 环境准备
在安装Pulsar之前,需要准备必要的系统环境和Java运行时。
系统要求
# cat /etc/redhat-release
Red Hat Enterprise Linux Server release 7.9 (Maipo)
# 检查系统架构
# uname -m
x86_64
# 检查内存和磁盘空间
# free -h
total used free shared buff/cache available
Mem: 31G 2.1G 28G 8.5M 1.0G 28G
Swap: 15G 0B 15G
# df -h /fgeudb
Filesystem Size Used Avail Use% Mounted on
/dev/sdb1 500G 20G 480G 4% /fgeudb
# 安装JDK 17或更高版本
# yum install -y java-17-openjdk java-17-openjdk-devel
# 验证Java版本
# java -version
# 输出示例如下:
openjdk version “17.0.8” 2023-07-18 LTS
OpenJDK Runtime Environment 21.9 (build 17.0.8+9-LTS)
OpenJDK 64-Bit Server VM 21.9 (build 17.0.8+9-LTS, mixed mode, sharing)
# 配置JAVA_HOME
# vi /etc/profile.d/java.sh
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
# 使环境变量生效
# source /etc/profile.d/java.sh
# 创建Pulsar用户和组
# groupadd -g 5017 pulsar
# useradd -g pulsar -u 5017 -d /home/pulsar -s /bin/bash pulsar
# 创建安装目录
# mkdir -p /fgeudb/pulsar/{data,logs}
# chown -R pulsar:pulsar /fgeudb/pulsar
5. TAR包安装实战
使用TAR包安装是最常见的方式,以下是详细步骤。学习交流加群风哥QQ113257174
步骤1:解压安装包
# cd /fgeudb/software
# tar -xzf apache-pulsar-4.0.3-bin.tar.gz -C /fgeudb/
# 输出示例如下:
# 重命名目录
# mv /fgeudb/apache-pulsar-4.0.3 /fgeudb/pulsar
# 查看目录结构
# ls -la /fgeudb/pulsar/
total 64
drwxr-xr-x. 8 pulsar pulsar 4096 Apr 4 10:05 .
drwxr-xr-x. 3 root root 4096 Apr 4 10:00 ..
drwxr-xr-x. 2 pulsar pulsar 4096 Apr 4 10:05 bin
drwxr-xr-x. 2 pulsar pulsar 4096 Apr 4 10:05 conf
drwxr-xr-x. 2 pulsar pulsar 4096 Apr 4 10:05 examples
drwxr-xr-x. 2 pulsar pulsar 4096 Apr 4 10:05 lib
drwxr-xr-x. 2 pulsar pulsar 4096 Apr 4 10:05 licenses
-rw-r–r–. 1 pulsar pulsar 5120 Apr 4 10:05 LICENSE
-rw-r–r–. 1 pulsar pulsar 1024 Apr 4 10:05 NOTICE
-rw-r–r–. 1 pulsar pulsar 2048 Apr 4 10:05 README.md
# 设置权限
# chown -R pulsar:pulsar /fgeudb/pulsar
步骤2:配置Pulsar
# vi /fgeudb/pulsar/conf/broker.conf
# Broker基础配置
brokerServicePort=6650
webServicePort=8080
clusterName=standalone
# 存储配置
managedLedgerDefaultEnsembleSize=1
managedLedgerDefaultWriteQuorum=1
managedLedgerDefaultAckQuorum=1
# BookKeeper配置
bookkeeperMetadataServiceUri=metadata-store:zk:localhost:2181/ledgers
# ZooKeeper配置
zookeeperServers=localhost:2181
configurationStoreServers=localhost:2181
# 保存配置文件
# 编辑BookKeeper配置文件
# vi /fgeudb/pulsar/conf/bookkeeper.conf
# BookKeeper配置
metadataServiceUri=metadata-store:zk:localhost:2181/ledgers
bookiePort=3181
journalDirectory=/fgeudb/pulsar/data/bookkeeper/journal
ledgerDirectories=/fgeudb/pulsar/data/bookkeeper/ledgers
# 保存配置文件
# 编辑ZooKeeper配置文件
# vi /fgeudb/pulsar/conf/zookeeper.conf
# ZooKeeper配置
clientPort=2181
dataDir=/fgeudb/pulsar/data/zookeeper
dataLogDir=/fgeudb/pulsar/data/zookeeper/log
# 保存配置文件
步骤3:启动Pulsar Standalone
# su – pulsar
# 启动Pulsar Standalone(包含ZooKeeper、BookKeeper、Broker)
$ cd /fgeudb/pulsar
$ ./bin/pulsar standalone
# 输出示例如下:
10:10:00.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Starting Pulsar standalone
10:10:01.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Starting ZooKeeper
10:10:02.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Starting BookKeeper
10:10:05.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Starting Broker
10:10:10.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Pulsar standalone started successfully
# 检查进程
$ ps -ef | grep pulsar
pulsar 12345 1 5 10:10 pts/0 00:00:10 /usr/lib/jvm/java-17-openjdk/bin/java -cp /fgeudb/pulsar/lib/* …
# 检查端口
$ netstat -tlnp | grep java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 12345/java
tcp 0 0 0.0.0.0:3181 0.0.0.0:* LISTEN 12345/java
tcp 0 0 0.0.0.0:6650 0.0.0.0:* LISTEN 12345/java
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 12345/java
步骤4:配置systemd服务
# vi /etc/systemd/system/pulsar.service
[Unit]
Description=Apache Pulsar
After=network.target
[Service]
Type=simple
User=pulsar
Group=pulsar
Environment=”JAVA_HOME=/usr/lib/jvm/java-17-openjdk”
Environment=”PULSAR_HOME=/fgeudb/pulsar”
ExecStart=/fgeudb/pulsar/bin/pulsar standalone
ExecStop=/fgeudb/pulsar/bin/pulsar-daemon stop standalone
Restart=on-failure
RestartSec=10
LimitNOFILE=65535
[Install]
WantedBy=multi-user.target
# 重新加载systemd
# systemctl daemon-reload
# 启动Pulsar服务
# systemctl start pulsar
# 设置开机自启
# systemctl enable pulsar
# 输出示例如下:
Created symlink from /etc/systemd/system/multi-user.target.wants/pulsar.service to /etc/systemd/system/pulsar.service.
# 检查服务状态
# systemctl status pulsar
● pulsar.service – Apache Pulsar
Loaded: loaded (/etc/systemd/system/pulsar.service; enabled)
Active: active (running) since Fri 2026-04-04 10:15:00 CST; 10s ago
Main PID: 12345 (java)
CGroup: /system.slice/pulsar.service
└─12345 /usr/lib/jvm/java-17-openjdk/bin/java -cp …
6. Docker容器安装实战
使用Docker容器安装Pulsar是最快捷的方式,适合快速测试和开发环境。更多学习教程公众号风哥教程itpux_com
步骤1:拉取Pulsar镜像
# docker pull apachepulsar/pulsar:4.0.3
# 输出示例如下:
4.0.3: Pulling from apachepulsar/pulsar
f1f26f570256: Pull complete
8f8e43ef9c3a: Pull complete
e9d8dca5b8a5: Pull complete
a1b8d4c4a5e6: Pull complete
Digest: sha256:abc123def456…
Status: Downloaded newer image for apachepulsar/pulsar:4.0.3
docker.io/apachepulsar/pulsar:4.0.3
# 查看镜像
# docker images | grep pulsar
apachepulsar/pulsar 4.0.3 abc123def456 2 weeks ago 800MB
步骤2:运行Pulsar容器
# mkdir -p /fgeudb/pulsar/{data,logs}
# 运行Pulsar容器
# docker run -d \
–name pulsar \
-p 6650:6650 \
-p 8080:8080 \
-v /fgeudb/pulsar/data:/pulsar/data \
-v /fgeudb/pulsar/logs:/pulsar/logs \
apachepulsar/pulsar:4.0.3 \
bin/pulsar standalone
# 输出示例如下:
abc123def456789…
# 检查容器状态
# docker ps | grep pulsar
abc123def456 apachepulsar/pulsar:4.0.3 “bin/pulsar standalo…” 10 seconds ago Up 9 seconds 0.0.0.0:6650->6650/tcp, 0.0.0.0:8080->8080/tcp pulsar
# 查看容器日志
# docker logs pulsar
# 输出示例如下:
10:20:00.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Starting Pulsar standalone
10:20:10.000 [main] INFO org.apache.pulsar.PulsarStandaloneStarter – Pulsar standalone started successfully
7. Pulsar配置优化
以下是生产环境的Pulsar配置优化建议。
Broker配置优化
# vi /fgeudb/pulsar/conf/broker.conf
# 基础配置
brokerServicePort=6650
webServicePort=8080
clusterName=production
# 存储配置
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
managedLedgerMaxEntriesPerLedger=50000
managedLedgerMinLedgerRolloverTimeMinutes=10
# 消息配置
defaultRetentionTimeInMinutes=0
defaultRetentionSizeInMB=0
maxMessageSize=5242880
# 性能配置
numHttpServerThreads=8
numIOThreads=8
numExecutorThreadPoolSize=8
# 内存配置
brokerServiceCompactionMonitorIntervalInSeconds=60
brokerServiceCompactionMinThreshold=0.3
# 保存配置文件
BookKeeper配置优化
# vi /fgeudb/pulsar/conf/bookkeeper.conf
# 基础配置
bookiePort=3181
metadataServiceUri=metadata-store:zk:192.168.1.51:2181,192.168.1.52:2181,192.168.1.53:2181/ledgers
# 存储配置
journalDirectory=/fgeudb/pulsar/data/bookkeeper/journal
ledgerDirectories=/fgeudb/pulsar/data/bookkeeper/ledgers
indexDirectories=/fgeudb/pulsar/data/bookkeeper/index
# 性能配置
journalSyncData=true
journalMaxGroupWaitMSec=1
numAddWorkerThreads=4
numReadWorkerThreads=4
numJournalCallbackThreads=4
# 内存配置
journalWriteBufferSizeKB=512
journalRemoveFromPageCache=true
# 保存配置文件
8. 安装验证与测试
安装完成后,需要进行验证测试确保Pulsar正常运行。
使用Pulsar Admin CLI
$ cd /fgeudb/pulsar
$ ./bin/pulsar-admin clusters list
# 输出示例如下:
standalone
# 查看集群信息
$ ./bin/pulsar-admin clusters get standalone
# 输出示例如下:
{
“serviceUrl” : “http://localhost:8080”,
“brokerServiceUrl” : “pulsar://localhost:6650”,
“peerClusterNames” : [ ]
}
# 查看租户列表
$ ./bin/pulsar-admin tenants list
# 输出示例如下:
“public”
“pulsar”
# 查看命名空间列表
$ ./bin/pulsar-admin namespaces list public
# 输出示例如下:
“public/default”
“public/functions”
创建Topic和发送消息
$ ./bin/pulsar-admin topics create persistent://public/default/test-topic
# 查看Topic列表
$ ./bin/pulsar-admin topics list public/default
# 输出示例如下:
“persistent://public/default/test-topic”
# 查看Topic信息
$ ./bin/pulsar-admin topics stats persistent://public/default/test-topic
# 输出示例如下:
{
“msgRateIn” : 0.0,
“msgThroughputIn” : 0.0,
“msgRateOut” : 0.0,
“msgThroughputOut” : 0.0,
“averageMsgSize” : 0.0,
“storageSize” : 0,
“publishers” : [ ],
“subscriptions” : { },
“replication” : { },
“deduplicationStatus” : “Disabled”
}
# 发送测试消息
$ ./bin/pulsar-client produce persistent://public/default/test-topic \
–messages “Hello Pulsar!” \
-n 10
# 输出示例如下:
10:30:00.000 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool – Messages sent successfully: 10
# 订阅并消费消息
$ ./bin/pulsar-client consume persistent://public/default/test-topic \
–subscription-name test-subscription \
–num-messages 10
# 输出示例如下:
10:30:01.000 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool – Messages received: 10
—– got message —–
key:[null], properties:[], content:Hello Pulsar!
REST API测试
$ curl http://192.168.1.51:8080/admin/v2/clusters
# 输出示例如下:
[“standalone”]
# 查看租户列表
$ curl http://192.168.1.51:8080/admin/v2/tenants
# 输出示例如下:
[“public”,”pulsar”]
# 查看命名空间列表
$ curl http://192.168.1.51:8080/admin/v2/namespaces/public
# 输出示例如下:
[“public/default”,”public/functions”]
# 查看Topic统计
$ curl http://192.168.1.51:8080/admin/v2/persistent/public/default/test-topic/stats
# 输出示例如下:
{
“msgRateIn” : 0.0,
“msgThroughputIn” : 0.0,
“msgRateOut” : 0.0,
“msgThroughputOut” : 0.0,
“averageMsgSize” : 0.0,
“storageSize” : 0,
“publishers” : [ ],
“subscriptions” : { },
“replication” : { }
}
使用Pulsar Manager
# docker pull apachepulsar/pulsar-manager:v0.4.0
# 运行Pulsar Manager容器
# docker run -d \
–name pulsar-manager \
-p 9527:9527 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:v0.4.0
# 输出示例如下:
ghi789jkl012mno…
# 浏览器访问Pulsar Manager
# URL: http://192.168.1.51:9527/
# 默认登录信息
# 用户名: pulsar
# 密码: pulsar
# Pulsar Manager功能:
# – 集群管理
# – 租户管理
# – 命名空间管理
# – Topic管理
# – 订阅管理
# – 消息查询
至此,Pulsar消息队列的下载和安装已完成。后续可以根据实际业务需求创建Topic、配置生产者和消费者。
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
