RabbitMQ工作流程
生产者发送消息的流程
- 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
- 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过bindingKey (绑定Key)将交换器和队列绑定( binding )起来
- 生产者发送消息至RabbitMQ Broker,其中包含routingKey (路由键)、交换器等信息
- 相应的交换器根据接收到的routingKey 查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
简单案例
??Hello World一对一的简单模式。生产者直接发送消息给RabbitMQ,另一端消费。未定义和指定Exchange的情况下,使用的是AMQP default这个内置的Exchange。 引入maven依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
HelloProducer
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.10");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "hello world".getBytes());
channel.close();
connection.close();
}
}
HelloConsumer push模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
public class HelloConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", (consumerTag, message)->{
System.out.println((new String(message.getBody())));
}, (consumerTag)->{});
}
}
HelloGetConsumer get模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
public class HelloGetConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
GetResponse getResponse = channel.basicGet("hello", true);
System.out.println(new String(getResponse.getBody()));
channel.close();;
connection.close();
}
}
先启动消费者,再启动生产者
Connection 和Channel关系
??生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
为什么不直接使用TCP连接,而是使用信道?
RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。
当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。
当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection ,分摊信道。具体的调优看业务需要。
信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...
RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish 对应AMQP 的Basic.Publish命令。
RabbitMQ工作模式详解
官网地址:https://www.rabbitmq.com/getstarted.htm
Work Queue
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。 Producer
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
channel.queueDeclare("queue.wq", true, false, false, null);
channel.queueBind("queue.wq", "ex.wq", "key.wq");
for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.wq", "key.wq", null, ("工作队列:" + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
Consumer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.wq", true, false, false, null);
channel.basicConsume("queue.wq", (consumerTag, message)->{
System.out.println(new String(message.getBody()));
}, (consumerTag)->{});
}
}
先启动consumer再启动producer
发布订阅模式
??使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。
??消息广播给所有订阅该消息的消费者。
??在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
??生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。
??交换器的类型有: direct 、topic 、headers 和fanout 四种类型。发布订阅使用fanout 。创建交换器,名字叫logs :
channel.exchangeDeclare("1 logs", "fanout");
??fanout 交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道的所有的队列。
rabbitmqctl list_exchanges
列出RabbitMQ的交换器,包括了amq.* 的和默认的(未命名)的交换器。 ??未命名交换器
??在前面的那里中我们没有指定交换器,但是依然可以向队列发送消息。这是因为我们使用了默认的交换器。
channel.basicPublish("", "hello", null, message.1 getBytes());
??第一个参数就是交换器名称,为空字符串。直接使用routingKey向队列发送消息,如果该routingKey指定的队列存在的话。
??现在,向指定的交换器发布消息:
channel.basicPublish("logs", "", null, message.getBytes());
临时队列
??前面我们使用队列的名称,生产者和消费者都是用该名称来发送和接收该队列中的消息。
??首先,我们无论何时连接RabbitMQ的时候,都需要一个新的,空的队列。我们可以使用随机的名字创建队列,也可以让服务器帮我们生成随机的消息队列名字。
??其次,一旦我们断开到消费者的连接,该队列应该自动删除。
String queueName = channel.queueDeclare().getQueue();
producer
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.myfan", "", null, ("hello world fan: " + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
分别创建三个consumer
consumer1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "ex.myfan", "");
channel.basicConsume(queue, (consumerTag, message)->{
System.out.println( "One " + new String(message.getBody()));
}, (consumerTag)->{});
}
}
consumer2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TwoConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "ex.myfan", "");
channel.basicConsume(queue, (consumerTag, message)->{
System.out.println( "Two " + new String(message.getBody()));
}, (consumerTag)->{});
}
}
consumer3
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ThreeConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "ex.myfan", "");
channel.basicConsume(queue, (consumerTag, message)->{
System.out.println( "Three " + new String(message.getBody()));
}, (consumerTag)->{});
}
}
当消费者启动起来之后,命令rabbitmqctl list_bindings 列出绑定关系:
路由模式
??使用direct 类型的Exchange,发N条消费并使用不同的routingKey ,消费者定义队列并将队列、routingKey 、Exchange绑定。此时使用direct 模式Exchagne必须要routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。
??上一个模式中,可以将消息广播到很多接收者。
??现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。
??绑定 上一模式中,交换器的使用方式:
channel.queueBind(queueName, 1 EXCHANGE_NAME, "");
绑定语句中还有第三个参数: routingKey :
channel.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey 的作用与具体使用的交换器类型有关。对于fanout 类型的交换器,此参数设置无效,系统直接忽略。
direct交换器
??分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志记录。
??我们需要将不同日志级别的日志记录交给不同的应用处理。
??如何解决?
??使用direct交换器
??如果要对不同的消息做不同的处理,此时不能使用fanout 类型的交换器,因为它只会盲目的广播消息。 ??我们需要使用direct 类型的交换器。direct 交换器的路由算法很简单:只要消息的routingKey 和队列的bindingKey 对应,消息就可以推送给该队列。
??上图中的交换器X 是direct 类型的交换器,绑定的两个队列中,一个队列的bindingKey 是orange ,另一个队列的bindingKey 是black 和green 。
??如此,则routingKey 是orange 的消息发送给队列Q1, routingKey 是black 和green 的消息发送给Q2队列,其他消息丢弃。
??上图中,我们使用direct 类型的交换器X ,建立了两个绑定:队列Q1根据bindingKey 的值black 绑定到交换器X ,队列Q2根据bindingKey 的值black 绑定到交换器X ;交换器X 会将消息发送给队列Q1和队列Q2。交换器的行为跟fanout 的行为类似,也是广播。
??在案例中,我们将日志级别作为routingKey 。 EmitLogsDirect.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitLogsDirect {
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String servrity = null;
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
for (int i = 0; i < 100; i++) {
switch (i % 3) {
case 0:
servrity = "info";
break;
case 1:
servrity = "warn";
break;
case 2:
servrity = "error";
break;
default:
System.err.println("log错误,程序退出");
System.exit(-1);
}
String logStr = "这是 【" + servrity + "】 的消息";
channel.basicPublish("direct_logs", servrity, null,
logStr.getBytes("UTF-8"));
}
}
}
ReceiveErrorLogsDirect.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveErrorLogsDirect {
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f"); factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" +
message + "'");
};
channel.basicConsume(queueName, deliverCallback, consumerTag ->
{});
}
}
ReceiveWarnInfoLogsDirect.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveWarnInfoLogsDirect {
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "warn");
channel.queueBind(queueName, "direct_logs", "info");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" +
message + "'");
};
channel.basicConsume(queueName, deliverCallback, consumerTag ->
{});
}
}
主题模式
??使用topic 类型的交换器,队列绑定到交换器、bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息routingKey 模糊匹配,比较灵活。
??上个模式中,我们通过direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。
??这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做?
??比如,我想监听cron服务发送的error 消息,又想监听从kern服务发送的所有消息。
??此时可以使用RabbitMQ的主题模式( Topic )。
??要想topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该点分单词字符串最长255字节。
??bindingKey 也必须是这种形式。topic 类型的交换器背后原理跟direct 类型的类似:只要队列的bindingKey 的值与消息的routingKey 匹配,队列就可以收到该消息。有两个不同:
- * (star)匹配一个单词
- # 匹配0到多个单词
??上图中,我们发送描述动物的消息。消息发送的时候指定的routingKey 包含了三个词,两个点。第一个单词表示动物的速度,第二个是颜色,第三个是物种:<speed>.<color>.<species>。
??创建三个绑定:Q1绑定到" *.orange.* “Q2绑定到” *.*.rabbit “和” lazy.# "。
- Q1关注orange颜色动物的消息
- Q2关注兔子的消息,以及所有懒的动物消息
??如果不能匹配,就丢弃消息。 ??如果发送的消息routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。 ??如果在topic 类型的交换器中bindingKey 使用# ,则就是fanout 类型交换器的行为。 ??如果在topic 类型的交换器中bindingKey 中不使用* 和# ,则就是direct 类型交换器的行为。
EmitLogTopic.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] SPEED = {
"lazy",
"quick",
"normal"
};
private static final String[] COLOR = {
"black",
"orange",
"red",
"yellow",
"blue",
"white",
"pink"
};
private static final String[] SPECIES = {
"dog",
"rabbit",
"chicken",
"horse",
"bear",
"cat"
};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.10");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = null;
String routingKey = null;
String speed = null;
String color = null;
String species = null;
for (int i = 0; i < 10; i++) {
speed = SPEED[RANDOM.nextInt(SPEED.length)];
color = COLOR[RANDOM.nextInt(COLOR.length)];
species = SPECIES[RANDOM.nextInt(SPECIES.length)];
message = speed + "-" + color + "-" + species;
routingKey = speed + "." + color + "." + species;
channel.basicPublish(EXCHANGE_NAME, routingKey, null,
message.getBytes());
}
System.out.println(" [x] Sent '" + routingKey + "':'" + message +
"'");
}
}
EmitLogTopic1.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class EmitLogTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] SPECIES = {
"dog",
"rabbit",
"chicken",
"horse",
"bear",
"cat"
};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = null;
String routingKey = null;
String speed = null;
String species = null;
for (int i = 0; i < 10; i++) {
speed = "lazy";
species = SPECIES[RANDOM.nextInt(SPECIES.length)];
message = speed + "-" + species;
routingKey = speed + "." + species;
channel.basicPublish(EXCHANGE_NAME, routingKey, null,
message.getBytes());
}
System.out.println(" [x] Sent '" + routingKey + "':'" + message +
"'");
}
}
ReceiveLogTopic.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("*.*.rabbit 匹配到的消息:" + new
String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLogTopic1.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("*.orange.* 匹配到的消息:" + new
String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLopTopic2.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.*.*");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("lazy.*.* 匹配到的消息:" + new
String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLogTopic3.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic3 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("lazy.*.* 匹配到的消息:" + new
String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
|