IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ教程路由Routing -> 正文阅读

[大数据]RabbitMQ教程路由Routing

前言:上一篇我们的程序实现了广播消息给所有的消费者,本章中我们将要增加一个特性,我们将实现只订阅消息的某个子集,例如我们将ERROR错误等级的消息保存到磁盘上,而INFO.WARNING 等级的日志直接打印出来

RabitMQ安装

如何安装: https://blog.csdn.net/Beijing_L/article/details/119042261

图例


P(生产者): 生产者属于数据的发送方 ,发送的消息被放入队列里

C(消费者): 消费者属于数据的接收方,发现队列里的消息,将消息从队列中读取出来做业务操作

Queue(队列): RabbitMQ的作用是存储消息,队列的特性是先进先出

几个概念


绑定(Bindings)

在前一章中我们知道交换器和队列的关系我们称之为绑定( a binding)并创建了绑定,绑定也很容易理解,例如

channel.queueBind(queueName, EXCHANGE_NAME, "");

实现绑定有个关键参数称之为路由键(routingKey),为了避免困惑我们把它叫bindingkey,??这就让我们可以实现通过某个路由键创建绑定关系,bindingkey的意义取决于第二哥参数exchange,即交换器类型, 我们上一张使用的扇出交换器(fanout exchange)就忽略了这个参数

    /**
     * @param 队列名称
     * @param 交换器名称
     * @param 使用绑定的路由键
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

直连交换器(Direct exchange)

?当我们使用扇出交换器的时候,我们会发现它不是很灵活, 扇出交换器近乎无脑的方式广播消息。我们想实现对消息过滤并处理时就并适用,例如系统有很多日志消息,广播消息后, 有些消费者处理ERROR严重级别的消息,例如将其写到磁盘上, 有些消费者处理非ERROR级别的消息,例如打印输出。这个时候我们就需要用到直连交换器,直连交换器的算法很简单:当一个消息进入队列,待发送消息的路由键(routingKey)和直连交换器和队列关系的绑定键(bindingKey)完全相同的时候, 消息将会被发送到关联的队列

如下图所示:有两个队列Q1和Q2和直连交换器X链接,第一个队列有一个绑定键(bindingKey)为orange, 第二哥队列有两个绑定键 (bindingKey)black和green,当一个消息以路由键orange发送到直连交换器的时候,消息将会被推送到Q1队列中,当消息以black或者green为路由键发送到直连交换器X中,消息将会被推送到Q2队列中

?

多重绑定

当然,用同一个绑定建(bindingKey)绑定多个队列是也合法的, 上面的例子中我们可以用black同时绑定Q1和Q2,当一个消息以black 为路由建发送到直连交换器X的时候,消息将会被推送到Q1和Q2中,其实用这种方式就代替了扇出交换器

实现图示

为了实现前面提到的按照日志错误等级来处理日志消息,我们将消息的等级当做路由键, 同时用消息等级当做bindingKey来绑定直连交换器和不同队列,C1只处理ERROR级别的日志,用于保存到磁盘文件中, C2接收所有日志实现打印输出.如下图所示

?

参考代码

生产者

根据路由建创建3条消息

public class Producer {

    private static final String EXCHANGE_NAME = "exchange_direct_log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //exchange-type= direct 直连交换器,routingKeys =bindingkey
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //routingKeys
        List<String> routingKeys = Arrays.asList("INFO", "WARNING", "ERROR");

        for (String rountingkey : routingKeys) {
            String message =rountingkey+ " Send the message level:" ;
            channel.basicPublish(EXCHANGE_NAME, rountingkey, null, message.getBytes("UTF-8"));
            System.out.println(" Send" + rountingkey + "':'" + message);
        }


        channel.close();
        connection.close();
    }
}

其中

?direct表示直连交换器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

?根据路由键发送消息

 channel.basicPublish(EXCHANGE_NAME, rountingkey, null, message.getBytes("UTF-8"));

消费者

//处理:"INFO", "WARNING","ERROR"
public class CustomerReviver1 {
    private static final String EXCHANGE_NAME = "exchange_direct_log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机并获取匿名队列
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        //设置bindingkey
        List<String> bindingkeys = Arrays.asList("INFO", "WARNING","ERROR");
        for (String bindingkey : bindingkeys) {
            channel.queueBind(queueName,EXCHANGE_NAME,bindingkey);
            System.out.println("CustomerReviver1 exchange:"+EXCHANGE_NAME+"," +
                    " queue:"+queueName+", BKey:" + bindingkey);

        }

        System.out.println("CustomerReviver1  Waiting for messages");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            System.out.println(" [x] do print");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}


//另一个类消费者2,处理"ERROR"

public class CustomerReviver2 {

    private static final String EXCHANGE_NAME = "exchange_direct_log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        //channel 绑定bindingkeys
        List<String> bindingkeys = Arrays.asList("ERROR");
        for (String bindingkey : bindingkeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingkey);
            System.out.println("CustomerReviver2 exchange:" + EXCHANGE_NAME + "," +
                    " queue:" + queueName + ", BKey:" + bindingkey);

        }
        System.out.println("CustomerReviver2  Waiting for messages");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {

            //routingKey=bindingKey时才起作用
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            System.out.println(" [x] do saving");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}

执行结果

生产者创建3条消息

?SendINFO':'INFO Send the message level:
?SendWARNING':'WARNING Send the message level:
?SendERROR':'ERROR Send the message level:

消费者

##消费者CustomerReviver1
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:INFO
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:WARNING
CustomerReviver1 exchange:exchange_direct_log, queue:amq.gen-6tprkkdqdrbhk2Elkddyyw, BKey:ERROR
CustomerReviver1  Waiting for messages
 [x] Received 'INFO':'INFO Send the message level:'
 [x] do print
 [x] Received 'WARNING':'WARNING Send the message level:'
 [x] do print
 [x] Received 'ERROR':'ERROR Send the message level:'
 [x] do print



##消费者CustomerReviver2 ,只处理ERROR等级消息
CustomerReviver2 exchange:exchange_direct_log, queue:amq.gen-CzPTr7XnTLLgURIS-gqRKA, BKey:ERROR
CustomerReviver2  Waiting for messages
 [x] Received 'ERROR':'ERROR Send the message level:'
 [x] do saving

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:18:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:02:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码