IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ的使用(消息模型,死信队列) -> 正文阅读

[大数据]RabbitMQ的使用(消息模型,死信队列)

RabbitMQ

一. 简介

? RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。

? Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。

? AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

二. rabbitmq基本原理

? RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。

? 就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来操作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。

? 接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel

? 再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程

至此,我们就把所有的名词贯通咯,接下来做个概要描述:

  • ConnectionFactory:与RabbitMQ服务器连接的管理器
  • Connection:与RabbitMQ服务器的TCP连接
  • Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
  • Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
  • Message Queue:消息队列,用于存储还未被消费者消费的消息。
  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
  • RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受
  • BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
  • Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
  • Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。
  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
    如下图:

### [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bYOgutN1-1627904230745)(C:\Users\Administrator\Desktop\123.png)]
在这里插入图片描述

三. 安装

3.1安装erlang环境

rabbitmq是使用erlang语言开发的,所以必须要先安装erlang环境。erlang的软件环境下载地址为:https://www.erlang.org/downloads,安装完毕之后需要配置环境变量:

ERLANG_HOME=G:\erlang\erl-23.0

请添加图片描述

3.2 安装rabbitmq

本教程采用解压的方式进行安装。rabbitmq的下载地址为:https://www.rabbitmq.com/install-windows.html

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8bZh89uS-1627904230781)(images/rabbitmq-download-path.png)]请添加图片描述

A. 配置环境变量请添加图片描述

1.RABBITMQ_SERVER=G:\rabbitmq\rabbitmq_server-3.8.3
2.在path中加入:%RABBITMQ_SERVER%\sbin

B.安装可视化工具和服务

下载完毕之后进行解压,进入到家目录下的sbin目录下,执行如下命令,安装rabbitmq的web可视化工具:

rabbitmq-plugins.bat enable rabbitmq_management

安装rabbitmq的服务:rabbitmq-service.bat install

启动rabbitmq的服务:rabbitmq-server.bat start

在浏览器输入:http://localhost:15672/,访问rabbitm请添加图片描述
q的控制台,用户名和密码均为 guest

请添加图片描述

3.3 使用docker安装

docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

四. RabbitMQ程序的编写

4.1 rabbitMQ支持的消息模型

请添加图片描述请添加图片描述

4.2 引入依赖

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
</dependency>

4.3 简单模型(直连)

请添加图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
4.3.1 生成者
public class Producer {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 5672;
        String username = "guest";
        String password = "guest";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();
  /**
         * channel.queueDeclare()声明了一个队列,但是我们没有将其绑定到某个交换机,那么就绑定到RabbitMQ中的默认交换机上(AMQP default)
         * 那么bindingKey(Routing key)就是队列的名字
         *
         * 第一个参数是:队列的名字
         * 第二个参数是:队列是否持久化,就是当服务器重启,队列是否存在,实际工作中为true
         * 第三个参数是:是否为排他队列(临时队列),表示当前队列和连接绑定,连接关闭,队列关闭(持久化也没用),一般设置false
         * 第四个参数是:是否自动删除(不再使用且没有消息的队列)
         */
        channel.queueDeclare("test", true, false, false, null);
         /**
         * 发布消息
         * basicPublish() 第一个参数是交换机的名字,空字符表示默认交换机
         * 第二个参数是routingkey
         * 第三个参数是消息的属性(消息有很多属性)
         * 第四个参数是消息的内容
         * MessageProperties.PERSISTENT_TEXT_PLAIN表示是一个持久化队列
         *  MessageProperties.TEXT_PLAIN表示基于内存持久化
         */
        channel.basicPublish("", "test", null, (i + "hello").getBytes());

        RabbitMQUtils.close(conn, channel);
    }
}
4.3.2 消费者
public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();
 		/**
         * 第一个参数就是队列的名称
         * 第二个参数(auto ack)表示是否确认
         */
        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}

4.4 工作模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5QM1ZnpQ-1627904230787)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1625456692838.png)]

4.4.1 消息生产者
for (int i = 0; i < 20; i++) {
	channel.basicPublish("", "test", null, (i + "hello").getBytes());
}
4.4.2 多个消费者

消费者一

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}

消费者二

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECOND.sleep(3);
                }catch(Exception ex){}
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}
4.4.3 消息自动确认

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

channel.basicQos(1); // 每次只消费一条消息
channel.queueDeclare("firstQueue", false, false, false, null);

// 消息改为手动确认
channel.basicConsume("firstQueue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(new String(body, Charset.defaultCharset()));
    /**
       * 第一个参数表示当前消息的id
       * 第二个参数是multiple,是否同时确认前面的所有消息
    */
        channel.basicAck(envelope.getDeliveryTag(), false);
	}
});

4.5 发布订阅模型(fanout)

请添加图片描述

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
4.5.1 消息生成者
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("multiple", BuiltinExchangeType.FANOUT);

channel.basicPublish("multiple", "", null, "fanout-message".getBytes());

RabbitMQUtils.close(channel, connection);
4.5.2 消息消费者

使用如下代码创建多个消息的消费者

Connection connection = RabbitMQUtils.getConnection();

Channel channel = connection.createChannel();

//String qName = channel.queueDeclare("consumer-2", false, false, false, null).getQueue();
String qName = channel.queueDeclare().getQueue(); //创建临时队列

channel.queueBind(qName, "multiple", "");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

4.6 直连模型(direct)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

请添加图片描述

4.6.1 消息生产者
// 声明一个交换机, 交换机的类型为 direct
channel.exchangeDeclare("direct-module", BuiltinExchangeType.DIRECT);

channel.basicPublish("direct-module", "success", null, "成功信息".getBytes());
channel.basicPublish("direct-module", "error", null, "错误信息".getBytes());
4.6.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "success");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "error");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

4.7 主题模式(topic)

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

请添加图片描述

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者audit.irs 等
        audit.*   只能匹配 audit.irs

4.7.1 消息生产者

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("topic-module", BuiltinExchangeType.TOPIC);

channel.basicPublish("topic-module", "company.java", null, "通知信息".getBytes());
//channel.basicPublish("topic-module", "error", null, "错误信息".getBytes());

RabbitMQUtils.close(channel, connection);

4.7.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.java.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者三

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.html.*");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

结论

company: 可以被消费者一接收到
company.java: 可以被消费者一、消费者二接收到
company.java.manager: 可以被消费者一、消费者二接收到
company.html.teacher: 可以被消费者一、消费者三接收到

五. RabbitMQ与springboot整合

5.1 依赖

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.4</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
		<exclusions>
			<exclusion>
				<groupId>org.junit.vintage</groupId>
				<artifactId>junit-vintage-engine</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
	</dependency>
</dependencies>

5.2 使用json的序列化

// 消息的消费方json数据的反序列化
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
    			ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

// 定义使用json的方式转换数据
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate amqpTemplate = new RabbitTemplate();
    amqpTemplate.setConnectionFactory(connectionFactory);
    amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return amqpTemplate;
}

5.3 简单模型

消息消费方

@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})
public void simpleModel(User user) {
	log.info("message: {}", user);
}

消息发送

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleMessageSend() {
        rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
    }
}

5.4 工作模型

工作模式只需要在简单模式的基础上,添加一个消息的消费方。

5.5 发布订阅模型

消息消费方

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

消息发送方

// fanout模型
@Test
public void fanoutMessageSend() {
	for (int i = 0; i < 5; i++) {
		rabbitTemplate.convertAndSend("fanout-ex", "", new User(i, "张三"));
	}
	try {
		TimeUnit.SECONDS.sleep(5);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

5.6 直连模式(direct)

消息的消费方

@RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"error", "success"},
                    exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
        System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
      @QueueBinding(value = @Queue,
            key = {"error"},
            exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
    System.out.println(String.format("消费者 【two】: %s", user));
}

消息生产者

@Test
public void directMessageSend() {
	//rabbitTemplate.convertAndSend("direct-ex", "success", new User(2, "张三"));
	rabbitTemplate.convertAndSend("direct-ex", "error", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

5.7 topic模型

消息的消费方

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.java.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.html.*"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format("消费者 【three】: %s", user));
}

消息生产方

@Test
public void topicMessageSend() {
	//rabbitTemplate.convertAndSend("topic-ex", "company", new User(2, "张三"));
	rabbitTemplate.convertAndSend("topic-ex", "company.java", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(2);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

5.8 消息的手动确认

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

配置

spring:
  rabbitmq:
    listener:
      simple:
        # 提交方式为手动
        acknowledge-mode: MANUAL

提交代码

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

成功确认
deliveryTag:该消息的index

multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。

消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

失败确认

失败确认一:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag:该消息的index。

multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。

requeue:被拒绝的是否重新入队列。
失败确认二:

void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。

requeue:被拒绝的是否重新入队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

说明:当配置了json反序列化(见5.2节),代码中实例化了SimpleRabbitListenerContainerFactory,会默认覆盖application.yml文件中的配置,需要在代码层面手动的设置提交的方式:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

六. 事务与confirm机制

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dfyMgtPO-1627904230791)(G:\锋迷商城-分布式-笔记\05-RabbitMQ\imgs\1623141531367.png)]

6.1RabbitMQ事务

RabbitMQ的事务是对AMQP协议的实现,通过设置Channel 的模式来完成,语句为:

channel.txSelect();  //开启事务
// ....本地事务操作
channel.txCommit();  //提交事务
channel.txRollback(); //回滚事务

特别说明:RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能。

6.2 Confirm机制

由于RabbitMQ的事务性能的问题,于是就又推出了发送方确认模式。

channel.confirmSelect(); //开启发送方确认模式
6.2.1 单条消息确认
channel.waitForConfirms(); //对于单条消息的确认,返回值为true或者false
6.2.2 批量消息确认
try {
	channel.waitForConfirmsOrDie();  //批量消息确认,如果有一条消息没有发送成功,会抛出异常
}catch (Exception ex) {
	ex.printStackTrace();
}
6.2.3 回调方式确认
channel.confirmSelect();
// mandatory 需要开启
channel.basicPublish("", "tx-queue1", true, "text".getBytes());

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      System.out.println("成功达到交换机");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    	System.out.println("没有到达交换机");
    }
});

// 没有到达队列的时候触发
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, 
                            String routingKey, AMQP.BasicProperties properties, 
                             byte[] body) throws IOException {
    		System.out.println("没有到达队列");
    }
});

6.3 springboot对confirm实现

springboot对confirm第三种机制的实现。

6.3.1 配置
spring:
  rabbitmq:
    # 开启信息是否 回调 到交换机的确认方法,即 setConfirmCallback 方法
    publisher-confirm-type: CORRELATED
6.3.2 编码实现
@Bean("availableTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate();
		rabbitTemplate.setConnectionFactory(connectionFactory);

		// 开启 setReturnCallback 
		rabbitTemplate.setMandatory(true);

		// exchange告诉程序是否以及到达交换机,该方法无论成功都会回调。如果到达ack为true; 否则为false;
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            //correlationData是消息的第四个参数,之前在里面存了消息id,这里可以取
      	System.out.println(correlationData.getId());
				if(ack) {
             System.out.println("成功到达交换机");
        }else {
             System.out.println("没有到达交换机");
        }
		});

		rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, rk) -> {
				System.out.println("消息没有到达队列");
		});

		return rabbitTemplate;
}
6.3.3 消息的发送
@Test
public void sendMsg2() {
		// 消息属性
		MessageProperties messageProperties = MessagePropertiesBuilder
						.newInstance().setMessageId(UUID.randomUUID().toString())
						.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
						.build();
		// 消息
		Message message = new Message(JSONObject.toJSONBytes(new User(23, "张三")),
                                  messageProperties);

		CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend("","tx-queue", message, data);

		try {
				TimeUnit.SECONDS.sleep(120);
		} catch (InterruptedException e) {
				e.printStackTrace();
		}
}

七. 死信队列

我们先假设一个场景,当消息被消费方消费过很多次后,依然无法消费,那么就没有尝试的必要了,我们需要将这类信息放到一个特定的队列中,等待人工的接入。死信队列并不是一个特殊的队列,只是一个普通的队列,只是我们把他们取名叫做死信队列。

死信队列的设计是在某个队列的头信息中设定x-dead-letter-exchange (死信交换机)和x-dead-letter-routing-key(死信路由键)即可。关联到一个绑定到某个死信交换机的队列上。然后给该队列指定过期时间或者指定的消息的过期时间,那么该消息到期后会自动到达死信队列中。

  • 说明:
  • 死信交换机(Dead-Letter-Exchange): 当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上
    *
  • 使用方法:申明队列的时候设置 x-dead-letter-exchange 参数
    *
  • 判断一个消息是否是死信消息(Dead Message)的依据:
  • a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
  • b. 消息过期; 消息过期时间设置主要有两种方式:
    *1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x- message-ttl 参数,单位为 毫秒)
    *2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的 值,单位为 毫秒)
    *3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;
  • c. 队列已满(队列满了,无法再添加数据到mq中);

7.1 RabbitAdmin对象

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
     RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
}

7.2 创建死信队列与交换机

@Slf4j
@Component
public class DeadQueue {

    // 死信队列名
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
    // 死信交换机
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    // 死信路由键
    private static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";

    private RabbitAdmin rabbitAdmin;

    public DeadQueue(RabbitAdmin rabbitAdmin) {
        this.rabbitAdmin = rabbitAdmin;
    }

    @PostConstruct
    public void initDeadQueue() {
        Queue deadQueue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        rabbitAdmin.declareQueue(deadQueue);  //创建死信队列

        Exchange deadExchange = ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
                .durable(true).build();
        rabbitAdmin.declareExchange(deadExchange);  //创建死信交换机

        Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange)
                .with(DEAD_LETTER_ROUTING_KEY).noargs();
        rabbitAdmin.declareBinding(binding);  // 将队列绑定到交换机上
        log.info("死信队列:{}, 死信交换机: {}, 已经成功绑定.", DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE);
    }
}

7.3 死信队列的绑定

/**
 * x-dead-letter-exchange: 死信队列交换机
 * x-dead-letter-routing-key: 死信队列路由键
 * x-message-ttl: 消息在队列中最大的存活时间, 如果没有被消费,就会进入到死信队列
 */
@RabbitListener(bindings = @QueueBinding(
  value = @org.springframework.amqp.rabbit.annotation.Queue(
    value = "msg-queue",
    durable = "true",
    arguments = {@Argument(name = "x-dead-letter-exchange", value = DEAD_LETTER_EXCHANGE),
                 @Argument(name = "x-dead-letter-routing-key", value = DEAD_LETTER_ROUTING_KEY),
                 @Argument(name = "x-message-ttl", value = "20000", type = "java.lang.Long")
                }
  ),
  exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "msg-exchange"),
  key = {"msg"}
))
public void receiveMsg(Message message, Channel channel) throws Exception{
  log.info("消息体:{}", new String(message.getBody()));
  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

7.4 消息发送

@Test
public void sendMsg2() {
  // 消息属性
  MessageProperties messageProperties = MessagePropertiesBuilder
    .newInstance().setMessageId(UUID.randomUUID().toString())
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    .setExpiration("5000")  //消息的存活时间,与队列的TTL,取最小的时间,进入死信队列
    .build();
  // 消息
  Message message = new Message(JSONObject.toJSONBytes(new User(23, "message")), messageProperties);

  CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  rabbitTemplate.convertAndSend("msg-exchange","msg", message, data);
  try {
    TimeUnit.SECONDS.sleep(120);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

7.5 应用场景

场景一:未支付订单在规定的时间取消。实现的方式为,将订单消息放入到一个队列中,并指定其过期时间。当过期时间到了之后,就进入到了死信队列,那么可以直接在死信队列的消费端取出对应的消息即可。

场景二:某条消息在消费端曾多次尝试消费,但是均未消费成功,那么就进入死信队列,让人工干预。

八. 幂等性

所有的消息中间件都会存在这样一个问题,那就是消息的重复消费问题,例如说记录用户的积分信息,消息每次消费都会生成一条记录,这会队我们的业务带来致命的问题,所以我们必须做幂等性设计,所谓幂等设计就是,一条消息无论消费多少次所产生的结果都是相同的。对应的数学公式为:

f(n) = f(f(n))

8.1 方案一

为每条消息生成全局唯一ID,每次消费消息之后都将ID在表中插入一条数据,每次消费之前先查询ID是否存在,如果不存在就执行对应的逻辑;如果存在则直接确认。

8.2 方案二

利用redis+数据库的方案来实现幂等性的设计,实现的思路与redis的缓存击穿方案类似;当插入数据的时候,将唯一ID同时插入数据库,然后放入到redis中。

九. 消息的重试机制

消息的重试是发生在消息的消费端。

十.消息的可靠性投递

请添加图片描述

请添加图片描述

CSDN(sessinsong): https://blog.csdn.net/sessionsong/article/details/86317991

简书 (jiangmo):https://www.jianshu.com/p/64357bf35808

面试:a. 如何保证消息不丢失?

? b. 如何保证消息的不重复消费?

? c. 如何使用mq来是实现分布式事务?

? d. 在工作中mq用在哪里?支付回调。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-03 11:16:33  更:2021-08-03 11:16:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 19:32:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码