IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ介绍及使用 -> 正文阅读

[大数据]RabbitMQ介绍及使用

RabbitMQ介绍及使用

对rabbitMQ的介绍及简单使用(整合SpringBoot),暂不包括高级特性

1 概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

1.1 优势与劣势

作用

  1. 解耦,不同服务之间的通信都只需要交给mq就可以了
  2. 削峰填谷,当用户峰值高时不会导致服务压力过大而宕机,mq可以限制消息流通的速度

缺点

  1. 系统可用性降低

    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

  2. 系统复杂度提高

    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  3. 一致性问题

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.2 AMQP JMS

AMQP

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。

JMS

JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API

JMS 是 JavaEE 规范中的一种,类比JDBC

很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,不过开源社区有

AMQP JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

1.3 架构

RabbitMQ 基础架构如下图:

在这里插入图片描述

RabbitMQ 中的相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);

官网对应模式介绍: https://www.rabbitmq.com/getstarted.html

2 简单模式与工作模式

先实现一个简单模式案例:

在这里插入图片描述

2.1 安装mq

教程很多,暂不列举。安装后建议手动开启控制台,便于查看。

创建maven项目,导下依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

2.2 连接mq的工具类

public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("localhost");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/");
        //连接用户名;默认为guest
        connectionFactory.setUsername("guest");
        //连接密码;默认为guest
        connectionFactory.setPassword("guest");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

2.3 生产者

public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取mq连接
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
         * 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //发送消息
        String message = "你好:qtds";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("已发送消息:" + message);

        //释放资源
        channel.close();
        connection.close();

    }
}

2.4 消费者

public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
         * 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
         * 参数5:队列其它参数
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        // 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 通过匿名内部类编写回调函数
            
            /**
             * 接收到消息执行的回调
             * @param consumerTag 消息标签,在channel.basicConsume时可以指定
             * @param envelope  消息包的内容,可以从中获取消息id,消息routingKey,交换机,消息和重传标志
             * @param properties 属性信息
             * @param body  消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
            }
        };
        /**
         * 参数1: 队列名称
         * 参数2:是否自动确认接收到消息
         * 参数3: 消息接收到后回调
         */
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
    }
}

2.5 工作队列模式

只是多了一个消费端,另外加个消费端的类(Consumer2),和Consumer1一样就可以了

在这里插入图片描述

工作模式的消费者是公平接收的,也就是说有多个消费端的情况下,他们会交替消费,像下图这样:

在这里插入图片描述

3 订阅、路由、通配符模式

这些模式都是在之前模式的基础上添加了交换机exchange,并分配多个队列给消费端:

在这里插入图片描述

3.1 订阅模式

生产者需要声明交换机和队列,并将对应的队列绑定至交换机:

public class Producer {
    public static String FANOUT_EXCHANGE = "fanout_exchange";
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();

        /**
         * 声明交换机
         * 参数1: 交换机名称
         * 参数2: 交换机类型 fanout,topic,direct
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        /**
         * 声明(创建)队列
         */
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        // 队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");

        for (int i = 0; i < 10; i++) {
            //发送消息
            String message = "你好:qtds ----" + i;
            /**
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }

        //释放资源
        channel.close();
        connection.close();

    }

}

消费者在声明队列后同样要绑定对应的交换机:

channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");

这样消息将会通过交换机发送消息到所有对应的队列,每个队列都会接收到一样的消息,也就是经典的消息发布/订阅模式!

3.2 路由模式

在有交换机的基础上,我们可以另外指定路由key,将不同的消息分开发送到队列中:

在这里插入图片描述

public class Producer {
    public static String DIRECT_EXCHANGE = "direct_exchange";
    static final String DIRECT_QUEUE_1 = "direct_queue_1";
    static final String DIRECT_QUEUE_2 = "direct_queue_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();

        /**
         * 声明交换机(DIRECT类型,即路由模式的交换机类型)
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);

        /**
         * 声明(创建)队列
         */
        channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);

        // 队列绑定交换机,并指定路由key(可连续指定多个)
        channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "insert");
        channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "update");
        // 队列2:
        channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "remove");

        //发送消息
        String message = "你好:qtds ----" + i;
        //发送消息时指定路由key,就会只发送消息到绑定了路由key的队列上了
        channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());

        //释放资源
        channel.close();
        connection.close();
    }
}

消费者绑定交换机同样指定好路由key:

channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");

3.3 通配符模式Topics

指定路由key时,可以使用通配符:

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配 item.insert.abc 或者 item.insert

item.* :只能匹配 item.insert

4 SpringBoot整合

以topics模式为案例

4.1 生产者搭建

先搭个springBoot模块,导下依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.1.0.RELEASE</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

配置文件:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /qtds #这里配置的是我自定义的域名,练习的话就直接用/就可以了
    username: guest
    password: guest

配置类:

package com.qtds.rabbitmq.config;


@Configuration
public class RabbitMQConfig {

    // 交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "springboot_item_topic_exchange";

    // 队列名称
    public static final String ITEM_QUEUE = "springboot_item_queue";

    //声明交换机
    @Bean("itemTopicExchange")
    public Exchange itemTopicExchange() {
        return ExchangeBuilder
                .topicExchange(ITEM_TOPIC_EXCHANGE)
                //是否持久化
                .durable(true)
                .build();
    }

    //声明队列
    @Bean("itemQueue")
    public Queue itemQueue() {
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }

    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}

启动类:

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

测试类:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
    }
}

运行测试类后,就可以在mp的控制台中看到对应的路由和队列了,消息也发送在队列中了

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.2 消费者

消费者也是同样方式创建一个springboot项目,导入相关依赖,编辑配置类。

然后配置一个消息监听器,绑定队列就能接收数据了;

@Component
public class MyListener {

    @RabbitListener(queues = "springboot_item_queue")
    public void myListener(String message) {
        System.out.println("接收到的消息为:" + message);
    }
}

启动类:

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

创建写个测试类测试一下:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class)
public class ConsumerTest {

    @Test
    public void test() {
        while (true) {

        }
    }
}

可以看到已经接收到我们生产者刚消费的消息了;

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:32:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:00:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码