IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习 -> 正文阅读

[大数据]RabbitMQ学习

RabbitMQ学习

使用场景

  1. 消息队列解决什么问题?

    1. 异步处理
    2. 应用解耦
    3. 流量削锋
    4. 日志处理

安装与配置

用户及vhost配置

添加用户

在这里插入图片描述

virtual host管理

在这里插入图片描述

开发指南

Simple简单队列

模型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ItrbYtxD-1632320692103)(images/python-one-overall.png)]

P:消息生产者

红色:阶列

C:消息消费者

不足

耦合性高,生产者—消费者一一对应。队列名变更都得变理

工作队列

模型

在这里插入图片描述

为什么会出现工作队列

Simple队列是一一对应的,而且我们实际开发,生产者发送消息是不费力的,而消费者一般是跟业务相结合的。,消息者接收到消息之后就需要处理。需要花费时间。队列就需要更多的消费者。

现象:

消费者1和2处理的消息是一样的

消费者1:奇数

消费者2:偶数

其实是一个轮询分发(roun-robin)

公平分发(Fail dispatch)

使用公平发分要关闭autoACK,改成手动。

公共的消费类

public class BHFailCustomConsumer extends DefaultConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(BHFailCustomConsumer.class);

    private String consumerName = "defaultConsumer";

    private long delayTime = 100L;

    public BHFailCustomConsumer(Channel channel,long delayTime, String ...name) {
        super(channel);
        try {
            //接收消息,在没有应答前只接收1条消息
            channel.basicQos(1);
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.delayTime = delayTime;
        if(name!=null) {
            this.consumerName = name[0];
        }else{
            this.consumerName = "defaultConsumer "+ UUID.randomUUID().toString();
        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body,"utf-8");
        LOGGER.debug(this.consumerName +"_"+msg);
        try {
            Thread.sleep(delayTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //手动应答
        getChannel().basicAck(envelope.getDeliveryTag(),false);
    }
}

C1,C2,使用不是的delayTime延迟

Connection connection = messageBrokerHelper.getConnection();
        Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
        Consumer consumer = new BHFailCustomConsumer(channel,delayTime,name);
        try {
            LOGGER.info("Consumer {} waiting consume,delay time {}",name,delayTime);
            //关掉自动应答,由消费者手动处理应答
            channel.basicConsume(QUEUE_NAME,false,consumer);
        } catch (IOException e) {
            LOGGER.debug("consumer listener failure",e);
        }

W 生产者

 public void sender(int senderCount){
        Connection connection = messageBrokerHelper.getConnection();
        Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
        try {
            int senderTime = senderCount < 1 ? 1 : senderCount;
            for (int i=0;i<senderTime;i++) {
                String body = "hello,rabbit mq" + i;
                channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            messageBrokerHelper.closeChannel(channel);
            messageBrokerHelper.closeConnection(connection);
        }

    }

消息应答与持久化

订阅模式

模型

在这里插入图片描述

  1. 一个生产者,多个消费者,每个消费者有自己报价列
  2. 生产者没有直接把消息发送到队列,而是发到了交换机,转发器exchange
  3. 每个队列都要绑定到交换机上
  4. 生产者发送的消息,经过交换机,到达队列。实现一个消息被多个消费者所消费。

场景:

注册->邮件->短信

生产者

消费者

exchange(交换机 转发器)

接收生产者消息,并接收到的消转发给队列

fanout:不处理路由键
在这里插入图片描述

Direct:处理路由键

在这里插入图片描述

路由模式

在这里插入图片描述
在这里插入图片描述

Topic模式

在这里插入图片描述

‘# 匹配一个或者多个

*匹配一个

RPC模式

在这里插入图片描述

消息确认机制(事务+confirm)

两种方式:

AMQP实现了事务机制

Confirm模式

事务机制

  • txselect

    用户将当前channel设置成transation模式

  • txCommit

用于提交事务

  • txRollback

回滚事务

Confirm模式

生产者的实现原理

开启confirm模式

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-23 11:32:08  更:2021-09-23 11:34:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:42:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码