IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ 六个工作模式深入理解 -> 正文阅读

[Java知识库]RabbitMQ 六个工作模式深入理解

RabbitMQ 使用场景

服务解耦

常规的微服务调用:
请添加图片描述
RabbitMQ解耦的情况:
请添加图片描述

流量削峰

高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
请添加图片描述

异步调用

请求放入RabbitMQ中后,不管后续。直接继续跑
RabbitMQ对接下游服务,慢慢消化。实现异步

基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
请添加图片描述

Exchange-交换机

根据Binding规则将消息 路由 给服务器中的 队列
ExchangeType决定了 路由消息的行为
常用类型有:
direct、Fanout、Topic

Queue-消息队列

我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中,等待消费者来取。
(如果路由找不到相应的queue则数据会丢失)

Binding Key-绑定键

它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

Routing Key-路由键

生产者发送的,来指定这个消息的路由规则。
这个routing key需要与Exchange Type及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里。

六种工作模式

简单模式——(寄信)——消息具有唯一性

请添加图片描述

请添加图片描述
生产者就是发信人 ——> rabbitmq就是邮政 ——> 消费者就是收信人。

  • 消息向默认交换机发送
  • 默认交换机隐含与所有队列绑定
  • routing key 即为队列名称

因此
exchange参数:为空串
routingKey参数:对于默认交换机,路由键就是目标队列名称

工作模式——(卖家发快递)——消息具有唯一性

请添加图片描述

生产者就是卖家 ——> rabbitmq就是快递 ——> 消费者就是买家。
请添加图片描述
请添加图片描述
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者。
工作模式,就是简单模式在多个消费者情况下的处理方式。需要消息确认(消费者挂了给其他人),需要合理分发(不回执不发送),需要持久化(防止rabbitmq挂掉)

消息确认——手动回执ACK

消费者配置中:
在处理消息的回调对象中,

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);//发送回执。

接受消息的

channel.basicConsume("helloworld", f==alse==, callback, cancel);

一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?
我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者
所以我们需要手动回执。
消费者执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

合理地分发——只接受1条消息

消费者配置中:

channel.basicQos(1);//一次只接收一条消息

我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

消息持久化——队列持久化,消息持久化

当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
生产者,消费者者配置中:

//第二个参数是持久化参数durable
ch.queueDeclare("helloworld", ==true==, false, false, null);//队列持久化

额外属性设置为:
MessageProperties.PERSISTENT_TEXT_PLAIN//消息持久化

发布订阅模式——(发广播)——消息克隆

请添加图片描述

请添加图片描述
生产者需要发送一个路由键,用来指定我们声明的fanout交换机。
fanout交换机,和消费者队列形成绑定。执行群发

在订阅模式下,消费者不再绑定队列,而是每个消费者都有一个属于自己的队列。通过这个队列绑定交换机

生产者配置:
定义交换机

channel.exchangeDeclare("logs", "fanout");//定义名字为logs的交换机,交换机类型为fanout

发送消息,指定路由键即可。

//第一个参数,向指定的交换机发送消息
//第二个参数,不指定队列,由消费者向交换机绑定队列
//如果还没有队列绑定到交换器,消息就会丢失,
//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));

消费者配置:
这里

channel.queueBind(queueName, "logs", "");//这里logs是交换机名,路由键不路由键都无所谓。

路由模式——(可以调频接收的广播)——消息克隆

请添加图片描述

请添加图片描述
前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。
请添加图片描述

我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。

生产者配置:

channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//声明交换机名
ch.basicPublish("direct_logs", level, null, msg.getBytes());//level为路由键。只能由绑定这个路由的,路由键匹配的接收

消费者配置:

channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//消费者也声明这个交换机
channel.queueBind(queueName, "direct_logs", level);
//fanout绑定交换机就够了,direct模式则还需要声明本消费者对应的序列的路由键

主题模式——(可以调频接收的广播)——消息克隆

请添加图片描述
虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

Topic和路由模式基本一样,差别在与路由键可以匹配通配符 *. 和 .#

#:可以匹配路由键中 0-多个 单词
*:可以匹配路由键中 个单词

交换机声明

ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//交换机需要改变

RPC模式

请添加图片描述

如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为 远程过程调用 ,即RPC
因此:这里我们不再理解为生产者消费者。而是客户端服务端

使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址,这里使用默认队列。

并且:
要考虑一个问题,响应消息在一个回调队列中,我们如何分辨这个响应是哪个请求的?
这时候我们需要一个唯一标识来标记每个请求——关联ID (correlationId)
请添加图片描述

客户端代码:

package rabbitmq.rpc;

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
	Connection con;
	Channel ch;
	
	public RPCClient() throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		con = f.newConnection();
		ch = con.createChannel();
	}
	
	public String call(String msg) throws Exception {
		//自动生成对列名,非持久,独占,自动删除
		String replyQueueName = ch.queueDeclare().getQueue();
		//生成关联id
		String corrId = UUID.randomUUID().toString();
		
		//设置两个参数:
		//1. 请求和响应的关联id
		//2. 传递响应数据的queue
		BasicProperties props = new BasicProperties.Builder()
				.correlationId(corrId)
				.replyTo(replyQueueName)
				.build();
		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
		
		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
		
		//接收响应数据的回调对象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
					//把收到的响应数据,放入阻塞集合
					response.offer(new String(message.getBody(), "UTF-8"));
				}
			}
		};

		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//开始从队列接收响应数据
		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
		//返回保存在集合中的响应数据
		return response.take();
	}
	
	public static void main(String[] args) throws Exception {
		RPCClient client = new RPCClient();
		while (true) {
			System.out.print("求第几个斐波那契数:");
			int n = new Scanner(System.in).nextInt();
			String r = client.call(""+n);
			System.out.println(r);
		}
	}
}

服务端代码:

package rabbitmq.rpc;

import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		/*
		 * 定义队列 rpc_queue, 将从它接收请求信息
		 * 
		 * 参数:
		 * 1. queue, 对列名
		 * 2. durable, 持久化
		 * 3. exclusive, 排他
		 * 4. autoDelete, 自动删除
		 * 5. arguments, 其他参数属性
		 */
		ch.queueDeclare("rpc_queue",false,false,false,null);
		ch.queuePurge("rpc_queue");//清除队列中的内容
		
		ch.basicQos(1);//一次只接收一条消息
		
		
		//收到请求消息后的回调对象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//处理收到的数据(要求第几个斐波那契数)
				String msg = new String(message.getBody(), "UTF-8");
				int n = Integer.parseInt(msg);
				//求出第n个斐波那契数
				int r = fbnq(n);
				String response = String.valueOf(r);
				
				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(message.getProperties().getCorrelationId())
						.build();
				/*
				 * 发送响应消息
				 * 1. 默认交换机
				 * 2. 由客户端指定的,用来传递响应消息的队列名
				 * 3. 参数(关联id)
				 * 4. 发回的响应消息
				 */
				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
				//发送确认消息
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
	}

	protected static int fbnq(int n) {
		if(n == 1 || n == 2) return 1;
		
		return fbnq(n-1)+fbnq(n-2);
	}
}
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-12-26 22:00:25  更:2021-12-26 22:02:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:12:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码