RabbitMQ
1.什么是MQ
生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者。

2.MQ有什么作用
- 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
- 异步:业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验(比如发短信)
- 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪(比如促销活动)
3.MQ的实现
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
-
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式 -
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。 -
JMS规定了两种消息模型;而AMQP的消息模型更加丰富
4.常见MQ产品
- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
5.AMQP协议的重要核心概念

- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃.
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。
- Queue:消息队列,用来保存消息,供消费者消费。
6.RabbitMQ的工作原理

- Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue.
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
7.使用流程
- 消息接收者客户端连接消息队列服务器,打开一个Channel
- 客户端声明一个exchange,并设置一些相关属性
- 客户端声明一个Queue,并设置相关属性
- 客户端通过使用routingkey(路由键),把声明的exchange和queue虚拟连接起来
- 消息发布端,发送消息到exchange中里
- exchange收到消息后,会通过routingkey消息的key和Binding将消息路由到一个队列或者多个队列中。
8.MQ的五种队列模式
<!--引入mq的相关依赖-->
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
1.直连方式(点对点模式)

@Test
public void sendMessage() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.97.67.215");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.97.67.215");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
}
注意:消费者队列和生产者队列一定要一一对应起来
2.第二种模型(任务模型)work queues
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消费的消费速度,长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

public class WorkProducerService {
@Test
public void sendMessages() throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,(i+"work message").getBytes());
}
MqUtils.MqClose(channel,mqConnection);
}
}
public class WorkConsumerServiceTwo {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费的消息》》》》"+new String(body).toString());
}
});
}
}
public class WorkConsumerService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费的消息》》》》"+new String(body).toString());
}
});
}
}
注意:默认情况下,这种work模型的消费方式是,平均消费的!
基于上面这种情况还可以演变成 能者多劳模式,和手动确认消息消费机制
public class WorkProducerService {
@Test
public void sendMessages() throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,(i+"work message").getBytes());
}
MqUtils.MqClose(channel,mqConnection);
}
}
public class WorkConsumerServiceAutoOne {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
}catch (Exception ce){
ce.printStackTrace();
}
System.out.println("消费的消息》》》》"+new String(body).toString());
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
public class WorkConsumerServiceAutoTwo {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费的消息》》》》"+new String(body).toString());
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
3.第三种模型 (广播模式)
广播模式下,消息发送流程是这样的。
可以有多个消费者 每个消费者有自己的(queue 队列),每个队列都要绑定到Exchange(交换机), 生产者发送消息,只能发送到交换机,交换机决定发给哪个队列,生产者无法决定。 交换机把消息发送给绑定过的所有队列。 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

public class FanoutProducerService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish("logs","",null,"广播模式的".getBytes());
MqUtils.MqClose(channel,mqConnection);
}
}
public class FanoutConsumerService1 {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("logs","fanout");
String queueName = channel.queueDeclare().getQueue();
System.out.println("queueName = " + queueName);
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1消费的消息是:》》》》"+new String(body));
}
});
}
}
public class FanoutConsumerService2 {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("logs","fanout");
String queueName = channel.queueDeclare().getQueue();
System.out.println("queueName = " + queueName);
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2消费的消息是:》》》》"+new String(body));
}
});
}
}
4.第四种(路由模式)
这种模式消息首先到X(exchange)中,然后通过routingkey把消息发送到匹配的队列中
4.1Routing 之订阅模型—Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange.
在Direct模型下:
-
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由Key)
- 消息的发送方向在向Exchange发送消息时,也必须指定消息的Routingkey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息的流程
public class DirectProducerService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("losg_diect","direct");
String routingKey ="info";
channel.basicPublish("losg_diect",routingKey,null,("这是direct{"+routingKey+"}").getBytes());
MqUtils.MqClose(channel,mqConnection);
}
}
public class DirectConsumerService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
String exchange = "losg_diect";
channel.exchangeDeclare(exchange,"direct");
String queue = channel.queueDeclare().getQueue();
String routingKey ="info";
String routingKeys ="error";
channel.queueBind(queue,exchange,routingKey);
channel.queueBind(queue,exchange,routingKeys);
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到的信息是》》》》"+new String(body));
}
});
}
}
public class DirectConsumerServiceTwo {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
String exchange = "losg_diect";
channel.exchangeDeclare(exchange,"direct");
String queue = channel.queueDeclare().getQueue();
String routingKey ="error";
channel.queueBind(queue,exchange,routingKey);
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到的信息是》》》》"+new String(body));
}
});
}
}
4.2 Routing 之订阅模型—Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange 可以让队列在绑定Routing key的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以“.”分隔。例如:item.insert
public class TopicProduserService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String routingKey ="lihuan.user.save";
channel.basicPublish("topic",routingKey,null,("topice{"+routingKey+"}"+"发送消息").getBytes());
MqUtils.MqClose(channel,mqConnection);
}
}
public class TopicConsumerService {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
String routingKey ="user.*";
channel.queueBind(queue,"topic",routingKey);
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 消费的信息>>>>"+new String(body) +">>>>路由是》》》"+routingKey);
}
});
}
}
public class TopicConsumerServiceTwo {
public static void main(String[] args) throws IOException {
Connection mqConnection = MqUtils.getMqConnection();
Channel channel = mqConnection.createChannel();
channel.exchangeDeclare("topic","topic");
String queue = channel.queueDeclare().getQueue();
String routingKey ="*.user.#";
channel.queueBind(queue,"topic",routingKey);
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 消费的信息>>>>"+new String(body) +">>>>路由是》》》"+routingKey);
}
});
}
}
5.RPC
c对s说“我这有个任务需要你的帮助”,s处理完后,将结果返回给c

9.RabbitMQ持久化(消息的可靠传输)
需要将消息和队列标记为持久化
9.1队列持久化
channel.queueDeclare("hello",true,false,false,null);
9.2消息持久化
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
9.3发布确认机制
1.发送方确认模式
将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
2.消费方确认模式
消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
|