IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot整合RabbitMQ及基础使用 -> 正文阅读

[大数据]springboot整合RabbitMQ及基础使用

依赖spring-boot-starter-amqp快速整合消息队列,实现高效异步处理任务和解耦合

本文参考3W学习方法来叙述内容。

一、What

1、RabbitMQ是什么?

消息队列中间件,同类产品有ActiveMQ、 Kafka、ZeroMQ、 RocketMQ 等。

2、消息队列是什么?

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

二、Why

1、RabbitMQ这类消息队列中间件能做什么?应用场景?

异步处理那些不需要及时处理的任务,随着业务升级、初期业务逻辑中的部分同步操作可以拆分出能异步执行的子任务,比如接发发短信、订单处理操作、红包方法等,通过消息队列实现高可用、解耦合。

三、How

1、springboot工程如何整合RabbitMQ?

1.1 引入maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.4</version>
</dependency>

1.2 application.yml配置ip、端口、账号等基本信息

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        ####开启手动ack
        acknowledge-mode: manual
        retry:
          ####开启消费者异常进行重试
          enabled: true
          ####最大重试次数
          max-attempts: 5
          ####重试间隔次数
          initial-interval: 3000

1.3 创建配置类,配置Exchange和Queue

package com.example.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Breaker-93
 * @date 2021/8/25 10:37
 */
@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_TEST = "queue_test1";
    public static final String EXCHANGE_TEST = "exchange_test1";
    public static final String ROUTE_TEST = "test1.#";

    /**
     * 声明交换机
     * 
     * @return
     */
    @Bean(EXCHANGE_TEST)
    public Exchange EXCHANGE_TEST() {
        // durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_TEST).durable(true).build();
    }

    /**
     * 声明队列
     * 
     * @return
     */
    @Bean(QUEUE_TEST)
    public Queue QUEUE_TEST() {
        return new Queue(QUEUE_TEST);
    }

    /**
     * 队列绑定交换机
     */
    @Bean
    public Binding ROUTE_TEST(@Qualifier(QUEUE_TEST) Queue queue, @Qualifier(EXCHANGE_TEST) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTE_TEST).noargs();
    }
}

1.4 生产者发送消息

package com.example.springbootrabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import com.example.springbootrabbitmq.config.RabbitmqConfig;

/**
 * @author Breaker-93
 * @date 2021/8/25 12:41
 */

@SpringBootTest
public class ProducerTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void Producer_topics_springbootTest() {

        // 使用rabbitTemplate发送消息
        String message = "Hello, rabbit mq";
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TEST, "test1.xyj", message);

    }
}

1.5 消费者监听队列,处理消息

package com.example.springbootrabbitmq;

import java.io.IOException;
import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.example.springbootrabbitmq.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;

/**
 * @author Breaker-93
 * @date 2021/8/25 13:39
 */
@Component
public class ReceiveHandler {
    /**
     * 队列监听
     * 
     * @param msg
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_TEST})
    public void receive(@Headers Map<String, Object> headers, Message message, Channel channel) throws IOException {
        System.out.println("QUEUE_TEST msg: " + message);
        // 手动ack
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手动签收
        channel.basicAck(deliveryTag, false);
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:13:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:08:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码