一、消息中间件
消息:传递的信息。系统与系统之间通讯传递的信息。
中间件:redis就是一个数据存储的中间件。 独立于系统之外的一个应用都可以叫做中间件。
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者),broker(中间件)。
生产者: 发送消息
消费者:获取接收消息、并处理消息。
二、概述
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
-
能够保证严格的消息顺序 -
提供丰富的消息拉取模式 -
高效的订阅者水平扩展能力 -
实时的消息订阅机制 -
亿级消息堆积能力
RocketMQ单机也可以支持亿级的消息堆积能力。单机写入TPS单实例约7万条/秒,单机部署3个-,可以跑到最高12万条/秒,消息大小10个字节。
==市场上主流的消息中间件: RabbitMQ、RocketMQ、Kafka、==
三、核心概念
Name Server
-
理解成zookeeper的效果,只是他没用zk,而是自己写了个nameserver来替代zk -
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点 -
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除 -
nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用, -
NameServer集群间互不通信,没有主备的概念 -
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化,所以他是无状态节点
Broker
-
理解成RocketMQ本身 -
broker主要用于producer和consumer发送和接收消息 -
broker会定时向nameserver提交自己的信息 -
==是消息中间件的消息存储、转发服务器== -
每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。
Producer
-
消息的生产者 -
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Broker Master建立长连接,且定时向Broker Master发送心跳。Producer完全无状态,可集群部署。 -
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s + 120s能够感知,在此期间内发往Broker的所有消息都会失败。
Consumer
-
消息的消费者 -
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Broker Master、Broker Slave建立长连接,且定时向Broker Master、Broker Slave发送心跳。Consumer既可以从Broker Master订阅消息,也可以从Broker Slave订阅消息,订阅规则由Broker配置决定。 -
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s + 120s才能感知。 -
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
Topic
Queue
-
==一个topic下,可以设置多个queue(消息队列),默认4个队列。== -
当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。 -
在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
四、RocketMQ下载与安装
1、创建一个本地文件夹
mkdir ?/usr/local/rocketmq
2、安装 Namesrv ,拉取镜像
docker pull rocketmqinc/rocketmq:4.4.0
3、启动namesrv
docker run -d -p 9876:9876 ?-v /usr/local/rocketmq/data/namesrv/logs:/root/logs ?-v /usr/local/rocketmq/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
4、 安装 broker 服务器
????????(1) 在 usr/local/rocketmq/创建conf 目录,并在conf下创建 broker.conf 文件
#创建目录 mkdir /usr/local/rocketmq/conf
#创建文件broker.conf文件 cd /usr/local/rocketmq/conf vim broker.conf
#把文件内容拷贝到文件上 brokerClusterName = DefaultCluster brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1 = XXX.XXX.XX.XXX(自己的linux机器的ip地址) brokerName = broker-tanhua
????????(2)安装brokerServer容器, 启动broker
docker run -d -p 10911:10911 -p 10909:10909 -v ?/usr/local/rocketmq/data/broker/logs:/root/logs -v ?/usr/local/rocketmq/rocketmq/data/broker/store:/root/store -v ?/usr/local/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
此时可以用docker ps 查询一下namesrv和broker是否启动
5、安装 rocketmq 控制台 ,拉取镜像
docker pull styletang/rocketmq-console-ng
#这里ip地址要修改成你自己的linux的ip
docker run -di ?--name=rocketmq-console-ng -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=xxx.xxx.xx.xxx:9876 -Drocketmq.config.isVIPChannel=false -Duser.timezone='Asia/Shanghai'" -v /etc/localtime:/etc/localtime -p 8001:8080 -t styletang/rocketmq-console-ng
8001是映射的端口,可自行设置。启动后可以在浏览器打开:http://mq主机ip:映射端口
到此,rocketmq服务已启动,相应的控制台项目也启动了。
五、Springboot整合RocketMQ
1、生产者工程
????????创建项目rocket_producer,添加rocketmq-spring-boot-starter 等依赖包
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
????????创建启动引导类
package cn.itcast;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
????????编写application.yml配置文件
rocketmq: ? name-server: 192.168.75.128:9876 ? producer: ? ? #生产组的名字是可以随意的,主要保证消息的高可用 ? ? group: tanhua ? ?#设置发送消息的超时时间 ? ? send-message-timeout: 5000
????????编写测试类,通过RocketMQTemplate发送消息
package cn.itcast.test;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test01(){
/**
* 参数一: 消息的主题
* 参数二:消息的内容
*/
rocketMQTemplate.convertAndSend("topic","rocketmq发送消息");
}
}
2、消费者工程
? ? ? ? 导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
????????创建启动引导类
package cn.itcast;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketConsumerApplication.class, args);
}
}
????????编写配置文件
rocketmq: ? name-server: xxx.xxx.xx.xxx:9876
#ip地址这里要修改成自己linux的ip
????????编写消息监听器,实现RocketMQListener 接口 ????????
package cn.itcast.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component //创建消息监听器的对象
//指定监听的主题并且指定消费组,消费的主题一定要与生产主题要一致,
// 消费者的名字可以随意的,我们一般一个主题对应一个消费组, 一个消息可以被不同的消费组消费多次的。同一个消费组只会消费一次同一个消息
@RocketMQMessageListener(topic = "topic",consumerGroup = "group2")
public class MyMessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("监听到的消息:"+message);
}
}
六、消息类型
消息的生产者:分别发送默认同步消息、同步消息、异步消息。
|-- RocketMQTemplate
|-- convertAndSend() 默认同步消息, 没有返回结果
|-- syncSend() 发送同步消息, 可以获取消息返回结果(回执)
|-- asyncSend() 发送异步消息
|-- sendOneWay() 直接发送消息,不会等待应答
1、在实际使用场景中,利用何种发送方式,可以总结如下:
-
当发送的消息不重要时,采用one-way 方式,比如:物流实时消息,以提高吞吐量; -
当发送的消息很重要,而且下一步必须使用上一步的操作结果. ,同步消息 结算---跳转对应结算清单页面 -
当发送的消息很重要,且对响应时间非常敏感的时候采用async 方式;,发送优惠券
方式 | 速度 | 返回结果 | 可靠性 | 同步 | 快 | 有 | 不丢失消息 | 异步 | 更快 | 有 | 不丢失消息 | 单向(直接发送) | 最快 | 无 | 可能丢失消息 |
七、顺序消息
在MQ的模型中,顺序需要由3个阶段去保障: ?? ?1.消息被发送时保持顺序 ?? ?2.消息被存储时保持和发送的顺序一致 ?? ?3.消息被消费时保持和存储的顺序一致
@Test
public void sendOrderMessage(){
//1. 得到消息
List<Order> orderList = Order.buildOrders();
//2. 消息的队列选择器 作用:指定消息进入到指定的队列中。
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
/**
* select方法调用时机: 每发送一个消息的时候都会触发这个方法,这个方法的返回值代表了该消息进入的队列
* @param list 当前主题下的所有队列
* @param message 消息的内容
* @param o 消息的标识符
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//1. 得到消息的标识
int orderId = Integer.parseInt((String) o);
//2. 根据订单号算出该订单存储的队列索引号
int index = orderId%list.size(); // 4%4 =0 5%4 = 1
return list.get(index);
}
});
//3. 遍历消息集合,发送消息
for (Order order : orderList) {
/**
* 参数一: 消息的主题
* 参数二: 消息的内容
* 参数三: 消息的唯一标识
*/
rocketMQTemplate.syncSendOrderly("order_topic",order.toString(),order.getId()+"");
}
}
|