| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> ActiveMQ -> 正文阅读 |
|
[大数据]ActiveMQ |
MQ消息中间件MQ = Message Queue,消息中间件 MQ的种类:Kafka(java/scala)、RabbitMQ(erlang)、RocketMQ(java)、ActiveMQ(java) 1、ActiveMQ入门两大默认端口:61616,8161 1、api发送和接收 2、MQ的高可用性 3、MQ的集群和容错配置 4、MQ的持久化 5、延时发送/定时投递 6、签收机制 7、Spring整合 8、编程语言:Java 2、安装:1、官网下载; 2、上传到/opt(第三方软件)下面; 3、解压缩:tar -zxvf apache-active....; 4、根目录下mkdir /myactivemq; 5、拷贝:cp -r 文件夹 /myactivemq/; 6、普通启动:./activemq start; 7、activemq的默认进程端口,默认61616:ps -ef|grep activemq|grep -v grep,netstat -anp|grep 61616,lsof -i:61616; 8、关闭:./activemq stop; ActiveMQ控制台: windows和Linux互ping。 3、JMS:java消息服务:编码实现创建步骤: 1、创建连接工厂ActiveMQConnectionFactory; 2、通过连接工厂,获得连接connection并且启动访问; 3、创建会话session; 4、创建目的地,queue或者topic; 5、创建消息消费者或者消息生产者; 6、生产消息或者消费消息; 7、关闭资源。 点对点(一对一):queue; 发布订阅(一对多):topic,先启动生产,再启动订阅; 1、先生产,只启动1号消费者:1号消费者可以消费消息吗:YES; 2、先生产,先启动1号消费者再启动2号消费者:1号消费者和2号消费者可以消费消息吗:1号:YES,2号:NO; 3、先启动2个消费者,生产6条消息,消费情况?:一人一半:轮询分发:YES JMS:java message service, Java EE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。 4、四大落地产品比较
JMS组成的四大元素 1、JMS provider 实现JMS接口和规范的消息中间件,也就是MQ服务器。 2、JMS producer 消息生产者,创建和发送JMS消息的客户端应用。 3、JMS consumer 消息消费者,接收和处理JMS消息的客户端应用。 4、JMS message = 消息头 + 消息体 + 消息属性 ①、消息头: JMSDestination:目的地; JMSDeliveryMode:持久化和非持久化模式; JMSExpiration:消息过期时间,默认(0)永不过期; JMSPriority:优先级:0-4普通消息,5-9加急消息,默认4级; JMSMessageID:唯一识别每条消息的标识由MQ产生。 ②、消息体: 封装具体的消息数据; 5种消息体格式: TextMessage:普通字符串消息,包含一个String; MapMessage:一个Map类型的消息,key为String类型,值为java的基本类型; BytesMessage:二进制数组消息,包含一个byte[]; StreamMessage:java数据流消息,用标准流操作来顺序的填充和读取; ObjectMessage:对象消息,包含一个可序列化的java对象。 发送和接受的消息体类型必须一致对应。 ③、消息属性: 需要除消息头字段以外的值,可以使用消息属性; 识别/去重/重点标注等操作非常有用的方法。 如何保证消息的可靠性:是否会被重复消费 1、持久(队列默认持久化) 非持久化:服务器宕机,消息不存在,messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 持久化:服务器宕机,消息依然存在,有利于消息可靠性,messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 发布订阅:
2、事务:偏生产者 事务 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第一个事务,第二个签收
3、签收:偏消费者 非事务
事务
在事务性会话中,当一个事务被成功提交则消息被自动签收。 如果事务回滚,则消息会被再次传送。 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(ack mode)。(自动就自动,手动就手动) 4、多节点集群(可持久化) 前三点是mq自带,第四个是借助其他来实现的。 5、ActiveMQ的BrokerBroker: 相当于一个ActiveMQ服务器实例,也就是用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动。节省资源保证可靠性。 和Redis一样,不同的配置文件模拟不同的实例。 6、Spring整合ActiveMQ(1)Maven修改,需要添加Spring支持JMS的包 xml配置文件
(2)Spring配置文件
(3)队列 生产者和消费者,重要的是JmsTemplate。 (4)主题 修改队列目的地为主题目的地。 (5)在spring里面实现消费者不启动,直接通过配置监听完成 xml配置监听程序:DefaultMessageListenerContainer,引入工厂和目的地,再引入监听MessageListener实现类。 7、SpringBoot整合ActiveMQ(注解) (1)、队列 ①队列生产者 pom.xml application.yml spring activemq: ? broker-url: tcp://#自己的服务器地址和端口号 user: admin password: admin jms: pub-sub-domain: false # false=Queue, true=Topic ? # 自己定义队列名称 myqueue: boot-activemq-queue @EnableJms注解:开启JMS。 @Component ConfigBean.class:配置queue; 配置生产者类:JmsMessagingTemplate.convertAndSend(queue, ...); 主启动类; 测试类:@SpringBootTest,@RunWith,@WebAppConfiguration 间隔时间3秒钟定时投放 @Scheduled(fixedDelay = 3000)注解在方法上; 主启动类开启@EnableScheduled注解。 ②队列消费者 监听注解在接收方法上:@JmsListener(destination=队列名)。 (2)、主题 ①主题生产者 pom.xml application.yml spring activemq: ? broker-url: tcp://#自己的服务器地址和端口号 user: admin password: admin jms: pub-sub-domain: true # false=Queue, true=Topic ? # 自己定义队列名称 mytopic: boot-activemq-topic @EnableJms注解:开启JMS。 @Component ConfigBean.class:配置topic; 配置生产者类:JmsMessagingTemplate.convertAndSend(topic, ...); 主启动类; 测试类:@SpringBootTest,@RunWith,@WebAppConfiguration 间隔时间3秒钟定时投放 @Scheduled(fixedDelay = 3000)注解在方法上; 主启动类开启@EnableScheduled注解。 ②主题消费者 监听注解在接收方法上:@JmsListener(destination=主题名)。 先启动消费者,后启动生产者 8、Activemq的传输协议默认tcp协议,nio能提供更好的性能,支持(BIO+tcp协议)openwire、amqp、stomp、mqtt、ws。 NIO协议: 以tcp为基础的NIO网络IO模型,这样的设置方式,使得这个端口支持Openwire协议 修改配置文件activemq.xml;先备份后修改,添加 加强NIO协议 配置 9、ActiveMQ的消息存储和持久化Redis的持久化机制:
ActiveMQ的可持久化机制(高可用的保障手段):JDBC、AMQ、KahaDB、LevelDB
(1)AMQ Message Store 一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小是32M,当一个存储文件中的消息已经全部被消费,那么这个文件被标识为可删除,在下一个清除阶段,这个文件被删除。 (2)*KahaDB(默认) KahaDB可用于任何场景,提高了性能和恢复能力。 消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。 KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。 数据被追加到data logs中,当不再需要log文件中的数据的时候,log文件会被丢弃。
(3)LevelDB 与KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。 但它不使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引。 <persistenceAdapter> <levelDB directory = "activemq-data"/> </persistenceAdapter> (4)**JDBC
原: <persistenceAdapter> <kahaDB directory = "${activemq-data}/kahadb"/> </persistenceAdapter> 现: <persistenceAdapter> <jdbcPersistenceAdapter dataSource = "#mysql-ds"/> <!-- createTableOnStartUp="true" 重启重建表,第一次true,然后改为false --> </persistenceAdapter>
<!-- mysql-ds的bean配置 --> <beans> <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destory-method="close"> ? <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/数据库名称?relaxAutoCommit=true"/> ? <property name="username" value="root"/> ? <property name="password" value="root"/> ? <property name="maxTotal" value="200"/> ? <property name="poolPreparedStatements" value="true"/> </bean> </beans>
建activemq的数据库 Linux和Windows互ping成功, 三张表会自动生成:ACTIVEMQ_MEGS(消息表)、ACTIVEMQ_ACKS(存储订阅关系)、ACTIVEMQ_LOCK(集群环境中,主要用于记录master)。
开启持久化: queue:看megs表 在点对点类型中, 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中; 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。 消息一旦被Consumer消费就从broker(数据库)中删除。 topic:看acks表 acks记录订阅者,megs记录发布者发布的消息。不被删除。 总结
坑坑坑
mysql-jdbc驱动的jar包和对应的数据库连接池jar包
在第一次启动ActiveMQ时,如果jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以去掉这个属性,或者更改createTableOnStartup属性为false。
(5)JDBC Messages Store with ActiveMQ Journal(高速缓存) 利用高速缓存,提高性能。 当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB的信息。 如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。 配置: <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data" </persistenceFactory> 10、高可用集群Zookeeper+replicated-leveldb-store的主从复制集群 集群原理: 使用Zookeeper集群注册的所有的ActiveMQ Broker,但只有其中的一个Broker可以提法服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。 集群部署配置:
11、高级特性当消费者是慢消费时,设置异步投递的三种方法: cf = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.useAsyncSend=true") ActiveMQConnctionFactory.setUseAsyncSend(true); ActiveMQConnction.setUseAsyncSend(true); (1)异步投递如何确认发送成功 正确的异步发送需要回调方法并由客户端再判断一次是否发送成功。 ActiveMQMessageProducer.send(message, new AsyncCallback(){ // 重写方法:成功或者失败 }); (2)延迟投递和定时投递 ScheduleMessage四大属性:DELAY延迟、PERIOD间隔、RETRY重复次数、CRON(corn表达式)。 步骤:
(3)消费重试机制 引起原因:
默认重发次数和间隔:每秒发6次,超过6次,poison ack有毒消息消费:异常。 可以修改次数,参考官网。 (4)死信队列 ActiveMQ_DLQ 一般生产环境中使用MQ设计的两个队列:①、核心业务(try);②、死信队列(catch)。
<policyEntry queue=">" <deadLetterStrategy> <sharedDeadLetterStrategy processExpired="false"/> </deadLetterStrategy> </policyEntry>
<policyEntry queue=">" <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent="true"/> </deadLetterStrategy> </policyEntry> (5)防止重复调用 由于网络延迟,卡顿,会造成重复消费。 插入操作:给消息做一个唯一主键,避免重复消费。(不推荐) 第三方服务方:假如redis,给消息分配一个全局id,只要消费过该消息,将<id, message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没有消费记录。 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 20:01:21- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |