IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ工作模式 -> 正文阅读

[大数据]RabbitMQ工作模式

RabbitMQ工作模式

1.Work queues 工作队列模式

1.1 模式说明

1

work queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

1.2 模式实现

生产者

public class Producer {

    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("生产者");
        //通过连接获取通道 Channel
        Channel  channel = connection.createChannel();
        //通过通道创建交换机,声明队列,绑定关系,路由key,发送消息
        /**
         * @params1 队列的名称
         * @params2 是否要持久化 durable-false
         * @params3 排他性,是否是独占独立
         * @params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除
         * @params5 携带的附属参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //准备消息内容
        for (int i = 0; i < 15; i++) {
            //发送的消息
            String message = "work __ Hello,Consumer"+i;
            //发送消息给队列 queue
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送成功");
        }
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者 1

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("消费者1");
        //通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        //通过通道创建交换机,声明队列,绑定关系,路由key,接收消息
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        //一次只能接收并处理一个消息
        channel.basicQos(1);
        //创建消费者、消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("");
                    System.out.println("=======================消费者1开始===============================");
                    System.out.println("");
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消费者1 - 接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("=======================消费者1结束===============================");
                    System.out.println("");
                    Thread.sleep(1000);
                    //确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        //监听消息
        channel.basicConsume(Producer.QUEUE_NAME,false,consumer);
    }
}

消费者 2

public class Consumer2 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("消费者2");
        //通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        //通过通道创建交换机,声明队列,绑定关系,路由key,接收消息
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        //一次只能接收并处理一个消息
        channel.basicQos(1);
        //创建消费者、消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("");
                    System.out.println("=======================消费者2开始===============================");
                    System.out.println("");
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消费者2 - 接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("=======================消费者2结束===============================");
                    System.out.println("");
                    Thread.sleep(1000);
                    //确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        //监听消息
        channel.basicConsume(Producer.QUEUE_NAME,false,consumer);
    }
}
2 3

2.Publish/Subscribe 发布与订阅模式

2.1 模式说明

4

在订阅模型中,多了一个X (exchange) 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

? Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

发布订阅模式:

  1. 每个消费者监听自己的队列
  2. 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

2.2 模式实现

生产者

public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("生产者");
        //通过连接获取通道 Channel
        Channel  channel = connection.createChannel();
        /**
         * 声明交换机
         * @params1 交换机名称
         * @params2 交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
        //声明队列
        channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
        channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
        //准备消息内容
        for (int i = 0; i < 10; i++) {
            //发送消息
            String message = "发布订阅模式 __ Hello,Consumer"+i;
            channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
            System.out.println("消息发送成功");
        }
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("消费者1");
        //通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        //通过通道创建交换机,声明队列,绑定关系,路由key,接收消息
        channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
        //创建消费者、消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("");
                    System.out.println("=======================消费者1开始===============================");
                    System.out.println("");
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消费者1 - 接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("=======================消费者1结束===============================");
                    System.out.println("");
                    Thread.sleep(1000);
                    //确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        //监听消息
        channel.basicConsume(Producer.FANOUT_QUEUE_1,false,consumer);
    }
}

消费者2

public class Consumer2 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("消费者2");
        //通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        //通过通道创建交换机,声明队列,绑定关系,路由key,接收消息
        channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");
        //创建消费者、消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("");
                    System.out.println("=======================消费者2开始===============================");
                    System.out.println("");
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("消费者2 - 接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("=======================消费者2结束===============================");
                    System.out.println("");
                    Thread.sleep(1000);
                    //确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        //监听消息
        channel.basicConsume(Producer.FANOUT_QUEUE_2,false,consumer);
    }
}
5 6

2.3 小结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者 都收到

发布订阅模式与工作队列模式的区别

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  2. 工作队列模式的生产方是面向队列发送消息(底层使用默认交换机),发布/订阅模式的生产方是面向交换机发送消息
  3. 工作队列模式不需要设置,会将队列绑定到默认的交换机,发布/订阅模式需要设置队列和交换机的绑定

3.Routing 路由模式

3.1 模式说明

7

图解:

  • P:生产者,它向 Exchange 发送消息,同时会指定一个routing key
  • X:交换机,接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing keyerror 的消息
  • C2:消费者,其所在队列指定了需要 routing keyinfo、error、warning 的消息

路由模式特点:

  • 队列与交换机的绑定,不是任意绑定,而是指定一个Routing Key
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 Routing Key
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

3.2 模式实现

生产者

public class Producer {
    //交换机名称
    static final String DIRECT_EXCHAGE = "direct_exchange";
    //队列名称
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接
        Connection connection = connectionFactory.newConnection("生产者");
        //创建频道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
        //声明(创建)队列
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
        //发送信息
        String message = "INSERT 路由模式;routing key 为 insert " ;
        /**
         * @params1 交换机名称,如果没有指定则使用默认Default Exchange
         * @params2 路由key,简单模式可以传递队列名称
         * @params3 消息其它属性
         * @params4 消息内容
         */
        channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        // 发送信息
        message = "UPDATE 路由模式;routing key 为 update" ;
        channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        // 关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接
        Connection connection = connectionFactory.newConnection("消费者1");
        //创建频道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
        //声明(创建)队列
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("");
                System.out.println("=======================消费者1开始===============================");
                System.out.println("");
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
                System.out.println("");
                System.out.println("=======================消费者1结束===============================");
                System.out.println("");
            }
        };
        /**
         * 监听消息
         * @params1 队列名称
         * @params2 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * @params3 消息接收到后回调
         */
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
    }
}

消费者2

public class Consumer2 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接
        Connection connection = connectionFactory.newConnection("消费者2");
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE,true,false,false,null);
        //绑定
        channel.queueBind(Producer.DIRECT_QUEUE_UPDATE,Producer.DIRECT_EXCHAGE,"update");
        //创建消费者、消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("");
                System.out.println("=======================消费者2开始===============================");
                System.out.println("");
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
                System.out.println("");
                System.out.println("=======================消费者2结束===============================");
                System.out.println("");
            }
        };
        //监听消息
        channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE,true,consumer);
    }
}
8 9 10

4.Topics 通配符模式

4.1 模式说明

11

Topic类型与Direct相比:

都是可以根据Routing Key把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routing key 一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如:item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

4.2 模式实现

生产者

public class Producer {
    //交换机名称
    static final String TOPIC_EXCHAGE = "topic_exchange";
    //队列名称
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    static final String TOPIC_QUEUE_2 = "topic_queue_2";
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接 connection
        Connection connection = connectionFactory.newConnection("生产者");
        //通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(TOPIC_QUEUE_1,true,false,false,null);
        channel.queueDeclare(TOPIC_QUEUE_2,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"item.#");
        channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.delete");
        //发送信息
        String message = "新增  Topic模式;routing key 为 item.insert " ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //发送信息
        message = "修改  Topic模式;routing key 为 item.update" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //发送信息
        message = "删除  Topic模式;routing key 为 item.delete" ;
        channel.basicPublish(TOPIC_EXCHAGE, "ddd.delete", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //发送信息
        message = "除了这些以外的消息。。。。" ;
        channel.basicPublish(TOPIC_EXCHAGE, "item.delete.aaa", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接
        Connection connection = connectionFactory.newConnection("消费者1");
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
        // 声明(创建)队列
        channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHAGE,"item.#");
        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("");
                System.out.println("=======================消费者1开始===============================");
                System.out.println("");
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
                System.out.println("");
                System.out.println("=======================消费者1结束===============================");
                System.out.println("");
            }
        };
        //监听消息
        channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);
    }
}

消费者2

public class Consumer2 {
    public static void main(String[] args) throws Exception {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //创建连接
        Connection connection = connectionFactory.newConnection("消费者2");
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
        // 声明(创建)队列
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "*.delete");
        //创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("");
                System.out.println("=======================消费者2开始===============================");
                System.out.println("");
                System.out.println("路由key为:" + envelope.getRoutingKey());
                System.out.println("交换机为:" + envelope.getExchange());
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
                System.out.println("");
                System.out.println("=======================消费者2结束===============================");
                System.out.println("");
            }
        };
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
    }
}
12 13 14

5.模式总结

1、简单模式 HelloWorld : 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue : 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe : 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing : 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式 Topic : 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

个人博客为:
MoYu’s HomePage

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:50:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:16:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码