IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Springboot 整合 阿里云消息队列RabbitMQ版服务 -> 正文阅读

[Java知识库]Springboot 整合 阿里云消息队列RabbitMQ版服务

因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。

准备

本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。

  1. 进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入

????????2.进入页面搜索消息队列

????????3.? 具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云

????????4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(虽然免费但后面还留了一个很大的坑等着踩呢

开始

????????1. 创建一个springboot项目 命名为:rabbitmq-aliyun

????????2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)

server:
  port: 8080


aliyun:
  rabbitmq:
    accessKey: 密匙key
    accessKeySecret: 密匙密码
    username: 静态用户名
    password:  静态密码
    vHost: 虚拟机名称
    exchange: 交换机名称
    exType: 交换机类型
    queue: 队列名称
    BindingKey:  路由key
    host: 介入点(公网接入点)

? ? ? ? :本地测试必须使用公网接入点? ,但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点

?所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)

最后只能需求官方客户帮助:

本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。

最后只能升级服务,并且选择支持公网

?

所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务

3.创建配置数据映射对象?RabbitMqConfigDTO.class

@Configuration
@ConfigurationProperties("aliyun.rabbitmq")
@Data
public class RabbitMqConfigDTO {

    /**
     * 账户密匙key
     */
    private String accessKey;

    /**
     * 账户密匙
     */
    private String accessKeySecret;

    /**
     *  静态用户名
     */
    private String username;

    /**
     * 静态用户名密码
     */
    private String password;

    /**
     * 虚拟机名称
     */
    private String vHost;

    /**
     * 交换机名
     */
    private String exchange;

    /**
     * 交换机类型
     */
    private String exType;

    /**
     * 队列名
     */
    private String queue;

    /**
     * 绑定规则key
     */
    private String BindingKey;

    /**
     * 接入点地址
     */
    private String host;

}

????????4. 创建spring工具类?SpringContextHolder.class 用于获取bean对象

public class SpringContextHolder implements ApplicationContextAware {

    @Autowired
    private static ApplicationContext applicationContext;

    public SpringContextHolder() {
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextHolder.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        assertApplicationContext();
        return applicationContext;
    }

    public static <T> T getBean(String beanName) {
        assertApplicationContext();
        return (T) applicationContext.getBean(beanName);
    }

    public static <T> T getBean(Class<T> requiredType) {
        assertApplicationContext();
        return applicationContext.getBean(requiredType);
    }

    private static void assertApplicationContext() {
        if (applicationContext == null) {
            throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
        }
    }

????????5. 创建rabbitMq工具类??RabbitMqUtil.class

@Slf4j
@Component
public class RabbitMqUtil {
    
    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    //第三步 建一个静态的本类
    private static RabbitMqUtil rabbitMqUtil;

    //第四步 初始化
    @PostConstruct
    public void init() {
        rabbitMqUtil = this;
    }
    
    /**
     * 创建队列连接
     * @return
     */
    public static Connection getRabbitConnection(){

        ConnectionFactory factory = new ConnectionFactory();

        //公网接入点
        factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
        //静态用户名
        factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
        //静态密码
        factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());

        //自动恢复
        factory.setAutomaticRecoveryEnabled(true);
        //网络恢复时间
        factory.setNetworkRecoveryInterval(5000);
        //虚拟机名称
        factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
        //端口
        factory.setPort(5672);
        //连接超时时间
        factory.setConnectionTimeout(30*100);
        //设置握手超时时间
        factory.setHandshakeTimeout(300000000);
        factory.setShutdownTimeout(0);

        //创建连接
        Connection connection = null;
        try {
            connection =factory.newConnection();

        }catch (Exception e){
            log.error("rabbitMq连接异常", e);
        }

        return connection;
    }

    /**
     * 创建队列通道
     * @param connection
     * @return
     */
    public static Channel getRabbitChannel(Connection connection){

        Channel channel = null;
        try {
            channel = connection.createChannel();
            String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
            channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
            channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
            channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());


        }catch (Exception e){
            log.error("创建rabbitMq通道异常", e);
        }

        return channel;
    }

}

????????6.创建server接口类

public interface RabbitMqService {

    /**
     * 发送mq消息
     * @return
     */
    String sendMessage() throws IOException, TimeoutException;

    /**
     * 消费消息
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    String consumeMessage() throws IOException, TimeoutException;
}

????????7.创建实现类

@Service
public class RabbitMqServiceImpl implements RabbitMqService {

    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    @Override
    public String sendMessage() throws IOException {

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);
        //开始发送消息
        for(int i=0; i< 10 ; i++){
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));

        }
        connection.close();
        return "消息发送成功";

    }

    @Override
    public String consumeMessage() throws IOException, TimeoutException {

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);

        String exchange = rabbitMqConfigDTO.getExchange();
        channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
        channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
        channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());

        // 开始消费消息。
        channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        connection.close();

        return "消费成功";
    }
    
}

?????????8.创建控制层

@RestController
public class RabbitMqController {

    @Autowired
    private RabbitMqService rabbitMqService;

    @GetMapping("/sendMessage")
    public String sendMessage() throws IOException, TimeoutException {

        return rabbitMqService.sendMessage();
    }

    @GetMapping("/consumeMessage")
    public String consumeMessage() throws IOException, TimeoutException {

        return rabbitMqService.consumeMessage();
    }
}

????????9.项目整体结构

????????

?????10.完成启动项目

? ? ?11.点击获取源码

测试?

  1. 发送消息

? ? ?2. 进入控制台查看?

?

?????????此时可以看到堆积10条消息,说明消息发送成功了

? ? ? ? 3. 消费消息

? ? ? ? ?4.再次进入控制台查看

????????????????堆积的消息已变为0说明消息已经被全部消费了

后序

自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:28:38  更:2022-04-26 11:32:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:17:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码