前言: 学习B站UP主狂神说视频笔记整理视频链接
什么是中间件
中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
为什么要使用消息中间件
中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。
消息中间件的应用场景
1:跨系统数据传递
2:高并发的流量削峰
3:数据的分发和异步处理
4:大数据分析与传递
5:分布式事务
比如你有一个数据要进行迁移或者请求并发过多的时候,比如你有10W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行。
常见的消息中间件
ActiveMQ.RabbitMQ、Kafka、RocketMQ等。
消息队列的本质及设计
它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。
MQ消息队列:负责数据的传接受,存储和传递,所以性能要过于普通服务和技术。其背后肯定要遵循某种协议

消息队列的核心组成部分
1:消息的协议 2:消息的持久化机制 3:消息的分发策略 4:消息的高可用,高可靠 5:消息的容错机制
消息队列协议
 我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议
所谓协议是指: 1:计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。 2∶和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高。 3:协议对数据格式和计算机之间交换数据都必须严格遵守规范。
而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。
面试题:为什么消息中间件不直接使用http协议呢? 1:因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。 2大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
AMQP协议
AMQP:(全称: Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。 特性: 1:分布式事务支持。2:消息的持久化支持。 3:高性能和高可靠的消息处理优势。

OpenMessage协议
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。 特点: 1∶结构简单2∶解析速度快 3:支持事务和持久化设计。 
Kafka协议
Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。特点是: 1:结构简单2︰解析速度快3:无事务支持4:有持久化设计

消息队列分发策略
MQ消息队列有如下几个角色 1:生产者 2:存储消息 3:消费者 那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。 
消息队列高可用及高可靠
什么是高可用
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。 当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
集群模式1-Master-Slave主从数据共享
 生产者讲消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用,
集群模式2-Master-Slave主从数据同步
 这种模式写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后续的rabbtmq中会有使用。
集群模式3-多集群同步部署
 和上面区别不大,但是它的写入是可以任意节点写
集群模式4-多集群转发部署
 如果你插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。 它会对描述信息也就是元数据信息就行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。
集群模式5-Master-slave与Breoker-cluster组合的方案
 反正终归三句话:1:要么消息共享,2︰要么消息同步 3:要么元数据共享
什么是高可靠
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠.
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。如何保证中间件消息的可靠性呢?可以从两个方面考虑: 1:消息的传输:通过协议来保证系统间数据解析的正确性。 2:消息的存储可靠:通过持久化来保证消息的可靠性。
RabbitMQ入门
什么是RabbitMQ
简单概述: RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。 
官网地址
RabbitMQ安装
安装包下载
RabbitMQ下载地址:https://www.rabbitmq.com/download.html 环境准备:CentOS7.X+/Erlang
注意:下载RabbitMQ和Erlang要注意版本号对应  Erlang下载地址:https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
安装Erlang
上传rpm包到CentOS服务器上 
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
sudo yum install erlang
erl
whereis erlang
socat安装
yum install -y socat
RabbitMQ安装
rpm -Uvh rabbitmq-server-3.8.19-1.el7.noarch.rpm
yum install rabbitmq-server -y
systemctl start rabbitmq-server
systemctl status rabbitmq-server
查看RabbitMQ状态 
systemctl enable rabbitmq-server
systemctl stop rabbitmq-server
RabbitMQWeb管理界面
默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效
rabbitmq-plugins enable rabbitmq_management
说明: rabbitmq有一个默认账号和密码是:guest|默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。

systemctl restart rabbitmq-server
阿里云等服务器放行15672端口
浏览器访问 http://IP地址:15672

授权账号和密码
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
用户级别:
- 1、administrator可以登录控制台、查看所有信息、可以对rabbitmq进行管理·
- 2、monitoring 监控者登录控制台,查看所有信息
- 3、policymaker 策略制定者登录控制台,指定策略
- 4、managment普通管理员登录控制台
- 5、none 不能访问
授权角色权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
之后就可以通过新账户密码登录图形化界面了!  其他命令: 
RabbitMQ入门案例-Simple简单模式
创建项目
创建Maven简单项目
由于是入门项目,所以只需要导入RabbitMQ原生依赖即可
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
编写生产者
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("root");
factory.setPassword("XXXXX");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello,RabbitMQ!";
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("消息发生完毕");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
编写消费者
public class Consumer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
RabbitMQ核心组成部分
 核心概念:
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
Channel :网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若千个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)Bindings : Exchange和Queue之问的虚拟连接,binding中可以保护多个routing key.
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ消息模式
官网教程
Simple简单模式

代码参考本文入门案例
注意点: 1.从官网图片上来看,虽然没有显示交换机,实则如果没有指定交换机,会有一个默认交换机,来将我们的消息推送到队列中

fanout发布订阅模式
 生产者 生产消息 每一个消费者都可以收到相同的消息
1.图型化界面创建fanout交换机  2.交换机绑定队列  3.如果没有队列,提取创建好队列  4.编写生产者
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello,RabbitMQ!";
String exchangeName = "faout-exchange";
String routeName = "";
String type = "fanout";
channel.basicPublish(exchangeName,routeName,null,message.getBytes());
System.out.println("消息发生完毕");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
5.编写消费者
public class Consumer implements Runnable{
public static void main(String[] args) {
new Thread(new Consumer(),"queue1").start();
new Thread(new Consumer(),"queue2").start();
}
public void run() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = Thread.currentThread().getName();
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收消息失败");
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
可以看到,在代码中我们并没有创建交换机,指定队列,那是因为我们在图形化界面中已经提前创建并绑定好了
Routing路由模式
 消息携带路由Key,根据消费者Key匹配消息推送到哪个消费者
1.创建direct交换机,绑定路由Key  2.编写生产者
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello,RabbitMQ!";
String exchangeName = "direct-exchange";
String routeName = "email";
String type = "direct";
channel.basicPublish(exchangeName,routeName,null,message.getBytes());
System.out.println("消息发生完毕");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3.消费者不用变动
Topics主题模式
 主题模式在路由模式之上又进行了升级,可以根据路由Key模糊匹配推送到指定队列
#: 代表零级或多级,用图上的列子来说它可以是这样的lazy.XXXX.XXX.XXX ,lazy *: 代表一级,它有且只能是这样的XXX.orange.XXX
1.创建topic交换机  2.绑定队列  3.编写生产者
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello,RabbitMQ!";
String exchangeName = "topic-exchange";
String routeName = "xxxx.order";
String type = "topic";
channel.basicPublish(exchangeName,routeName,null,message.getBytes());
System.out.println("消息发生完毕");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Headers参数模式
根据参数匹配到对应的队列
1.创建headers交换机,根据参数绑定队列  2.未来就可以通过携带附加参数,来匹配到对应的队列 
完整的声明创建方式
在以上的案例中,都是通过图形化界面与代码相互配合,通过图形化界面先创建好交换机,绑定上队列,然后在代码中直接使用
那么能不能通过代码的方式创建交换机绑定队列呢?
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.108.204.96");
factory.setUsername("admin");
factory.setPassword("admin");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello,RabbitMQ!";
String exchangeName = "direct-message-exchange";
String routeName = "email";
String type = "direct";
channel.exchangeDeclare(exchangeName,type,true);
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
channel.queueBind("queue5",exchangeName,"email");
channel.queueBind("queue6",exchangeName,"sms");
channel.queueBind("queue7",exchangeName,"order");
channel.basicPublish(exchangeName,routeName,null,message.getBytes());
System.out.println("消息发生完毕");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel!=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Word工作队列模式

轮询模式
轮询模式的分发:一个消费者—条,按均分配;
公平分发
公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;
|