IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ActiveMQ -> 正文阅读

[大数据]ActiveMQ

MQ消息中间件

MQ = Message Queue,消息中间件

MQ的种类:Kafka(java/scala)、RabbitMQ(erlang)、RocketMQ(java)、ActiveMQ(java)

1、ActiveMQ入门

两大默认端口:61616,8161

1、api发送和接收

2、MQ的高可用性

3、MQ的集群和容错配置

4、MQ的持久化

5、延时发送/定时投递

6、签收机制

7、Spring整合

8、编程语言:Java

2、安装:

1、官网下载;

2、上传到/opt(第三方软件)下面;

3、解压缩:tar -zxvf apache-active....;

4、根目录下mkdir /myactivemq;

5、拷贝:cp -r 文件夹 /myactivemq/;

6、普通启动:./activemq start;

7、activemq的默认进程端口,默认61616:ps -ef|grep activemq|grep -v grep,netstat -anp|grep 61616,lsof -i:61616;

8、关闭:./activemq stop;

ActiveMQ控制台:

windows和Linux互ping。

3、JMS:java消息服务:编码实现

创建步骤:

1、创建连接工厂ActiveMQConnectionFactory;

2、通过连接工厂,获得连接connection并且启动访问;

3、创建会话session;

4、创建目的地,queue或者topic;

5、创建消息消费者或者消息生产者;

6、生产消息或者消费消息;

7、关闭资源。

点对点(一对一):queue;

发布订阅(一对多):topic,先启动生产,再启动订阅;

1、先生产,只启动1号消费者:1号消费者可以消费消息吗:YES;
2、先生产,先启动1号消费者再启动2号消费者:1号消费者和2号消费者可以消费消息吗:1号:YES,2号:NO;
3、先启动2个消费者,生产6条消息,消费情况?:一人一半:轮询分发:YES

JMS:java message service,

Java EE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

4、四大落地产品比较

特性ActiveMQRabbitMQKafkaRocketMQ
点对点支持支持支持支持
发布订阅支持支持支持支持
REQUEST-REPLY支持支持-支持
API完备性低(静态配置)
多语言支持支持,Java优先语言无关支持,Java优先支持
单机吞吐量万级万级十万级单机万级
消息延迟-微秒级毫秒级-
可用性高(主从)高(主从)非常高(分布式)高(主从)
消息丢失-理论上不会丢失-
消息重复-可控制理论上会有重复-
文档的完备性
提供快速入门
首次部署维度-

JMS组成的四大元素

1、JMS provider

实现JMS接口和规范的消息中间件,也就是MQ服务器。

2、JMS producer

消息生产者,创建和发送JMS消息的客户端应用。

3、JMS consumer

消息消费者,接收和处理JMS消息的客户端应用。

4、JMS message = 消息头 + 消息体 + 消息属性

①、消息头:

JMSDestination:目的地;

JMSDeliveryMode:持久化和非持久化模式;

JMSExpiration:消息过期时间,默认(0)永不过期;

JMSPriority:优先级:0-4普通消息,5-9加急消息,默认4级;

JMSMessageID:唯一识别每条消息的标识由MQ产生。

②、消息体:

封装具体的消息数据;

5种消息体格式:

TextMessage:普通字符串消息,包含一个String;

MapMessage:一个Map类型的消息,key为String类型,值为java的基本类型;

BytesMessage:二进制数组消息,包含一个byte[];

StreamMessage:java数据流消息,用标准流操作来顺序的填充和读取;

ObjectMessage:对象消息,包含一个可序列化的java对象。

发送和接受的消息体类型必须一致对应。

③、消息属性:

需要除消息头字段以外的值,可以使用消息属性;

识别/去重/重点标注等操作非常有用的方法。

如何保证消息的可靠性:是否会被重复消费

1、持久(队列默认持久化)

非持久化:服务器宕机,消息不存在,messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

持久化:服务器宕机,消息依然存在,有利于消息可靠性,messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

发布订阅

  • 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题;

  • 然后再运行生产者发送消息;

  • 此时,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息都接收下来。

2、事务:偏生产者

事务

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第一个事务,第二个签收
  • false:只要执行send,就进入到队列中;关闭事务,那第2个签收参数的设置需要有效;

  • true:先执行send再执行commit,消息才被真正的提交到队列中;消息需要批量发送,需要缓冲区处理。

3、签收:偏消费者

非事务

  • 自动签收(默认):Session.AUTO_ACKNOWLEDGE;

  • 手动签收:Session.CLIENT_ACKNOWLEDGE,客户端调用acknowledge方法手动签收,textMessage.acknowledge();

  • 允许重复消息:Session.DUPS_OK_ACKNOWLEDGE;

事务

  • 当事务为true且commit时,手动签收时,可以不用写ack,正常消费;

  • 当事务为true没有commit时,手动签收时,写了ack,也会重复消费;

在事务性会话中,当一个事务被成功提交则消息被自动签收。

如果事务回滚,则消息会被再次传送。

非事务性会话中,消息何时被确认取决于创建会话时的应答模式(ack mode)。(自动就自动,手动就手动)

4、多节点集群(可持久化)

前三点是mq自带,第四个是借助其他来实现的。

5、ActiveMQ的Broker

Broker:

相当于一个ActiveMQ服务器实例,也就是用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动。节省资源保证可靠性。

和Redis一样,不同的配置文件模拟不同的实例。

6、Spring整合ActiveMQ

(1)Maven修改,需要添加Spring支持JMS的包

xml配置文件

  • 开启包的自动扫描;

  • 配置生产者;

  • 队列目的地,点对点的;

  • Spring提供的JMS工具类。

(2)Spring配置文件

  • 开启包的自动扫描;

  • 配置生产者,PooledConnectionFactory;

  • 队列目的地:ActiveMQQueue;

  • Spring提供的JMS工具类,可以进行消息发送,接收:JmsTeplate,引入前两者 + SimpleMessageConverter。

(3)队列

生产者和消费者,重要的是JmsTemplate。

(4)主题

修改队列目的地为主题目的地。

(5)在spring里面实现消费者不启动,直接通过配置监听完成

xml配置监听程序:DefaultMessageListenerContainer,引入工厂和目的地,再引入监听MessageListener实现类。

7、SpringBoot整合ActiveMQ

(注解)

(1)、队列

①队列生产者

pom.xml

application.yml

spring
    activemq:
 ?      broker-url: tcp://#自己的服务器地址和端口号
        user: admin
        password: admin
    jms:
        pub-sub-domain: false # false=Queue, true=Topic
?
# 自己定义队列名称
myqueue: boot-activemq-queue

@EnableJms注解:开启JMS。

@Component

ConfigBean.class:配置queue;

配置生产者类:JmsMessagingTemplate.convertAndSend(queue, ...);

主启动类;

测试类:@SpringBootTest,@RunWith,@WebAppConfiguration

间隔时间3秒钟定时投放

@Scheduled(fixedDelay = 3000)注解在方法上;

主启动类开启@EnableScheduled注解。

②队列消费者

监听注解在接收方法上:@JmsListener(destination=队列名)。

(2)、主题

①主题生产者

pom.xml

application.yml

spring
    activemq:
 ?      broker-url: tcp://#自己的服务器地址和端口号
        user: admin
        password: admin
    jms:
        pub-sub-domain: true # false=Queue, true=Topic
?
# 自己定义队列名称
mytopic: boot-activemq-topic

@EnableJms注解:开启JMS。

@Component

ConfigBean.class:配置topic;

配置生产者类:JmsMessagingTemplate.convertAndSend(topic, ...);

主启动类;

测试类:@SpringBootTest,@RunWith,@WebAppConfiguration

间隔时间3秒钟定时投放

@Scheduled(fixedDelay = 3000)注解在方法上;

主启动类开启@EnableScheduled注解。

②主题消费者

监听注解在接收方法上:@JmsListener(destination=主题名)。

先启动消费者,后启动生产者

8、Activemq的传输协议

默认tcp协议,nio能提供更好的性能,支持(BIO+tcp协议)openwire、amqp、stomp、mqtt、ws。

NIO协议:

以tcp为基础的NIO网络IO模型,这样的设置方式,使得这个端口支持Openwire协议

修改配置文件activemq.xml;先备份后修改,添加<transportConnector name="nio" uri="nio://0.0.0.0:61618?trance=true">,生产者和消费者两端协议修改。

加强NIO协议

配置auto+nio端口61608

9、ActiveMQ的消息存储和持久化

Redis的持久化机制:

  • .aof:写操作

  • .rdb:文件

ActiveMQ的可持久化机制(高可用的保障手段):JDBC、AMQ、KahaDB、LevelDB

  • 在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。

  • 消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

(1)AMQ Message Store

一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小是32M,当一个存储文件中的消息已经全部被消费,那么这个文件被标识为可删除,在下一个清除阶段,这个文件被删除。

(2)*KahaDB(默认)

KahaDB可用于任何场景,提高了性能和恢复能力。

消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

数据被追加到data logs中,当不再需要log文件中的数据的时候,log文件会被丢弃。

  • db-1.log:KahaDB存储消息到预定义大小的数据记录文件中,删除和归档。

  • db.data:持久化的BTree索引,索引了消息数据记录的消息,是消息的索引文件。

  • db.free:哪些页面是空闲的,文件具体内容是所有空闲页的ID。

  • db.redo:容灾备份机制,前两个的恢复机制。

  • lock:文件锁,表示当前获得KahaDB读写权限的broker。

(3)LevelDB

与KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。

但它不使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引。

<persistenceAdapter>
    <levelDB directory = "activemq-data"/>
</persistenceAdapter>

(4)**JDBC

  • MQ+MySQL

  • 添加mysql数据库的驱动包到lib文件夹

  • jdbcPersistenceAdapter配置

原:
<persistenceAdapter>
    <kahaDB directory = "${activemq-data}/kahadb"/>
</persistenceAdapter>
现:
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource = "#mysql-ds"/> <!-- createTableOnStartUp="true" 重启重建表,第一次true,然后改为false -->
</persistenceAdapter>
  • 数据库连接池配置

<!-- mysql-ds的bean配置 -->
<beans>
    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destory-method="close">
 ?      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/数据库名称?relaxAutoCommit=true"/>
 ?      <property name="username" value="root"/>
 ?      <property name="password" value="root"/>
 ?      <property name="maxTotal" value="200"/>
 ?      <property name="poolPreparedStatements" value="true"/>
    </bean>
</beans>
  • 建仓SQL和建表说明

  • 数据库情况

建activemq的数据库

Linux和Windows互ping成功,

三张表会自动生成:ACTIVEMQ_MEGS(消息表)、ACTIVEMQ_ACKS(存储订阅关系)、ACTIVEMQ_LOCK(集群环境中,主要用于记录master)。

  • 代码运行验证

开启持久化messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

queue:看megs表

在点对点类型中,

当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中;

当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

消息一旦被Consumer消费就从broker(数据库)中删除。

topic:看acks表

acks记录订阅者,megs记录发布者发布的消息。不被删除。

总结

  • 如果是queue,在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者已经消费过了,消费之后这些消息将会立即被删除;

  • 如果是topic,一般先启动消费订阅者然后再生产的情况下会将消息保存到activemq_acks。

坑坑坑

  • 数据库jar包

mysql-jdbc驱动的jar包和对应的数据库连接池jar包

  • createTablesOnStartup属性

在第一次启动ActiveMQ时,如果jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以去掉这个属性,或者更改createTableOnStartup属性为false。

  • 下划线

`"java.lang.IllegalStateException: BeanFactory not initialized or already closed":这是因为操作系统的机器名中有_,更改机器名称并重启即可。

(5)JDBC Messages Store with ActiveMQ Journal(高速缓存)

利用高速缓存,提高性能。

当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB的信息。

如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

配置:

<persistenceFactory>
    <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data"
</persistenceFactory>

10、高可用集群

Zookeeper+replicated-leveldb-store的主从复制集群

集群原理:

使用Zookeeper集群注册的所有的ActiveMQ Broker,但只有其中的一个Broker可以提法服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。

集群部署配置:

  • 环境版本

  • 关闭防火墙并保证win可以ping通过ActiveMQ服务器

  • 要求具备zk集群并可以成功启动

  • 集群部署规划列表

  • 创建3台集群目录

  • 修改管理控制台端口

  • hostman名字映射

  • 修改各节点的消息端口

  • 按顺序启动3个ActiveMQ节点,,前提:zk集群已经成功启动运行

  • zk集群的节点状态说明

11、高级特性

当消费者是慢消费时,设置异步投递的三种方法:

cf = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.useAsyncSend=true")
ActiveMQConnctionFactory.setUseAsyncSend(true);
ActiveMQConnction.setUseAsyncSend(true);

(1)异步投递如何确认发送成功

正确的异步发送需要回调方法并由客户端再判断一次是否发送成功。

ActiveMQMessageProducer.send(message, new AsyncCallback(){
// 重写方法:成功或者失败
});

(2)延迟投递和定时投递

ScheduleMessage四大属性:DELAY延迟、PERIOD间隔、RETRY重复次数、CRON(corn表达式)。

步骤

  • 在activemq.xml中配置schedulerSupport属性为true;

  • java代码里面封装的辅助消息类型:ScheduleMessage

  • 代码(设置延迟、定时、重复次数)

(3)消费重试机制

引起原因:

  • Client用了transcations且在session中调用了rollback();

  • Client用了transcations且在调用commit()之前关闭或者没有commit;、

  • Client在CLIENT_ACKNOWLEDGE的模式下,在session中调用了recover()。

默认重发次数和间隔:每秒发6次,超过6次,poison ack有毒消息消费:异常。

可以修改次数,参考官网。

(4)死信队列

ActiveMQ_DLQ

一般生产环境中使用MQ设计的两个队列:①、核心业务(try);②、死信队列(catch)。

  • 删除过期的消息不需要送到死信队列中,默认true。

<policyEntry queue=">"
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false"/>
    </deadLetterStrategy>
</policyEntry>
  • MQ不会把非持久的死消息发送到死信队列中

<policyEntry queue=">"
    <deadLetterStrategy>
        <sharedDeadLetterStrategy   processNonPersistent="true"/>
    </deadLetterStrategy>
</policyEntry>

(5)防止重复调用

由于网络延迟,卡顿,会造成重复消费。

插入操作:给消息做一个唯一主键,避免重复消费。(不推荐)

第三方服务方:假如redis,给消息分配一个全局id,只要消费过该消息,将<id, message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没有消费记录。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 13:08:12  更:2022-03-06 13:10:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:01:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码