IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ教程主题交换器Topics -> 正文阅读

[大数据]RabbitMQ教程主题交换器Topics

前言:上一篇中示例改进了我们的消息处理,将扇形交换器替换成直连交换器,实现了根据日志等级来接收处理消息。虽然直连交换器改进了程序,但是它仍然有局限性, 不能根据多种标准实现接收处理日志消息。例如我们想按照Error, Warning,Info等日志等级接收消息,同时我们还想根据日志所属系统来处理消息。如果可以实现多种标准进行路由, 程序将会更加的灵活。我们接下来了解主题交换器(Topic Exchange)

RabitMQ安装

如何安装:?https://blog.csdn.net/Beijing_L/article/details/119042261

图例

P(生产者): 生产者属于数据的发送方 ,发送的消息被放入队列里

C(消费者): 消费者属于数据的接收方,发现队列里的消息,将消息从队列中读取出来做业务操作

Queue(队列): RabbitMQ的作用是存储消息,队列的特性是先进先出

主题交换器(Topic Exchange)

之前的示例中使用路由键都是随便定义的, 但消息发送给主题交换器的时候路由键(RoutingKey)不能随便定义,路邮件必须是由点分割的单词列表。可以使用任何单词或者词组,我们通常是用消息的一些特征。就例如 “quick.orange.rabbit” 或者“lazy.orange.elephant”等 。但是RoutingKey最大不能超过255 bytes

交换器和队列的绑定键(binding Key)也必须是相同的形式, 主体交换器的底层实现原理和直连交换器很类似,使用特定路由键的消息发送到主体交换器中后,如果队列的绑定建和路邮键匹配,则消息将会发送到匹配的队列里。绑定键匹配处理的时候有2个特殊处理,如下图所示

  1. * 可以表示任何一个单词
  2. # 可以表示0个或者多个单词

生产者发送消息到主体交换器X, 主题交换器X根据路由键(routing key)和绑定键(binding key)匹配处理,满足匹配原则的就将消息推送到队列里

其中:

  • Q1队列: 接收所有带orange颜色的动物相关消息
  • Q2队列: 接收所有rabbit的消息以及所有Lazy的动物消息

例如:

  • 路由键 quick.orange.rabbit 满足*.orange.* 同时也满足 *.*.rabbit ,所以它会被发送到队列Q1和队列Q2
  • 路由键 lazy.orange.elephant 满足*.orange.* 同时也满足layz.# ,所以它会被发送到队列Q1和队列Q2 ,因为#可以表示多个单词
  • 路由键 lazy.brown.fox 只满足layz.# , 所以它会被发送到Q2中

想想看, 我们发送消息时以orange或者quick.orange.male.rabbit 当路由键时会发生什么? 两个消息将不会匹配到任何队列造成丢失.但是如果使用lazy.orange.male.rabbit当做路由键的时候,消息会被发送到Q2队列中。虽然路由键是4个单词,但是它可以匹配lazy.# 规则,因为#可以代表多个单词

主题交换器比其他交换器更强大

  • 当主题交换器和队列的绑定键为“#” 的时候,队列将会接收到所有的消息,就和不考虑秘钥的扇出交换器(fanout exchange)一样.
  • 当主题交换器和队列的绑定键不使用“*”或者“#” 的时候,主题交换器就和直连交换器(direct exchange)一样

参考代码

我们的示例程序中使用主体交换器,例如考虑路由键的配置方式为 <所述系统>.<严重等级> ,代码和先前的代码差不多

消费者

public class ReceiveOtherSysLogs {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");


        //定义一个路由关键字
        String[] bindingKeys = new String[]{"customer.*", "accept.*"};
        String queueName = channel.queueDeclare().getQueue();
        //绑定路由
        for (String bindingKey : bindingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey);
        }
        System.out.println("Waiting for messages");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }

}

?ReceiveOtherSysLogs? 用于接收customer和accept系统的消息 ,因为使用的是* 所有消息路由必须是两个单词,多个单词无法匹配

public class ReceiveProductSysLogs {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        //定义一个路由关键字
        String[] bindingKeys = new String[]{"product.#"};


        //绑定路由
        for (String bindingKey : bindingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            System.out.println("exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + bindingKey);
        }
        System.out.println(" Waiting for messages");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }

}

?ReceiveProductSysLogs? 用于接收产品系统的所有消息,因为使用#,所以可以匹配以product开通的KEY

生产者

public class TopicSender {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //待发送的消息
        String[] routingKeys = new String[]{
                "customer.error",
                "product.info",
                "product.error",
                "product.error.log",
                "accpet.info.log",
        };
        //发送消息
        for (String routingKey : routingKeys) {
            String message = "routingKey=" + routingKey + " log:" + " this is information";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println("TopicSend Sent '" + routingKey + "':'" + message + "'");
        }


        channel.close();
        connection.close();

    }
}

执行结果

"customer.error"满足customer.*
"product.info"满足product.#
?"product.error"满足product.#
"product.error.log"满足product.#? 其中#可以匹配多个单词
"accpet.info.log",不满足accept.*其中*智能匹配一个单词

生产者发送消息

TopicSend Sent 'customer.error':'routingKey=customer.error log: this is information'
TopicSend Sent 'product.info':'routingKey=product.info log: this is information'
TopicSend Sent 'product.error':'routingKey=product.error log: this is information'
TopicSend Sent 'product.error.log':'routingKey=product.error.log log: this is information'
TopicSend Sent 'accpet.info.log':'routingKey=accpet.info.log log: this is information'

消费者处理消息

##ReceiveProductSysLogs 处理了所有product相关的消息
exchange:topic_logs, queue:amq.gen-cd03G1A8vIBgzqN7eW-zbg, BindRoutingKey:product.#
 Waiting for messages
 [x] Received 'product.info':'routingKey=product.info log: this is information'
 [x] Received 'product.error':'routingKey=product.error log: this is information'
 [x] Received 'product.error.log':'routingKey=product.error.log log: this is information'
#ReceiveOtherSysLogs 只处理满足条件的消息customer.error
exchange:topic_logs, queue:amq.gen-UJEhQV-JDb1s8fU8lTPFVQ, BindRoutingKey:customer.*
exchange:topic_logs, queue:amq.gen-UJEhQV-JDb1s8fU8lTPFVQ, BindRoutingKey:accept.*
Waiting for messages
 [x] Received 'customer.error':'routingKey=customer.error log: this is information'

上一篇:RabbitMQ教程直连交换器和路由Routing

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-05 17:25:10  更:2021-08-05 17:27:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:24:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码