IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMq 的理论及应用示例(一) -> 正文阅读

[大数据]RabbitMq 的理论及应用示例(一)

什么是RabbitMq

RabbitMQ是一个基于AMQP协议的开源的消息代理和队列服务器。
优点:

  • 采用Erlang语言进行开发作为底层语言实现:Erlang有着和原生Socket一样的延迟,所以性能非常高
  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美整合,API丰富
  • 集群模式比较丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性,可用性

AMOP 专有名词:

  • Server:又称为 Broker
  • Connection:连接,应用程序和broker之间的网络连接
  • Channel:网络信道,一个网络会话的任务
  • Message:消息
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchage和Queue,但同一个Virtual host里面不能有相同名称的Exchage和Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定队列
  • Binding:Exchage和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则
  • Queue:保存具体的消息的容器

RabbitMQ 消息的流转过程:
在这里插入图片描述

应用场景/作用

  • 异步缓冲
    有些业务可以进行异步的,只要做到最终一致性,不用强一致性,即可用MQ

  • 服务解耦

    • 强依赖:使用 dubbo 或 springCloud 进行服务的调用和连接都是强依赖。【比如注册、发现都需要依赖其他服务】

    • 弱依赖:MQ 中间件

      • 不代表弱依赖就可以失败
      • 如果不能失败就要保证上游的消息发布端数据投递的可靠性

      场景举例:用户下单后,订单需要更新库存

      强依赖下会出现的问题:
      1)假如库存系统无法访问,则订单减库存失败,从而导致订单生成失败
      2)订单模块和库存模块是强耦合的
      3)如果启用一个线程做离线操作,只是做了异步访问,访问只是提升速度,是否正 常调用成功是无法保证的

      通过弱依赖来解决以上问题:
      1)订单生产成功写入消息到消息队列(保证消息的可靠投递)
      2)库存系统通过订阅消息获取下单信息,库存系统根据下单信息进行库存操作
      3)如果库存系统出现异常,库存消费消息失败的情况下消息就重回队列了,等待下次发送

  • 削峰和填谷

    • 当我们下游服务处理不过来的时候,就可以将这些消息缓存在一个地方,逐步处理
    • 将短暂一段时间的业务积压在后面缓慢执行就是削峰和填谷的过程

思考

  1. 生产端的可靠性投递;
    • 如果消息和钱有关,这个消息一定不能丢失
    • 需要做到生产端100%投递,就需要和业务数据保证原子性
  2. 消费端的幂等;
    • 生产端如果要做到可靠性投递,可能会有重复投递
    • 消费端消费了两次或多次这个数据可能会不一致
    • 所以消费端一定要做到同一个请求消费多次得到的结果一样
  3. MQ 本身需要考虑
    • HA:高可用
    • 低延迟
    • 可靠性:确保数据是完整的
    • 堆积能力:这是MQ能扛下你的业务量级的保证
    • 扩展性:是否能够天然支持横向扩展无感知扩容

RabbitMq 集群架构原理解析

1)主备模式
master-slave结构,可以理解为热备份,master负责读写,master宕机后切换到slave

2)镜像模式
业界主流使用比较多的模式;
RabbitMQ集群非常经典的就是镜像模式,保证数据100%不丢失;
高可用、数据同步低延迟、奇数个节点。

缺点:
镜像队列集群的缺陷是无法进行很好的横向扩容,因为每个节点都是一个完整的互相复制的节点,并且镜像节点过多也会增加MQ的负担,一个数据写入就要复制到多个节点,吞吐量也会降低
在这里插入图片描述

RabbitMq 的安装和使用

RabbitMq-3.8.19安装详解 前面我已经介绍了一篇,再此省略。

修改用户登录和连接心跳

  • 将 loopback_users.guest = false,前面的注释去掉
  • 将 {heartbeat, 60} 修改为 {heartbeat, 10}

查看MQ端口是否启用:yum -y install lsof

  • lsof -i:5672

启动插件:

  • rabbitmq-plugins enable rabbitmq_management

查看管理后台是否启动

  • lsof -i:15672

常用命令

# 启动服务
systemctl start rabbitmq-server
# 或者
rabbitmq-server -detached
# 开启web管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户密码
rabbitmqctl add_user username password

# 修改用户密码
rabbitmqctl change_password username password

# 列出所有用户
rabbitmqctl list_users

# 删除用户
rabbitmqctl delete_user username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 创建虚拟主机
rabbitmqctl add_vhost vhostpath

# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName

# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作

springboot 整合 rabbitmq

发送

搭建一个 SpringBoot 工程及准备工作,我在另一篇博客写了,这里不再重复。SpringBoot全局异常处理、集成Swagger和参数必填校验 ,我们现在需要的准备工作,都在这篇准备了。

新增 application.yaml

server:
  port: 8088

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000

消息实体

package cn.com.springboot.vo;

import lombok.Data;

import java.io.Serializable;

@Data
public class OrderInfo implements Serializable {

    private String id;

    private String orderName;

    private String messageId;
}

发送类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String ORDER_EXCHANGE = "order_exchange";

    private static final String ORDER_ROUTING_KEY = "order_r_key";

    public void sendOrder(OrderInfo orderInfo){
        //correlationData:消息唯一id
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(orderInfo.getMessageId());

        //String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData
        rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);
    }
}

rabbitmq 准备工作

创建 exchange
在这里插入图片描述
Exchange的相关属性说明

  • Name:exchange的名称

  • Type类型说明

    • direct:exchange在和queue进行binding时会设置routingkey,只有routingkey完全相同,exchange才会将消息转发到对应的queue上,相当于点对点

    • fanout:直接将消息路由到所有绑定的队列中,无需对消息的routingkey进行匹配操作,因为不绑定routingkey,所有也是消息转发最快的(广播方式)

    • topic:此类型的exchange和direct差不多,但direct类型要求routingkey完全相同,而topic可以使用通配符:‘*’,‘#’

      其中‘*’表示匹配一个单词,‘#’则表示匹配没有或者多个单词

    • header:路由规则是根据header来判断

    • 总结:一般direct和topic用来具体的路由信息,如果用广播就使用fanout,header类型用的比较少

  • Durability:Durable是持久化到磁盘的意思

  • Auto Delete:如果设置为yes,那么最后一个绑定到exchange上的队列被删除后,exchange也会自动删除

  • Internal:如果为yes则表明该exchange是rabbitmq内部使用,不提供给外部系统应用,一般是在自己编写erlang语言做定制化扩展时使用

  • Arguments这个是扩展AMQP协议时自定义使用的内容

创建Queue
在这里插入图片描述
Exchange和Queue通过Binding关联,由routingkey进行路由
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试发送

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@Api("发送消息")
@RestController
public class RabbitMqController {
    @Autowired
    private OrderSender orderSender;

    @PostMapping("/sendOrder")
    public ResultVo sender(@RequestBody OrderInfo orderInfo){
        orderSender.sendOrder(orderInfo);
        return ResultVo.success();
    }
}

用 swagger 测试
在这里插入图片描述
消息成功发送到队列
在这里插入图片描述
注意:

  • 一个exchange可以绑定多个queue,只要routingkey一样,一个消息就会发送到多个queue上
  • exchange绑定一个queue,无论binding多少个routingkey,只要符合这个routingkey规则的消息都会发送到这个队列中,接收的时候无论从哪个routingkey过来的消息,连接这个队列的消费端都会消费掉,相当于多个消息规则对应一个队列

消息接收

创建另一个工程:consumer-and-producer,搭建步骤和上面的基本一样。
application.yaml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000
    listener:
      simple:
        concurrency: 5              # 初始化并发数
        max-concurrency: 10         # 最大并发数
        auto-startup: true          # 自动开启监听
        prefetch: 1                 # 每个连接同一时间最多处理几个消息,限流设置
        acknowledge-mode: manual    # 签收模式为手动签收

添加消费类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Log4j2
@Component
public class OrderReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order_queue", durable = "true"),
            exchange = @Exchange(value = "order_exchange", type = "topic"),//durable默认是true
            key = "order_r_key"//我的routingKey是order_r_key
    ))
    @RabbitHandler
    public void receiveOrderInfo(@Payload OrderInfo orderInfo,
                                 @Headers Map<String, Object> headers,
                                 Channel channel) throws IOException {
        log.info("开始消费");
        log.info("orderName:{}, messageId:{}", orderInfo.getOrderName(), orderInfo.getMessageId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);

        channel.basicAck(deliveryTag, false);
    }
}

启动测试
在这里插入图片描述

暂时分享到这,欢迎指正!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-27 16:17:12  更:2021-07-27 16:18:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/8 18:32:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码