IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习篇3_快速入门案例、原生操作举栗、4种模式、解决报错com.rabbitmq.client.ShutdownSignalException: connection error -> 正文阅读

[大数据]RabbitMQ学习篇3_快速入门案例、原生操作举栗、4种模式、解决报错com.rabbitmq.client.ShutdownSignalException: connection error

目录


  • 快速入门案例

    1. 简单模式
    2. 工作模式
    3. 发布与订阅模式
      • fanout
      • direct
      • topic
  • 举栗

    1. 生产者Producer代码
    2. 一些说明
    3. 消费者Consumer代码
    4. 一些说明
    5. windows下创建连接对象失败报错解决

一、快速入门案例

查看官网的 教程RabbitMQ Tutorials :https://www.rabbitmq.com/getstarted.html,共有4种mq的模式场景(fanout、direact、topic、headers)

1. 隐式交换机简单模式:单个消费者

在这里插入图片描述

2. 隐式交换机工作模式:多个消费者,分发模式有轮询、公平分发
  • 轮询分发:默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为循环。
  • 公平分发:按照权重分发
    在这里插入图片描述
  • 消息确认:如果消费者在接受完任务之后,突然出故障,这时需要有 消息确认 机制 接着mq将未处理的信息重新放入队列分发处理在这里插入图片描述
3. 显示交换机模式
  • 需要一个 exchange交换机binding,交换类型有: direct, topic, headers and fanout
  • direct, topic, headers and fanout 是交换机 和 队列之间的关系
  • binding 是队列和消费者之间的关系,消费者绑定队列 才能收到信息
1.fanout发布与订阅模式
  • fanout只是将它接收的所有消息广播到它知道的所有队列
  • 消费者binding 指定队列才能接收到信息

在这里插入图片描述

2.direct路由交换模式
  • 上面的基础上,向它添加一个功能 ,使得只能订阅邮件的子集成为可能。订阅发布是 队列和消费者 之间的关系。

  • 不在使用fanout交换 将使用direct交换。直接交换背后的路由算法很简单 - 消息转到其binding key与消息的routing key完全匹配的队列。

    在这里插入图片描述

direct交换方式下设置相同的binding key 效果和 fanout类似

在这里插入图片描述

3.topic模式
  • 相当于增加了模糊匹配的direct交换方式

在这里插入图片描述

4.举栗
生产者Producer代码
package henu.soft.xiaosi.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) {

        // 所有的中间件技术都是基于TCP\IP协议之上构建新型的协议规范,主不过rabbitmq遵循amqp协议 ip、port


        /**
         * 原生方式使用rabbitmq
         */
        // 1. 创建连接工厂

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
//        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            // 2. 创建连接Connection

            connection = factory.newConnection("生产者");


            // 3. 通过连接获取管道channel

            channel = connection.createChannel();


            // 4. 创建交换机,声明队列、绑定关系、路由key、发送消息、接受信息

            String queueName = "xiaosi";

            /**
             * 参数:
             * 队列名称
             * 是否需要持久化
             * 排他性
             * 是否自动删除(最后一个消费者消费完是否删除队列
             * 携带附加参数
             */

            channel.queueDeclare(queueName,false,false,false,null);

            // 5. 准备消息内容
            String msg = "RabbitMQ~~";


            // 6. 发送消息到queue

            channel.basicPublish("",queueName,null,msg.getBytes());

            System.out.println("消息发送成功!");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                // 7. 关闭通道
                if(channel != null && channel.isOpen()){
                    channel.close();
                }
                // 8. 关闭连接
                if(connection != null && channel.isOpen()){
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }


        }


    }
}

一些说明

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

请添加图片描述

消费者Cosumer代码
package henu.soft.xiaosi.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) {

        // 所有的中间件技术都是基于TCP\IP协议之上构建新型的协议规范,主不过rabbitmq遵循amqp协议 ip、port


        /**
         * 原生方式使用rabbitmq
         */
        // 1. 创建连接工厂

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
//        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            // 2. 创建连接Connection

            connection = factory.newConnection("消费者");


            // 3. 通过连接获取管道channel

            channel = connection.createChannel();


            // 4. 从哪个队列取


            String queueName = "xiaosi";


            channel.basicConsume(
                    queueName,
                    true,
                    new DeliverCallback() {
                        public void handle(String consumerTag, Delivery message) throws IOException {
                            System.out.println("消费者受到消息:" + new String(message.getBody(), "UTF-8"));
                        }

                    },
                    new CancelCallback() {
                        public void handle(String consumerTag) throws IOException {
                            System.out.println("消费者接受消息失败!");

                        }
                    });


            System.out.println("开始接受新消息!");
            System.in.read();


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                // 7. 关闭通道
                if (channel != null && channel.isOpen()) {
                    channel.close();
                }
                // 8. 关闭连接
                if (connection != null && channel.isOpen()) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }


        }


    }
}



一些说明

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

1.获取连接对象 factory.newConnection("生产者");报错
  • windows下使用java原生连接rabbitmq的时候,获取连接失败,输出:com.rabbitmq.client.ShutdownSignalException: connection error; protocol meth

  • 解决需要执行:rabbitmqctl set_permissions -p "/" 登录的username ".*" ".*" ".*"

请添加图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:01:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:38:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码