- 简单说就是一个生产者者,多个消费者共同消费消息,比如有10个消息,就会
平均分配 给每一个消费者,一人5个消息。
经典的生产-消费模式
生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = MQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "work", null, (i + " hello work queque").getBytes());
}
MQConnection.closeChannelAndConnection(channel, connection);
}
}
消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = MQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = MQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
- 以上方式中的消费者弊端显而易见,就是消费者1消费的很快,而消费者2消费的很慢,消费者1不就相当于浪费资源,在那里看着消费者2干活,多少有点不合理吧。所以如果想做到
能者多劳的效果 (channel.basicQos(1);),并且保证消息不丢失的情况下,希望每次手动进行确认消息 (channel.basicAck(envelope.getDeliveryTag(), false);)是否消费完毕,那么有人就会问了,如果在手动ack的时候,mq挂机 了,那消息不还是丢失了吗?那么我们可以给它加上事务操作,有问题直接回滚,这样就能保证消息不丢失问题。
升级改造消费者
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = MQConnection.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
|