IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ消息队列 -> 正文阅读

[大数据]RabbitMQ消息队列

网页右边,向下滑有目录索引,可以根据标题跳转到你想看的内容
如果右边没有就找找左边

一、AMQP

AMQP
  1. 高级消息队列协议,是进程之间传递异步消息的网络协议
AMQP工作过程
  1. 发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取
    在这里插入图片描述
消息队列
  1. 队列:队列是数据结构中的概念。数据存储在一个队列中,数据是有序的,先进先出,后进后出。一侧负责进数据,一侧负责出数据。
  2. 消息队列MQ:很多功能都是基于此队列结构实现的
    在这里插入图片描述

二、RabbitMQ简介

RabbitMQ
  1. 由Erlang语言编写的基于AMQP的消息中间件。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。
解决应用耦合
  1. 不使用MQ时(应用程序之间相互传递消息)
    在这里插入图片描述
  2. 使用MQ解决耦合(通过消息中间件传递消息)
    在这里插入图片描述
适用场景
  1. 排队算法:使用消息队列特征
  2. 秒杀活动:使用消息队列特征(先进先出,肯定是先进的人,优先出去执行业务代码)
  3. 消息分发:使用消息异步特征
  4. 异步处理:使用消息异步特征
  5. 数据同步:使用消息异步特征
  6. 处理耗时任务:使用消息异步特征
  7. 流量削峰:再流量突然暴涨时,做限流等处理
概念和原理(和AMQP大致相同,只是多了点细节,只有中间红色框框中内容,才是RabbitMQ的内容,分为生产者,消息队列,消费者)

在这里插入图片描述

  1. Message
  1. 消息。是不具名的,有消息头和消息体组成。消息体是不透明的,消息头由一系列可选属性组成,包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
  1. Publisher
  1. 消息生产者。是一个向交换机发布消息的客户端应用程序。
  1. Consumer
  1. 消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
  1. Exchange
  1. 交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型:direct(发布与订阅 完全匹配)、fanout(广播)、topic(主题,规则匹配)
  1. Binding
  1. 绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的
  1. Queue
  1. 消息队列。用来保存消息直到发送给消费者。是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走
  1. Routing-key
  1. 路由键。RabbitMQ决定消息该投递到哪个队列的规则(也可以理解为队列的名称,路由键是key,队列是value)。队列通过路由键绑定到交换机。消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞。
  1. Connection
  1. 链接。指rabbit服务器和服务建立的TCP链接。
  1. Channel
  1. 信道。信道是TCP里面的虚拟链接,电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。TCP一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
  1. Virtual Host
  1. 虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/
  1. Borker
  1. 表示消息队列服务器实体
  1. 交换器和队列的关系
  1. 交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。也就是说,消息到队列的过程中,消息首先会经过交换机,接下来交换机在通过路由键匹配分发消息到具体的队列中。路由键可以理解为匹配的规则
  1. 为什么需要信道,为什么不是TCP直接通信
  1. TCP的创建和销毁开销特别大。需要3次握手,销毁需要4次分手。如果不用信道,应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也有限制,必定会造成性能瓶颈。信道原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限信道,即使每秒成千上万条请求,也不会成为性能的瓶颈
  1. 总结
  1. routing-key决定一个生产者生产的消息发送到哪个队列(可以一个,也可以同时发送给多个队列),此时消息带着信息,发送给交换器,交换器记录,分析routing-key得知这个消息要发送给哪个队列(假设是queue1),此时查看binding-key,发现queue1队列的绑定,那么通过binding-key,发送到指定队列

三、Erlang安装

Erlang
  1. 一个编程语言,RabbitMQ是使用Erlang语言编写,所以需要先配置Erlang
1. 修改Linux主机名(如果你配置过的话,就不需要管它了),RabbitMQ是通过主机名进行访问的,必须指定能访问的主机名,记得配置为reboot重启一下
vim /etc/sysconfig/network

在这里插入图片描述

vim /etc/hosts

在这里插入图片描述

2. 安装依赖

在这里插入图片描述

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel
3. 上传otp_src_版本号.tar.gz并解压到linux中,解压时注意,此压缩包没有gzip属性,解压参数没有z,只有xf
  1. 首先查看对应版本 https://www.rabbitmq.com/which-erlang.html#compatibility-matrix
    在这里插入图片描述
  2. 下载:http://erlang.org/download/
    在这里插入图片描述
  3. 上传
    在这里插入图片描述
  4. 解压(参数中不可以有z,xf即可)
    在这里插入图片描述
4. 安装
  1. 创建/usr/local/erlang文件夹,作为安装文件夹
mkdir /usr/local/erlang
  1. 进入刚解压的文件目录,配置参数
    在这里插入图片描述
 ./configure --prefix=/usr/local/erlang  --with-ssl  --enable-threads --enable-smp-support -enable-kernel-poll  --enable-hipe  --without-javac
  1. 编译
    在这里插入图片描述
make
  1. 安装
    在这里插入图片描述
make install
5. 修改环境变量
  1. 修改/etc/profile环境变量文件,或者修改自己的环境变量文件
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
#erlang
export PATH=$PATH:/usr/local/erlang/bin
  1. 重新加载环境变量
    在这里插入图片描述
source /etc/profile

erl -version

四、安装RabbitMQ

1. 配置安装

1.下载安装包,解压,复制到/usr/local下重命名为rabbitmq
  1. 下载: https://www.rabbitmq.com/
    在这里插入图片描述在这里插入图片描述
    在这里插入图片描述在这里插入图片描述
  1. 上传,解压,复制到/usr/local下重命名为rabbitmq
    在这里插入图片描述
    在这里插入图片描述
tar xf rabbitmq-server-generic-unix-3.9.2.tar.xz
cp -r rabbitmq_server-3.9.2 /usr/local/rabbitmq
2. 配置环境变量

在这里插入图片描述
在这里插入图片描述

#RabbitMQ
export PATH=$PATH:/usr/local/rabbitmq/sbin

2. 开启web管理插件

1. 进入rabbitmq/sbin目录,查看插件列表,然后生效管理插件

在这里插入图片描述
在这里插入图片描述

# 进入sbin目录
cd /usr/local/rabbitmq/sbin
# 查看插件列表
./rabbitmq-plugins list
# 生效管理插件
./rabbitmq-plugins enable rabbitmq_management

3. 后台运行rabbitmq,访问web管理页面

  1. 启动rabbitmq
    在这里插入图片描述
  2. 访问页面,15672端口,默认管理员账户为guest,密码也是guest,但是这个用户名只能本地使用,想要从外界通过浏览器访问web页面,需要新建用户
    在这里插入图片描述
    在这里插入图片描述
./rabbitmq-server -detached
  1. 停止命令,如果无法停止,使用kill -9 进程号 ,进行关闭
./rabbitmqctl stop_app

4. RabbitMQ账户管理,解决外部浏览器无法通过guest用户访问web页面的问题

在这里插入图片描述
在这里插入图片描述

1. 创建账户
cd /usr/local/rabbitmq/sbin
./rabbitmqctl add_user 用户名 密码
2. 给用户授予管理员角色,命令中root为用户名
./rabbitmqctl set_user_tags 用户名 administrator
3. 给用户授权
  1. "/"表示虚拟机
  2. “." ".” “.*” 表示完整权限
./rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"

五、交换器

在这里插入图片描述

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。我们介绍在RabbitMQ中支持的四种交换器
  1. Direct Exchange:直连交换器(默认)
  2. Fanout Exchange:扇形交换器
  3. Topic Exchange:主题交换器
  4. Header Exchange:首部交换器

1. java测试环境搭建,这里建立了生产者

代码演示中,需要使用的注解和api
  1. org.springframework.amqp.core.Queue:队列
  2. AmqpTemplate:操作RabbitMQ的接口,复制发送或接收消息
  3. @PabbitListener(queues="")注解某个方法为接收消息方法
搭建测试交换器的demo环境
  1. 引入依赖
    在这里插入图片描述
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.2.RELEASE</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
  1. 编写配置文件
    在这里插入图片描述
# 配置rabbitmq 服务器地址,用户名,密码
spring:
  rabbitmq:
    host: 192.168.10.105 # 默认localhost
    username: root # 默认guest
    password: root # 默认guest
  1. 启动类
    在这里插入图片描述
  2. 编写配置类
    在这里插入图片描述
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration//标准此类为spring 配置类
public class RabbitmqConfig {

    @Bean//将这个对象添加到Spring的IOC容器
    protected Queue queue(){//注意Queue的包是org.springframework.amqp.core.Queue;
        Queue queue = new Queue("MyQueue");
        return queue;
    }
}
  1. 测试类
    在这里插入图片描述
    在这里插入图片描述
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@SpringBootTest(classes = PublisherApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MyTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void test(){
        amqpTemplate.convertAndSend("MyQueue","第一个参数是发送队列名称,这个参数代表发送的内容");
        System.out.println("发送结束");
    }
}

2. direct交换器

  1. direct交换器是RabbitMQ默认交换器,默认会进行公平调度。所有接收者依次从消息队列中获取值。Publisher给哪个队列发消息,就一定是给哪个队列发送消息。对交换器绑定的其它队列没有任何影响。
新建项目,代表消费者
  1. 创建项目,添加依赖
    在这里插入图片描述
  2. 配置文件,将生产者中的配置文件复制过来即可
    在这里插入图片描述
  3. 启动类
    在这里插入图片描述
创建接收消息的类,编写测试代码
  1. 接收消息类
    在这里插入图片描述
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component//作为组件,加入到Spring容器中
public class DempReceive {

    @RabbitListener(queues = "MyQueue")//标注此方法为接收MyQueue队列消息的方法
    public void demo(String msg){
        System.out.println("获取到的消息为:"+msg);
    }
}
  1. 运行(下图可见,这个项目一直在运行监听着队列,可见是一个长链接)
    在这里插入图片描述
  2. 再次发送信息给队列
    在这里插入图片描述
  3. 重写接收类
    8
  1. 从上面的运行结果可知,Direct交换器会进行公平调度。所有接收者依次从消息队列中获取值。Publisher给哪个队列发消息,就一定是给哪个队列发送消息

3. fanout交换器

  1. 扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换机上的队列。并且每个队列消息中第一个Consumer能收到消息。
代码演示中,需要使用的注解和api,一个交换器需要绑定多个队列
  1. FanoutExchange:fanout交换器
  2. Binding:绑定交换器和队列
  3. BindingBuilder:Binding的构建器
  4. amq.fanout:内置fanout交换器名称
  1. 生产者,配置类中创建队列,交换器,并绑定队列和交换器
    在这里插入图片描述
//测试fanout交换器需要多个队列
@Bean
public Queue createQueue1(){
    return new Queue("myfanout1");
}
@Bean
public Queue createQueue2(){
    return new Queue("myfanout2");
}
//获取fanout交换器
@Bean
public FanoutExchange getFanoutExchange(){
    return new FanoutExchange("amq.fanout");
}

//将队列绑定给fanout交换器
@Bean
public Binding binding(Queue createQueue1,FanoutExchange getFanoutExchange){//注意参数名,需要和你上面注入IOC容器的方法名一致
    return BindingBuilder.bind(createQueue1).to(getFanoutExchange);//将createQueue1方法注入的队列对象,绑定给Fanout交换器
}
@Bean
public Binding binding2(Queue createQueue2,FanoutExchange getFanoutExchange){
    return BindingBuilder.bind(createQueue2).to(getFanoutExchange);//将createQueue2方法注入的队列对象,绑定给Fanout交换器
}
  1. 编写测试
    在这里插入图片描述
@Test
public void test2(){
    amqpTemplate.convertAndSend("amq.fanout","core","第一个参数指定交换器不写就是用默认交换器,第二个参数是Routing Key,第三参数是消息");
}
@Test
public void test3(){
    amqpTemplate.convertAndSend("amq.fanout","core","给fanout的第二条消息");
}
  1. 接收者,接收方法
    在这里插入图片描述
    @RabbitListener(queues = "myfanout1")//标注此方法为接收myfanout1队列消息的方法
    public void demo3(String msg){
        System.out.println("fanout1接收者获取到的消息为:"+msg);
    }
    @RabbitListener(queues = "myfanout2")//标注此方法为接收myfanout2队列消息的方法
    public void demo4(String msg){
        System.out.println("fanout2接收者获取到的消息为:"+msg);
    }

4. topic交换器

  1. 允许在路由键(Routing Key)中出现匹配规则。
  2. 路由键的写法和包写法相同,com.yzpnb.xxxx.xxxx格式
  3. 在绑定时可以带有下面特殊符号
  1. *:代表一个单词
  2. #:0个或多个字符
  1. 接收方依然是公平调度,同一个队列中内容轮换获取值
代码演示中,需要使用的注解和api,一个交换器需要绑定多个队列
  1. TopicExchange:Topic交换器
  2. amq.topic:内置topic交换器名称
  3. BindingBuilder:Binding的构建器
  4. amq.fanout:内置fanout交换器名称
  1. 生产者,配置类中创建队列,交换器,并绑定队列和交换器
    @Bean
    public Queue topicQueue2(){
        return new Queue("mytopic2");
    }
    //获取topic交换器
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("amq.topic");
    }
    //将队列绑定给fanout交换器
    @Bean
    public Binding TopicBinding(Queue topicQueue1,TopicExchange getTopicExchange){
        return BindingBuilder.bind(topicQueue1).to(getTopicExchange).with("com.yzpnb.*");//with表示Routing Key,当Routing Key 为com.yzpnb.任意单词   时,匹配这个绑定
    }
    @Bean
    public Binding TopicBinding2(Queue topicQueue2,TopicExchange getTopicExchange){
        return BindingBuilder.bind(topicQueue2).to(getTopicExchange).with("com.yzpnb.a");//Routing Key为com.yzpnb.a是,匹配这个绑定
    }
  1. 编写测试
    @Test
    public void test4(){
        //第一个参数是交换器,第二个是路由键,第三个是消息
        amqpTemplate.convertAndSend("amq.topic","com.yzpnb.a","路由键为com.yzpnb.a");
    }
    @Test
    public void test5(){
        amqpTemplate.convertAndSend("amq.topic","com.yzpnb.b","路由键为com.yzpnb.b");
    }
    @Test
    public void test6(){
        amqpTemplate.convertAndSend("amq.topic","com.yzpnb.c","路由键为com.yzpnb.c");
    }
  1. 接收者,接收方法
    在这里插入图片描述
    @RabbitListener(queues = "mytopic1")//标注此方法为接收myfanout2队列消息的方法
    public void demo5(String msg){
        System.out.println("mytopic1接收者获取到的消息为:"+msg);
    }
    @RabbitListener(queues = "mytopic2")//标注此方法为接收myfanout2队列消息的方法
    public void demo6(String msg){
        System.out.println("mytopic2接收者获取到的消息为:"+msg);
    }

六、实现数据同步功能(Dubbo+Solr+RabbitMQ)

我将其放在了另一篇博客:https://blog.csdn.net/grd_java/article/details/119716566
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:51:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:24:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码