IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SpringBoot整合RabbitMQ -> 正文阅读

[大数据]SpringBoot整合RabbitMQ

添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

添加application配置

spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.addresses=localhost
#如果不设置默认为"/"
spring.rabbitmq.virtual-host=spring-cloud-alibaba-sample

添加配置文件

@Slf4j
@Configuration
public class RabbitMQConfig {

    /**
     * 交换机名称
     */
    public static final String ITEM_TOPIC_EXCHANGE = "item.topic.exchange";

    /**
     * 队列名称
     */
    public static final String ITEM_QUEUE = "item.queue";

    /**
     * 备份交换器名称
     */
    public static final String SPARE_EXCHANGE = "item.spare.exchange";

    /**
     * 备份交换器队列
     */
    public static final String SPARE_QUEUE = "item.spare.queue";

    /**
     * 死信队列交换器
     */
    public static final String DLX_EXCHANGE = "item.dlx.exchange";

    /**
     * 死信队列
     */
    public static final String DLX_QUEUE = "item.dlx.queue";

    /**
     * 声明交换机
     *
     * @return
     */
    @Bean
    public Exchange itemExchange() {
        return ExchangeBuilder
                .topicExchange(ITEM_TOPIC_EXCHANGE)
                .durable(true)
                .alternate("spare.exchange")
                .build();
    }

    /**
     * 声明队列
     *
     * @return
     */
    @Bean
    public Queue itemQueue() {
        return QueueBuilder.durable(ITEM_QUEUE).deadLetterExchange(DLX_EXCHANGE).deadLetterRoutingKey("dlx.routingKey").build();
    }

    /**
     * 绑定消息队列和交换机
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }

    /**
     * 备份交换机
     *
     * @return
     */
    @Bean
    public Exchange spareExchange() {
        return ExchangeBuilder.fanoutExchange(SPARE_EXCHANGE).build();
    }

    /**
     * 备份队列
     *
     * @return
     */
    @Bean
    public Queue spareQueue() {
        return QueueBuilder.durable(SPARE_QUEUE).build();
    }

    /**
     * 备份交换机绑定备份队列
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding spareQueueExchange(@Qualifier("spareQueue") Queue queue, @Qualifier("spareExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }

    /**
     * 死信队列交换器
     *
     * @return
     */
    @Bean
    public Exchange dlxExchange() {
        return ExchangeBuilder.directExchange(DLX_EXCHANGE).durable(true).build();
    }

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE).build();
    }

    /**
     * 绑定死信交换器和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding dlxQueueExchange(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("dlx.routingKey").noargs();
    }
}

当前声明了一个交换器和队列,并且给他设置了备份交换器和死信队列。

发送消息

@Api(tags = "发送消息")
@RestController
@RequestMapping("/message")
@Slf4j
public class MessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ApiOperation(value = "发送消息")
    @GetMapping("/sendMessage")
    public AjaxResult sendMessage(String routingKey, String msg) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("2000");
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, routingKey, message);
        return AjaxResult.success("消息发送成功");
    }
}

消费消息

    @RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE)
    public void receive(Message message, Channel channel) throws Exception {
        ThreadUtils.sleep(1000);
        String msg = new String(message.getBody());
        log.info("消费者消费了消息:{}", msg);
    }

消息生产者回调

交换器不存在

?添加application配置

spring.rabbitmq.publisher-confirm-type=correlated

给RabbitTemplate添加confirmCallBack

@Slf4j
@Configuration
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback, CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(String... args) throws Exception {
        rabbitTemplate.setConfirmCallback(this);

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败:{}", cause);
        }
    }
}

路由失败

添加备份交换器

? ? ? ? 消息路由失败会将消息发送到备份交换器

给RabbitTemplate添加returnCallback方法

? ? ? ? 给application添加配置

spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true

? ? ? ? 添加returnCallback方法

@Slf4j
@Configuration
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback, CommandLineRunner {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(String... args) throws Exception {
        rabbitTemplate.setReturnCallback(this);
    }


    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 路由失败业务处理
    }
}

消费者手动确认

全局设置手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

单个监听上设置ackMode

 @RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE,ackMode = "MANUAL")

单个的优先级要高于全局

    @RabbitListener(queues = RabbitMQConfig.ITEM_QUEUE)
    public void receive(Message message, Channel channel) throws Exception {
        ThreadUtils.sleep(1000);
        String msg = new String(message.getBody());
        log.info("消费者消1费了消息:{}", msg);
//        确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//        拒绝
//        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//        log.info(JSONObject.toJSONString(message.getMessageProperties()));
    }

消息确认与拒绝消息说明

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-08-05 17:25:10  更:2021-08-05 17:27:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:15:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码