1.1Work queues工作队列模式
1.1.1 入门程序
完成简单模式的消息传递
生产者代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.155.10.100");
factory.setPort(5672);
factory.setVirtualHost("/study");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null);
channel.basicPublish("", "hello_world", null, message.getBytes());
channel.close();
connection.close();
消费者代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.155.10.100");
factory.setPort(5672);
factory.setVirtualHost("/study");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("需要打印的东东");
}
};
channel.basicConsume("hello_world", true, consumer);
1.1.2 工作模式
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费
- 应用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
代码即拷贝一份消费者,两个消费者同时消费队列中的消息,相互之间并不影响,即同一份数据可以同时被C1和C2消费
1.1.3 PUB/Sub订阅模式
订阅模型中,多了一个Exchange角色,而且过程略有变化
- P:生产者,不再发送到队列中,而是发送给交换机
- C:消费者,消息的接收者,会一直等待消息到来
- Exchange:只负责转发消息,不具备存储消息的功能,因此如果没有任何队列与Exchange绑定,或者没有路由规则的队列,那么消息会丢失
生产者代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.155.10.100");
factory.setPort(5672);
factory.setVirtualHost("/study");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, true, false, null);
channel.queueDeclare("testQueue1", true, false, false, null);
channel.queueBind("testQueue1", exchangeName, "");
channel.basicPublish("", "hello_world", null, message.getBytes());
channel.close();
connection.close();
消费者代码基本一致
1.1.4 Routing路由模式
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey
- 消息的发送方再向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据RoutingKey进行判断,只有队列RoutingKey鱼消息的RoutingKey完全一直,才会接受到消息
生产者代码:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.155.10.100");
factory.setPort(5672);
factory.setVirtualHost("/study");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, true, false, null);
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "warn");
channel.queueBind(queue2Name, exchangeName, "error");
channel.basicPublish(exchangeName, "hello_world", null, message.getBytes());
channel.close();
connection.close();
1.1.4 Topics通配符模式
*代表一个单词,#代表0个或多个单词 生产者代码“
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.155.10.100");
factory.setPort(5672);
factory.setVirtualHost("/study");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, true, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue2Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
channel.basicPublish(exchangeName, "hello_world", null, message.getBytes());
channel.close();
connection.close();
|