RabbitMQ工作模式
1.Work queues 工作队列模式
1.1 模式说明
work queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
1.2 模式实现
生产者
public class Producer {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("生产者");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 15; i++) {
String message = "work __ Hello,Consumer"+i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送成功");
}
channel.close();
connection.close();
}
}
消费者 1
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者1");
Channel channel = connection.createChannel();
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("");
System.out.println("=======================消费者1开始===============================");
System.out.println("");
System.out.println("路由key为:"+ envelope.getRoutingKey());
System.out.println("交换机为:"+ envelope.getExchange());
System.out.println("消息id为:"+ envelope.getDeliveryTag());
System.out.println("消费者1 - 接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("=======================消费者1结束===============================");
System.out.println("");
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(Producer.QUEUE_NAME,false,consumer);
}
}
消费者 2
public class Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者2");
Channel channel = connection.createChannel();
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("");
System.out.println("=======================消费者2开始===============================");
System.out.println("");
System.out.println("路由key为:"+ envelope.getRoutingKey());
System.out.println("交换机为:"+ envelope.getExchange());
System.out.println("消息id为:"+ envelope.getDeliveryTag());
System.out.println("消费者2 - 接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("=======================消费者2结束===============================");
System.out.println("");
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(Producer.QUEUE_NAME,false,consumer);
}
}
2.Publish/Subscribe 发布与订阅模式
2.1 模式说明
在订阅模型中,多了一个X (exchange) 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
? Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
发布订阅模式:
- 每个消费者监听自己的队列
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
2.2 模式实现
生产者
public class Producer {
static final String FANOUT_EXCHANGE = "fanout_exchange";
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("生产者");
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
for (int i = 0; i < 10; i++) {
String message = "发布订阅模式 __ Hello,Consumer"+i;
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("消息发送成功");
}
channel.close();
connection.close();
}
}
消费者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者1");
Channel channel = connection.createChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("");
System.out.println("=======================消费者1开始===============================");
System.out.println("");
System.out.println("路由key为:"+ envelope.getRoutingKey());
System.out.println("交换机为:"+ envelope.getExchange());
System.out.println("消息id为:"+ envelope.getDeliveryTag());
System.out.println("消费者1 - 接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("=======================消费者1结束===============================");
System.out.println("");
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);
}
}
消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者2");
Channel channel = connection.createChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("");
System.out.println("=======================消费者2开始===============================");
System.out.println("");
System.out.println("路由key为:"+ envelope.getRoutingKey());
System.out.println("交换机为:"+ envelope.getExchange());
System.out.println("消息id为:"+ envelope.getDeliveryTag());
System.out.println("消费者2 - 接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("=======================消费者2结束===============================");
System.out.println("");
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
}
}
};
channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);
}
}
2.3 小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者 都收到
发布订阅模式与工作队列模式的区别
工作队列模式 不用定义交换机,而发布/订阅模式 需要定义交换机工作队列模式 的生产方是面向队列发送消息(底层使用默认交换机),发布/订阅模式 的生产方是面向交换机发送消息工作队列模式 不需要设置,会将队列绑定到默认的交换机,发布/订阅模式 需要设置队列和交换机的绑定
3.Routing 路由模式
3.1 模式说明
图解:
- P:生产者,它向
Exchange 发送消息,同时会指定一个routing key - X:交换机,接收生产者的消息,然后把消息递交给 与
routing key 完全匹配的队列 - C1:消费者,其所在队列指定了需要
routing key 为 error 的消息 - C2:消费者,其所在队列指定了需要
routing key 为 info、error、warning 的消息
路由模式特点:
- 队列与交换机的绑定,不是任意绑定,而是指定一个
Routing Key - 消息的发送方在向
Exchange 发送消息时,也必须指定消息的 Routing Key Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
3.2 模式实现
生产者
public class Producer {
static final String DIRECT_EXCHAGE = "direct_exchange";
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("生产者");
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
String message = "INSERT 路由模式;routing key 为 insert " ;
channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "UPDATE 路由模式;routing key 为 update" ;
channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}
消费者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者1");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("");
System.out.println("=======================消费者1开始===============================");
System.out.println("");
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
System.out.println("");
System.out.println("=======================消费者1结束===============================");
System.out.println("");
}
};
channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
}
}
消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者2");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE,true,false,false,null);
channel.queueBind(Producer.DIRECT_QUEUE_UPDATE,Producer.DIRECT_EXCHAGE,"update");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("");
System.out.println("=======================消费者2开始===============================");
System.out.println("");
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
System.out.println("");
System.out.println("=======================消费者2结束===============================");
System.out.println("");
}
};
channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE,true,consumer);
}
}
4.Topics 通配符模式
4.1 模式说明
Topic 类型与Direct 相比:
都是可以根据Routing Key 把消息路由到不同的队列。只不过Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符
Routing key 一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如:item.insert
通配符规则:
# :匹配一个或多个词
* :匹配不多不少恰好1个词
举例:
item.# :能够匹配item.insert.abc 或者 item.insert
item.* :只能匹配item.insert
4.2 模式实现
生产者
public class Producer {
static final String TOPIC_EXCHAGE = "topic_exchange";
static final String TOPIC_QUEUE_1 = "topic_queue_1";
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("生产者");
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_1,true,false,false,null);
channel.queueDeclare(TOPIC_QUEUE_2,true,false,false,null);
channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"item.#");
channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.delete");
String message = "新增 Topic模式;routing key 为 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "修改 Topic模式;routing key 为 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "删除 Topic模式;routing key 为 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "ddd.delete", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "除了这些以外的消息。。。。" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete.aaa", null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}
消费者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者1");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHAGE,"item.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("");
System.out.println("=======================消费者1开始===============================");
System.out.println("");
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
System.out.println("");
System.out.println("=======================消费者1结束===============================");
System.out.println("");
}
};
channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);
}
}
消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("消费者2");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "*.delete");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("");
System.out.println("=======================消费者2开始===============================");
System.out.println("");
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
System.out.println("");
System.out.println("=======================消费者2结束===============================");
System.out.println("");
}
};
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
}
}
5.模式总结
1、简单模式 HelloWorld : 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue : 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe : 需要设置类型为fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing : 需要设置类型为direct 的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic : 需要设置类型为topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
个人博客为: MoYu’s HomePage
|