ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线(消息中间件)。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

java消息服务:

不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统A要把数据发送给系统B,这个问题我们应该如何去处理? 1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务–JMS规范(标准);常用的几种信息交互技术(httpClient、hessian、dubbo、jms、webservice 五种).

JMS概述:

JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。

ActiveMQ概述:

我们知道JMS只是消息服务的一组规范和接口,并没有具体的实现,而ActiveMQ就是JMS规范的具体实现;它是Apache下的一个项目,采用Java语言开发;是一款非常流行的开源消息服务器.

ActiveMQ与JMS关系:

我们知道,JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口而已;就像JDBC抽象了关系数据库访问、JPA抽象了对象与关系数据库映射、JNDI抽象了命名目录服务访问一样,JMS具体的实现由不同的消息中间件厂商提供,比如Apache ActiveMQ就是JMS规范的具体实现,Apache ActiveMQ才是一个消息服务系统,而JMS不是。

特点:

1
2
3
4
5
6
7
1、支持多种语言编写客户端 
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)

ActiveMQ集群

在生产环境中,ActiveMQ作为消息中间件,保证消息的安全、稳定和可靠必不可少,高可用集群有两个两种模式:Master-Slave和broker Cluster。

集群部署架构主要有三种模式,共享文件系统主从、JDBC主从和复制LevelDB存储(另外Pure Master-Slave 这种shared nothing 架构在5.8版本之后已经不支持了,主要原因是Master挂了之后再起来无法自动融入集群,需要停掉MQ重新启动才可以,本次讨论略过)。

Master-Slave

1. 共享文件系统主从(Shared File System Master Slave)

通过共享文件系统实现主从节点,解决单点问题。

Shared File System Master Slave

多个broker争夺共享文件锁,取到文件锁的成为master节点。默认格式为kahadb。

1
2
3
<persistenceAdapter>
<levelDB directory="/sharedFileSystem/sharedBrokerData"/></persistenceAdapter>
</persistenceAdapter>

2. JDBC主从(JDBC Master Slave)

通过数据库的持久化功能实现ActivMQ的主从架构。

JDBC Master Slave

原理与文件系统类似,通过竞争数据库锁,成为master节点。相比文件系统模式,稳定性更高,但是性能会较差。

3. 复制LevelDB存储(Replicated LevelDB Store)

Replicated LevelDB Store是通过Zookeeper来进行节点角色的调度和选择,通过LevelDB来实现消息的持久化,官方用Very fast来描述这种方案,在5.9+版本可以使用。推荐在已经存在Zookeeper集群的环境下使用。

Replicated LevelDB Store由于Zookeeper节点选举机制的需求,至少需要3个节点进行Broker Master节点的选举。每个Broker实例将消息数据保存在本地,本质上是Shared nothing的方式,它们之间并不共享任何数据。

当Broker启动时,它首先向zookeeper注册自己的信息(brokerName,消息日志的版本戳等),如果此时group中没有其他broker实例,并阻塞初始化过程,等到足够多的broker加入group;当brokers的数量达到“replicas的多数派"时,开始选举,选举将会根据“消息日志的版本戳”、“权重"的大小决定,即“版本戳”越大(数据最新)、权重越高的broker优先成为master,其他broker作为slave并跟随master。当一个broker成为master时,它会向zookeer注册自己的sync地址信息;此后slaves首先根据sync地址与master建立链接,并同步消息文件(download)。当足够多的slave数据同步结束后,master将初始化transportConnector,此后Client将可以与master进行数据交互。Activemq.xml配置范例如下:

1
2
3
4
5
6
7
8
9
10
11
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-group" brokerId="amq01" dataDirectory="${activemq.data}">  
<persistenceAdapter>
<replicatedLevelDB directory="${activemq.data}"
replicas="3"
bind="tcp://0.0.0.0:62621"
zkAddress="concar-zk01:2181"
zkSessionTmeout="30s"
zkPath="/activemq/leveldb-stores"
hostname="concar-amq01" />
</persistenceAdapter>
</broker>

Master-slaves集群中,所有的broker必须具有相同的brokerName,它作为group域来限定集群的成员,brokerId可以不同,它仅作为描述信息。“replicas”参数非常重要,默认为3,表示消息最多可以备份在几个broker实例上,同是只有当“replicas/2 + 1”个broker存活时(包含master),集群才有效,才会选举master和备份消息,此值必须>=2。Client发送给Master的持久化消息(包括ACK和事务),master首先在本地保存,然后立即同步(sync)给选定的(replicas/2)个slaves,只有当这些节点也同步成功后,此消息的交互才算结束;对于剩下的replicas个节点,master采用异步的方式(async)转发。这种设计要求,可以保证集群中消息的可靠性,只有当(replicas/2 + 1)个节点物理故障,才会有丢失消息的风险。通常replicas为3,这要求开发者需要至少部署3个broker实例。如果replicas过大,会严重影响master的吞吐能力,因为它在sync消息的过程中会消耗太多的时间。

如果集群故障,在重启broker实例时,建议首先查看每个broker中查看LevelDB日志文件的版本戳(文件名为16进制数字),并优先启动版本戳较大的实例。(因为replicas多数派的约束,随机重启也不会有太大的问题)。但是不得随意调小replicas的值,如果你确实需要修改,那就首先关闭集群,一定优先启动版本戳最大的broker。

尽管集群对zookeeper的操作并不是很多,但是我们还是希望不要接入负载过高的zookeeper集群,以免给消息服务引入不稳定因素。通常zookeeper集群至少需要3个实例,才能保证zookeeper本身的高可用性。

其中bind属性表示当此broker实例成为master时,开启一个socket地址,此后slave可以通过此地址与其同步数据。

我们还需要为Replicated LevelDB配置zookeeper的地址和path,其中path下用来存储group中所有broker的注册信息,此值在group中所有的broker上都要一样。“hostname”用来描述当前机器的核心信息,通常为机器IP。如果你在本机构建伪分布式,那么需要在系统hosts文件中使用转义。

对于Client端而言,仍然需要使用failover协议,而且协议中需要包含group中所有broker的链接地址。和其他模式一样,对于非持久化消息仍然只会保存在master上,当master失效后,消息将会丢失。

4. Broker Cluster

Master-slave架构主要是解决单个broker服务的高可用问题,Broker Cluster的目的是为了实现实现不同broker之间的分布式和复制均衡。Broker-Cluster部署方式中,各个broker通过网络互相连接,并共享queue,提供2种部署方式:static Broker-Cluster和Dynamic Broker-Cluster。

  • static Broker-Cluster
1
2
3
<networkConnectors> 
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
  • Dynamic Broker-Cluster

安装ActiveMQ集群

我们通常采用Activemq的JDBC架构实现高可用,Activemq的JDBC架构实施简便,易于管理,当主节点宕机时,备节点会获取数据库锁,成为主节点,保证服务的高可用,同时客户端采用failover协议去连接ActiveMQ消息服务

JDBC集群模式下,当节点启动时,会尝试获取database的表级排他锁,获取到排他锁的节点为Master节点,其他节点为Slave节点,同时消息的持久化也采用DB方式进行存储,需要注意的是:“Shared storage”模式下只会共享持久化类型消息,对于非持久化消息将不能从从中收益,它们不会在Slaves中备份。通常,我们在“Shared storage”模式中,额外的配置一个插件,强制将所有消息转换成持久化类型,这样所有的消息都可以在故障恢复之后不会丢失。JDBC Store相对于文件系统来而言,通常认为是低效的,尽管数据的可见性较好,但是database的扩容能力非常的弱,无法良好的适应在高并发、大数据情况下(严格来说,单组M-S架构是无法支持大数据的),况且ActiveMQ的消息通常存储时间较短,频繁的写入,频繁的删除,都是性能的影响点。我们通常在研究activeMQ的存储原理时使用JDBC Store,或者在中小型应用中对数据一致性(可靠性,可见性)要求较高的环境中使用:比如订单系统中交易流程支撑系统等。

1. 安装ActiveMQ

主机名 IP 角色 安装服务 端口
centos-vm2 192.168.134.112 - MySQL 3306
centos-vm3 192.168.134.113 master openjdk/activemq 8161,61616
centos-vm4 192.168.134.114 slave openjdk/activemq 8161,61616

软件版本

  • ActiveMQ版本:5.15.8
  • jre/jdk:>= 1.7
  • mysql: 5.5.60

端口解析:

  • 8161:ActiveMQ Web控制台端口
  • 61616:ActiveMQ服务端口

1.1 Java安装(略,见cassandra文档)

假定java家目录/opt/java/jdk1.8.0_202

1.2 下载安装ActiveMQ

二进制包下载地址

解压安装

1
tar -zxvf apache-activemq-5.15.8-bin.tar.gz -C /opt/

2. 配置ActiveMQ

2.1 配置/opt/apache-activemq-5.15.8/conf/activemq.xml

  • 修改brokerName
1
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="centos-vm3" dataDirectory="${activemq.data}" useJmx="false">
  • 注释掉原,添加MySQL的JDBC数据持久化接口
1
2
3
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds" createTablesOnStartup="true" useDatabaseLock="true"/>
</persistenceAdapter>

并在后添加MySQL连接配置,id与persistenceAdapter内的dataSource="#id"要对应

1
2
3
4
5
6
7
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://centos-vm2:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="active@mq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

MySQL创建activemq对应的用户及数据库

1
2
3
CREATE USER activemq@'%' IDENTIFIED BY 'active@mq';
CREATE DATABASE activemq;
GRANT ALL on activemq.* to activemq@'%';

2.2 配置/opt/apache-activemq-5.15.8/conf/jetty.xml及jetty-realm.properties

修改atcivemq master节点web监听端口

1
2
3
4
5
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>

修改默认登录角色及密码

1
2
admin: active@mq, admin
user: userpassword, user

3. 安装mysql-connector-java.jar

下载地址

解压后,将目录内的mysql-connector-java-5.1.47-bin.jar拷贝至/opt/apache-activemq-5.15.8/lib/目录下即可

4. 启动ActiveMQ

1
/opt/apache-activemq-5.15.8/bin/linux-x86-64/activemq start

支持停止及重启

1
/opt/apache-activemq-5.15.8/bin/linux-x86-64/activemq stop/restart

5. 添加systemd支持

/usr/lib/systemd/system/activemq.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Unit]
Description=activemq message queue
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/opt/java/jdk1.8.0_202

ExecStart=/opt/apache-activemq-5.15.8/bin/linux-x86-64/activemq start
ExecReload=/opt/apache-activemq-5.15.8/bin/linux-x86-64/activemq restart
ExecStop=/opt/apache-activemq-5.15.8/bin/linux-x86-64/activemq stop
PrivateTmp=true

[Install]
WantedBy=multi-user.target

重载及服务控制

1
2
systemctl daemon-reload
systemctl start/stop/reload activemq