RabbitMQ
概述
Message Queue, 消息队列, 先入先出,通信机制。
四大MQ对比
- ActiveMQ
- RabbitMQ:
- RocketMQ:高并发
- Kafka:大数据
四大组件:
- 生产者:生产数据发送消息
- 消费者:等待接收消息,并消费处理消息
- 交换机:是RabbitMQ中非常重要的一个部件,一方面接收来自生产者的消息,另一方面将消息推送到队列中。交换机必须确切直到如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或将消息丢弃,这个是由交换机类型决定的。与队列时1对多关系
- 队列:RabbitMQ中的一种数据结构,存储消息数据
名词解释:
- Broker:接收和分发消息的应用, RabbitMQ Server就是Message Broker
- Connection:publisher/consummer和broker直接的TCP连接
- Channel:是Connection内部建立的逻辑连接,Channel作为轻量级Connection极大减少了操作系统建立TCP连接的开销
- Exchange:消息到达broker的第一站,根据规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型由:direct(point-to-point),topic(publisher-subscribe),fanout(multicast)
- binding:交换机与队列间的绑定
交换机四种类型
生产者将消息推给交换机,交换机再将消息发送到队列。如果不指定交换机,默认使用默认交换机。
- direct:处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
- fanout:Fanout 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上。
- topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。
- header:不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。
六大模式
- 简单模式
- 工作模式
- 发布订阅模式
- 路由模式
- 主题模式
- 发布确认模式
简单模式
-
简单模式如果不指定交换机,则使用默认交换机。 -
一般要求指定交换机 -
简单模式的交换机类型是direct
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.80.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
String message = "hello rabbit!";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.80.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println("消息消费被中断了");
};
channel.basicConsume("simple", true, deliverCallback, cancelCallback);
}
}
工作模式
多个消费者轮流从消息队列中取出消息进行消费
- 工作模式如果不指定交换机,则使用默认交换机
- 一般要求指定交换机
- 工作模式的交换机类型是direct
轮询分发
public class RabbitMQUtils {
public static Channel getChannel () throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.80.130");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接收到消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费");
};
System.out.println("消费者启动等待消费...");
channel.basicConsume("work", true, deliverCallback, cancelCallback);
}
}
public class Producer {
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.queueDeclare("work", false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", "work", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息完成:" + message);
}
}
}
}
不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
预取值
发布订阅模式
交换机的类型为Fanout,即不处理路由键,只需要简单的将队列绑定到交换机上。
路由模式
交换机类型是direct,而且需要添加路由将队列绑定到交换机上
主题模式
支持模糊匹配的路由。交换机类型是topic,需要添加模糊路由进行队列和交换机的绑定
主题模式的路由不能随意写,必须是一个单词列表,以点分隔开
模糊匹配规则
发布确认模式
见后面
可靠消费
为了保证消息都被消费者消费了,没有丢失,引入了消息应答机制
消息应答
消费应答机制:消费者在接收到消息并且处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了
- 自动应答
- 手动应答??
Channel.basicAck 肯定确认,已成功处理该消息,可以将其丢弃Channel.basicNack 否定确认Channel.basicReject 否定确认,不处理该消息了,可以将其丢弃
消息自动重新入队
如果消费者没有发送ACK确认,RabbitMQ将知道该消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,该消息可以重新分发给另一个消费者。这样可以确保消息不会丢失。
代码实现
public void produce() throws Exception {
try (Channel channel = RabbitMQUtils.getChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while (sc.hasNext()){
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
public void consume() throws Exeception{
Channel channel = RabbitUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println("c1等待接收消息...");
DeleverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
SleepUtils.sleep(1);
System.out.println("接收到消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
}
public class SleepUtils { public static void sleep(int second){ try { Thread.sleep(second * 1000); } catch (InterruptedException e){ Thread.currentThread().interrupt(); } }}
持久化
为确保消息不会丢失,需要将队列和消息都持久化
发布确认
当消息被投递到匹配的队列后,broker就会发送一个确认给生产者,这就使得生产者直到消息已经正确到达目的地了。
单个确认发布
这是一种同步确认发布地方式,发布一个消息后只有它被确认发布,后续消息才能继续发布
public void publishMessageIndividually() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String queueName = "confim_individually"; channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect();
批量确认发布
先发布一批消息,然后一起确认可以极大提高吞吐量,缺点是,如果发生故障,不知道哪个消息出现了问题,也是同步的
public void publishMessageBatch() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String queueName = "batch_confim"; channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect();
异步确认发布
利用回调函数来达到消息可靠性传递
public void publishMessageAsync() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String name = "async_confirm"; channel.queueDeclare(queueName, false, false, false, null);
死信
一般来说,producer将消息投递到broker或者queue里,consumer从queue中取出消息进行消费,但是某些是偶由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就成为死信。
为了保证消息不丢失,需要用到死信队列来处理死信。
死信的来源
死信队列处理机制
演示消息TTL过期,处理死信队列
public void produce() throws Exception { try (Channel channe = RabbitUtils.getChannel()){ channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
public void consumer() throws Exception{ Channel channel = RabbitUtils.getChannel();
延迟队列
延迟队列就是存放需要在指定时间被处理的元素的队列
TTL是消息或队列的属性,表面该消息或队列中所有消息的最大存活时间。
如果消息在TTL设定的时间内没有被消费,则消息会变为死信。
创建两个队列QA和QB,分别设置TTL为10s和40s,然后创建一个交换机X和死信交换机Y,再创建一个死信队列QD
插件优化
RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列;如果第一个消息过期时间很长,而第二个消息过期时间很短,第二个消息并不会优先得到执行。
将rabbtimq_delayed_message_exchange 插件加成放到RabbtiMQ目录下
这样可以定义一个自定义的delay交换机,可以让第二个消息先得到执行
SpringBoot中确认发布
mandatory 参数获取无法投递消息的感知能力并及时处理(回退消息)
设置备份交换机
对于无法投递的消息,既不想丢失消息,又不想设置mandatory参数增加生产者的复杂性,该怎么做呢?
可以利用备份交换机,当交换机接收到一条不可路由的消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,备份交换机类型常常为Fanout
RabbitMQ其他知识点
幂等、重复消费问题
用户对于同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。
消费者再消费消息后,给MQ返回ack时网络中断,故MQ没有收到确认消息,该条消息会重写发送给消费者,而实际上该消息已经被消费,造成了重复消费问题。
解决办法:利用全局唯一ID,每次消费消息时,用该id判断,该消息是否已经被消费过了
优先级队列
RabbitMQ支持优先级队列,可以在往优先级队列中添加消息时设置优先级,让优先级高的消息先执行
惰性队列
惰性队列会尽可能将消息存入磁盘中,而在消费者消费到相应消息时,才会被加载到内存中,它的一个中要设计目标时能够支持更长的队列,即支持更多的消息存储。
集群
镜像队列
将队列镜像到集群中的其他Broker节点上,如果集群中的一个节点失效了,队列能自动切换到镜像中的另一个节点上以保证服务的可用性。
负载均衡
|