网页右边,向下滑有目录索引,可以根据标题跳转到你想看的内容 |
---|
如果右边没有就找找左边 |
一、AMQP
- 高级消息队列协议,是进程之间传递异步消息的网络协议
- 发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取
队列: 队列是数据结构中的概念。数据存储在一个队列中,数据是有序的,先进先出,后进后出。一侧负责进数据,一侧负责出数据。消息队列MQ: 很多功能都是基于此队列结构实现的
二、RabbitMQ简介
- 由Erlang语言编写的基于AMQP的消息中间件。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。
- 不使用MQ时(应用程序之间相互传递消息)
- 使用MQ解决耦合(通过消息中间件传递消息)
- 排队算法:使用消息队列特征
- 秒杀活动:使用消息队列特征(先进先出,肯定是先进的人,优先出去执行业务代码)
- 消息分发:使用消息异步特征
- 异步处理:使用消息异步特征
- 数据同步:使用消息异步特征
- 处理耗时任务:使用消息异步特征
- 流量削峰:再流量突然暴涨时,做限流等处理
概念和原理(和AMQP大致相同,只是多了点细节,只有中间红色框框中内容,才是RabbitMQ的内容,分为生产者,消息队列,消费者) |
---|
- Message
- 消息。是不具名的,有消息头和消息体组成。消息体是不透明的,消息头由一系列可选属性组成,包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
- Publisher
- 消息生产者。是一个向交换机发布消息的客户端应用程序。
- Consumer
- 消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
- Exchange
- 交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型:direct(发布与订阅 完全匹配)、fanout(广播)、topic(主题,规则匹配)
- Binding
- 绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的
- Queue
- 消息队列。用来保存消息直到发送给消费者。是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走
- Routing-key
- 路由键。RabbitMQ决定消息该投递到哪个队列的规则(也可以理解为队列的名称,路由键是key,队列是value)。队列通过路由键绑定到交换机。消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞。
- Connection
- 链接。指rabbit服务器和服务建立的TCP链接。
- Channel
- 信道。信道是TCP里面的虚拟链接,电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。TCP一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
- Virtual Host
- 虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/
- Borker
- 表示消息队列服务器实体
- 交换器和队列的关系
- 交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。也就是说,消息到队列的过程中,消息首先会经过交换机,接下来交换机在通过路由键匹配分发消息到具体的队列中。路由键可以理解为匹配的规则
- 为什么需要信道,为什么不是TCP直接通信
- TCP的创建和销毁开销特别大。需要3次握手,销毁需要4次分手。如果不用信道,应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也有限制,必定会造成性能瓶颈。信道原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限信道,即使每秒成千上万条请求,也不会成为性能的瓶颈
总结
- routing-key决定一个生产者生产的消息发送到哪个队列(可以一个,也可以同时发送给多个队列),此时消息带着信息,发送给交换器,交换器记录,分析routing-key得知这个消息要发送给哪个队列(假设是queue1),此时查看binding-key,发现queue1队列的绑定,那么通过binding-key,发送到指定队列
三、Erlang安装
- 一个编程语言,RabbitMQ是使用Erlang语言编写,所以需要先配置Erlang
1. 修改Linux主机名(如果你配置过的话,就不需要管它了),RabbitMQ是通过主机名进行访问的,必须指定能访问的主机名,记得配置为reboot重启一下 |
---|
vim /etc/sysconfig/network
vim /etc/hosts
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel
3. 上传otp_src_版本号.tar.gz并解压到linux中,解压时注意,此压缩包没有gzip属性,解压参数没有z,只有xf |
---|
- 首先查看对应版本 https://www.rabbitmq.com/which-erlang.html#compatibility-matrix
- 下载:http://erlang.org/download/
- 上传
- 解压(参数中不可以有z,xf即可)
- 创建/usr/local/erlang文件夹,作为安装文件夹
mkdir /usr/local/erlang
- 进入刚解压的文件目录,配置参数
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support -enable-kernel-poll --enable-hipe --without-javac
- 编译
make
- 安装
make install
- 修改/etc/profile环境变量文件,或者修改自己的环境变量文件
export PATH=$PATH:/usr/local/erlang/bin
- 重新加载环境变量
source /etc/profile
erl -version
四、安装RabbitMQ
1. 配置安装
1.下载安装包,解压,复制到/usr/local下重命名为rabbitmq |
---|
- 下载: https://www.rabbitmq.com/
- 上传,解压,复制到/usr/local下重命名为rabbitmq
tar xf rabbitmq-server-generic-unix-3.9.2.tar.xz
cp -r rabbitmq_server-3.9.2 /usr/local/rabbitmq
export PATH=$PATH:/usr/local/rabbitmq/sbin
2. 开启web管理插件
1. 进入rabbitmq/sbin目录,查看插件列表,然后生效管理插件 |
---|
cd /usr/local/rabbitmq/sbin
./rabbitmq-plugins list
./rabbitmq-plugins enable rabbitmq_management
3. 后台运行rabbitmq,访问web管理页面
- 启动rabbitmq
- 访问页面,15672端口,默认管理员账户为guest,密码也是guest,但是这个用户名只能本地使用,
想要从外界通过浏览器访问web页面,需要新建用户
./rabbitmq-server -detached
- 停止命令,如果无法停止,使用kill -9 进程号 ,进行关闭
./rabbitmqctl stop_app
4. RabbitMQ账户管理,解决外部浏览器无法通过guest用户访问web页面的问题
cd /usr/local/rabbitmq/sbin
./rabbitmqctl add_user 用户名 密码
2. 给用户授予管理员角色,命令中root为用户名 |
---|
./rabbitmqctl set_user_tags 用户名 administrator
- "/"表示虚拟机
- “." ".” “.*” 表示完整权限
./rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
五、交换器
交换器负责接收客户端传递过来的消息,并转发到对应的队列中。我们介绍在RabbitMQ中支持的四种交换器 |
---|
- Direct Exchange:直连交换器(默认)
- Fanout Exchange:扇形交换器
- Topic Exchange:主题交换器
- Header Exchange:首部交换器
1. java测试环境搭建,这里建立了生产者
- org.springframework.amqp.core.Queue:队列
- AmqpTemplate:操作RabbitMQ的接口,复制发送或接收消息
- @PabbitListener(queues="")注解某个方法为接收消息方法
- 引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
- 编写配置文件
spring:
rabbitmq:
host: 192.168.10.105
username: root
password: root
- 启动类
- 编写配置类
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
@Bean
protected Queue queue(){
Queue queue = new Queue("MyQueue");
return queue;
}
}
- 测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest(classes = PublisherApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MyTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void test(){
amqpTemplate.convertAndSend("MyQueue","第一个参数是发送队列名称,这个参数代表发送的内容");
System.out.println("发送结束");
}
}
2. direct交换器
- direct交换器是RabbitMQ默认交换器,默认会进行公平调度。所有接收者依次从消息队列中获取值。Publisher给哪个队列发消息,就一定是给哪个队列发送消息。对交换器绑定的其它队列没有任何影响。
- 创建项目,添加依赖
- 配置文件,将生产者中的配置文件复制过来即可
- 启动类
- 接收消息类
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DempReceive {
@RabbitListener(queues = "MyQueue")
public void demo(String msg){
System.out.println("获取到的消息为:"+msg);
}
}
- 运行(下图可见,这个项目一直在运行监听着队列,可见是一个长链接)
- 再次发送信息给队列
- 重写接收类
- 从上面的运行结果可知,Direct交换器会进行公平调度。所有接收者依次从消息队列中获取值。Publisher给哪个队列发消息,就一定是给哪个队列发送消息
3. fanout交换器
- 扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换机上的队列。并且每个队列消息中第一个Consumer能收到消息。
代码演示中,需要使用的注解和api,一个交换器需要绑定多个队列 |
---|
- FanoutExchange:fanout交换器
- Binding:绑定交换器和队列
- BindingBuilder:Binding的构建器
- amq.fanout:内置fanout交换器名称
- 生产者,配置类中创建队列,交换器,并绑定队列和交换器
@Bean
public Queue createQueue1(){
return new Queue("myfanout1");
}
@Bean
public Queue createQueue2(){
return new Queue("myfanout2");
}
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("amq.fanout");
}
@Bean
public Binding binding(Queue createQueue1,FanoutExchange getFanoutExchange){
return BindingBuilder.bind(createQueue1).to(getFanoutExchange);
}
@Bean
public Binding binding2(Queue createQueue2,FanoutExchange getFanoutExchange){
return BindingBuilder.bind(createQueue2).to(getFanoutExchange);
}
- 编写测试
@Test
public void test2(){
amqpTemplate.convertAndSend("amq.fanout","core","第一个参数指定交换器不写就是用默认交换器,第二个参数是Routing Key,第三参数是消息");
}
@Test
public void test3(){
amqpTemplate.convertAndSend("amq.fanout","core","给fanout的第二条消息");
}
- 接收者,接收方法
@RabbitListener(queues = "myfanout1")
public void demo3(String msg){
System.out.println("fanout1接收者获取到的消息为:"+msg);
}
@RabbitListener(queues = "myfanout2")
public void demo4(String msg){
System.out.println("fanout2接收者获取到的消息为:"+msg);
}
4. topic交换器
- 允许在路由键(Routing Key)中出现匹配规则。
- 路由键的写法和包写法相同,com.yzpnb.xxxx.xxxx格式
- 在绑定时可以带有下面特殊符号
- *:代表一个单词
- #:0个或多个字符
- 接收方依然是公平调度,同一个队列中内容轮换获取值
代码演示中,需要使用的注解和api,一个交换器需要绑定多个队列 |
---|
- TopicExchange:Topic交换器
- amq.topic:内置topic交换器名称
- BindingBuilder:Binding的构建器
- amq.fanout:内置fanout交换器名称
- 生产者,配置类中创建队列,交换器,并绑定队列和交换器
@Bean
public Queue topicQueue2(){
return new Queue("mytopic2");
}
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("amq.topic");
}
@Bean
public Binding TopicBinding(Queue topicQueue1,TopicExchange getTopicExchange){
return BindingBuilder.bind(topicQueue1).to(getTopicExchange).with("com.yzpnb.*");
}
@Bean
public Binding TopicBinding2(Queue topicQueue2,TopicExchange getTopicExchange){
return BindingBuilder.bind(topicQueue2).to(getTopicExchange).with("com.yzpnb.a");
}
- 编写测试
@Test
public void test4(){
amqpTemplate.convertAndSend("amq.topic","com.yzpnb.a","路由键为com.yzpnb.a");
}
@Test
public void test5(){
amqpTemplate.convertAndSend("amq.topic","com.yzpnb.b","路由键为com.yzpnb.b");
}
@Test
public void test6(){
amqpTemplate.convertAndSend("amq.topic","com.yzpnb.c","路由键为com.yzpnb.c");
}
- 接收者,接收方法
@RabbitListener(queues = "mytopic1")
public void demo5(String msg){
System.out.println("mytopic1接收者获取到的消息为:"+msg);
}
@RabbitListener(queues = "mytopic2")
public void demo6(String msg){
System.out.println("mytopic2接收者获取到的消息为:"+msg);
}
六、实现数据同步功能(Dubbo+Solr+RabbitMQ)
|