IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMq详解讲解以及代码Demo -> 正文阅读

[大数据]RabbitMq详解讲解以及代码Demo

RabbitMQ

1.什么是MQ

生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者。

preview

2.MQ有什么作用

  • 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  • 异步:业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验(比如发短信)
  • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪(比如促销活动)

3.MQ的实现

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

4.常见MQ产品

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

5.AMQP协议的重要核心概念

在这里插入图片描述

  • Server:接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃.
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。
  • Queue:消息队列,用来保存消息,供消费者消费。

6.RabbitMQ的工作原理

img

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue.
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

7.使用流程

  • 消息接收者客户端连接消息队列服务器,打开一个Channel
  • 客户端声明一个exchange,并设置一些相关属性
  • 客户端声明一个Queue,并设置相关属性
  • 客户端通过使用routingkey(路由键),把声明的exchange和queue虚拟连接起来
  • 消息发布端,发送消息到exchange中里
  • exchange收到消息后,会通过routingkey消息的key和Binding将消息路由到一个队列或者多个队列中。

8.MQ的五种队列模式

<!--引入mq的相关依赖-->
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

1.直连方式(点对点模式)

在这里插入图片描述

//生产消息的
@Test
public void sendMessage() throws IOException, TimeoutException {
    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接rabbitmq主机
    connectionFactory.setHost("39.97.67.215");
    //设置端口号
    connectionFactory.setPort(5672);
    //设置连接哪个虚拟机
    connectionFactory.setVirtualHost("/ems");
    //设置访问虚拟主机的用户名和密码
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("ems");
    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    //获取连接中通道
    Channel channel = connection.createChannel();
    //通道绑定对应消息队列
    //参数1:队列名称,如果队列不存则自动创建
    //参数2: 用来定义队列特性是否要持久化
    //参数3:exclusive 是否独占队列 true 独占队列 false 不独占队列
    //参数4:是否在消费完成后自动删除队列 true自动删除 false不删除
    //参数5:额外附加参数
    channel.queueDeclare("hello",false,false,false,null);
    //发布消息
    //参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置  参数4:消息的具体内容
    channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
    channel.close();
    connection.close();
}

//消费消息
public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接rabbitmq主机
    connectionFactory.setHost("39.97.67.215");
    //设置端口号
    connectionFactory.setPort(5672);
    //设置连接哪个虚拟机
    connectionFactory.setVirtualHost("/ems");
    //设置访问虚拟主机的用户名和密码
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("ems");
    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    //获取连接中通道
    Channel channel = connection.createChannel();


    //通道绑定对应消息队列
    //参数1:队列名称,如果队列不存则自动创建
    //参数2: 用来定义队列特性是否要持久化
    //参数3:exclusive 是否独占队列 true 独占队列 false 不独占队列
    //参数4:是否在消费完成后自动删除队列 true自动删除 false不删除
    //参数5:额外附加参数
    channel.queueDeclare("hello",false,false,false,null);


    //消费消息
    //参数1:消费哪个消息队列
    //参数2:开始消息的自动确认机制
    //参数3:消费是的回调接口
    channel.basicConsume("hello",true,new DefaultConsumer(channel){
        //匿名内部类
        @Override  //最后一个参数是从消息队列中取出的消息
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("new String(body) = " + new String(body));
        }
    });
    //        channel.close();
    //        connection.close();
}

注意:消费者队列和生产者队列一定要一一对应起来

2.第二种模型(任务模型)work queues

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消费的消费速度,长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

在这里插入图片描述

//任务型模式的队列 生产者
public class WorkProducerService {
    @Test
    public void sendMessages() throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work",null,(i+"work message").getBytes());
        }
        MqUtils.MqClose(channel,mqConnection);
    }
}
//消费者
//任务型模式的队列
public class WorkConsumerServiceTwo {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息》》》》"+new String(body).toString());
            }
        });
    }
}
//任务型模式的队列
public class WorkConsumerService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息》》》》"+new String(body).toString());
            }
        });
    }
}

注意:默认情况下,这种work模型的消费方式是,平均消费的!

基于上面这种情况还可以演变成 能者多劳模式,和手动确认消息消费机制

//任务型模式的队列 生产者
public class WorkProducerService {
    @Test
    public void sendMessages() throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work",null,(i+"work message").getBytes());
        }
        MqUtils.MqClose(channel,mqConnection);
    }
}
//消费者
public class WorkConsumerServiceAutoOne {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.basicQos(1);  //设置每次消费一个
        channel.queueDeclare("work",true,false,false,null);
        //修改参数二,设置成false为不自动确认消息机制
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                }catch (Exception ce){
                    ce.printStackTrace();
                }
                System.out.println("消费的消息》》》》"+new String(body).toString());
                channel.basicAck(envelope.getDeliveryTag(),false);  //手动确认消费机制
            }
        });
    }
}
public class WorkConsumerServiceAutoTwo {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.basicQos(1);  //设置每次消费一个
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费的消息》》》》"+new String(body).toString());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

3.第三种模型 (广播模式)

广播模式下,消息发送流程是这样的。

可以有多个消费者
每个消费者有自己的(queue 队列),每个队列都要绑定到Exchange(交换机),
生产者发送消息,只能发送到交换机,交换机决定发给哪个队列,生产者无法决定。
交换机把消息发送给绑定过的所有队列。
队列的消费者都能拿到消息,实现一条消息被多个消费者消费

在这里插入图片描述

//广播模式的生成者
public class FanoutProducerService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        // 参数一为交换机的名字 参数二为交换机的模式
        channel.exchangeDeclare("logs", "fanout");
        channel.basicPublish("logs","",null,"广播模式的".getBytes());
        MqUtils.MqClose(channel,mqConnection);
    }
}

//广播模式的消费者1
public class FanoutConsumerService1 {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取一个队列临时的
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("queueName = " + queueName);
        //交换机和队列绑定
        channel.queueBind(queueName,"logs","");
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1消费的消息是:》》》》"+new String(body));
            }
        });
    }
}
//广播模式的消费者2
public class FanoutConsumerService2 {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs","fanout");
        //获取一个队列临时的
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("queueName = " + queueName);
        //交换机和队列绑定
        channel.queueBind(queueName,"logs","");
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2消费的消息是:》》》》"+new String(body));
            }
        });
    }
}

4.第四种(路由模式)

这种模式消息首先到X(exchange)中,然后通过routingkey把消息发送到匹配的队列中

4.1Routing 之订阅模型—Direct(直连)

在这里插入图片描述

在Fanout模式中,一条消息,会被所有订阅的队列消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange.

在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由Key)
    • 消息的发送方向在向Exchange发送消息时,也必须指定消息的Routingkey
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息的流程
// 这是直连方式,通过交换机 生产者
public class DirectProducerService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机,第一个参数交换机的名字 第二个参数交换机的类型
        channel.exchangeDeclare("losg_diect","direct");
        String routingKey  ="info";
//        String routingKey  ="error";
        channel.basicPublish("losg_diect",routingKey,null,("这是direct{"+routingKey+"}").getBytes());
        MqUtils.MqClose(channel,mqConnection);
    }
}

//这是交换机,直连模式的消费者1
public class DirectConsumerService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        String exchange = "losg_diect"; //交换机名字
        //绑定交换机
        channel.exchangeDeclare(exchange,"direct");
        String queue = channel.queueDeclare().getQueue();  // 获取临时的队列
        String routingKey  ="info";  //路由
        //队列和交换机 ,路由绑定
        String routingKeys  ="error";
        channel.queueBind(queue,exchange,routingKey);
        channel.queueBind(queue,exchange,routingKeys);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到的信息是》》》》"+new String(body));
            }
        });
    }
}
//这是交换机,直连模式的消费者
public class DirectConsumerServiceTwo {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        String exchange = "losg_diect"; //交换机名字
        //绑定交换机
        channel.exchangeDeclare(exchange,"direct");
        String queue = channel.queueDeclare().getQueue();  // 获取临时的队列
        String routingKey  ="error";  //路由
        //队列和交换机 ,路由绑定
        channel.queueBind(queue,exchange,routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到的信息是》》》》"+new String(body));
            }
        });
    }
}

4.2 Routing 之订阅模型—Topic

在这里插入图片描述

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange 可以让队列在绑定Routing key的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以“.”分隔。例如:item.insert

// 订阅模式的 生产者
public class TopicProduserService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("topic","topic");
//        String routingKey ="user.save";
        String routingKey ="lihuan.user.save";
        //发送信息
        channel.basicPublish("topic",routingKey,null,("topice{"+routingKey+"}"+"发送消息").getBytes());
        MqUtils.MqClose(channel,mqConnection);
    }
}
// 订阅模式的 消费者1
public class TopicConsumerService {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("topic","topic");
        //得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
         String routingKey ="user.*";
        //绑定交换机
        channel.queueBind(queue,"topic",routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 消费的信息>>>>"+new String(body) +">>>>路由是》》》"+routingKey);
            }
        });
    }
}

// 订阅模式的 消费者2
public class TopicConsumerServiceTwo {
    public static void main(String[] args) throws IOException {
        Connection mqConnection = MqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("topic","topic");
        //得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        String routingKey ="*.user.#";
        //绑定交换机
        channel.queueBind(queue,"topic",routingKey);
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2 消费的信息>>>>"+new String(body) +">>>>路由是》》》"+routingKey);
            }
        });
    }
}

5.RPC

c对s说“我这有个任务需要你的帮助”,s处理完后,将结果返回给c

在这里插入图片描述

9.RabbitMQ持久化(消息的可靠传输)

需要将消息和队列标记为持久化

9.1队列持久化

 channel.queueDeclare("hello",true,false,false,null);

9.2消息持久化

//参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置(持久化的概念)  参数4:消息的具体内容
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

9.3发布确认机制

1.发送方确认模式

将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

2.消费方确认模式

消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:11:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 16:43:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码