IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot集成Rabbitmq 手动确认消息 -> 正文阅读

[大数据]springboot集成Rabbitmq 手动确认消息

springboot集成Rabbitmq 手动确认消息

pom文件:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

yml文件:

#配置rabbitMq 服务器
server:
  port: 9096


#配置rabbitMq 服务器
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    template:
      # 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
      mandatory: true
    listener:
      simple:
        # NONE 自动确认 RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
        #      所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
        #      一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
        # MANUAL 手动确认 需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。
        # AUTO 由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。
        acknowledge-mode: manual #手动ACK

  redis:
    host: 127.0.0.1 # Redis服务器地址
    database: 1 # Redis数据库索引(默认为0)
    port: 6379 # Redis服务器连接端口
    password: 123456 # Redis服务器连接密码(默认为空)
    timeout: 5000ms # 连接超时时间(毫秒)

config配置选其一


************************************第一种方式******************************************

package com.jy.springbootRabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class RabbitMqConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        //消息发送成功的回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
            log.info("ConfirmCallback:     "+"确认情况:"+ack);
            log.info("ConfirmCallback:     "+"原因:"+cause);
        });

        //发生异常时的消息返回提醒
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback:     "+"消息:"+message);
            log.info("ReturnCallback:     "+"回应码:"+replyCode);
            log.info("ReturnCallback:     "+"回应信息:"+replyText);
            log.info("ReturnCallback:     "+"交换机:"+exchange);
            log.info("ReturnCallback:     "+"路由键:"+routingKey);
        });
        return rabbitTemplate;
    }
}

************************************第二种方式******************************************

package com.jy.springbootRabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Slf4j
@Configuration
public class RabbitInitializingBean implements InitializingBean {

    @Resource
    private RabbitTemplate rabbitTemplate;


   //初始化
    @Override
    public void afterPropertiesSet() throws Exception {
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        //消息发送成功的回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("ConfirmCallback:     " + "相关数据:" + correlationData);
                log.info("ConfirmCallback:     " + "确认情况:" + ack);
                log.info("ConfirmCallback:     " + "原因:" + cause);
            }
        });

        //发生异常时的消息返回提醒
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("ReturnCallback:     " + "消息:" + message);
                log.info("ReturnCallback:     " + "回应码:" + replyCode);
                log.info("ReturnCallback:     " + "回应信息:" + replyText);
                log.info("ReturnCallback:     " + "交换机:" + exchange);
                log.info("ReturnCallback:     " + "路由键:" + routingKey);
            }
        });
    }
}

主题设置TopicRabbitConfig:

package com.jy.springbootRabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {
	

    public final static String STUDENT_QUEUE = "topic.student.queue";//队列
    
    public final static String DELAY_QUEUE = "delay.queue";//延迟
	

    public final static String STUDENT_EXCHANGE  = "topic.student";//交换机
   
    public final static String DELAY_EXCHANGE  = "delay.exchange";//延迟

    public final static String STUDENT_KEY = "topic.student.key";//路由
    
    public final static String DELAY_KEY = "delay.key";//延迟

    public final static Long EXPIRATION = 5000L;//过期时间
    
   
      @Bean
      TopicExchange delayEexchange() {
          return new TopicExchange(DELAY_EXCHANGE);
      }
     
      
      @Bean
      public Queue delayQueue() {
      	 // 设置超时队列
          return QueueBuilder.durable(DELAY_QUEUE)
                  // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
                  .withArgument("x-dead-letter-exchange", TopicRabbitConfig.STUDENT_EXCHANGE)
                  // dead letter携带的routing key,配置处理队列的路由key
                  .withArgument("x-dead-letter-routing-key", STUDENT_KEY)
                  .build();

      }
      
      @Bean
      public Binding delayBinding() {
          return BindingBuilder
                  .bind(delayQueue())
                  .to(delayEexchange())
                  .with(DELAY_KEY);
      }


    @Bean
    public Queue studentQueue() {
        return new Queue(STUDENT_QUEUE);
    }

    @Bean
    TopicExchange studentExchange() {
        return new TopicExchange(STUDENT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeStudent() {
        return BindingBuilder.bind(studentQueue()).to(studentExchange()).with(STUDENT_KEY);
    }

}

监听类ReceiveHandler:

package com.jy.springbootRabbitmq.Handler;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.jy.springbootRabbitmq.config.TopicRabbitConfig;
import com.jy.springbootRabbitmq.entity.student;
import com.jy.springbootRabbitmq.entity.RetrySendmsg;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.UUID;


@Slf4j
@Component
public class ReceiveHandler {

	@Resource
	private RabbitTemplate rabbitTemplate;
    @Resource
    private StringRedisTemplate stringRedisTemplate;


    /**
     * 监听消息
     * @param channel
     * @param message
     * @param dto
     * @throws IOException
     */
    @RabbitListener(queues = {TopicRabbitConfig.STUDENT_QUEUE})
    public void studentMessage(Channel channel, Message message, student dto) throws IOException {
        try {
            log.info("=============>>>监听队列数据:"+ dto);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            resendMessage(TopicRabbitConfig.DELAY_EXCHANGE, TopicRabbitConfig.DELAY_KEY, dto,message);
        }
    }
	
	/**
	 * 重试机制代码
	 * @param exchange
	 * @param routingKey
	 * @param obj
	 * @param message
	 */
	public void resendMessage(String exchange,String routingKey,Object obj,Message message) {
		String messageId = message.getMessageProperties().getMessageId();
		if(StrUtil.isNotBlank(messageId) && Convert.toInt(stringRedisTemplate.opsForValue().get(messageId)) >3) {
            log.info("===========>>>重试三次失败后依然失败存库人工处理"+obj.toString());
			RetrySendmsg retrySendmsg = new RetrySendmsg();
			retrySendmsg.setExchange(message.getMessageProperties().getReceivedExchange());
			retrySendmsg.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
			retrySendmsg.setMsgBody(JSONUtil.toJsonStr(obj));
			retrySendmsg.setClassName(ClassUtil.getClassName(obj, false));
			//TODO insert方法
			return;
		}
		
		rabbitTemplate.convertAndSend(exchange, routingKey, obj, msg -> {
    	    MessageProperties messageProperties = msg.getMessageProperties();
    	    //设置消息ID
    	    String new_messageId = Convert.toStr(UUID.randomUUID());
    	    if(StrUtil.isBlank(messageId)) {
    	    	messageProperties.setMessageId(new_messageId);
    	    	messageProperties.setExpiration(Convert.toStr(TopicRabbitConfig.EXPIRATION));
                stringRedisTemplate.opsForValue().set(new_messageId,"1");
    	    }else {
    	    	messageProperties.setMessageId(messageId);
    	    	messageProperties.setExpiration(Convert.toStr(TopicRabbitConfig.EXPIRATION));
                stringRedisTemplate.opsForValue().set(messageId, Convert.toInt(stringRedisTemplate.opsForValue().get(messageId)) + 1 + "");
    	    }
    	    return msg;
    	});
	}
}

实体类:

package com.jy.springbootRabbitmq.entity;

import lombok.Data;

import java.io.Serializable;

@Data
public class student implements Serializable{
	private static final long serialVersionUID = 1L;

    private String studentName;
    private String studentCode;

    public student(String studentName, String studentCode) {
        this.studentName = studentName;
        this.studentCode = studentCode;
    }

}


package com.jy.springbootRabbitmq.entity;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = false)
public class RetrySendmsg {

    private static final long serialVersionUID = 1L;

    private String exchange;
    private String routingKey;
    private String msgBody;
    private String newMsgBody;
    private String className;
    private Integer status = 0;
    private Integer isSend = 0;
}

参考:https://blog.csdn.net/qq330983778/article/details/99611193
https://blog.csdn.net/qq_35387940/article/details/100514134

结束END

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:11:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 16:43:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码