IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ系列--Direct Exchange -> 正文阅读

[大数据]RabbitMQ系列--Direct Exchange

RabbitMQ工作流程

左边是生产者,右边是消费者,中间红框内是RabbitMQ服务器,其中包括交换机以及消息队列

消费者发送消息给RabbitMQ服务器,交换机接收到消息,然后根据不同的交换机规则投递给消息队列,消费者订阅消息进行消费
在这里插入图片描述

RabbitMQ交换机类型

作用:接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout三种,其他的由于性能不好等原因几乎用不到,后续主要介绍这三个。

Direct Exchange ----- 直连型交换机,根据消息携带的路由键将消息投递给对应队列。

Topic Exchange ----- 主题交换机,根据一定规则将消息投递给对应队列。

Fanout Exchange ----- 扇型交换机,这个交换机没有路由键概念,这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

编写RabbitMQ示例
  • 项目准备

为了更加直观的进行演示,本示例会创建两个项目,均采用SpringBoot作为框架,一个生产者(rabbitmq-provider),一个消费者(rabbitmq-consumer),项目大家自行创建。

  • 项目分别添加RabbitMQ相关依赖
<!--web相关依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 项目分别配置application.yml
# 生产者配置
server:
  port: 8001
spring:
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
# 消费者配置
server:
  port: 8002
spring:
  application:
    name: rabbitmq-consumer
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 生产者项目创建直连交换机
package com.chentawen.rabbitmqprovider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: CTW
 * @Date: create in 2021/8/2 21:23
 */
@Configuration
public class DirectExchangeConfig {
    /**
     * 声明直连交换机
     *
     * @return
     */
    @Bean
    DirectExchange MyDirectExchange() {
        return new DirectExchange("MyDirectExchange", true, false);
    }

    /**
     * 声明队列
     *
     * @return
     */
    @Bean
    Queue MyDirectQueue() {
        return new Queue("MyDirectQueue", true);
    }

    /**
     * 将交换机和队列进行绑定
     *
     * @return
     */
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(MyDirectQueue()).to(MyDirectExchange()).with("DirectRoutingKey");
    }
}
  • 生产者项目创建消息发送接口
package com.chentawen.rabbitmqprovider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @Author: CTW
 * @Date: create in 2021/8/2 21:23
 */
@RestController
public class SendMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息至直连交换机
     *
     * @return
     */
    @GetMapping("/sendMessageDirectExchange")
    public String sendMessageDirectExchange() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Hello World!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss"));
        Map<String, Object> map = new HashMap<>(16);
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        /**
         * exchange 交换机名称
         * routingKey 路由key
         * map 发送的消息内容
         */
        rabbitTemplate.convertAndSend("MyDirectExchange", "DirectRoutingKey", map);
        return "消息发送成功!";
    }
}
  • 使用postman访问接口发送消息

image.png

  • 查看RabbitMQ监控
    概述页面:准备好的消息和消息总数都是1

image.png
交换机页面可以看到刚刚创建的直连交换机:
image.png
队列页面可以看到刚刚创建的队列及消息数量
image.png

  • 消费者项目创建消息接收监听类
package com.chentawen.rabbitmqconsumer.listener;//package com.chentawen.springbootall.config.rabbitlistener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Author: CTW
 * @Date: create in 2021/8/2 22:23
 * MyDirectQueue 监听的队列名称
 */
@Component
@RabbitListener(queues = "MyDirectQueue")
public class DirectReceiver {

    @RabbitHandler
    public void process(Map MessageData) {
        System.out.println("rabbitmq-consumer接收到消息  : " + MessageData.toString());
    }

}
  • 启动项目,消费消息

image.png

后续发送的消息都是实时消费的

image.png

在RabbitMQ监控中也可以看到消息被消费掉了

image.png

  • 多个消费者进行消费
    在消费者项目中再创建一个监听

image.png

  • 重启项目,发送消息,查看控制台
    可以看到多个消费者去消费的话是以轮询的方式进行消费,不会重复消费

image.png

以上就是本期内容,后续内容持续更新

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:17:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 18:23:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码