IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ php -> 正文阅读

[大数据]RabbitMQ php

概念

消息队列,一般我们会简称它为MQ(Message Queue),先不管消息(Message)这个词,来看看队列(Queue)。队列是一种先进先出的数据结构.消息队列可以简单理解为:把要传输的数据放在队列中。

应用场景

为什么用消息队列?

  1. 解耦
  2. 异步
  3. 削峰/限流

解耦: 耦合是指两个或两个以上的体系或两种运动形式间通过相互作用而彼此影响以至联合起来的现象。 解耦就是用数学方法将两种运动分离开来处理问题,常用解耦方法就是忽略或简化对所研究问题影响较小的一种运动,只分析主要的运动

削峰/限流: 应用在一些高并发的场景下,比如秒杀,抢票等场景

RabbitMq简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

注意: RabbitMQ通过插件实现AMQP 1.0。但是,AMQP 1.0是与AMQP 0-9-1完全不同的协议,因此不适用于后者。因此RabbitMQ将继续无限期地支持AMQP 0-9-1

RabbitMq和Redis队列比较

找到了一个说得得非常详细的文章 就不在这里写了 直接看看吧.

https://www.cnblogs.com/chinaboard/p/3819533.html

Rabbitmq使用介绍

通常我们谈队列服务,会有三个概念:发消息者、队列、收消息者,Rabbitmq在这个基本概念上多做了一层抽象,在发消息者和队列之间,加入了消息交换机。这样发消息和队列就没有直接联系,转而变成发消息者把消息给消息交换机,交换器根据调度策略再把消息给消息交换机,消息交换机根据调度策略再把消息再给队列

RabbitMq队列服务的五个概念:

1)虚拟主机:一个虚拟主机持有一组消息交换机、队列和绑定。在Rabbitmq中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的消息交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每个Rabbitmq服务器都有一个默认的虚拟主机”/”。

2)消息:消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储), 最后,Rabbitmq的延迟队列也是通过消息头参数实现(x-delay)

3)绑定:也就是交换机需要和队列向绑定,用于消息队列和交换器之间的互联,可能一对一,一对多,多对一。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

4)信道:多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的tcp连接内地虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过通道完成。因为对于操作系统来说建立和销毁tcp都死非常昂贵的开销,所以引入了信道的概念,以复用一条tcp连接。

5)交换机:交换机用于转发消息,但它不会做存储,如果没有Quene bind到Exchange的话,它会直接丢弃掉Producer发送过来的消息。消息到交换机的时候,交换机会根据路由键转发到对应的队列中。

换机有四种类型:Direct, topic, Headers and Fanout

Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.

Topic:按规则转发消息(支持匹配最灵活)(* 可以代替一个单词。#可以替代零个或多个单词)

Headers:设置header attribute参数类型的交换机

Fanout:转发消息到所有绑定队列

三类队列

  1. 普通队列
  2. 延时队列
  3. 死信队列

死信队列:没有被及时消费的消息存放的队列

消息没有被及时消费的原因:

a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

b.TTL(time-to-live) 消息超时未消费

c.达到最大队列长度

代码展示

消息发布者

$exchangeName = 'test2'; //交换机名
    $config = [
        'host' => env('MQ_HOST', ''),
        'port' => env('MQ_PORT', ''),
        'user' => env('MQ_USER', ''),
        'password' => env('MQ_PASSWORD', ''),
    ];
    $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
    $channel = $connection->channel();
    $channel->exchange_declare($exchangeName, 'fanout', false, true, false); //声明初始化交换机
    $data = json_encode([
        "agreenmentId" => "5150963681932088538",
        "oppId" => "4a71c5d3-0b7f-1971-704f-5fd9b41f9007",
        "projects" => [
            [
                "id" => "5151333319467605791",
                "loanAmount" => 50000,
            ],
        ]
    ]);
    $msg = new AMQPMessage($data, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); //delivery_mode = 2 代表消息持久化
    $delayTime = 10;
    $msg->set('application_headers', new AMQPTable([
        'x-delay' => $delayTime * 1000 // 延迟时间,单位毫秒
    ]));
    $channel->basic_publish($msg, $exchangeName); //推送消息到某个交换机
    $channel->close();
    $connection->close();

消息接收者

$exchangeName = 'servicefee_message'; //交换机名
    $config = [
        'host' => env('MQ_HOST', ''),
        'port' => env('MQ_PORT', ''),
        'user' => env('MQ_USER', ''),
        'password' => env('MQ_PASSWORD', ''),
    ];
    $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
    $channel = $connection->channel();
    $channel->exchange_declare($exchangeName, 'fanout', false, true, false); //声明初始化交换机
    $arguments = new AMQPTable([
        'x-message-ttl'=>15000, //队列消息过期时间
        'x-dead-letter-exchange'=>'PHP-Dead-Exchange', //死信交换机
        'x-dead-letter-routing-key'=>'PHP-Dead-Key' //死信routingKey
    ]);
    list($queue_name, ,) = $channel->queue_declare("xmjr_callBack_project", false, true, false, false, false, $arguments);
    $channel->queue_bind($queue_name, $exchangeName, ""); 
    $channel->basic_qos(null, 5, null); //prefetch_count = 5 代表这个消费者的缓冲池的容量是五条队列消息

    //死信交换机
    $channel->exchange_declare('PHP-Dead-Exchange','direct');
    //死信队列
    $channel->queue_declare('PHP-Dead-Queue');
    //死信绑定
    $channel->queue_bind('PHP-Dead-Queue','PHP-Dead-Exchange','PHP-Dead-Key');

    $callback = function($msg){
        echo " [x] Received ", $msg->body, "\n";
        $this->callBack($msg->body);
        echo " [x] Done", "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };

    $channel->basic_consume('xmjr_callBack_project', '', false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

注事事项

  1. 选择手动确认机制,避免数据丢失
  2. 根据实际业务情况以及服务性能设置合适的缓冲池容量.
  3. 根据实际情况确定交换机、队列、消息是否做持久化.

问题

  1. 降低了系统的稳定性
  2. 增加了系统的复杂性

加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大

补充

php版本在链接队列服务器的时候,需要显式设置心跳间隔,如果不设置则默认显式设置,则默认设置心跳间隔为0,如果服务端也是设置的0,则表示禁用心跳,参见文档
https://www.rabbitmq.com/heartbeats.html

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-02 11:26:45  更:2021-09-02 11:28:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:52:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码