01、ActiveMQ的传输协议
参考官方文档:http://activemq.apache.org/configuring-version-5-transports.html
概述
ActiveMQ支持的client-broker(客户端-MQ服务器)通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM,其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml 中的<transportConnectors> 标签之内,如下:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如
- 描述amqp协议的监听端口时,采用的URI描述格式为
amqp://······ ; - 描述Stomp协议的监听端口时,采用URI描述格式为
stomp://······ ; - 唯独
openwire协议 描述时,URI头却采用的tcp://······ ,这是因为ActiveMQ中默认的消息协议就是openwire
支持的传输协议
注意:
-
除了tcp 和nio 协议,其他的了解就行。 -
各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。 -
mqtt是物联网专用协议 ,采用的中间件一般是mosquito。 -
ws是websocket的协议 ,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。 -
stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。
协议不同,我们的代码都会不同。
ActiveMQ支持的网络协议:
协议 | 描述 |
---|
TCP | 默认的协议,性能相对不错 | NIO | 基于TCP协议之上的,进行了拓展和优化,具有更好的拓展性 | UDP | 性能比TCP好,但是不具有可靠性 | SSL | 安全连接 | HTTPS | 基于HTTP或者HTTPS | VM | VM本身不是协议,当客户端和代理在同一个java虚拟机(VM)中运行时,他们之间想要通信,不想占用网络通道,而是直接通信,就可以采用这种方式 |
TCP协议
(1) Transmission Control Protocol(TCP)是默认的,TCP的Client监听端口61616
(2) 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol 的来序列化成字节流。
(3) TCP连接的URI形式如:tcp://HostName:port?key=value&key=value ,后面的参数是可选的。
(4) TCP传输的的优点:
- TCP协议传输可靠性高,稳定性强
- 高效率:字节流方式传递,效率很高
- 有效性、可用性:应用广泛,支持任何平台
(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference
NIO协议
(1) New I/O API Protocol(NIO)
(2) NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
(3) 适合使用NIO协议的场景:
- 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
- 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
(4) NIO连接的URI形式如:nio://hostname:port?key=value&key=value
(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/nio-transport-reference
AMQP协议(了解)
官网地址:http://activemq.apache.org/amqp
STOMP协议(了解)
官网地址:http://activemq.apache.org/stomp
MQTT协议(了解)
官网地址:http://activemq.apache.org/mqtt
NIO协议案例
关于TCP协议的案例我们前面已经使用很多了,就不在赘述了,这里看看NIO协议是怎么使用的
- BIO:同步阻塞IO
- BOI:同步非阻塞IO,新型的IO
- AIO:异步非阻塞IO
ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型,只有当我们指定使用nio才使用NIO的IO模型。
修改配置文件activemq.xml
如果不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络的IO模型(OpenWire,STOMP,AMQP等),所以为了提高单个ActiveMQ服务器的吞吐性能,我们需要明确指定ActiveMQ的网络IO模型为NIO,NIO是以TCP协议为基础的NIO网络IO模型 。
生产者代码
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
消费者代码
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消费者接收到的消息: " + textMessage.getText());
} catch (Exception e) {
System.out.println("出现异常,消费失败,放弃消费");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
NIO协议案例增强
上面是Openwire协议传输底层使用NIO网络IO模型 。 如何让其他协议传输底层也使用NIO网络IO模型呢?
URI格式以nio 开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型,前面的设置方式只能使一个端口支持OpenWire协议 ,就是616168端口支持NIO 。
如何让一个端口支持NIO网络模型,又让它支持多个协议?
也就是无论使用哪种协议都可以访问这个端口,而且这个端口还是NIO形式的 注意:前面的都是一个端口只支持特定的一种协议,如果协议名称和端口号对应不上就无法访问 。
解决方案:
使用auto关键字
使用+符号来为端口设置多种特性
修改配置文件activemq.xml
修改配置文件activemq.xml在 <transportConnectors> 节点下添加如下内容: ><transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
- auto : 针对所有的协议,它会识别我们是什么协议。
- nio :使用NIO网络IO模型。
- 修改配置文件后重启activemq
生产者代码
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
消费者代码
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
02、ActiveMQ的消息存储和持久化
官网文档地址:http://activemq.apache.org/persistence
概述
(1) 此处持久化和之前的持久化的区别
- MQ高可用:
事务 、可持久 、签收 ,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。
(2) 持久化是什么?
持久化是什么?
- 一句话就是:ActiveMQ宕机了,消息不会丢失的机制。
说明:
- 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
- ActiveMQ的消息持久化机制有
JDBC ,AMQ ,KahaDB 和LevelDB ,无论使用哪种持久化方式,消息的存储逻辑都是一致的,就是在发送者将消息发送出去后,消息中心首先 将消息存储到本地数据文件、内存数据库或者远程数据库等,再 试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。 - 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
持久化的方式有哪些
(1) AMQ Message Store
- 基于文件的存储机制,是以前的默认机制,现在不再使用。
- AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点,消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。
- AMQ适用于ActiveMQ5.3之前的版本
(2) kahaDB
(3) JDBC消息存储
(4) LevelDB消息存储
(5) JDBC Message Store with ActiveMQ Journal
kahaDB消息存储
概述
基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。
官网文档:http://activemq.apache.org/kahadb ,官网上还有一些其他配置参数。
在配置文件activemq.xml中可以进行配置,如下:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
日志文件的存储目录在:%activemq安装目录%/data/kahadb
说明
- KahaDB是默认的存储方式,可用于任何场景吗,提高了性能和恢复能力。
- 消息存储使用一个事务日志和一个索引文件来存储它所有的地址。
- KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化
- 数据被追加到data logs中,当不在需要log文件中数据的时候,log文件会被丢弃。
KahaDB的存储原理
KahaDB在消息保存的目录中只有4类文件 和一个lock ,根ActiveMQ其他几种文件存储引擎相比这就非常简洁了
db-<num>.log ,KahaDB存储消息到预定义大小的数据记录文件中,文件名字为db-<num>.log ,当数据文件已满时,一个新的文件才会被创建,num的数值也会随之递增,当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。db.data文件 ,包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree,使用B-Tree作为索引指向db-<num>.log 里面存储的消息。db.free文件 ,当前db.data文件里那些页面是空闲的,文件具体内容是所有空闲页的ID。db.redo文件 ,用来进行消息回复,如果KahaDB消息存储在强制退出后启动,用于恢复B-Tree索引。lock文件锁 ,表示当前获取读写权限的broker
JDBC消息存储
一句话:MQ+MySQL
MQ+MySQL
添加mysql数据库的驱动包到lib文件夹
把mysql数据库的驱动包添加到:%ActiveMQ安装位置%/lib
jdbcPersistenceAdapter配置
修改ActiveMQ的配置文件activemq.xml :
-
修改前: <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-
修改后 <persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
dataSource指定将要引用的持久化数据库的bean的名称
createTablesOnStartup是否在启动的时候创建数据库表,默认是true,
这样每次启动都会去创建一个数据库表,一般是第一次启动的时候设置为true之后改为false
数据库连接池配置
需要我们准备一个mysql数据库,并创建一个名为activemq的数据库,然后将其配置在ActiveMQ的配置文件中:activemq.xml 中配置
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://IP:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="mysql数据库用户名"/>
<property name="password" value="mysql数据库密码"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
注意:新建的数据库要采用latin1编码 或者ASCII编码 。
默认是的dbcp数据库连接池 ,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。
创建SQL数据库和表
创建数据库的语句如下,指定编码方式:
create database activemq default character set latin1;
重启activemq,会自动生成如下3张表 ,如果没有自动生成,需要我们手动执行SQL生成这三张表。
-
ACTIVEMQ_MSGS数据表 :queue和topic都存在里面 -
ACTIVEMQ_ACKS数据表 :存储持久订阅的消息和最后一个持久订阅接收的消息ID -
ACTIVEMQ_LOCK数据表 :集群环境中才用,只有一个broker可以获得消息,称为Master broker,其它的只能最为备份等待Master Broker不可用,才能称为下一个Master Broker,这个表用于记录那个Broker是当前的Master Broker
这三张表一般是能够自动生成的,如果没有生成可以查看ActiveMQ的启动日志解决问题,生成的三张表ER图如下:
queue类型验证和数据表变化
queue类型:点对点的类型
- 当DeliveryMode设置为NON_PERSISTENCE时,消息不会被持久化,只会被保存在内存中。
- 当DeliveryMode设置为PERSISTENCE时,消息保存在Broker响应的文件或者数据库中
注意:点对点类型的消息一旦被消费,就会从Broker中删除。
举例:
- 我们使用queue模式持久化(
DeliveryMode设置为PERSISTENCE )生产3条消息后,会发现ACTIVEMQ_MSGS数据表多了3条数据。 - 启动消费者,消费了所有的消息后,发现数据表的数据消失了。
总结:
- queue模式:非持久化不会将消息持久化到数据库。
- queue模式:持久化会将消息持久化数据库。
代码
public class JmsProduce_persistence {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String QUEUE_NAME= "queue_persistence";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("queue_name--" + i);
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** QUEUE_NAME消息发送到MQ完成 ****");
}
}
public class JmsConsummer_persistence {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String QUEUE_NAME = "queue_persistence";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("AISMALL_QUEUE01");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue=session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
while(true){
TextMessage message = (TextMessage)messageConsumer.receive();
if (null != message){
System.out.println("****消费者的消息:"+message.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}
topic类型验证和数据表变化
注意:
- 要先启动一下持久化topic的消费者,可以看到
ACTIVEMQ_ACKS 数据表会多了一条消息 ACTIVEMQ_ACKS数据表 ,多了一个消费者的身份信息,一条记录代表一个持久化topic的消费者
举例:
-
我们启动持久化生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据。 -
消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失 。 -
持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在 ,且只存储一条。 -
注意:持久化的topic大量存储后可能导致性能下降,这里就像公众号一样,消费者消费完后,消息还会保留
代码
public class JmsProduce_persistence {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String TOPIC_NAME = "topic_persistence";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****");
}
}
public class JmsConsummer_persistence {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String TOPIC_NAME = "topic_persistence";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("AISMALL_TOPIC01");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
connection.start();
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
总结
如果是queue
在没有被消费者消费的前提下会将消息保存到activate_msgs表中,只要被任意一个消费者消费,这些消息就会被删除。
如果是topic
一般是先启动消费者订阅然后在生产的情况下会将消息保存到activemq_acks表中
开发中需要注意的点 ,在配置关系型数据库作为ActiveMQ的持久化方案时:
JDBC Message Store with ActiveMQ Journal
概述
这种方式克服了JDBC Store 的不足,JDBC每次消息过来,都需要去写库读库,ActiveMQ Journal使用高速缓存写入技术,大大提高了性能。
当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:
- 生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。
- 如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储 ,先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库,该方式要比纯JDBC性能要高。
配置
修改ActiveMQ的配置文件activemq.xml,基于上面JDBC配置,再做一点修改:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
总结
1、jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
2、持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
3、持久化机制演变的过程:
- 从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。
- ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),
- 后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
4、ActiveMQ消息持久化机制有
AQM | 基于日志文件 |
---|
KahaDB | 基于日志文件,从ActiveMQ5.4 | 开始默认使用 | | JDBC | 基于第三方数据库,例如MySQL | Replicated LevelDB Store | 从5.9开始提供了LevelDB和zookeeper的数据复制方法,用于Master-Slave方式的首选数据复制方案 |
|