前言:上一篇我们编写程序实现通过队列发送和接收消息,在这个例子中我们创建一个工作队列,多个员工分配并处理工作任务,工作队列主要是避免立即做一项资源密集型任务。我们将工作任务封装成一个个消息,并将其发送到队列里,后台运行的线程将处理这些工作任务,当你运行许多消费者时,任务将在他们之间共享。
RabitMQ安装
如何安装:??https://blog.csdn.net/Beijing_L/article/details/119042261
图例
P(生产者): 生产者属于数据的发送方 ,发送的消息被放入队列里
C(消费者): 消费者属于数据的接收方,发现队列里的消息,将消息从队列中读取出来做业务操作
Queue(队列): RabbitMQ的作用是存储消息,队列的特性是先进先出
工作队列
本例实现生产者创建10个工作消息,并将消息发送到RabbitMQ消息队列中, 消费者从RabbitMq获取消息并处理,处理时候通过Thread.sleep()方法模拟消息处理过程

工作队列
消费者代码和上一篇代码类似,使用 DeliverCallback回调接口处理消息, 使用 Thread.sleep(2000); 模拟任务处理耗时,我们可以启动2个消费者
参考代码如下:
/**
* 消费者
*/
public class Recv1 {
public final static String QUEUE_NAME = "tutorial.queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("Customer PROCESS");
// 创建连接工厂,设置RabbitMQ地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
System.out.println("connection...");
//创建一个新的连接
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Customer Waiting Received messages");
// channel.basicQos(1);
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery?
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // 自动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} finally {
//关闭通道和连接
//channel.close();
//connection.close();
}
}
//模拟执行任务
private static void doWork(String task) throws InterruptedException {
Thread.sleep(2000);
}
}
执行后控制台出现,因还没有任务所以等待中
connection...
Customer Waiting Received messages
1.?boolean autoAck = true?
这里是这自动确认,即发送一个消息后Rabbitmq 直接从队列里删除消息
生产者创建10个任务消息,并将消息发送到队列里
参考代码:
public class NewTask {
public final static String QUEUE_NAME = "tutorial.queue";
public static void main(String[] args) throws IOException, TimeoutException {
//使用工厂创建1个连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
try {
//创建1个通道,声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送10个任务
for (int i = 0; i < 10; i++) {
String message = "task message.NO." + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
} finally {
//关闭通道和连接
//channel.close();
//connection.close();
}
}
}
执行后控制台上看,创建了10个任务
[x] Sent 'task message.NO.0'
[x] Sent 'task message.NO.1'
[x] Sent 'task message.NO.2'
[x] Sent 'task message.NO.3'
[x] Sent 'task message.NO.4'
[x] Sent 'task message.NO.5'
[x] Sent 'task message.NO.6'
[x] Sent 'task message.NO.7'
[x] Sent 'task message.NO.8'
[x] Sent 'task message.NO.9'
查看消费者控制台, 消费者执行处理了队列里的消息
connection...
Customer Waiting Received messages
[x] Received 'task message.NO.0'
[x] Done
[x] Received 'task message.NO.2'
[x] Done
[x] Received 'task message.NO.4'
[x] Done
[x] Received 'task message.NO.6'
[x] Done
[x] Received 'task message.NO.8'
[x] Done
循环调度
使用任务队列的优势之一是能够轻松地并行工作。如果我们正在积累工作积压,我们可以增加更多的工人,这样就可以很容易地扩大规模。当我们启动2个消费者处理任务的时候会发现,消费者1处理单数编号任务, 消费者2处理双数任务。因为默认情况下,RabbitMQ会将每个消息一次发给下一个消费者,平均来说,每个消费者都会收到相同数量的邮件,这种分发消息的方式叫做循环调度,或者也可以称之为轮询
公平调度
分别处理单数和双数任务是因为Thread.sleep同时间引起的吗, 我们将处理任务的方法做如下修改,工作任务时长是个一个5秒内的随机数,这样每个任务的处理时长将不同
//模拟执行任务
private static void doWork(String task) throws InterruptedException {
Random random1 = new Random();
Integer time = random1.nextInt(5000);
System.out.println(" [x] deal time" + time);
Thread.sleep(time);
}
你可以能发现在有两个工作人员的情况下,不管消息耗时多少,即使一个工作人员会一直很忙,而另一个几乎不做任何工作,RabbitMQ对此一无所知,仍然会平均分配消息,发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不考虑消费者未确认的消息数量。它只是盲目地将每第n条消息发送给第n个消费者。
?
针对这种情况我们需要使用channel.basicQos(1)告诉RabbitMq一次不要给一个消费者发送超过1条信息。即处理并确认前1条消息之前,不再发送新的消息给消费者,相反他会将消息发送给下一个消费者
/**
* 服务发送消息的最大个数,本例中设置为1
*/void basicQos(int prefetchCount) throws IOException;
?继续测试按照下图修改,这样分发消息的时候1次发送1个,当前一个消息还没有处理完的时候消息会被分配个另一个消费者处理
//channel.basicQos(1);
//放开原来注释的代码,修改为
channel.basicQos(1);
消息应答
继续想一想, 处理一个任务可能需要几秒钟,如果消费者完成一部分就异常死去会发生什么,我们的代码channel#basicConsume 谁知了自动应答,即一旦RabbitMQ向消费者传递了一条消息,它会立即将其标记为删除,这种情况下,如果消费者异常那么任务将丢失, 如果队列想消费者发送了多个任务, 那么还将丢失所有其他的任务
实际上, 我们不想丢失任务,想把这些任务交给另一个消费这个工作,为了确保消息不丢失, RabbitMQ支持消费者发回一个确认消息,告诉Rabbtmq已经收到并处理了一个特定消息,Rabbtmq可以自由删除消息,如果一个消费者在没有发送确认的情况下死亡(它的信道被关闭,连接被关闭,或者连接丢失),RabbitMQ将会理解一个消息没有被完全处理,并将它重新排队。如果同时有其他消费者在线,它将很快将其重新交付给另一个消费者。这样你就可以确保没有信息丢失,即使工人偶尔死去
程序修改
- 消息处理设置channel# basicConsume 时设置autoAck=false表示不自动确认
- 消息处理后调用channel#basicAck 确认消息处理完成
参考代码如下:?
channel.basicQos(1);
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery?
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false; // acknowledgment is covered below
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为此,我们需要将其声明为持久耐用,生产者消费者声明代码都需要修改
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
我们需要将我们的消息标记为持久的
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意事项:
-
将消息标记为持久并不能完全保证消息不会丢失。虽然它告诉Rabbtmq将消息保存到磁盘,但是当Rabbtmq接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口,这段时间会丢失 -
RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms。
前一篇:RabbitMQ应用Hello World
|