IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 03 rocketmq术语解释 -> 正文阅读

[大数据]03 rocketmq术语解释

  1. mq的结构图,如下:
    在这里插入图片描述
    在这里插入图片描述
    同理,consumer也是先向nameserver获取从哪一个broker获取消息,再得到nameserver的响应之后,才开始获取消息,consumer会一直监听消息的发送.
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    异步与同步的区别:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    4 发送同步,异步消息:
    同步: 指的是消息的发送者在发送消息后,处于阻塞状态,等待消息的消费方发送确认后返回.
    /**
  • 发送同步消息
    /
    public class SyncProducer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并指定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();
    for (int i = 0; i < 10; i++) {
    //4.创建消息对象,指定主题Topic、Tag和消息体
    /
    *
    * 参数一:消息主题Topic(消息的类别)
    * 参数二:消息Tag()
    * 参数三:消息内容
    */
    Message msg = new Message(“springboot-mq”, “Tag1”, (“Hello World” + i).getBytes());
    //5.发送消息
    SendResult result = producer.send(msg);
    //发送状态
    SendStatus status = result.getSendStatus();
    //消息id(每条消息都有一个id)
    String msgId = result.getMsgId();
    //消息队列id
    int queueId = result.getMessageQueue().getQueueId();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(1);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }

异步: 消息发送到之后,不会去等待mq回传发送结果,异步消 息的可靠性没有同步消息高,异步消息的发送可以通过回调函数的方式接受消息的处理.
应用场景为对消息发送性能比较高的场景.不能让消息延迟
/**

  • 发送异步消息
    */
    public class AsyncProducer {

    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();

     for (int i = 0; i < 10; i++) {
         //4.创建消息对象,指定主题Topic、Tag和消息体
         /**
          * 参数一:消息主题Topic
          * 参数二:消息Tag
          * 参数三:消息内容
          */
         Message msg = new Message("base", "Tag2", ("Hello World" + i).getBytes());
         //5.发送异步消息
         producer.send(msg, new SendCallback() {
             /**
              * 发送成功回调函数
              */
             public void onSuccess(SendResult sendResult) {
                 System.out.println("发送结果:" + sendResult);
             }
             /**
              * 发送失败回调函数
              */
             public void onException(Throwable e) {
                 System.out.println("发送异常:" + e);
             }
         });
         //线程睡1秒
         TimeUnit.SECONDS.sleep(1);
     }
     //6.关闭生产者producer
     producer.shutdown();
    

    }
    }

5 单向消息发送: 只管发,不管结果(例如日志发送)
/**

  • 发送单向消息
    /
    public class OneWayProducer {
    public static void main(String[] args) throws Exception, MQBrokerException {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();
    for (int i = 0; i < 3; i++) {
    //4.创建消息对象,指定主题Topic、Tag和消息体
    /
    *
    * 参数一:消息主题Topic
    * 参数二:消息Tag
    * 参数三:消息内容
    */
    Message msg = new Message(“base”, “Tag3”, (“Hello World,单向消息” + i).getBytes());
    //5.发送单向消息
    producer.sendOneway(msg);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(5);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }

6 消费消息的流程:
在这里插入图片描述
/**

  • 消息的接受者
    /
    public class Consumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者Consumer,制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
    //2.指定Nameserver地址
    consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.订阅主题Topic和Tag
    consumer.subscribe(“base”, "
    ");
    //设定消费模式:负载均衡|广播模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    //4.设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {

         //接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
             for (MessageExt msg : msgs) {
                 System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
             }
             //消费成功返回结果
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
     });
     //5.启动消费者consumer
     consumer.start();
    

    }
    }
    在回调函数中监听消息,有消息发送到达时候,负责处理.

7 消息消费模式:
(1) 广播模式: 消息队列中的消息被所有的消费者都消费一次.如下消息队列中的abc三条消息被消费者1,2都进行了消费.消费者之间都消费了相同的消息.
在这里插入图片描述
(2) 负载均衡模式:消费者1,消费者2合作起来一起消费abc,例如消费者1消费了ab,消费者2消费了c.
(3) 消费者在消费消息的时候指定消费模式:

在这里插入图片描述
注意: 默认的消费模式是负载均衡

8 顺序消息:
指的是按照生产者发送消息的顺序去进行消费(FIFO).RocketMQ可以严格保证消息有序(mq本身的数据结构就是先进先出).
当消息生产者的业务操作产生的消息具有顺序性的时候,那么消息在broker中是如何存储?
Broker中包含了不止一个队列,broker采用轮询的方式,将生产者的顺序消息分别存储在不同的队列中,如图:
在这里插入图片描述
消费者采用多线程的方式同时去消费消息.消费者如何保证顺序性消费呢?多线程很难保证顺序性.

9 消息顺序:
(1) 全局消息顺序(broker中所有队列的消息都是有序的,全局顺序是没有必要的)
(2) 局部消息顺序(将有顺序的消息放入一个队列中)
在这里插入图片描述
此时在一个队列中的消息,就得用单线程而不是多线程的方式去消费消息了.

9.1 首先保证同一个业务的顺序消息发送至同一个队列中.例如:创建一个订单的业务: 创建,付款,推送,完成;
订单号相同的消息需要推送至同一个队列,消费时,同一个orderId获取到的肯定是同一个队列.
如下: 确定业务消息存在一个队列中.
public class Producer {

public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
    //3.启动producer
    producer.start();
    //构建消息集合(订单集合)
    List<OrderStep> orderSteps = OrderStep.buildOrders();
    //发送消息
    for (int i = 0; i < orderSteps.size(); i++) {
        String body = orderSteps.get(i) + "";
        Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
        /**
         * 参数一:消息对象
         * 参数二:消息队列的选择器(确保业务相关消息发送至同一个队列)
         * 参数三:选择队列的业务标识(订单ID)
         */
        SendResult sendResult = producer.send(message, new MessageQueueSelector() {
            /**
             *
             * @param mqs:队列集合
             * @param msg:消息对象
             * @param arg:业务标识的参数
             * @return
             */
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                long orderId = (long) arg;
                long index = orderId % mqs.size();//根据订单id选择队列(保证同一个订单在同一个队列中)
                return mqs.get((int) index);//根据取模的结果确定发送的队列
            }
        }, orderSteps.get(i).getOrderId());

        System.out.println("发送结果:" + sendResult);
    }
    producer.shutdown();
}

}

9.2 消息的顺序消费(采用单线程确保消费的顺序性)
在这里插入图片描述
10 延迟消息: 在发送消息的时候,设置消息被延迟消费的时间
延迟可选如下:
在这里插入图片描述
11 批量消息发送:
批量发送消息能显著提高消息传递的性能,这一批消息的总大小不应超过4M.超过范围则需要进行分割.需要手写一个分割算法,计算消息的大小.
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
List msgs = new ArrayList();
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg1 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 1).getBytes());
Message msg2 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 2).getBytes());
Message msg3 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 3).getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
//5.发送消息
SendResult result = producer.send(msgs);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
//6.关闭生产者producer
producer.shutdown();
}
}

public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“BatchTopic”, “*”);
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

        //接受消息内容
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //5.启动消费者consumer
    consumer.start();
    System.out.println("消费者启动");
}

}

12 过滤消息的方式: 当消息生产者将消息发送至broker后,consumer可以根据一定的规则对消息进行过滤,选择需要消费的消息
(1) tag过滤,consumer将接受包含TAG的消息.但是限制是一个消息只能有一个标签,不适用与一些复杂的场景.
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message(“FilterTagTopic”, “Tag2”, (“Hello World” + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“FilterTagTopic”, "Tag1 || Tag2 ");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

        //接受消息内容
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //5.启动消费者consumer
    consumer.start();
    System.out.println("消费者启动");
}

}

(2) Sql:复杂场景推荐使sql过滤.
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message(“FilterSQLTopic”, “Tag1”, (“Hello World” + i).getBytes());
msg.putUserProperty(“i”, String.valueOf(i));//添加自定义的属性用于过滤
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(2);
}
//6.关闭生产者producer
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“FilterSQLTopic”, MessageSelector.bySql(“i>5”));//过滤条件
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

        //接受消息内容
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //5.启动消费者consumer
    consumer.start();
    System.out.println("消费者启动");
}

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-13 12:53:31  更:2021-12-13 12:53:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 5:48:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码