IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ发送消息的四种模式 -> 正文阅读

[Java知识库]RabbitMQ发送消息的四种模式

1. 前言

RabbitMQ中有4种类型的exchange,分别是directfanouttopicheaders。对应有5种发送消息的模式。direct类型的exchange有两种发送模式。

2. 发送消息的不同模式

2.1 AMQP default

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
默认的exchange被隐式地绑定到每个队列,routingKey与队列的名称相同。不能从默认exchange上显式地绑定或解绑。也不能删除。这种模式就像消息直接发送到队列一样。
这种模式的exchange类型是direct
在这里插入图片描述
在这里插入图片描述
producer
这种模式需要设置队列名称

ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("public");
factory.setPassword("admin");
//获取连接
Connection connection=factory.newConnection();
//创建通道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i=0;i<10000;i++){
     String message = "Hello World!";
     message+=i;
     Thread.sleep(1000);
     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
     System.out.println(" [x] Sent '" + message + "'");
 }
channel.close();
connection.close();

channel.basicPublish(...)

The first parameter is the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

第一个参数是exchange的名称,这里的空字符串表示默认或没有名字的exchange。如果routingKey存在的话,消息会被路由到routingKey指定名称对应的队列。即队列与routingKey同名。

consumer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(10);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
    	String messageStr = new String(message.getBody(), "UTF-8");
    	System.out.println(" [x] Received '" + messageStr + "'");
//  	channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
};
boolean autoAck=true;
//消费队列中的消息
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

channel.basicConsume(...)
第一个参数是queue名称,autoAck表示消息自动确认

Fair dispatch 消息公平的分发
如果前n个消息没有处理确认,就不给他发送新的消息

int prefetchCount = n;
channel.basicQos(prefetchCount);

2.2 fanout exchange

fanout类型的exchange会忽略routingKey,只要队列与exchange绑定,exchange的消息就会路由到这个队列。
producer

//创建exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));

consumer

//获取队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定队列
channel.queueBind(queueName, EXCHANGE_NAME, "");

2.3. direct exchange

这种类型,消息通过routingKey路由到对应队列
producer

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "info", null, (message+" info").getBytes(StandardCharsets.UTF_8));

consumer

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");

2.4 topic exchange

这种模式通过主题发送,主题其实就是routingKey加了通配符。
在这里插入图片描述

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
“lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings

*代替一个单词
#代替零个或多个单词
一个消息的routingKey为"lazy.pink.rabbit",它会被分发到第二个队列Q2,且只有一次,即使Q2匹配两个绑定关系。
producer

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, (message+" quick.orange.rabbit").getBytes(StandardCharsets.UTF_8));

consumer

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

2.5 headers exchange

消费者中绑定队列到exchange时,需要传递一组key-value形式的参数,其中有一对参数是"x-match"-">any",如果这对参数不传的话,默认为"x-match"->“all”。
如果x-match的值为all,消息的header中的值要匹配队列绑定时传递的参数中的全部参数,此时消息会被路由到这个队列。如果值为any,只需要匹配参数中的一对参数即可。

Note that headers beginning with the string x- will not be used to evaluate matches.
注意,以 x- 开头的header不会被匹配

producer

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
//headers.put("type", "report");
AMQP.BasicProperties properties= new AMQP.BasicProperties.Builder()
        .headers(headers).build();
String message= "hello world";
channel.basicPublish(EXCHANGE_NAME, "", properties, (message).getBytes());

consumer

String queueName = channel.queueDeclare().getQueue();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-match",  "any"); // x-match默认为all
arguments.put("format",  "pdf");
arguments.put("type",  "report");
channel.queueBind(queueName, EXCHANGE_NAME, "",arguments);
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:18:07  更:2022-03-12 17:18:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:43:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码