IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 02_rabbitmq -> 正文阅读

[大数据]02_rabbitmq

一、简介

二、概述

三、使用

1、安装

拉取镜像:
docker hub : 
	docker pull rabbitmq:management


创建容器:
docker run -d --name rabbitmq  -p 15672:15672 -p 5672:5672 b6f50be8c669


查看web端
http://localhost:15672

在这里插入图片描述

2、入门案例

  • 依赖
		<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
  • 生产者
 public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");

        // 3.创建连接
        Connection connection = connectionFactory.newConnection();

        // 4.创建channel
        Channel channel = connection.createChannel();

        // 5.创建队列

        /**
         * 队列名称
         * 是否持久化,mq重启后是否还在
         * 是否独占
         *      只能有一个消费者监听这个队列
         *     当connection关闭时是否删除这个队列
         *  是否自动删除这个队列,当没有consumer时,自动删除
         */
        channel.queueDeclare("queue1", true, false, false, null);


        // 6.发送消息
        // public void basicPublish(String exchange, String routingKey,
        // boolean mandatory, boolean immediate, BasicProperties props, byte[] body)

        String body = "hello rabbitmq!";
        channel.basicPublish("","queue1",null,body.getBytes(StandardCharsets.UTF_8));

        // 7.释放资源
        /*channel.close();
        connection.close();*/
    }
  • 消费者
 public static void main(String[] args) throws IOException, TimeoutException {


        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");

        // 3、创建连接
        Connection connection = connectionFactory.newConnection();


        // 4.创建通道
        Channel channel = connection.createChannel();


        // 5.创建队列
        channel.queueDeclare("queue1",true,false,false,null);

        // 6.消费消息
        //  String queue, boolean autoAck, Consumer callback
        Consumer consumer = new DefaultConsumer(channel){

            /**
             * 回调方法,当收到消息后,会自动执行该方法
             * @param consumerTag 标识
             * @param envelope  获取信息,交换机,routingkey等
             * @param properties    配置信息
             * @param body      数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("queue1",true,consumer);


    }

3、工作模式

3.1 work queue 工作队列模式

在这里插入图片描述

3.2 pub/sub订阅模式

在这里插入图片描述

在订阅者模式中,多了一个exchange的角色,而且过程有所改变

- p:生产者,不在发送消息到队列,而是发送到交换机
- c:消费者,消息的接收者,会一直等待消息到来
- Queue:队列,接受消息,缓存消息
- Exchange: 交换机,一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如交给某个特别队列,递交给所有队列,或者将消息丢掉,到底如何操作,取决于exchange的类型,exchange常见有三种类型
	Fanout:广播,把消息交给所有绑定到交换机的队列
	Direct:定向,把消息交给符合指定routing key的队列
	Topic: 通配符,把消息交给符合routing pattern(路由模式) 的队列
- exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列和exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
  • 生产者
 public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");


        Connection connection = connectionFactory.newConnection();


        Channel channel = connection.createChannel();


        String exchangeName = "test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);


        channel.queueBind(queue1Name, exchangeName, "");
        channel.queueBind(queue2Name, exchangeName, "");

        String body = "日志信息:张三调用了findall()方法...日志级别:info";
        channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));

        /*channel.close();
        connection.close();
*/


    }
  • 消费者1
 public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                System.out.println("将日志信息打印到控制台");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
    }
  • 消费者2
 public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                System.out.println("将日志信息存储到数据库");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);
    }
  • direct-producer
public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");


        Connection connection = connectionFactory.newConnection();


        Channel channel = connection.createChannel();


        String exchangeName = "test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);


        channel.queueBind(queue1Name, exchangeName, "erro");
        channel.queueBind(queue2Name, exchangeName, "info");
        channel.queueBind(queue2Name, exchangeName, "warning");
        channel.queueBind(queue2Name, exchangeName, "debug");

        String body = "日志信息:张三调用了findall()方法...日志级别:info";
        channel.basicPublish(exchangeName, "debug", null, body.getBytes(StandardCharsets.UTF_8));

        channel.close();
        connection.close();


    }
  • topic-producer
 public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2.设置参数
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/xingyu");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");


        Connection connection = connectionFactory.newConnection();


        Channel channel = connection.createChannel();


        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);


        channel.queueBind(queue1Name, exchangeName, "#.erro");
        channel.queueBind(queue2Name, exchangeName, "#.info");
        channel.queueBind(queue2Name, exchangeName, "*.warning");
        channel.queueBind(queue2Name, exchangeName, "*.debug.*");

        String body1 = "日志信息:张三调用了findall()方法...日志级别:system.erro";
        String body2 = "日志信息:张三调用了findall()方法...日志级别:system.info";
        String body3 = "日志信息:张三调用了findall()方法...日志级别:system.warning";
        String body4 = "日志信息:张三调用了findall()方法...日志级别:system.debug";
        String body5 = "日志信息:张三调用了findall()方法...日志级别:system.debug.debug";
        channel.basicPublish(exchangeName, "system.erro", null, body1.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "system.info", null, body2.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "system.warning", null, body3.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "system.debug", null, body4.getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "system.debug.debug", null, body5.getBytes(StandardCharsets.UTF_8));

        channel.close();
        connection.close();


    }

3.3 springboot整合rabbitmq

  • producer
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    @Bean("bootExchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    @Bean("bootQueue")
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }


    @Bean
    public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
                           @Qualifier("bootQueue") Queue queue){

        return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();

    }
}




@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @org.junit.Test
    public void test() {

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello....");
    }
}
  • consumer
@Component
public class ConsumerListener {

    @RabbitListener(queues = "boot_queue")
    public void listenerQueue(Message message){
        byte[] body = message.getBody();
        System.out.println(message);
        System.out.println(new String(body));
    }
}

四、高级特性

1、消息的可靠投递

1.1 confirm

  • 配置信息
spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    virtual-host: /xingyu
    host: localhost
    port: 5672
    username: root
    password: root
    publisher-confirms: true



@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    @Bean("bootExchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    @Bean("bootQueue")
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }


    @Bean
    public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
                           @Qualifier("bootQueue") Queue queue){

        return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();

    }
}
  • 生产者
@org.junit.Test
    public void test() {
        
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData  配置信息
             * @param ack   ack交换机是否收到消息
             * @param cause   失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm执行了");
                if(ack){
                    System.out.println("接收成功");
                }else {
                    System.out.println("接收失败"+cause);
                }
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "boot.haha","boot mq hello....");
    }

1.2 return

  • 配置信息
spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    virtual-host: /xingyu
    host: localhost
    port: 5672
    username: root
    password: root
    publisher-returns: true



@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    @Bean("bootExchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    @Bean("bootQueue")
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }


    @Bean
    public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
                           @Qualifier("bootQueue") Queue queue){

        return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();

    }
}
  • 生产者
   @org.junit.Test
    public void test() {

        //设置交换机处理消息失败的模式
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 
             * @param message
             * @param replyCode
             * @param replyText
             * @param exchange
             * @param routingKey
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return执行了");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "boot1","boot mq hello....");
    }
  • 消费者
@Component
@Configuration
public class ConsumerListener {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @RabbitListener(queues = "boot_queue")
    public void listenerQueue1(Message message, Channel channel) throws Exception {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            System.out.println(message);

            System.out.println("执行业务逻辑");

            int i = 3/0;

            channel.basicAck(deliveryTag, true);

            System.out.println("签收成功");
        } catch (Exception e) {
            /**
             * 拒绝签收
             * b1重新回到队列,
             */
            System.out.println("签收失败");
            channel.basicNack(deliveryTag, true, true);

        }

    }
}

2、 消费端限流

ack确认必须设置为手工

@Component
@Configuration
public class ConsumerListener {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }

    @RabbitListener(queues = "boot_queue")
    public void listenerQueue1(Message message, Channel channel) throws Exception {

        Thread.sleep(1000);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        System.out.println(new String(message.getBody()));

        channel.basicAck(deliveryTag,true);

    }
}

3、TTL

TTL全称Time To Live (存活时间/过期时间)
当消息到达存活时间后,还没有被消费,就会被清除
rabbitmq可对消息设置存活时间,也可以对整个队列设置存活时间

在这里插入图片描述

  • TTL队列
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME = "boot_queue";

    @Bean("bootExchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    @Bean("bootQueue")
    public Queue queue(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(map).build();
    }


    @Bean
    public Binding binding(@Qualifier("bootExchange") Exchange bootExchange,
                           @Qualifier("bootQueue") Queue queue){

        return BindingBuilder.bind(queue).to(bootExchange).with("boot.#").noargs();

    }
}

  • 消息单独过期
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                    "boot.haha", "boot mq hello...." , new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            message.getMessageProperties().setExpiration("10000");
                            return message;
                        }
                    });

4、死信队列

DLX : dead letter exchange(死信交换机),当消息成为Dead Message后,可以被重新发送到另一个交换机,这个交换机就是DLX

  • 消息成为死信的三种情况

队列消息长度达到限制
消费者拒绝消费消息,basicNack/basicReject,并且不把消息重新放回原目标队列,requeue =false
原队列存在消息过期设置,消息达到超时时间未被消费

  • 队列绑定死信交换机

给队列设置参数:
x-dead-letter-exchange
x-dead-letter-routing-key
在这里插入图片描述

  • 代码实现

@Configuration
public class RabbitMQConfig {

    public static final String TTL_EXCHANGE_NAME = "ttl_exchange";
    public static final String TTL_QUEUE_NAME = "ttl_queue";

    public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    public static final String DEAD_QUEUE_NAME = "dead_queue";

    // TTL交换机和队列

    @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.topicExchange(TTL_EXCHANGE_NAME).build();
    }

    @Bean("ttlQueue")
    public Queue ttlQueue(){
        Map<String, Object> map = new HashMap<>();
        //设置队列的过期时间
        map.put("x-message-ttl",10000);
        //设置队列的长度
        map.put("x-max-length",10);

        // 给正常队列绑定死信交换机和
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        map.put("x-dead-letter-routing-key","dead.hehe");
        return QueueBuilder.durable(TTL_QUEUE_NAME).withArguments(map).build();
    }


    @Bean
    public Binding ttlBinding(@Qualifier("ttlExchange") Exchange exchange,
                           @Qualifier("ttlQueue") Queue queue){

        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();

    }



    // 死信交换机和队列
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding deadBinding(@Qualifier("deadExchange") Exchange exchange,
                               @Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dead.#").noargs();

    }
}





@org.junit.Test
    public void test() {

      /* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           @Override
           public void confirm(CorrelationData correlationData, boolean ack, String cause) {
               System.out.println("confirm执行了");
               if(ack){
                   System.out.println("接收成功");
               }else {
                   System.out.println("接收失败"+cause);
               }
           }
       });*/

         /*//  1.测试过期时间,死信消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,
                    "ttl.haha", "我是一条消息,我会死吗");*/


           // 2,测试长度限制后,消息死信
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME,
                    "ttl.haha", "我是一条消息,我会死吗");
        }

5、延迟队列

TTL + 死信队列
在这里插入图片描述

实现:由消费者监听死信队列即可

6、日志与监控

web控制台

7、消息追踪

一般用于开发和调试环境
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于rabbitmq而言,可能因为生产者或者消费者与rabbitmq断开了连接,而它们与rabbitmq又采用了不同的确认机制,也有可能是因为交换器与队列之间不同的转发策略,甚至于交换机并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施,另外rabbitmq本身的集群策略也可能导致消息的丢失,这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位

  • Firehose

给amq.rabbitmq.trace 交换机绑定的队列,routingkey 设置为 “#” ,
在rabbitmq命令台输入 rabbitmqctl trace_on 即可开启
在这里插入图片描述

  • rabbitmq-trancing(插件)

rabbitmq-plugins list
rabbitmq-plugins enable 插件名
在这里插入图片描述
在这里插入图片描述

8、应用问题

8.1 消息补偿机制

100%消息发送成功
2失败,则比q3和mdb的消息id,如果不一致,则通知producer重新发送
2,3失败,则比对db和MDB数据是否一致,不一致的数据由producer重新发送

在这里插入图片描述

8.2 幂等性

  • 乐观锁机制

利用版本号
在这里插入图片描述

  • 消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
  • 消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
  • 针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。

8.3 可靠性

在这里插入图片描述

8.4 顺序性

link:https://blog.csdn.net/zw791029369/article/details/109561457

https://blog.csdn.net/zw791029369/article/details/109561457
在这里插入图片描述

9、集群

rabbitmq天然支持集群,不需要注册中心
集群+haproxy


查看状态 rabbitmqctl status



  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 22:44:58  更:2021-07-30 22:45:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 16:31:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码