IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ详解 -> 正文阅读

[大数据]RabbitMQ详解

RabbitMQ详解

RabbitMq发送接收消息方式

RabbitMq不使用交换机

发送方

    public static void main(String[] args) {
        String messageData = "test message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            /**
             * 参数1:队列名
             * 参数2:持久化
             * 参数3:是否排外,排外只允许一个消费者监听
             * 参数4:是否自动删除,队列中没有消息并且没有消费者连接时会自动删除
             * 参数4:为队列设置属性,通常为null
             */
            channel.queueDeclare("myqueue",true,false,false,null);

            /**
             * 参数1:交换机名
             * 参数2:队列名或者routingkey,指定了交换机这里就是routingkey
             * 参数3:设置消息属性,通常为null
             * 参数4:消息数据
             */
            channel.basicPublish("","myqueue",null,messageData.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

接收方

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();                     
            channel.queueDeclare("myqueue",true,false,false,null);
            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("myqueue",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println(message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

RabbitMq-direct

发送方

 public static void main(String[] args) {
        String messageData = "direct message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            /**
             * 队列
             * 参数1:队列名
             * 参数2:持久化
             * 参数3:是否排外,排外只允许一个消费者监听
             * 参数4:是否自动删除,队列中没有消息并且没有消费者连接时会自动删除
             * 参数4:为队列设置属性,通常为null
             */
            channel.queueDeclare("myqueue001",true,false,false,null);
            /**
             * 声明一个交换机
             * 参数1交换机名称
             * 参数2交换机类型
             * 参数3是否持久化
             */
            channel.exchangeDeclare("exchange001","direct",true);
            /**
             * 将队列绑定到交换机
             * 参数1队列名称
             * 参数2交换机名称
             * 参数3routingkey(Bindingkey)
             */
            channel.queueBind("myqueue001","exchange001","routingkey001");
            /**
             * 参数1:交换机名
             * 参数2:routingkey
             * 参数3:设置消息属性,通常为null
             * 参数4:消息数据
             */
            channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

接收方

 public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");

            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("myqueue001",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println(message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

RabbitMq-fanout

发送方

 public static void main(String[] args) {
        String messageData = "fanout message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("exchange002","fanout",true);
            /**
             * 参数1:交换机名
             * 参数2:routingkey
             * 参数3:设置消息属性,通常为null
             * 参数4:消息数据
             */
            channel.basicPublish("exchange002","",null,messageData.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

接收方01

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            //创建随机队列
            String queue = channel.queueDeclare().getQueue();
            channel.exchangeDeclare("exchange002","fanout",true);
            channel.queueBind(queue,"exchange002","");

            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume(queue,true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println(“消费者01+message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

接收方02

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            //创建随机队列
            String queue = channel.queueDeclare().getQueue();
            channel.exchangeDeclare("exchange002","fanout",true);
            channel.queueBind(queue,"exchange002","");

            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume(queue,true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println(“消费者02+message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

RabbitMq-topic

消费者001

  public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("topicmyqueue001",true,false,false,null);
            channel.exchangeDeclare("topicexchange001","topic",true);
            channel.queueBind("topicmyqueue001","topicexchange001","aa");
            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("topicmyqueue001",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println("消费者001"+message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

消费者002

  public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("topicmyqueue002",true,false,false,null);
            channel.exchangeDeclare("topicexchange001","topic",true);
            channel.queueBind("topicmyqueue002","topicexchange001","aa.*");
            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("topicmyqueue002",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println("消费者002"+message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

消费者003

  public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("topicmyqueue003",true,false,false,null);
            channel.exchangeDeclare("topicexchange001","topic",true);
            channel.queueBind("topicmyqueue003","topicexchange001","aa.#");
            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("topicmyqueue003",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String message=new String(body,"utf-8");
                    System.out.println("消费者003"+message);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
        }
    }

发送者

public static void main(String[] args) {
        String messageData = "topic message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("topicexchange001","topic",true);
            /**
             * 参数1:交换机名
             * 参数2:routingkey
             * 参数3:设置消息属性,通常为null
             * 参数4:消息数据
             */
          //消费者001,消费者003可以接收到消息
          //  channel.basicPublish("topicexchange001","aa",null,messageData.getBytes());
          //消费者002,消费者003可以接收到消息
          //  channel.basicPublish("topicexchange001","aa.bb",null,messageData.getBytes());
          //消费者003可以接收到消息 
              channel.basicPublish("topicexchange001","aa.bb.cc",null,messageData.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

事务

1.RabbitMq的事务只对发送方生效,接收者不论是否声明事务都会将队列中的消息移除。
2.事务声明后消息不会保存到队列,只有事务提交以后消息才会进入队列中。
txSelect()声明事务
txCommit();提交事务
txRollback();回滚事务

    public static void main(String[] args) {
        String messageData = "direct message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");
            channel.txSelect();
            channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel!=null){
                try {
                    channel.txRollback();
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

发送者确认模式

普通确认

   public static void main(String[] args) {
        String messageData = "direct message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");
            //开启确认
            channel.confirmSelect();
            channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
            //阻塞线程等待服务器返回响应,确认发送成功返回true,返回false消息可能失败也可能成功
            //如果返回false或者报错需要对消息进行验证,失败的消息必须补发
            boolean b = channel.waitForConfirms();
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

批量确认

    public static void main(String[] args) {
        String messageData = "direct message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");
            //开启确认
            channel.confirmSelect();
            channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
            //批量确认,确认之前发送的消息是否全部写入队列
            //该方法没有返回值,如果出现没有成功写入队列的情况,无法确定具体是哪条消息失败,无法补发
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

异步确认

    public static void main(String[] args) {
        String messageData = "direct message, hello!";
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");
            //开启确认
            channel.confirmSelect();

            //异步监听
            channel.addConfirmListener(new ConfirmListener() {
                /**
                 * 消息确认以后回调方法
                 * @param l 被确认的消息编号,从1开始,用于标记当前消息是第几个
                 * @param b 当前消息是否确认了多个
                 * @throws IOException
                 */
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    //b为true表示确认了多个消息,false表示只确认了当前一个消息
                    System.out.println("消息被确认-----当前消息编号: "+l+"    是否确认多个:"+b);
                }
                //消息没有确认回调方法,需要补发
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("消息没有被确认-----当前消息编号: "+l+"    是否没有确认多个:"+b);
                }
            });

            for (int i=0; i<10000;i++){
                channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
            }
          System.out.println("消息发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if(channel!=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

在这里插入图片描述

接收方消息确认

1.如果消费者声明了事务,必须提交事务消息才会被移除队列
2.isRedeliver()消息是否被接收过

 public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("47.110.157.82");
        factory.setPort(5672);
        factory.setUsername("liu");
        factory.setPassword("123456");
        Connection connection=null;
        Channel channel=null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare("myqueue001",true,false,false,null);
            channel.exchangeDeclare("exchange001","direct",true);
            channel.queueBind("myqueue001","exchange001","routingkey001");

            /**
             * 参数1 监听队列名
             * 参数2 自动确认
             * 参数3 接受者标签
             * 参数4 消息接收回调方法
             */
            channel.basicConsume("myqueue001",false,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //消息是否被接收过,如果被接收过,可能处理完成,需要做验证
                    boolean redeliver = envelope.isRedeliver();
                    if(!redeliver){
                        //获取消息编号
                        long deliveryTag = envelope.getDeliveryTag();
                        Channel channel1 = this.getChannel();
                        String message=new String(body,"utf-8");
                        //手动确认
                        /**
                         * 参数2是否确认小于等于当前编号的所有消息,true确认多个,false确认当前消息
                         */
                        channel1.basicAck(deliveryTag,true);
                        System.out.println(message);
                    }else{
                        //验证消息是否处理成功
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

springboot整合RabbitMq

direct方式

发送端
1.添加pom依赖

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8091
spring:
  application:
    name: rabbitmq-provider
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.RabbitMQ配置类DirectRabbitConfig

@Configuration
public class DirectRabbitConfig {

    //队列
    @Bean
    public Queue directQueue() {
        return new Queue("directQueue",true);
    }

    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue,DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

4.发送消息

@Controller
public class SendController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("sendMessage")
    @ResponseBody
    public String sendMessage(String message){
        String messageId = String.valueOf(UUID.randomUUID());
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",message);
        map.put("createTime",createTime);
        //将消息携带绑定键值:routingkey001发送到交换机directExchange
        rabbitTemplate.convertAndSend("directExchange", "routingkey001", map);
        return "ok";
    }
}

接收端
1.添加pom.xml依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rabbitmq-->
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8092
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.为了防止队列不存在报错,这里我们也添加RabbitMQ配置类DirectRabbitConfig

@Configuration
public class DirectRabbitConfig {

    //队列
    @Bean
    public Queue directQueue() {
        return new Queue("directQueue",true);
    }

    //Direct交换机
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    //绑定  将队列和交换机绑定
    @Bean
    Binding bindingDirect(Queue directQueue,DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
    }
}

4.接收消息

@Component
@RabbitListener(queues = "directQueue")//监听的队列名称 directQueue
public class ReceiveMessage {
    /**
     * 如果这个方法正常结束,spring会自动确认消息
     * 如果这个方法报错,消息不会确认,并且进入死循环
     * 因此在处理消息之前需要防重复处理
     * @param testMessage
     */
    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("准备接收消息了");
        //1.防重复处理代码

        //2.消息处理代码
        System.out.println("directReceiver消费者收到消息  : " + testMessage.toString());
    }
}

fanout方式

发送端
1.添加pom依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8091
spring:
  application:
    name: rabbitmq-provider
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.RabbitMQ配置类FanoutRabbitConfig

@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

}

4.发送消息

@Controller
public class SendController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("sendMessage")
    @ResponseBody
    public String sendMessage(String message){
        String messageId = String.valueOf(UUID.randomUUID());
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",message);
        map.put("createTime",createTime);
        rabbitTemplate.convertAndSend("fanoutExchange", "", map);
        return "ok";
    }
}

接收端
1.添加pom依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rabbitmq-->
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8092
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.RabbitMQ配置类FanoutRabbitConfig

@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

}

4.接收消息

@Component
public class FanoutReceive {

    @RabbitListener(bindings = {
            @QueueBinding(value =@Queue(),//创建一个随机队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout"))
    })
    public void fanoutReceive(Map messgae){
        System.out.println("fanoutReceiver01消费者收到消息"+messgae.toString());
    }


    @RabbitListener(bindings = {
            @QueueBinding(value =@Queue(),//创建一个随机队列
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout"))
    })
    public void fanoutReceive02(Map messgae){
        System.out.println("fanoutReceiver02消费者收到消息"+messgae.toString());
    }
}

topic方式

发送端
1.添加pom依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8091
spring:
  application:
    name: rabbitmq-provider
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.RabbitMQ配置类FanoutRabbitConfig

@Configuration
public class TopicRabbitConfig {

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
}

4.发送消息

    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("sendMessage")
    @ResponseBody
    public String sendMessage(String message){
        String messageId = String.valueOf(UUID.randomUUID());
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",message);
        map.put("createTime",createTime);
        rabbitTemplate.convertAndSend("topicExchange", "aa.bb", map);
        return "ok";
    }
}

接收端
1.添加pom依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--rabbitmq-->
        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 8092
spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    host: 47.110.157.82
    port: 5672
    username: liu
    password: 123456

3.接收消息

@Component
public class TopicReceive {

    @RabbitListener(bindings = {
            @QueueBinding(value =@Queue("topicQueue001"),//创建一个随机队列
                    key = {"aa"},
                    exchange = @Exchange(name = "topicExchange",type = "topic"))
    })
    public void topicReceive01(Map messgae){
        System.out.println("topicReceive01消费者收到消息"+messgae.toString());
    }

    @RabbitListener(bindings = {
            @QueueBinding(value =@Queue("topicQueue002"),//创建一个随机队列
                    key = {"aa.*"},
                    exchange = @Exchange(name = "topicExchange",type = "topic"))
    })
    public void topicReceive02(Map messgae){
        System.out.println("topicReceive02消费者收到消息"+messgae.toString());
    }

    @RabbitListener(bindings = {
            @QueueBinding(value =@Queue("topicQueue003"),//创建一个随机队列
                    key = {"aa.#"},
                    exchange = @Exchange(name = "topicExchange",type = "topic"))
    })
    public void topicReceive03(Map messgae){
        System.out.println("topicReceive03消费者收到消息"+messgae.toString());
    }
}

在这里插入图片描述

RabbitMQ集群搭建

我的服务器一:192.168.164.134服务器二:192.168.164.137
1.修改host文件
修改192.168.164.134 hosts文件

vim /etc/hosts

在这里插入图片描述
修改192.168.164.137 hosts文件
在这里插入图片描述
重启服务器,确保两个服务器之间互通
在这里插入图片描述
在这里插入图片描述

2.准备两个服务器分别安装RabbitMQ
RabbitMQ下载安装
保证两个RabbitMQ成功安装
在这里插入图片描述
在这里插入图片描述
3.将两个服务器.erlang.cookie保持一直
查看192.168.164.134的.erlang.cookie,将内容复制

 cat /var/lib/rabbitmq/.erlang.cookie 

在这里插入图片描述
将192.168.164.134的.erlang.cookie复制到192.168.164.137

scp /var/lib/rabbitmq/.erlang.cookie 192.168.164.137:/var/lib/rabbitmq/

重启两个服务器的RabbitMQ

/sbin/service rabbitmq-server restart

4.集群配置
192.168.164.134

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq2
rabbitmqctl start_app

192.168.164.137

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app

在这里插入图片描述
在这里插入图片描述
5.镜像集群配置
添加规则
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-03 11:16:33  更:2021-08-03 11:16:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/22 6:09:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码