IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ防止重复消费 -> 正文阅读

[大数据]RabbitMQ防止重复消费

RabbitConfig?

/**
 * @author fan
 * @date 2022年04月27日 11:17
 */
@EnableRabbit
@Configuration
public class RabbitConfig {
    
    /**创建扇形交换机开始*/
    //测试队列名称
    private String fanoutQueueName = "fanoutQueue";
    // 测试交换机名称
    private String fanoutExchange = "fanoutExchange";
    // RoutingKey 路由键无需配置,配置也不起作用
    //private String fanoutRoutingKey = "fanoutRoutingKey";

    /** 创建队列 */
    @Bean
    public Queue fanoutQueue() {
        return new Queue(fanoutQueueName);
    }
    /** 创建交换机 */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }
    /** 通过routingKey把队列与交换机绑定起来 */
    @Bean
    public Binding fanoutExchangeBinding() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    /**结束创建扇形交换机(Fanout exchange)
     *直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个    routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
*扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快
*主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,`*`代表一个单词,`#`代表多个单词
*首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则

    */
}

producer

/**
 * @author fan
 * @date 2022年04月27日 15:01
 */
@Component
public class MyProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
    /** 
     * 发送JSON类型
     * @author fan 
     * @date 2022/4/27 15:39
     * @param queueName 队列名称
     * @return java.lang.String 
    */
    public String sendjsonObject(String queueName) {//构造前端回传参数
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "Icomxchishi@qq.com");
        jsonObject.put("phone", "03803880388");
        
        jsonObject.put("age", 15);
        jsonObject.put("sex","女");
        jsonObject.put("name", "messageData");

        jsonObject.put("data", Arrays.asList("张三","12345678",12,"男"));
        jsonObject.put("timestamp", System.currentTimeMillis());
        String jsonString = jsonObject.toJSONString();
        
        String messageId = String.valueOf(UUID.randomUUID());
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setDeliveryTag(System.currentTimeMillis())
                .setContentEncoding("utf-8")
                .setMessageId(messageId)
                .build();
        rabbitTemplate.convertAndSend(queueName, message);
        return "sendjsonObject";
    }

    /** 
     * 发送map类型数据
     * @author fan 
     * @date 2022/4/27 18:33
     * @param queueName 队列名称
     * @return java.lang.String 
    */
    public String sendMessageMap(String queueName) {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "my message!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        map.put("msg","随便");
        map.put("data", Arrays.asList("张三","12345678",12,"男"));
        rabbitTemplate.convertAndSend(queueName, map);
        return "sendMapMessage";
    }

  
    

    

    
}

consumer,以sendjsonObject();为例

@Slf4j
@Component
public class MyConsumer {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisClient redisClient;//本人封装的,可以使用redisTemplate
    @Autowired
    private UserService userService;
    @Autowired
    private LoggerService loggerService;


    /** 消费者消费成功后,再给MQ确认的时候出现了网络波动
     * MQ没有接收到确认,为了保证消息被消费,RabbitMQ就会继续给消费者重新投递之前的消息
     * 消费者就接收到了两条一样的消息
     * 重复消息可能是网络波动原因造成的或其它原因,导致不可避免的重复消息
     * 如何保证消息幂等性??
     * 解决:消费者先获取到消息后在根据messageId去查询redis是否存在该消息
     * 若不存在,则正常消费,消费完毕后写入redis
     * 若存在,则消息已经被消费过,就直接丢弃
     * @author fan
     * @date 2022/4/27 16:28
     * @param message
    */
    @RabbitListener(queues = "fanoutQueue")
    public void sendjsonObject(Message message, Channel channel) throws Exception{//消费者消息重复
        log.info("MyConsumer  消费者收到消息:{}" , JSONObject.toJSONString(message));
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");
        redisClient.getString("messageId");
        if (messageId != null) {
            if (messageId.equals(messageIdRedis)) {
                return;
            }
        }
        System.out.println("number==" + channel.getChannelNumber());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            //具体业务,如:插入或修改user,这里用数据库可以不用redis做判断
            Integer age = (Integer) jsonObject.get("age");
            String name = (String) jsonObject.get("name");
            String sex = (String) jsonObject.get("sex");
            LocalDateTime localDateTime = LocalDateTime.now();
            Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
            User user = new User();
            user.setSex(sex);
            user.setName(name);
            user.setDataTime(dataTime);
            user.setAge(age);
            int i = 0;
            if (user != null){
                i = userService.savetUser(user);//新增或修改
                redisClient.setKey(user.getId() + "", user,  5_000L);// 写入缓存,便于操作延时业务
                //redisTemplate.opsForList().rightPushAll(user.getId(), Arrays.asList(user));
                //loggerService.saveLog(jsonObject);//日志记录
            }
            log.info("收到消息obj:" + user + ",id=" + user.getId());
            log.info("保存了" + i + "记录");
            channel.basicAck(deliverTag,true);//手动设置ack,消费成功,确认消息
        }catch (Exception e){
            try {
                //异常返回false,就重新回到队列
                channel.basicNack(deliverTag, false, true);
            } catch (Exception ioException) {
                log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
            }
            log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);
            
            //channel.basicReject(deliverTag, false);
        }
        //redisTemplate.opsForValue().set(messageId, messageId, 30, TimeUnit.SECONDS);//时间具体根据业务定
        redisClient.setKey(messageId,messageId,3_000L);// 写入缓存
        log.info("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);
    }
    


    @RabbitListener(queues = "fanoutQueue")
    public void sendMessageMap(Map map){
        log.info("收到消息map:" + map.toString());
    }



   
}
ProducerController

@RestController
@RequestMapping(value = "producer")
public class ProducerController {

    @Autowired
    private MyProducer myProducer;

    @RequestMapping("/sendjsonObject")
    public String sendjsonObject() {//前端传参数回来在producer那里模拟了,这里就略
        return myProducer.sendjsonObject("fanoutQueue");
    }


    @GetMapping("/sendMessageMap")
    public String sendMessageMap() {
        return myProducer.sendMessageMap("fanoutQueue");
    }



   

}

application.yml

server:
  port: 8081

       

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 
    dbcp2: 
      initial-size: 5                                        
      max-total: 5                                            
      max-wait-millis: 200                                    
      min-idle: 5 

  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:
  

  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-returns: true
    publisher-confirms: true
    connection-timeout: 5000
 
    #另外一种打印语句的方式
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 

logging:
  level:
    com:
      acong:
        dao: debug
  file: d:/logs/redis.log
  

效果图:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-29 12:12:55  更:2022-04-29 12:14:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:43:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码