IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> ActiveMQ-05-ActiveMQ的传输协议,消息存储和持久化 -> 正文阅读

[网络协议]ActiveMQ-05-ActiveMQ的传输协议,消息存储和持久化

01、ActiveMQ的传输协议

参考官方文档:http://activemq.apache.org/configuring-version-5-transports.html

概述

ActiveMQ支持的client-broker(客户端-MQ服务器)通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM,其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内,如下:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如

  • 描述amqp协议的监听端口时,采用的URI描述格式为amqp://······
  • 描述Stomp协议的监听端口时,采用URI描述格式为stomp://······
  • 唯独openwire协议描述时,URI头却采用的tcp://······,这是因为ActiveMQ中默认的消息协议就是openwire

支持的传输协议

注意:

  • 除了tcpnio协议,其他的了解就行。

  • 各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。

  • mqtt是物联网专用协议,采用的中间件一般是mosquito。

  • ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。

  • stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。

协议不同,我们的代码都会不同。

ActiveMQ支持的网络协议:

协议描述
TCP默认的协议,性能相对不错
NIO基于TCP协议之上的,进行了拓展和优化,具有更好的拓展性
UDP性能比TCP好,但是不具有可靠性
SSL安全连接
HTTPS基于HTTP或者HTTPS
VMVM本身不是协议,当客户端和代理在同一个java虚拟机(VM)中运行时,他们之间想要通信,不想占用网络通道,而是直接通信,就可以采用这种方式

TCP协议

(1) Transmission Control Protocol(TCP)是默认的,TCP的Client监听端口61616

(2) 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。

(3) TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。

(4) TCP传输的的优点:

  • TCP协议传输可靠性高,稳定性强
  • 高效率:字节流方式传递,效率很高
  • 有效性、可用性:应用广泛,支持任何平台

(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference

NIO协议

(1) New I/O API Protocol(NIO)

(2) NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。

(3) 适合使用NIO协议的场景:

  • 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
  • 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。

(4) NIO连接的URI形式如:nio://hostname:port?key=value&key=value

(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/nio-transport-reference

AMQP协议(了解)

官网地址:http://activemq.apache.org/amqp

STOMP协议(了解)

官网地址:http://activemq.apache.org/stomp

MQTT协议(了解)

官网地址:http://activemq.apache.org/mqtt

NIO协议案例

关于TCP协议的案例我们前面已经使用很多了,就不在赘述了,这里看看NIO协议是怎么使用的

  • BIO:同步阻塞IO
  • BOI:同步非阻塞IO,新型的IO
  • AIO:异步非阻塞IO

ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型,只有当我们指定使用nio才使用NIO的IO模型。

修改配置文件activemq.xml

如果不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络的IO模型(OpenWire,STOMP,AMQP等),所以为了提高单个ActiveMQ服务器的吞吐性能,我们需要明确指定ActiveMQ的网络IO模型为NIO,NIO是以TCP协议为基础的NIO网络IO模型

  • ① 修改配置文件activemq.xml在 <transportConnectors>节点下添加如下内容:

    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />

  • ② 修改完成后重启activemq:

    .\activemq restart

  • ③ 查看管理后台,可以看到页面多了一个nio
    在这里插入图片描述

生产者代码

public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618";
    private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //队列
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //8.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
}

消费者代码

public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618";
    private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {

            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                    } catch (Exception e) {
                        System.out.println("出现异常,消费失败,放弃消费");
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

NIO协议案例增强

上面是Openwire协议传输底层使用NIO网络IO模型。 如何让其他协议传输底层也使用NIO网络IO模型呢?

URI格式以nio开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型,前面的设置方式只能使一个端口支持OpenWire协议,就是616168端口支持NIO

如何让一个端口支持NIO网络模型,又让它支持多个协议?

也就是无论使用哪种协议都可以访问这个端口,而且这个端口还是NIO形式的
注意:前面的都是一个端口只支持特定的一种协议,如果协议名称和端口号对应不上就无法访问

解决方案:

使用auto关键字

使用+符号来为端口设置多种特性

修改配置文件activemq.xml

修改配置文件activemq.xml在 <transportConnectors>节点下添加如下内容:
><transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>

  • auto : 针对所有的协议,它会识别我们是什么协议。
  • nio :使用NIO网络IO模型。
  • 修改配置文件后重启activemq

在这里插入图片描述

生产者代码

//使用nio模型的tcp协议生产者,其他代码和之前一样
public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException {
         ......
    }
}
//使用nio模型的nio协议生产者,其他代码和之前一样
public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException {
       ......
    }
}

消费者代码


public class Jms_TX_Consumer {
	//使用nio模型的tcp协议消费者,其他代码和之前一样
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException, IOException {
       ......
    }
}
//使用nio模型的nio协议消费者,其他代码和之前一样
public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException, IOException {
        ......
    }
}

02、ActiveMQ的消息存储和持久化

官网文档地址:http://activemq.apache.org/persistence

概述

(1) 此处持久化和之前的持久化的区别

  • MQ高可用:事务可持久签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。
    在这里插入图片描述

(2) 持久化是什么?

持久化是什么?

  • 一句话就是:ActiveMQ宕机了,消息不会丢失的机制。

说明:

  • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
  • ActiveMQ的消息持久化机制有JDBCAMQKahaDBLevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
  • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

持久化的方式有哪些

(1) AMQ Message Store

  • 基于文件的存储机制,是以前的默认机制,现在不再使用。
  • AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点,消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。
  • AMQ适用于ActiveMQ5.3之前的版本

(2) kahaDB

  • 现在默认的。下面我们再详细介绍。

(3) JDBC消息存储

  • 下面我们再详细介绍。

(4) LevelDB消息存储

  • 过于新兴的技术,现在有些不确定。

(5) JDBC Message Store with ActiveMQ Journal

  • 下面我们再详细介绍。

kahaDB消息存储

概述

基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。

官网文档:http://activemq.apache.org/kahadb,官网上还有一些其他配置参数。

在配置文件activemq.xml中可以进行配置,如下:

<persistenceAdapter>
   <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

日志文件的存储目录在:%activemq安装目录%/data/kahadb

说明

  • KahaDB是默认的存储方式,可用于任何场景吗,提高了性能和恢复能力。
  • 消息存储使用一个事务日志和一个索引文件来存储它所有的地址。
  • KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化
  • 数据被追加到data logs中,当不在需要log文件中数据的时候,log文件会被丢弃。

KahaDB的存储原理

KahaDB在消息保存的目录中只有4类文件一个lock,根ActiveMQ其他几种文件存储引擎相比这就非常简洁了

  • db-<num>.log,KahaDB存储消息到预定义大小的数据记录文件中,文件名字为db-<num>.log,当数据文件已满时,一个新的文件才会被创建,num的数值也会随之递增,当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。
  • db.data文件,包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree,使用B-Tree作为索引指向db-<num>.log里面存储的消息。
  • db.free文件,当前db.data文件里那些页面是空闲的,文件具体内容是所有空闲页的ID。
  • db.redo文件,用来进行消息回复,如果KahaDB消息存储在强制退出后启动,用于恢复B-Tree索引。
  • lock文件锁,表示当前获取读写权限的broker

JDBC消息存储

一句话:MQ+MySQL

  • 持久化方式选择MySQL

MQ+MySQL

在这里插入图片描述

添加mysql数据库的驱动包到lib文件夹

把mysql数据库的驱动包添加到:%ActiveMQ安装位置%/lib
在这里插入图片描述

jdbcPersistenceAdapter配置

修改ActiveMQ的配置文件activemq.xml

  • 修改前:

    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>
    
  • 修改后

    <persistenceAdapter>
    	<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
    </persistenceAdapter>
    
    dataSource指定将要引用的持久化数据库的bean的名称
    
    createTablesOnStartup是否在启动的时候创建数据库表,默认是true,
    这样每次启动都会去创建一个数据库表,一般是第一次启动的时候设置为true之后改为false
    

数据库连接池配置

需要我们准备一个mysql数据库,并创建一个名为activemq的数据库,然后将其配置在ActiveMQ的配置文件中:activemq.xml中配置

<!-- 数据库配置 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://IP:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="mysql数据库用户名"/>
    <property name="password" value="mysql数据库密码"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

注意:新建的数据库要采用latin1编码 或者ASCII编码

默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。

创建SQL数据库和表

创建数据库的语句如下,指定编码方式:

create database activemq default character set latin1;

重启activemq,会自动生成如下3张表,如果没有自动生成,需要我们手动执行SQL生成这三张表。
在这里插入图片描述

  • ACTIVEMQ_MSGS数据表:queue和topic都存在里面
    在这里插入图片描述

  • ACTIVEMQ_ACKS数据表:存储持久订阅的消息和最后一个持久订阅接收的消息ID
    在这里插入图片描述

  • ACTIVEMQ_LOCK数据表:集群环境中才用,只有一个broker可以获得消息,称为Master broker,其它的只能最为备份等待Master Broker不可用,才能称为下一个Master Broker,这个表用于记录那个Broker是当前的Master Broker
    在这里插入图片描述

这三张表一般是能够自动生成的,如果没有生成可以查看ActiveMQ的启动日志解决问题,生成的三张表ER图如下
在这里插入图片描述

queue类型验证和数据表变化

queue类型:点对点的类型

  • 当DeliveryMode设置为NON_PERSISTENCE时,消息不会被持久化,只会被保存在内存中。
  • 当DeliveryMode设置为PERSISTENCE时,消息保存在Broker响应的文件或者数据库中

注意:点对点类型的消息一旦被消费,就会从Broker中删除。

举例:

  • 我们使用queue模式持久化(DeliveryMode设置为PERSISTENCE)生产3条消息后,会发现ACTIVEMQ_MSGS数据表多了3条数据。
    在这里插入图片描述
  • 启动消费者,消费了所有的消息后,发现数据表的数据消失了。

总结:

  • queue模式:非持久化不会将消息持久化到数据库。
  • queue模式:持久化会将消息持久化数据库。

代码

  • 生产者代码:
//持久化queue的消息生产者
public class JmsProduce_persistence {
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME= "queue_persistence";
    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue=session.createQueue(QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        // 设置持久化queue
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化queue之后再,启动连接
        connection.start();
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("queue_name--" + i);
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** QUEUE_NAME消息发送到MQ完成 ****");
    }
}
  • 消费者代码:
// 持久化queue的消息消费者
public class JmsConsummer_persistence {
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue_persistence";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 设置客户端ID。向MQ服务器注册自己的名称
        connection.setClientID("AISMALL_QUEUE01");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue=session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        connection.start();
        while(true){
            TextMessage message = (TextMessage)messageConsumer.receive();
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

topic类型验证和数据表变化

注意:

  • 要先启动一下持久化topic的消费者,可以看到ACTIVEMQ_ACKS数据表会多了一条消息
    在这里插入图片描述
  • ACTIVEMQ_ACKS数据表,多了一个消费者的身份信息,一条记录代表一个持久化topic的消费者

举例:

  • 我们启动持久化生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据。

  • 消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失

  • 持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。

  • 注意:持久化的topic大量存储后可能导致性能下降,这里就像公众号一样,消费者消费完后,消息还会保留

代码

  • 生产者代码:
public class JmsProduce_persistence {
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic_persistence";
    public static void main(String[] args) throws  Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        // 设置持久化topic
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化topic之后再,启动连接
        connection.start();
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}
  • 消费者代码:
// 持久化topic 的消息消费者
public class JmsConsummer_persistence {
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic_persistence";
    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 设置客户端ID。向MQ服务器注册自己的名称
        connection.setClientID("AISMALL_TOPIC01");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
        connection.start();
        Message message = topicSubscriber.receive();
        while (null != message){
            TextMessage textMessage = (TextMessage)message;
            System.out.println(" 收到的持久化 topic :"+textMessage.getText());
            message = topicSubscriber.receive();
        }
        session.close();
        connection.close();
    }
}

总结

如果是queue

在没有被消费者消费的前提下会将消息保存到activate_msgs表中,只要被任意一个消费者消费,这些消息就会被删除。

如果是topic

一般是先启动消费者订阅然后在生产的情况下会将消息保存到activemq_acks表中

开发中需要注意的点,在配置关系型数据库作为ActiveMQ的持久化方案时:

  • 数据库的jar包,记得把数据库的jar包放到ActiveMQ安装目录的lib目录中,如果使用数据库连接池,也要把连接池的jar包放到里面。

  • createTablesOnStartup属性,这个属性在首次启动的时候设为true,默认也是true,之后就设为false即可。

JDBC Message Store with ActiveMQ Journal

概述

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库,ActiveMQ Journal使用高速缓存写入技术,大大提高了性能。

当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。

举个例子:

  • 生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。
  • 如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

为了高性能,这种方式使用日志文件存储+数据库存储,先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库,该方式要比纯JDBC性能要高。

配置

修改ActiveMQ的配置文件activemq.xml,基于上面JDBC配置,再做一点修改:

<persistenceAdapter>
	<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
  • 修改如下,Adapter变为Factory
<persistenceFactory>
	<journalPersistenceAdapterFactory journalLogFiles="4" 
									  journalLogFileSize="32768" 
									  useJournal="true" 
									  useQuickJournal="true" 
									  dataSource="#mysql-ds"
									  dataDirectory="activemq-data"/>
</persistenceFactory>

总结

1、jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。

2、持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。

3、持久化机制演变的过程:

  • 从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。
  • ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),
  • 后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。

4、ActiveMQ消息持久化机制有

AQM基于日志文件
KahaDB基于日志文件,从ActiveMQ5.4
开始默认使用
JDBC基于第三方数据库,例如MySQL
Replicated LevelDB Store从5.9开始提供了LevelDB和zookeeper的数据复制方法,用于Master-Slave方式的首选数据复制方案
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-15 15:56:06  更:2021-08-15 15:56:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 5:07:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计