前言:上一篇我们的程序实现了广播消息给所有的消费者,本章中我们将要增加一个特性,我们将实现只订阅消息的某个子集,例如我们将ERROR错误等级的消息保存到磁盘上,而INFO.WARNING 等级的日志直接打印出来
RabitMQ安装
如何安装: https://blog.csdn.net/Beijing_L/article/details/119042261
图例
P(生产者): 生产者属于数据的发送方 ,发送的消息被放入队列里
C(消费者): 消费者属于数据的接收方,发现队列里的消息,将消息从队列中读取出来做业务操作
Queue(队列): RabbitMQ的作用是存储消息,队列的特性是先进先出
几个概念
绑定(Bindings)
在前一章中我们知道交换器和队列的关系我们称之为绑定( a binding)并创建了绑定,绑定也很容易理解,例如
channel.queueBind(queueName, EXCHANGE_NAME, "");
实现绑定有个关键参数称之为路由键(routingKey),为了避免困惑我们把它叫bindingkey,??这就让我们可以实现通过某个路由键创建绑定关系,bindingkey的意义取决于第二哥参数exchange,即交换器类型, 我们上一张使用的扇出交换器(fanout exchange)就忽略了这个参数
/**
* @param 队列名称
* @param 交换器名称
* @param 使用绑定的路由键
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
直连交换器(Direct exchange)
?当我们使用扇出交换器的时候,我们会发现它不是很灵活, 扇出交换器近乎无脑的方式广播消息。我们想实现对消息过滤并处理时就并适用,例如系统有很多日志消息,广播消息后, 有些消费者处理ERROR严重级别的消息,例如将其写到磁盘上, 有些消费者处理非ERROR级别的消息,例如打印输出。这个时候我们就需要用到直连交换器,直连交换器的算法很简单:当一个消息进入队列,待发送消息的路由键(routingKey)和直连交换器和队列关系的绑定键(bindingKey)完全相同的时候, 消息将会被发送到关联的队列
如下图所示:有两个队列Q1和Q2和直连交换器X链接,第一个队列有一个绑定键(bindingKey)为orange, 第二哥队列有两个绑定键 (bindingKey)black和green,当一个消息以路由键orange发送到直连交换器的时候,消息将会被推送到Q1队列中,当消息以black或者green为路由键发送到直连交换器X中,消息将会被推送到Q2队列中

?
多重绑定
当然,用同一个绑定建(bindingKey)绑定多个队列是也合法的, 上面的例子中我们可以用black同时绑定Q1和Q2,当一个消息以black 为路由建发送到直连交换器X的时候,消息将会被推送到Q1和Q2中,其实用这种方式就代替了扇出交换器
实现图示
为了实现前面提到的按照日志错误等级来处理日志消息,我们将消息的等级当做路由键, 同时用消息等级当做bindingKey来绑定直连交换器和不同队列,C1只处理ERROR级别的日志,用于保存到磁盘文件中, C2接收所有日志实现打印输出.如下图所示

?
参考代码
生产者
根据路由建创建3条消息
public class Producer {
private static final String EXCHANGE_NAME = "exchange_direct_log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//exchange-type= direct 直连交换器,routingKeys =bindingkey
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//routingKeys
List<String> routingKeys = Arrays.asList("INFO", "WARNING", "ERROR");
for (String rountingkey : routingKeys) {
String message =rountingkey+ " Send the message level:" ;
channel.basicPublish(EXCHANGE_NAME, rountingkey, null, message.getBytes("UTF-8"));
System.out.println(" Send" + rountingkey + "':'" + message);
}
channel.close();
connection.close();
}
}
其中
?direct表示直连交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
?根据路由键发送消息
channel.basicPublish(EXCHANGE_NAME, rountingkey, null, message.getBytes("UTF-8"));
消费者
//处理:"INFO", "WARNING","ERROR"
public class CustomerReviver1 {
private static final String EXCHANGE_NAME = "exchange_direct_log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机并获取匿名队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
//设置bindingkey
List<String> bindingkeys = Arrays.asList("INFO", "WARNING","ERROR");
for (String bindingkey : bindingkeys) {
channel.queueBind(queueName,EXCHANGE_NAME,bindingkey);
System.out.println("CustomerReviver1 exchange:"+EXCHANGE_NAME+"," +
" queue:"+queueName+", BKey:" + bindingkey);
}
System.out.println("CustomerReviver1 Waiting for messages");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
System.out.println(" [x] do print");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
//另一个类消费者2,处理"ERROR"
public class CustomerReviver2 {
private static final String EXCHANGE_NAME = "exchange_direct_log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
//channel 绑定bindingkeys
List<String> bindingkeys = Arrays.asList("ERROR");
for (String bindingkey : bindingkeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingkey);
System.out.println("CustomerReviver2 exchange:" + EXCHANGE_NAME + "," +
" queue:" + queueName + ", BKey:" + bindingkey);
}
System.out.println("CustomerReviver2 Waiting for messages");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//routingKey=bindingKey时才起作用
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
System.out.println(" [x] do saving");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
执行结果
生产者创建3条消息
?SendINFO':'INFO Send the message level:
?SendWARNING':'WARNING Send the message level:
?SendERROR':'ERROR Send the message level:
消费者
##消费者CustomerReviver1
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:INFO
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:WARNING
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:ERROR
CustomerReviver1 Waiting for messages
[x] Received 'INFO':'INFO Send the message level:'
[x] do print
[x] Received 'WARNING':'WARNING Send the message level:'
[x] do print
[x] Received 'ERROR':'ERROR Send the message level:'
[x] do print
##消费者CustomerReviver2 ,只处理ERROR等级消息
CustomerReviver2 exchange:exchange_direct_log, queue:amq.gen-CzPTr7XnTLLgURIS-gqRKA, BKey:ERROR
CustomerReviver2 Waiting for messages
[x] Received 'ERROR':'ERROR Send the message level:'
[x] do saving
|