IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 消息中间件学习二之ActiveMQ -> 正文阅读

[Java知识库]消息中间件学习二之ActiveMQ

一、安装

ActiveMQ是Apache旗下的一款消息中间实现,可以支持多语言客户端,也是一个老牌消息中间件实现,所以各项功能也相对成熟。
首先我们可以去官网下载所需要的版本
在这里插入图片描述
我这里用的是centos去安装的,下载好对应的版本即可。

下载完成之后解压:
在这里插入图片描述
启动ActiveMQ(进入到安装目录的bin目录下执行):
在这里插入图片描述
启动完成之后,就可以采用ip:8161/admin/html查看管理后台界面,默认账号密码都是admin。
在这里插入图片描述
此时我们就可以在我们的应用中使用ActiveMQ了,本文将使用Springboot来使用ActiveMQ。

二、样例

首先新建一个Springboo项目,然后引入ActiveMQ的jar包。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

配置文件:

spring:
  activemq:
    broker-url: tcp://192.168.0.11:61616
    user: admin
    password: admin
queue: active.test
@Configuration
public class Configure {
    @Value("${queue}")
    private String queue;
    @Bean
    public Queue logQueue() {
        return new ActiveMQQueue(queue);
    }
}

生产者:将每三秒钟发送一个消息到对应的队列中


@Component
@EnableScheduling
public class Producer {
    @Autowired
    private Queue logQueue;
    @Autowired
    private JmsMessagingTemplate  jmsTemplate;
    @Scheduled(fixedDelay = 3000)
    public void seedQueue() {
        jmsTemplate.convertAndSend(logQueue,"test for queue"+System.currentTimeMillis());
    }
}

消费者:消费者将不断的监听消息,然后打印消息

@Component
public class Consumer {
    @JmsListener(destination = "${queue}")
    public void receive(String msg) {
        System.out.println("receive from activemq:" + msg);
    }
}

以上是Springboot的一个简单案例,接下来我们按照传统方式来写一个代码如下:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TraditionalWay {
    private String BROKER="tcp://192.168.0.11:61616";
    private String queue="traditional.way";
    public void  producer() throws JMSException {
        ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin",BROKER);
        Connection connection=connectionFactory.createConnection();
        connection.start();
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination=session.createQueue(queue);
        MessageProducer producer=session.createProducer(destination);
         int i=0;
        while (true){

           TextMessage message=session.createTextMessage();
           message.setText("traditional message send:"+i++);
            producer.send(message);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {

            }
        }
    }
    public void  consumer() throws JMSException {
        ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("admin","admin",BROKER);
        Connection connection=connectionFactory.createConnection();
        connection.start();
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination=session.createQueue(queue);
        MessageConsumer consumer=session.createConsumer(destination);

        while (true){
            TextMessage message=(TextMessage)consumer.receive();
            System.out.println("traditional message receive: "+message.getText());
        }
    }
    public static void main(String[] args) throws JMSException {
        TraditionalWay way=new TraditionalWay();
        new Thread(()->{
            try {
                way.producer();
            } catch (JMSException e) {
            }
        }).start();
        way.consumer();
    }
}

传统方式,完全按照第一部分中的顺序来执行,这就是一个标准的实现。此时我们再来看管理后台,两个队列分别有消息的产生和消费
在这里插入图片描述

三、深入分析

第一、二部分只是简单安装和初步使用了API,接下来分别从消息类型、生产者、broker、消费者四个角度来分析ActiveMQ。

1、消息类型
ActiveMQ中主要有两种模式,一种是队列(queue),一种是主题(topic)

队列是点对点模式,也就是一个消息只能是一个消费者消费,但是对于同一个队列,可以有多个生产者同时写入消息,以及多个消费者同时去消费消息
主题模式,就类似以发布/订阅模型,生产者,往对应主题上发送消息,消费者去消费对应主题消息,此时的消费者可以有多个(消费者消费的消息只有是在消费者订阅主题之后,生产者发送的消息才可以消费)

那么消息还可以分为持久化和非持久化方式,持久化方式也就是说在服务器重启之后,消息还存在,非持久化方式,消息保存在内存中,重启之后就会丢失(setDeliveryMode配置)。在ActiveMQ中可以配置持久化方式,在其配置文件中的< persistenceAdapter >节点即可,可以保存在硬盘上,也可以保存在数据库中。

那么此时会有个问题,比较服务器的内存是有限的,持久化数据存储在文件或者数据库中,非持久化数据存储在内存中,那么当非持久化数据达到内存不够用时,此时会怎样?
此时非持久化数据,会保持在一个临时文件中(文件大小可以配置),但是服务重启之后,临时文件的数据是不可以恢复的,也就是说非持久化数据是不可以恢复的。
不管是持久化文件还是临时文件都有一个最大值设置,如果持久化文件达到最大值之后,此时生产者会阻塞,消费者可以继续消费消息,相反如果非持久化的临时文件达到最大值,生产者会阻塞,所以在选择的时候尽量使用持久化方式。

2、生产者

从生产者角度来讲,我们需要关心什么问题呢?首先消息发送是否支持同步和异步,消息发送失败之后,是否有重试机制、是否发送了重复消息,消息发送大小限制,以及消息时批量发还是单个发送(吞吐量问题)。

(一)消息发送方式
ActiveMQ中基本上都是异步模式发送消息的,如果当前消息没有开启事务并且消息时持久化模式,那么会以同步发送,也就是说会等到服务端确认之后,才会继续执行,否则会进行阻塞,此时我们可以通过在连接参数上加上jms.useAsyncSend=true来使用异步方式发送,当消息以异步方式发送的时候,还可以设置一个回调接口来查看当前消息是否发送成功(CompletionListener实现这个接口),我们在发送消息的时候还可以设定当前消息的一个存活时间。

这里提到一个事务,我们在创建Session的时候,会设置两个参数一个是是否开启事务,另一个是响应机制,事务简单理解就是当前消息发送时候,因为某些原因需要回滚,这在ActiveMQ中是支持的,只需要开启事务即可也就是true,第二个参数就是消息签收模式,一般针对消费者。

(二)消息失败
当消息发送失败,之后会抛出异常,客户端可以根据异常信息进行一个消息重试,此时需要注意到当前消息是否会出现一个重发,如果出现重发了消费端需要对重复数据进行相应的幂等处理。

(三)消息大小

当消息以异步方式发送时,此时可以采用一个窗口大小(producerWindowSize配置来控制消息发送)。生产者每发送一个消息,就会统计当前消息字节,当生产者发送的消息达到窗口大小时,此时新消息不会继续发送,会等待服务器的响应,因为每发送一个消息服务端的内存就会增加对应的大小,如果没有及时消费的话,服务器响应之后,会返回一个size大小,也就是此时服务端的内存使用减少了size,所以窗口大小相应减少,此时就可以继续发送。

3、broker

broker就相当于一个ActiveMQ服务实例,也是存储消息和转发消息的核心所在,上边分析了消息的存储方式以及相应需要注意的事项。
关于broker还有一些连接协议概念,比如connector,我们在连接broker的时候会使用tcp://ip:port方式来进行,这也是ActiveMQ内嵌的一种方式。

如果查看ActiveMQ的配置文件会发现有如下配置(transport connector):
在这里插入图片描述
其中就有我们的tcp方式,我也可以配置udp\nio\http模式,修改对应的端口,然后连接的时候使用对应的协议即可。ssl是一种安全协议,transport connector是针对客户端到服务端的连接,那么broker-to-broker呢?

broker-to-broker就是ActiveMQ中集群通信方式(network connector),可以通过这项配置来实现集群功能。

4、消费者

消费者可以同步的获取消息,也可以异步的通过监听器来获取服务端消息,对于消费者来说,需要处理重复数据,以及数据消费完成之后的应答(可以自动和手动)。消费者端还可以创建一个持久订阅,这种方式只针对主题模式,当消费者宕机之后,恢复之后可以继续读取主题数据。针对消费者,当消息没有被正确消费,服务端会有一个重发机制(默认6次),或者消息过期都会被放到死信队列中,所以在系统中可以有两个队列来保证业务的正常使用,一个正常队列,一个死信队列,后者主要是用于处理异常情况,但是也需要注意消息的重复消费问题

四、总结

本文大致介绍了一下ActiveMQ的相关知识,ActiveMQ 的单机吞吐量在万级,可以支持ms级时效性,可用性高(集群),功能齐全。客户可以根据自身的需求进行选择。

以上,有任何不对的地方请指正,谢谢!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 22:18:55  更:2022-03-15 22:21:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:22:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码