IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 消息队列:RocketMQ 概念及下载安装 -> 正文阅读

[大数据]消息队列:RocketMQ 概念及下载安装

一、消息中间件

消息:传递的信息。系统与系统之间通讯传递的信息。

中间件:redis就是一个数据存储的中间件。 独立于系统之外的一个应用都可以叫做中间件。

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者),broker(中间件)。

生产者: 发送消息

消费者:获取接收消息、并处理消息。

二、概述

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

RocketMQ单机也可以支持亿级的消息堆积能力。单机写入TPS单实例约7万条/秒,单机部署3个-,可以跑到最高12万条/秒,消息大小10个字节。

==市场上主流的消息中间件: RabbitMQ、RocketMQ、Kafka、==

三、核心概念

Name Server

  • NameServer的主要功能是为整个MQ集群提供服务协调与治理,具体就是记录维护Topic、Broker的信息,及监控Broker的运行状态。

  • 理解成zookeeper的效果,只是他没用zk,而是自己写了个nameserver来替代zk

  • 底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点

  • nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除

  • nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,

  • NameServer集群间互不通信,没有主备的概念

  • nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化,所以他是无状态节点

Broker

  • 理解成RocketMQ本身

  • broker主要用于producer和consumer发送和接收消息

  • broker会定时向nameserver提交自己的信息

  • ==是消息中间件的消息存储、转发服务器==

  • 每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

Producer

  • 消息的生产者

  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Broker Master建立长连接,且定时向Broker Master发送心跳。Producer完全无状态,可集群部署。

  • Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s + 120s能够感知,在此期间内发往Broker的所有消息都会失败。

  • Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

Consumer

  • 消息的消费者

  • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Broker Master、Broker Slave建立长连接,且定时向Broker Master、Broker Slave发送心跳。Consumer既可以从Broker Master订阅消息,也可以从Broker Slave订阅消息,订阅规则由Broker配置决定。

  • Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s + 120s才能感知。

  • Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

Topic

  • Topic翻译为主题。我们可以理解为消息类型

  • 比如一个电商系统的消息可以分为:交易消息、物流消息等,一条消息必须有一个Topic。

Queue

  • ==一个topic下,可以设置多个queue(消息队列),默认4个队列。==

  • 当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。

  • 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

四、RocketMQ下载与安装

1、创建一个本地文件夹

mkdir ?/usr/local/rocketmq

2、安装 Namesrv ,拉取镜像

docker pull rocketmqinc/rocketmq:4.4.0

3、启动namesrv

docker run -d -p 9876:9876 ?-v /usr/local/rocketmq/data/namesrv/logs:/root/logs ?-v /usr/local/rocketmq/data/namesrv/store:/root/store --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv

4、 安装 broker 服务器

????????(1) 在 usr/local/rocketmq/创建conf 目录,并在conf下创建 broker.conf 文件

#创建目录
mkdir /usr/local/rocketmq/conf


#创建文件broker.conf文件
cd /usr/local/rocketmq/conf
vim broker.conf


#把文件内容拷贝到文件上
brokerClusterName = DefaultCluster
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = XXX.XXX.XX.XXX(自己的linux机器的ip地址)
brokerName = broker-tanhua

????????(2)安装brokerServer容器, 启动broker

docker run -d -p 10911:10911 -p 10909:10909 -v ?/usr/local/rocketmq/data/broker/logs:/root/logs -v ?/usr/local/rocketmq/rocketmq/data/broker/store:/root/store -v ?/usr/local/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

此时可以用docker ps 查询一下namesrv和broker是否启动

5、安装 rocketmq 控制台 ,拉取镜像

docker pull styletang/rocketmq-console-ng

#这里ip地址要修改成你自己的linux的ip

docker run -di ?--name=rocketmq-console-ng -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=xxx.xxx.xx.xxx:9876 -Drocketmq.config.isVIPChannel=false -Duser.timezone='Asia/Shanghai'" -v /etc/localtime:/etc/localtime -p 8001:8080 -t styletang/rocketmq-console-ng

8001是映射的端口,可自行设置。启动后可以在浏览器打开:http://mq主机ip:映射端口

到此,rocketmq服务已启动,相应的控制台项目也启动了。

五、Springboot整合RocketMQ

1、生产者工程

????????创建项目rocket_producer,添加rocketmq-spring-boot-starter等依赖包


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

????????创建启动引导类

package cn.itcast;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

????????编写application.yml配置文件

rocketmq:
? name-server: 192.168.75.128:9876
? producer:
? ? #生产组的名字是可以随意的,主要保证消息的高可用
? ? group: tanhua
? ?#设置发送消息的超时时间
? ? send-message-timeout: 5000

????????编写测试类,通过RocketMQTemplate发送消息

package cn.itcast.test;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @Test
    public void test01(){
        /**
         * 参数一: 消息的主题
         * 参数二:消息的内容
         */
        rocketMQTemplate.convertAndSend("topic","rocketmq发送消息");
    }

}

2、消费者工程

? ? ? ? 导入依赖

     <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
    </dependencies>

????????创建启动引导类

package cn.itcast;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketConsumerApplication.class, args);
    }
}

????????编写配置文件

rocketmq:
? name-server: xxx.xxx.xx.xxx:9876

#ip地址这里要修改成自己linux的ip

????????编写消息监听器,实现RocketMQListener接口 ????????

package cn.itcast.listener;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component //创建消息监听器的对象
//指定监听的主题并且指定消费组,消费的主题一定要与生产主题要一致,
// 消费者的名字可以随意的,我们一般一个主题对应一个消费组, 一个消息可以被不同的消费组消费多次的。同一个消费组只会消费一次同一个消息
@RocketMQMessageListener(topic = "topic",consumerGroup = "group2")
public class MyMessageListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("监听到的消息:"+message);
    }
}

六、消息类型

消息的生产者:分别发送默认同步消息、同步消息、异步消息。

|-- RocketMQTemplate

|-- convertAndSend() 默认同步消息, 没有返回结果

|-- syncSend() 发送同步消息, 可以获取消息返回结果(回执)

|-- asyncSend() 发送异步消息

|-- sendOneWay() 直接发送消息,不会等待应答

1、在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用one-way方式,比如:物流实时消息,以提高吞吐量;

  • 当发送的消息很重要,而且下一步必须使用上一步的操作结果. ,同步消息 结算---跳转对应结算清单页面

  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;,发送优惠券

方式

速度

返回结果

可靠性

同步不丢失消息
异步更快不丢失消息
单向(直接发送)最快可能丢失消息

七、顺序消息

在MQ的模型中,顺序需要由3个阶段去保障:
?? ?1.消息被发送时保持顺序
?? ?2.消息被存储时保持和发送的顺序一致
?? ?3.消息被消费时保持和存储的顺序一致

 @Test
    public void sendOrderMessage(){
        //1. 得到消息
        List<Order> orderList = Order.buildOrders();

        //2. 消息的队列选择器 作用:指定消息进入到指定的队列中。
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            /**
             * select方法调用时机: 每发送一个消息的时候都会触发这个方法,这个方法的返回值代表了该消息进入的队列
             * @param list   当前主题下的所有队列
             * @param message  消息的内容
             * @param o    消息的标识符
             * @return
             */
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                //1. 得到消息的标识
                int orderId = Integer.parseInt((String) o);
                //2. 根据订单号算出该订单存储的队列索引号
                int index = orderId%list.size(); // 4%4 =0  5%4 = 1
                return list.get(index);
            }
        });

        //3. 遍历消息集合,发送消息
        for (Order order : orderList) {
            /**
             * 参数一: 消息的主题
             * 参数二: 消息的内容
             * 参数三: 消息的唯一标识
             */
            rocketMQTemplate.syncSendOrderly("order_topic",order.toString(),order.getId()+"");
        }
    }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-27 12:54:03  更:2021-10-27 12:55:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 2:00:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码