IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ--简单模式-工作队列模式-订阅发布模式-路由模式-通配符模式 -> 正文阅读

[大数据]RabbitMQ--简单模式-工作队列模式-订阅发布模式-路由模式-通配符模式

原理图

在这里插入图片描述

原生生产者代码

/**
 * 简单模式:发送消息
 */
public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        //1. 创建连接工厂(设置RabbitMQ的连接参数);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机;默认localhost
        connectionFactory.setHost("localhost");
        //连接端口;默认5672
        connectionFactory.setPort(5672);
        //虚拟主机;默认/
        connectionFactory.setVirtualHost("/");
        //用户名;默认guest
        connectionFactory.setUsername("guest");
        //密码;默认guest
        connectionFactory.setPassword("guest");

        //2. 创建连接;
        Connection connection = connectionFactory.newConnection();
        //3. 创建频道;
        Channel channel = connection.createChannel();
        //4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //5. 发送消息;
        String message = "你好。";

        /**
         * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
         * 参数2:路由key,简单模式中可以使用队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //6. 关闭资源
        channel.close();
        connection.close();
    }
}

原生消费者代码

public class ComsumerHi {
    static final String QUEUE_NAME="hello_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("hello_queue",true,false,false,null);


        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

2.工作队列模式(包工头)

在同一个队列中可以有多个消费者,消费者之间对于消息的接收是竞争关系。

注:在简单模式的基础上直接再启动多几个consumer就行了

3.订阅发布模式(联想微博):

一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播

生产者代码:

public class PubSub {
    static final String SMS_QUEUE="sms_queue";
    static final String EMAIL_QUEUE="email_queue";

    static final String FANOUT_EXCHANGE="fanout_exchange";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);


        channel.queueDeclare(SMS_QUEUE,true,false,false,null);
        channel.queueDeclare(EMAIL_QUEUE,true,false,false,null);

        channel.queueBind(SMS_QUEUE,FANOUT_EXCHANGE,"");
        channel.queueBind(EMAIL_QUEUE,FANOUT_EXCHANGE,"");

        String msg ="gwiogj";

        channel.basicPublish(FANOUT_EXCHANGE,"",null,msg.getBytes());

        channel.close();
        connection.close();
    }
}

消费者:

只需要修改一处代码,也就是修改队列名即可:

SMS消费者

channel.basicConsume(SMS_QUEUE,true,defaultConsumer);

EMAIL消费者:

channel.basicConsume(EMAIL_QUEUE,true,defaultConsumer);

但是:万一运维率先启动的是消费者,那么开发人员最好把队列和交换机照着生产者代码声明一遍,防止找不到交换机和队列的绑定关系

★★★4.路由模式(分布式日志收集):

比发布/订阅模式更灵活。
Routing 路由模式要求队列绑定到交换机的时候指定路由key;消费发送时候需要携带路由key;只有消息的路由key队列路由key完全一致才能让该队列接收到消息。

生产者:

public class ProducerRouting {
    static final String ALERT_QUEUE="alert_queue";
    static final String LOG_QUEUE="log_queue";

    static final String DIRECT_EXCHANGE="direct_exchange";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);


        channel.queueDeclare(ALERT_QUEUE,true,false,false,null);
        channel.queueDeclare(LOG_QUEUE,true,false,false,null);

//        第三个参数声明路由Key
        channel.queueBind(ALERT_QUEUE,DIRECT_EXCHANGE,"error");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"info");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"warning");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"error");

        for (int i = 0; i < 5; i++) {
            String msg ="正常info";
            channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
        }
        for (int i = 0; i < 5; i++) {
            String msg ="异常error";
            channel.basicPublish(DIRECT_EXCHANGE,"error",null,msg.getBytes());
        }


        channel.close();
        connection.close();
    }
}

消费者:

在生产者的队列和交换机都在消费者代码声明好的前提下,只需改动消费者监听的队列名称即可

public class RoutingConsumer {

    static final String ALERT_QUEUE="alert_queue";
    static final String LOG_QUEUE="log_queue";

    static final String DIRECT_EXCHANGE="direct_exchange";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("hello_queue",true,false,false,null);
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);


        channel.queueDeclare(ALERT_QUEUE,true,false,false,null);
        channel.queueDeclare(LOG_QUEUE,true,false,false,null);

//        第三个参数声明路由Key
        channel.queueBind(ALERT_QUEUE,DIRECT_EXCHANGE,"error");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"info");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"warning");
        channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"error");


        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        channel.basicConsume(LOG_QUEUE,true,defaultConsumer);
    }
}

topics通配符模式(路由模式升级版):

可以根据路由key将消息传递到对应路由key的队列;队列绑定到交换机的路由key可以有多个;通配符模式中路由key可以使用 *# ;使用了通配符模式之后对于路由Key的配置更加灵活。

生产者:

public class ProducerTopics {
    static final String GUANGZHOU_XIAOZHANG_QUEUE="guangzhou_xiaozhang_queue";
    static final String ZONGBU_CAIWU_QUEUE="zongbu_caiwu_queue";

    static final String TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);


        channel.queueDeclare(GUANGZHOU_XIAOZHANG_QUEUE,true,false,false,null);
        channel.queueDeclare(ZONGBU_CAIWU_QUEUE,true,false,false,null);

//        第三个参数声明路由Key
        ★★★channel.queueBind(GUANGZHOU_XIAOZHANG_QUEUE,TOPIC_EXCHANGE,"guangzhou.*.*");
        ★★★channel.queueBind(ZONGBU_CAIWU_QUEUE,TOPIC_EXCHANGE,"*.caiwu.*");


        for (int i = 0; i < 5; i++) {
            String msg ="正常info";
            channel.basicPublish(TOPIC_EXCHANGE,"guangzhou.caiwu.info",null,msg.getBytes());
        }
        for (int i = 0; i < 5; i++) {
            String msg ="异常error";
            channel.basicPublish(TOPIC_EXCHANGE,"guangzhou.renshi.error",null,msg.getBytes());
        }
        for (int i = 0; i < 5; i++) {
            String msg ="success";
            channel.basicPublish(TOPIC_EXCHANGE,"Santeeyago.caiwu.info",null,msg.getBytes());
        }


        channel.close();
        connection.close();
    }
}

消费者:

public class TopicsConsumerXiaozhang {

    static final String GUANGZHOU_XIAOZHANG_QUEUE="guangzhou_xiaozhang_queue";
    static final String ZONGBU_CAIWU_QUEUE="zongbu_caiwu_queue";

    static final String TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);


        channel.queueDeclare(GUANGZHOU_XIAOZHANG_QUEUE,true,false,false,null);
        channel.queueDeclare(ZONGBU_CAIWU_QUEUE,true,false,false,null);

//        第三个参数声明路由Key
        channel.queueBind(GUANGZHOU_XIAOZHANG_QUEUE,TOPIC_EXCHANGE,"guangzhou.*.*");
        channel.queueBind(ZONGBU_CAIWU_QUEUE,TOPIC_EXCHANGE,"*.caiwu.*");


        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        channel.basicConsume(GUANGZHOU_XIAOZHANG_QUEUE,true,defaultConsumer);
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 15:11:21  更:2021-08-20 15:12:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:54:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码