IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Rabbitmq中常用的五种连接方式 -> 正文阅读

[大数据]Rabbitmq中常用的五种连接方式

目录

前提准备?

方式一:? Hello World

? ? ? ? 服务端(provider)代码

? ? ? ? 客户端(customer)代码

方式二: work(以下方式都是通过工具类来创建connection对象)

? ? ? ? 有两种方式? ?第一种为平均分配

? ? ? ? 第二种为能者多劳方式

方式三??Publish/Subscribe? (以下所有连接方式都需要用到交换机)

provider端代码

? customer端代码

方式四? Routeing?

方式五:Topics??

provider端代码

customer端代码


前提准备?

rabbitmq官方手册??RabbitMQ Tutorials — RabbitMQ? (可以对照手册进行对比)

在编写连接方式之前, 我们先做一些前提的操作,用于我们编写连接代码时更方便

a).? 创建一个连接rabbitmq的工具类

public class RabbitmqUtil {
    private static ConnectionFactory factory;

    static{
        factory = new ConnectionFactory();
        factory.setHost("虚拟机的ip地址");
        factory.setPort(5672);
        factory.setVirtualHost("用户的模块");
        factory.setUsername("用户名");
        factory.setPassword("密码");
    }

    public static Connection getConnection(){
        try {
            return factory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnectionAndChannel(Connection connection, Channel channel){
        try {
            if (connection != null) connection.close();
            if (channel != null)channel.close();
        }catch (Exception e){
//            e.printStackTrace();
        }
    }
}

?图中画红色框框的就是用户的模块

导入相应的rabbitmq依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

下面我们进入正题?

方式一:? Hello World

? ? ? ? 服务端(provider)代码

????????如果不是springboot项目,这里方法可以替换中主函数进行运行

? ? ? ? ? ? ? ? 如果没有配置rabbitmq? 就需要把注释掉的代码解开

public class Provider {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
//        ConnectionFactory factory = new ConnectionFactory();
//
//        //设置ip地址
//        factory.setHost("虚拟机ip地址");
//        //设置端口号
//        factory.setPort(5672);
//        //设置虚拟主机
//        factory.setVirtualHost("虚拟主机");
        设置连接用的用户名和密码
//        factory.setUsername("用户名");
//        factory.setPassword("密码");
//
//        //通过factory获取连接对象
//        Connection connection = factory.newConnection

        Connection connection = RabbitmqUtil.getConnection();
        Connection connection = RabbitmqUtil.getConnection();
        //通过connection获取通道
        Channel channel = connection.createChannel();

        /**
         * 绑定对应的消息列
         *      参数1  队列名称  如果不存在自动创建
         *      参数2  队列是否持久化 false不持久化
         *      参数3  是否独占队列  true 独占队列
         *      参数4  消息传完是否自动删除队列 true自动删除
         *      参数5  其他参数
         */
        channel.queueDeclare("hello",false,false,false,null);

        /**
         * 发送消息
         *      参数1  交换机名称
         *      参数2  通道名称
         *      参数3  传递消息额外设置   MessageProperties.PERSISTENT_TEXT_PLAIN  rabbitmq 重启后消息依然在,
         *                            null 重启后消息消失
         *      参数4  传递的信息
         */
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbit".getBytes());
    
        //关闭连接
        RabbitmqUtil.closeConnectionAndChannel(connection,channel);
    }
}

? ? ? ? 客户端(customer)代码

public class Customer {

    public static void main(String[] args) throws IOException, TimeoutException {
//        //创建工厂并设置参数
//        ConnectionFactory factory = new ConnectionFactory();
//        factory.setHost("192.168.47.221");
//        factory.setPort(5672);
//        factory.setVirtualHost("/msg");
//        factory.setUsername("long");
//        factory.setPassword("123");
//        Connection connection1 = factory.newConnection();
//        获取对象通过对象获取通道
        Connection connection = RabbitmqUtil.getConnection();
        Channel channel = connection.createChannel();
        //绑定消息列
        channel.queueDeclare("hello",false,false,false,null);
        //读取信息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body) = " + new String(body));
            }
        });


    }
}

方式二: work(以下方式都是通过工具类来创建connection对象)

? ? ? ? 有两种方式? ?第一种为平均分配

? provider端代码

public class Provider {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        // 通过连接对象获取通道
        Channel channel = connection.createChannel();

        //绑定对应的消息栈
        /**
         * 绑定对应的消息列
         *      参数1  队列名称  如果不存在自动创建
         *      参数2  队列是否持久化 false不持久化
         *      参数3  是否独占队列  true 独占队列
         *      参数4  消息传完是否自动删除队列 true自动删除
         *      参数5  其他参数
         */
        channel.queueDeclare("work",true,false,false,null);

        /**
         * 发送消息
         *      参数1  交换机名称
         *      参数2  通道名称
         *      参数3  传递消息额外设置   MessageProperties.PERSISTENT_TEXT_PLAIN  rabbitmq 重启后消息依然在,
         *                            null 重启后消息消失
         *      参数4  传递的信息
         */
        for(int i=1;i<=20;i++) {
            channel.basicPublish("", "work", null,  (i + "hello work").getBytes());
        }
        //关闭连接
        RabbitmqUtil.closeConnectionAndChannel(connection,channel);

    }
}

customer端代码(大家可以多写几个customer端代码进行测试,我这里就类举一个了)

public class Customer1 {

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        //获取连接
        Channel channel = connection.createChannel();
        //绑定消息列
        channel.queueDeclare("work",true,false,false,null);
        //读取信息

        /**
         *  参数一:  消息队列的名称
         *  参数二:  消息自动确认
         *              true;消费整自动向 rabbitmq 确认信息消费
         *              false:不会自动确认消息
         *  参数三:  从rabbitmq中获取消息
         */
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("customer1 message :" + new String(body));

            }
        });
    }

}

? ? ? ? 第二种为能者多劳方式

? ? ? ? provider端代码一样,修改customer端代码即可

????????

public class Customer1 {

    public static void main(String[] args) throws IOException {
        //获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        //获取连接
        Channel channel = connection.createChannel();
        //绑定消息列
        channel.queueDeclare("work",true,false,false,null);
        //读取信息

        /**
         *  参数一:  消息队列的名称
         *  参数二:  消息自动确认
         *              true;消费整自动向 rabbitmq 确认信息消费
         *              false:不会自动确认消息
         *  参数三:  从rabbitmq中获取消息
         */
        channel.basicQos(1);// 每次只消费一个消息
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("customer1 message :" + new String(body));

                channel.basicAck(envelope.getDeliveryTag(),false);// 进行手动提交
            }
        });
    }

}

方式三??Publish/Subscribe? (以下所有连接方式都需要用到交换机)

? ? ? ? 服务端发送消息,所有的客户端都可以获取消息(类似于广播效果)

provider端代码

public class Provider {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        //通过连接对象获取通道
        Channel channel = connection.createChannel();
        //绑定交换机
        /**
         *  参数一: 交换机的名称   如何rabbitmq中没有会自动创建
         *  参数二: 交换机的类型   固定为fanout
         */
        channel.exchangeDeclare("logs","fanout");

        /**
         *   参数一: 交换机的名称
         *   参数二: 路由的key
         *   参数三: 其他参数
         *   参数四: 要传送的数据
         */
        channel.basicPublish("logs","",null,"hello  fanout".getBytes());

        //关闭连接
        RabbitmqUtil.closeConnectionAndChannel(connection,channel);

    }
}

? customer端代码

public class Customer {

    public static void main(String[] args) throws IOException{
        // 获取当前连接对象
        Connection connection = RabbitmqUtil.getConnection();
        // 通过连接对象获取通道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时通道的名称
        String queue = channel.queueDeclare().getQueue();
        //创建临时对列绑定交换机
        /**
         *   参数一: 临时通道的名称
         *   参数二: 交换机的名称
         *   参数三: 路由的key
         */
        channel.queueBind(queue,"logs","");

        //获取信息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("fanout message :" + new String(body));
            }
        });

    }
}

方式四? Routeing?

? ? ? ?根据route对消息进行发布,provider只对对应的route发送消息? 代码如下

provider端代码

public class Provider {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        // 通过连接对象获取通道
        Channel channel = connection.createChannel();

        String exchangeName = "logs_direct";
        // 声明交换机
        /**
         *   参数一: 交换机的名称
         *   参数二: 交换机的类型
         */
        channel.exchangeDeclare(exchangeName,"direct");
        String key = "Info";
        //发布消息
        channel.basicPublish(exchangeName,key,null,("provider发布的消息类型为:["+ key+"],  hello direct").getBytes());

        //关闭
        RabbitmqUtil.closeConnectionAndChannel(connection,channel);
    }
}

customer端代码(customer端代码一样,大家可以多写几个进行试验)

public class Customer {

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        //通过连接对象获取通道
        Channel channel = connection.createChannel();

        String exchangeName = "logs_direct";
        //声明交换机
        channel.exchangeDeclare(exchangeName,"direct");
        //声明虚拟队列
        String queueName = channel.queueDeclare().getQueue();// 获取虚拟队列的名称
        /**
         *  参数一: 队列名称
         *  参数二: 交换机名称
         *  参数三: 路由的key
         */
        channel.queueBind(queueName,exchangeName,"error");// 绑定交换机
        channel.queueBind(queueName,exchangeName,"Info");// 绑定交换机
        channel.queueBind(queueName,exchangeName,"Warn");// 绑定交换机

        //获取消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息为"+ new String(body));
            }
        });
    }
}

方式五:Topics??

? ? ? ? 动态路由, 在方式四的基础上添加了统配符? 通配符的种类有两种? 如下

? ? ? ? ? ? ? ? *: 代表任意一个单纯,如 user.* / *.user / *.user.*? ? 以第一种为例:代表只接受 有user开头,route的个数是两个单纯的消息(单词和单词之间用 . 分割? ? 这个的route是只接受两个单词的消息,如以user开头,有三个请求他是接受不到的,要注意!!!)

? ? ? ? ? ? ? ? #:代表一个多多个单词,如user.# /?#.user / #.user.#? 以第一种为例: 代表只要是以user开头的消息都可以被接受到?

provider端代码

?????????

public class Provider{

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitmqUtil.getConnection();
        // 通过连接对象获取通道
        Channel channel = connection.createChannel();
        // 绑定交换机
        String exchangeName = "topics";
        channel.exchangeDeclare(exchangeName,"topic");
        // 声明routekey  发布消息
        String routeKey = "user.save";
        channel.basicPublish(exchangeName,routeKey,null,("topic要发送参数了啊,routeKey["+routeKey+"]").getBytes());

        //关闭
        RabbitmqUtil.closeConnectionAndChannel(connection,channel);

    }
}

customer端代码

public class Customer {

    public static void main(String[] args) throws IOException{
        Connection connection = RabbitmqUtil.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "topics";

        channel.exchangeDeclare(exchangeName,"topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,exchangeName,"user.*");

        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("customer1接受到消息:"+ new String(body));
            }
        });
    }
}

以上就是常用的五种rabbitmq的连接方式,希望可以对大家有所帮助 !!!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-26 12:16:31  更:2021-10-26 12:16:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 5:09:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码