Apache RocketMQ是由阿里巴巴开源的可支撑万亿级数据洪峰的分布式消息和流计算平台,于2016年捐赠给Apache Software Foundation,2017年9月25日成为Apache 顶级项目。由于其高稳定性、低延时、高吞吐量等特点,被大规模应用于金融、互联网、物流公司的核心交易支付、实时位置追踪、大数据分析等场景,同时也被电力、交通、汽车、零售等十几个行业的数万家企业广泛使用,是企业数字化转型的核心基础性软件。
RocketMQ的特性:
- 消息模式包括发布/订阅,请求/答复和流式传输
- 财务级交易消息
- 基于DLedger的内置容错和高可用性配置选项
- 各种跨语言客户端,例如Java,C / C ++,Python,Go
- 可插拔的传输协议,例如TCP,SSL,AIO
- 内置消息跟踪功能,还支持开放式跟踪
- 多功能的大数据和流生态系统集成
- 按时间或偏移量追溯消息
- 可靠的FIFO和严格有序的消息传递在同一队列中
- 高效的推拉消费模型
- 单个队列中的百万级消息累积容量
- 多种消息传递协议,例如JMS和OpenMessaging
- 灵活的分布式横向扩展部署架构
- 快如闪电的批量消息交换系统
- 各种消息过滤器机制,例如SQL和Tag
- 用于隔离测试和云隔离群集的Docker映像
- 功能丰富的管理仪表板,用于配置,指标和监视
- 认证与授权
- 免费的开源连接器,适用于源和接收器
RocketMQ角色介绍
RocketMQ由四部分构成:Producer、Consumer、Broker和NameServer
启动顺序:NameServer->Broker
为了消除单点故障,增加可靠性或增大吞吐量,可以在多台机器上部署多个nameserver和broker,并且为每个broker部署1个或多个slave,rocketmq架构图如图1.1所示。

Topic & message queue:一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的 Topic 名称来区分。所以发送和接收消息前,先创建topic,针对某个 Topic 发送和接收消息。有了 Topic 以后,还需要解决性能问题 。 如果一个Topic 要发送和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue, Message Queue 类似分区或 Partition 。Topic有了多个 Message Queue 后,消息可以并行地向各个Message Queue 发送,消费者也可以并行地从多个 Message Queue 读取消息并消费 。
名称服集群务 NameServer cluster
NameServer服务提供了轻量级的服务发现和路由。每个NameServer服务记录完整的路由信息,提供一致的读写服务,支持快速存储扩展
代理服务集群 Broker Cluster
Broker通过提供轻量级主题和队列机制来处理消息存储。它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),提供了极强的峰值处理里能力和按照时间顺序存储数以百万记的消息存储能力,此外,代理提供了灾难恢复、丰富的度量统计和警报机制,这些都是在传统的消息传递系统中缺乏的
生产者集群 Producer Cluster
produce支持分布式部署,分布式的produce通过broker集群提供的各种负载均衡策略将消息发送到broker集群中。发送过程支持快速失败是低延迟的。
消费者集群 Consumer Cluster
消费者也支持在推送或者拉取模式下分布式部署,它还支持集群消费和消息广播。提供实时的消息订阅机制,能够满足大多数消费者的需求。RocketMQ的网站为感兴趣的用户提供了一个简单的快速入门指南。
名称服务NameServer
NameServer是一个功能齐全的服务器,主要包括两个功能:
⊙broker 管理,nameserver 接受来自broker集群的注册信息并提供心跳来检测他们是否可用。
⊙路由管理,每一个nameserver都持有关于broker集群和队列的全部路由信息,用来向客户端提供查询。
我们知道 ,rocketMQ客户端(生产者/消费者)会从nameserver查询队列的路由信息,客户端是如何知道nameserver的地址的呢?
有四种方式能够让客户端获取到nameserver的地址:
⊙通过程序,像这样producer.setNamesrvAddr(“ip:port”)
⊙java 配置项,这么用rocketmq.namesrv.addr
⊙环境变量 NAMESRV_ADDR
⊙HTTP 端点
代理服务 broker server
broker server负责消息的存储传递,消息查询,保证高可用等等。
像下图所示,broker server有一些非常重要的子模块:
⊙remoting(远程) 模块,broker的入口,处理从客户端发起的请求。
⊙client manager(客户端管理),管理各个客户端(生产者/消费者)还有维护消费者主题订阅。
⊙store(存储服务),提供简单的api来在磁盘保持或者查询消息。
⊙HA 高可用服务,提供主从broker的数据同步。
⊙index(索引服务),为消息建立索引提供消息快速查询。
集群部署方式
单Master模式
只有一个 Master节点
优点:配置简单,方便部署
缺点:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
多Master模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
多Master多Slave模式(异步复制)
每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
多Master多Slave模式(同步双写)
每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
RocketMQ双主双从异步集群部署
1、预装环境:
Linux / Unix / Mac;
64位JDK 1.8+;
Maven 3.2.x;
Git;
2、集群规划
这里演示使用两台来做,生产可使用四台。
IP | 服务名 | 端口 |
10.0.0.6 | rocketmq-nameserver-1,Broker-a-master,Broker-b-slave | 9876;10911;10921 |
10.0.0.16 | rocketmq-nameserver-2,Broker-b-master,Broker-a-slave | 9876;10911;10921 |
3、下载rocketmq二进制包
cd /opt wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip unzip rocketmq-all-4.7.0-bin-release.zip && mv rocketmq-all-4.7.0-bin-release rocketmq
4、创建存储路径
rocketmq-nameserver-1: mkdir -p /data/rocketmq/store/{rootdir-a-m,commitlog-a-m,rootdir-b-s,commitlog-b-s} rocketmq-nameserver-2: mkdir -p /data/rocketmq/store/{rootdir-b-m,commitlog-b-m,rootdir-a-s,commitlog-a-s}
10.0.0.16 rocketmq-nameserver-2
修改/etc/profile,加入
export ROCKETMQ_HOME=/opt/rocketmq export PATH=$ROCKETMQ_HOME/bin:$PATH source /etc/profile
6、修改配置文件
双主双从异步复制,默认的rocketmq已经为我们配置了相应配置目录
cd /opt/rocketmq/conf/2m-2s-async
操作:10.0.0.6 rocketmq-nameserver-1 角色:broker-a-master & broker-b-slave
vim broker-a.properties
#所属集群名字 brokerClusterName=rocket-mq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=rocketmq-nameserver-1 #brokerId 0 表示 Master,>0 表示 Slave brokerId=0 # Broker 对外服务的监听端口 listenPort=10911 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=72 #Broker role有3种:SYNC MASTER、ASYNC MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制, SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫秒级。 brokerRole=ASYNC_MASTER # 刷盘方式 ASYNC_FLUSH 异步刷盘 flushDiskType=ASYNC_FLUSH #存储路径 storePathRootDir=/data/rocketmq/store/rootdir-a-m storePathCommitLog=/data/rocketmq/store/commitlog-a-m # 是否允许 Broker 自动创建Topic autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组 autoCreateSubscriptionGroup=true
若多网卡环境可指定ip
brokerIP1=10.0.0.6
vim broker-b-s.properties
brokerClusterName=rocket-mq-cluster brokerName=rocketmq-nameserver-2 listenPort=10921 namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876 brokerId=1 deleteWhen=04 fileReservedTime=72 brokerRole=SLAVE storePathRootDir=/data/rocketmq/store/rootdir-b-s storePathCommitLog=/data/rocketmq/store/commitlog-b-s autoCreateTopicEnable=true autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
操作 :10.0.0.16 rocketmq-nameserver-2 角色:broker-b-master & broker-a-slave
vim broker-b.properties
brokerClusterName=sns-rocket-mq-cluster brokerName=rocketmq-nameserver-2 brokerId=0 listenPort=10911 namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876 deleteWhen=04 fileReservedTime=72 brokerRole=ASYNC_MASTER storePathRootDir=/data/rocketmq/store/rootdir-b-m storePathCommitLog=/data/rocketmq/store/commitlog-b-m autoCreateTopicEnable=true autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
vim broker-a-s.properties
brokerClusterName=rocket-mq-cluster brokerName=rocketmq-nameserver-1 listenPort=10921 namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876 brokerId=1 deleteWhen=04 fileReservedTime=72 brokerRole=SLAVE storePathRootDir=/data/rocketmq/store/rootdir-a-s storePathCommitLog=/data/rocketmq/store/commitlog-a-s autoCreateTopicEnable=true autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
修改日志配置文件(两台一样)
mkdir -p /opt/rocketmq/logs cd /opt/rocketmq/conf sed -i 's#${user.home}#/opt/rocketmq#g' *.xml
7、修改启动脚本参数
继续国际惯例,修改之前先备份,两台服务器相同操作
vim /opt/rocketmq/bin/runbroker.sh
调一下JVM,包括nameserver 和 broker。限于自己机器的配置,参数调小一下。但Rocketmq最少的堆是1g,否则无法启动。两台机器执行相同的操作。
JAVA_OPT=”${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g”
8 、服务启动
要先启动namerserver,再启broker,两台机器执行相同的操作
两台启动namerserver
nohup sh /opt/rocketmq/bin/mqnamesrv > /data/rocketmq/store/mqnamesrv.log 2>&1 &
10.0.0.6 rocketmq-nameserver-1 上启动broker_a_master和broker_b_slave
nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a.properties > /data/rocketmq/store/broker-a-m.log 2>&1 & nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b-s.properties > /data/rocketmq/store/broker-b-s.log 2>&1 &
10.0.0.16 rocketmq-nameserver-2 上启动broker_b_master和broker_a_slave
nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b.properties > /data/rocketmq/store/broker-b-m.log 2>&1 & nohup sh /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a-s.properties > /data/rocketmq/store/broker-a-s.log 2>&1 &
都启动完毕之后可以jps看一下
[[email protected] ~]# jps 76609 Jps 75925 BrokerStartup 119063 rocketmq-console-ng-1.0.1.jar 77467 BrokerStartup 71407 NamesrvStartup
9、服务关闭
关闭nameserver: /opt/rocketmq/bin/mqshutdown namesrv 关闭broker: /opt/rocketmq/bin/mqshutdown broker
10、部署rocketmq-console
随便在其中一台部署即可
git clone https://github.com/apache/rocketmq-externals cd rocketmq-externals/rocketmq-console/ mvn clean package -Dmaven.test.skip=true
你会发现编译报错,需要修改pom.xml文件注释掉maven-checkstyle-plugin这个插件
<!–
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<excludes>src/main/resources</excludes>
<configLocation>style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
–>
启动服务
nohup java -jar /opt/rocketmq-console-ng-1.0.1.jar --server.port=8080 '--rocketmq.config.namesrvAddr=10.0.0.6:9876;10.0.0.16:9876' > /dev/null 2>&1 &
访问web控制台:http://ip:8080