工作模式简单介绍
图中一个消息生产者,C1和C2为消息的消费者,他们之间为竞争关系,默认C1和C2之间的工作模式是轮询模式,例如P生产消息AA和BB,那么C1如果消费了消息AA,BB就会被C2所消费
消息生产者代码示例
public class Producer {
public static final String QUEUE_NAME="hello_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
String message="hello word";
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
channel.basicPublish("",QUEUE_NAME, null,(message+scanner.next()).getBytes());
}
}
}
消费者One代码示例
channel.basicQos(2) 这一行代码主要是用来说明C1和C2是否进行轮询消费消息,如果设置的值是1就是不公平消费,谁处理的快就会给谁处理消息
public class ConsumerOne {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("message = " + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicQos(0);
channel.basicConsume(Producer.QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
消费者Two代码示例
public class ConsumerTwo {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.basicQos(0);
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("message = " + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(Producer.QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
启动代码
在消息生产者中输入相关信息
消费者One
输出了两条BB,DD
消费者Two
输出流两条AA,CC
|