RabbitMQ 使用场景
服务解耦
常规的微服务调用: RabbitMQ解耦的情况:
流量削峰
高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
异步调用
请求放入RabbitMQ中后,不管后续。直接继续跑 RabbitMQ对接下游服务,慢慢消化。实现异步
基本概念
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
Exchange-交换机
根据Binding规则将消息 路由 给服务器中的 队列 ExchangeType决定了 路由消息的行为 常用类型有: direct、Fanout、Topic
Queue-消息队列
我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中,等待消费者来取。 (如果路由找不到相应的queue则数据会丢失)
Binding Key-绑定键
它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。
Routing Key-路由键
生产者发送的,来指定这个消息的路由规则。 这个routing key需要与Exchange Type及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里。
六种工作模式
简单模式——(寄信)——消息具有唯一性
生产者就是发信人 ——> rabbitmq就是邮政 ——> 消费者就是收信人。
- 消息向默认交换机发送
- 默认交换机隐含与所有队列绑定
- routing key 即为队列名称
因此 exchange参数:为空串 routingKey参数:对于默认交换机,路由键就是目标队列名称
工作模式——(卖家发快递)——消息具有唯一性
生产者就是卖家 ——> rabbitmq就是快递 ——> 消费者就是买家。 工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。 rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者。 工作模式,就是简单模式在多个消费者情况下的处理方式。需要消息确认(消费者挂了给其他人),需要合理分发(不回执不发送),需要持久化(防止rabbitmq挂掉)
消息确认——手动回执ACK
消费者配置中: 在处理消息的回调对象中,
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
接受消息的
channel.basicConsume("helloworld", f==alse==, callback, cancel);
一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢? 我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者 所以我们需要手动回执。 消费者执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。
合理地分发——只接受1条消息
消费者配置中:
channel.basicQos(1);
我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者
消息持久化——队列持久化,消息持久化
当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据 生产者,消费者者配置中:
ch.queueDeclare("helloworld", ==true==, false, false, null);
额外属性设置为: MessageProperties.PERSISTENT_TEXT_PLAIN//消息持久化
发布订阅模式——(发广播)——消息克隆
生产者需要发送一个路由键,用来指定我们声明的fanout交换机。 fanout交换机,和消费者队列 形成绑定。执行群发
在订阅模式下,消费者不再绑定队列,而是每个消费者都有一个属于自己的队列。通过这个队列绑定交换机
生产者配置: 定义交换机
channel.exchangeDeclare("logs", "fanout");
发送消息,指定路由键即可。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
消费者配置: 这里
channel.queueBind(queueName, "logs", "");
路由模式——(可以调频接收的广播)——消息克隆
前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。
我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。
生产者配置:
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
ch.basicPublish("direct_logs", level, null, msg.getBytes());
消费者配置:
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
channel.queueBind(queueName, "direct_logs", level);
主题模式——(可以调频接收的广播)——消息克隆
虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。
Topic和路由模式基本一样,差别在与路由键可以匹配通配符 *. 和 .#
# :可以匹配路由键中 0-多个 单词 * :可以匹配路由键中 单 个单词
交换机声明
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
RPC模式
如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为 远程过程调用 ,即RPC 因此:这里我们不再理解为生产者消费者。而是客户端服务端
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址,这里使用默认队列。
并且: 要考虑一个问题,响应消息在一个回调队列中,我们如何分辨这个响应是哪个请求的? 这时候我们需要一个唯一标识来标记每个请求——关联ID (correlationId)
客户端代码:
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
Connection con;
Channel ch;
public RPCClient() throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
con = f.newConnection();
ch = con.createChannel();
}
public String call(String msg) throws Exception {
String replyQueueName = ch.queueDeclare().getQueue();
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
response.offer(new String(message.getBody(), "UTF-8"));
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
return response.take();
}
public static void main(String[] args) throws Exception {
RPCClient client = new RPCClient();
while (true) {
System.out.print("求第几个斐波那契数:");
int n = new Scanner(System.in).nextInt();
String r = client.call(""+n);
System.out.println(r);
}
}
}
服务端代码:
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
ch.queueDeclare("rpc_queue",false,false,false,null);
ch.queuePurge("rpc_queue");
ch.basicQos(1);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
int n = Integer.parseInt(msg);
int r = fbnq(n);
String response = String.valueOf(r);
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();
ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
protected static int fbnq(int n) {
if(n == 1 || n == 2) return 1;
return fbnq(n-1)+fbnq(n-2);
}
}
|