简介
ActiveMQ 特点
ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。 它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:
- 支持包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种语言的客户端和协议。协议包含 OpenWire、Stomp、AMQP、MQTT 。
- 提供了像消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化之类的高级特性
- 完全支持 JMS 1.1 和 J2EE 1.4规范(包括持久化、分布式事务消息、事务)
- 对 Spring 框架的支持,ActiveMQ 可以通过 Spring 的配置文件方式很容易嵌入到 Spring 应用中
- 通过了常见的 J2EE 服务器测试,比如 TomEE、Geronimo、JBoss、GlassFish、WebLogic
- 连接方式的多样化,ActiveMQ 提供了多种连接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA
- 支持通过使用 JDBC 和 journal 实现消息的快速持久化
- 为高性能集群、客户端-服务器、点对点通信等场景而设计
- 提供了技术和语言中立的 REST API 接口
- 支持 Ajax 方式调用 ActiveMQ
- ActiveMQ 可以轻松地与 CXF、Axis 等 Web Service 技术整合,以提供可靠的消息传递
- 可用作为内存中的 JMS 提供者,非常适合 JMS 单元测试
基本概念
因为 ActiveMQ 是完整支持 JMS 1.1 的,所以从 Java 使用者的角度其基本概念与 JMS 1.1 规范是一致的。
消息传送模型
-
点对点模型(Point to Point) 使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
-
发布订阅模型(Pub/Sub) 使用主题作为消息通信载体,类似于广播模式,发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
基本组件
ActiveMQ 使用时包含的基本组件各与 JMS 是相同的:
- Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。
- Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。
- Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。
- Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。
- Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。
- Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。
由于这些概念在 JMS 中已介绍过,这里不再详细介绍。
连接器
ActiveMQ Broker 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为:
<schema name>:<hierarchical part>[?<query>][#<fragment>]
schema name 表示协议,
例如:foo://username:[email protected]:8042/over/there/index.dtb?type=animal&name=narwhal#nose
其中 schema name 部分是 foo,hierarchical part 是 username:[email protected]:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。
- 传输连接器 为了交换消息,消息生产者和消息消费者(统称为客户端)都需要连接到消息代理服务器,这种客户端和消息代理服务器之间的通信就是通过传输连接器(Transport connectors)完成的。很多情况下用户连接消息代理时的需求侧重点不同,有的更关注性能,有的更注重安全性,因此 ActiveMQ 提供了一系列l连接协议供选择,来覆盖这些使用场景。从消息代理的角度看,传输连接器就是用来处理和监听客户端连接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),传输连接的相关配置如下:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
传输连接器定义在
<transportConnectors>
元素中,一个
<transportConnector>
元素定义一个特定的连接器,一个连接器必须有自己唯一的名字和 URI 属性,但
discoveryUri
属性是可选的。目前在 ActiveMQ 最新的5.15版本中常用的传输连接器连接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等
- vm,允许客户端和消息服务器直接在 VM 内部通信,采用的连接不是 Socket 连接,而是直接的虚拟机本地方法调用,从而避免网络传输的开销。应用场景仅限于服务器和客户端在同一 JVM 中。
- tcp,客户端通过 TCP 连接到远程的消息服务器。
- udp,客户端通过 UDP 连接到远程的消息服务器。
- multicast,允许使用组播传输的方式连接到消息服务器。
- nio,nio 和 tcp 的作用是一样的,只不过 nio 使用了 java 的 NIO包,这可能在某些场景下可提供更好的性能。
- ssl,ssl 允许用户在 TCP 的基础上使用 SSL 。
- http 和 https,允许客户端使用 REST 或 Ajax 的方式进行连接,这意味着可以直接使用 Javascript 向 ActiveMQ 发送消息。
- websocket,允许客户端通过 HTML5 中的 WebSocket 方式连接到消息服务器。
- amqp,5.8版本开始支持。
- mqtt、stomp,5.6版本开始支持。
每个协议的具体配置见官网(http://activemq.apache.org/uri-protocols.html )。除了以上这些基本协议之外 ActiveMQ 还支持一些高级协议也可以通过 URI 的方式进行配置,比如 Failover 和 Fanout 。
- Failover 是一种重新连接的机制,工作于上面介绍的连接协议的上层,用于建立可靠的传输。其配置语法允许制定任意多个复合的 URI ,它会自动选择其中的一个 URI 来尝试建立连接,如果该连接没有成功,则会继续选择其它的 URI 来尝试。配置语法例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
- Fanout 是一种重新连接和复制的机制,它也工作于其它连接的上层,采用复制的方式把消息复制到多个消息服务器。配置语法例如:fanout:(tcp://localhost:61629,tcp://localhost:61639,tcp://localhost:61649)
我们在第2章使用 ActiveMQ 的 JMS 例子中使用的默认连接地址
ActiveMQConnection.DEFAULT_BROKER_URL
,实际上它的值就是
failover://tcp://localhost:61616
,所以客户端默认使用的是基于 TCP 的 failover 连接协议。
- 网络连接器 很多情况下,我们要处理的数据可能是海量的,这种场景单台服务器很难支撑,这就要用到集群功能,为此 ActiveMQ 提供了网络连接的模式,简单说就是通过把多个消息服务器实例连接在一起作为一个整体对外提供服务,从而提高整体对外的消息服务能力。通过这种方式连接在一起的服务器实例之间可共享队列和消费者列表,从而达到分布式队列的目的,网络连接器就是用来配置服务器之间的通信。
如图所示,服务器 S1 和 S2 通过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 都可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也可以接收到。要使用网络连接器的功能需要在服务器 S1 的 activemq.xml 中的 broker 节点下添加如下配置(假设192.168.11.23:61617 为 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
如果只是这样,S1 可以将消息发送到 S2,但这只是单方向的通信,发送到 S2 上的的消息还不能发送到 S1 上。如果想 S1 也收到从 S2 发来的消息需要在 S2 的 activemq.xml 中的 broker 节点下也添加如下配置(假设192.168.11.45:61617为 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
这样,S1和S2就可以双向通信了。目前在 ActiveMQ 最新的5.15版本中常用的网络连接器协议有 static 和 multicast 两种。
-
static,静态协议,用于为一个网络中多个代理创建静态配置,这种配置协议支持复合的 URI (即包含其他 URI 的 URI)。例如
static://(tcp://ip:61616,tcp://ip2:61616)
- multicast,多点传送协议,消息服务器会广播自己的服务,也会定位其他代理。这种方式用于服务器之间实现动态识别,而不是配置静态的 IP 组。
对这块感兴趣的话可以看官方文档:http://activemq.apache.org/networks-of-brokers.html
消息存储
JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息常用于发送通知或实时数据,当你比较看重系统性能并且即使丢失一些消息并不影响业务正常运作时可选择非持久化消息。持久化消息被发送到消息服务器后如果当前消息的消费者并没有运行则该消息继续存在,只有等到消息被处理并被消息消费者确认之后,消息才会从消息服务器中删除。
对以上这两种方式 ActiveMQ 都支持,并且还支持通过缓存在内存中的中间状态消息的方式来恢复消息。概括起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,类似于消息的多点传播,主要实现了如下几种:
- AMQ,是 ActiveMQ 5.0及以前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。 在此方案下消息本身以日志的形式实现持久化,存放在 Data Log 里。并且还对日志里的消息做了引用索引,方便快速取回消息。
- KahaDB,也是一种基于文件并具有支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性。
- JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来说比较慢,所以 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提高了性能。
- 内存存储,是指将所有要持久化的消息放到内存中,因为这里没有动态的缓存,所以需要注意设置消息服务器的 JVM 和内存大小。
- LevelDB,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替常用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式还是 KahaDB,但是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 Zookeeper 的数据复制方式,作为 Master-Slave 方式的首选数据复制方案。
工程实例
Java 访问 ActiveMQ 实例
在第2章介绍 JMS 时就以 ActiveMQ 为例介绍过 Java 访问代码,不过那个例子中是基于队列(Queue)模式传递消息的,我们知道 JMS 规范中传递消息的方式有两种,一种是点对点模型的队列(Queue)方式,另一种是发布订阅模型的主题(Topic)方式。下面看下用 ActiveMQ 以主题方式传递消息的 Java 示例。
引入依赖
Java 工程中需要引入 ActiveMQ 包的依赖,jar 包版本同你安装 ActiveMQ 版本一致即可:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
消息生产者
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicPublisher {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建 Topic,用作消费者订阅消息
Topic myTestTopic = session.createTopic("activemq-topic-test1");
//消息生产者
MessageProducer producer = session.createProducer(myTestTopic);
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("发送消息 " + i);
producer.send(myTestTopic, message);
}
//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在 Topic 模式中消息生产者是用于发布消息的,绝大部分代码与 Queue 模式中相似,不同的是本例中基于 Session 创建的是主题(Topic),该主题作为消费者消费消息的目的地。
消息消费者
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicSubscriber {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false