1. 前言
RabbitMQ中有4种类型的exchange ,分别是direct ,fanout ,topic 和headers 。对应有5种发送消息的模式。direct类型的exchange有两种发送模式。
2. 发送消息的不同模式
2.1 AMQP default
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted. 默认的exchange被隐式地绑定到每个队列,routingKey与队列的名称相同。不能从默认exchange上显式地绑定或解绑。也不能删除。这种模式就像消息直接发送到队列一样。 这种模式的exchange类型是direct producer 这种模式需要设置队列名称
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("public");
factory.setPassword("admin");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i=0;i<10000;i++){
String message = "Hello World!";
message+=i;
Thread.sleep(1000);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
channel.basicPublish(...)
The first parameter is the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.
第一个参数是exchange的名称,这里的空字符串表示默认或没有名字的exchange。如果routingKey存在的话,消息会被路由到routingKey指定名称对应的队列。即队列与routingKey同名。
consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(10);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String messageStr = new String(message.getBody(), "UTF-8");
System.out.println(" [x] Received '" + messageStr + "'");
}
};
boolean autoAck=true;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
channel.basicConsume(...) 第一个参数是queue名称,autoAck表示消息自动确认
Fair dispatch 消息公平的分发 如果前n个消息没有处理确认,就不给他发送新的消息
int prefetchCount = n;
channel.basicQos(prefetchCount);
2.2 fanout exchange
fanout类型的exchange会忽略routingKey,只要队列与exchange绑定,exchange的消息就会路由到这个队列。 producer
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
consumer
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
2.3. direct exchange
这种类型,消息通过routingKey路由到对应队列 producer
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "info", null, (message+" info").getBytes(StandardCharsets.UTF_8));
consumer
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");
2.4 topic exchange
这种模式通过主题发送,主题其实就是routingKey加了通配符。
* (star) can substitute for exactly one word. # (hash) can substitute for zero or more words. “lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings
* 代替一个单词 # 代替零个或多个单词 一个消息的routingKey为"lazy.pink.rabbit",它会被分发到第二个队列Q2,且只有一次,即使Q2匹配两个绑定关系。 producer
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, (message+" quick.orange.rabbit").getBytes(StandardCharsets.UTF_8));
consumer
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
2.5 headers exchange
消费者中绑定队列到exchange时,需要传递一组key-value形式的参数,其中有一对参数是"x-match"-">any",如果这对参数不传的话,默认为"x-match"->“all”。 如果x-match的值为all,消息的header中的值要匹配队列绑定时传递的参数中的全部参数,此时消息会被路由到这个队列。如果值为any,只需要匹配参数中的一对参数即可。
Note that headers beginning with the string x- will not be used to evaluate matches. 注意,以 x- 开头的header不会被匹配
producer
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
AMQP.BasicProperties properties= new AMQP.BasicProperties.Builder()
.headers(headers).build();
String message= "hello world";
channel.basicPublish(EXCHANGE_NAME, "", properties, (message).getBytes());
consumer
String queueName = channel.queueDeclare().getQueue();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-match", "any");
arguments.put("format", "pdf");
arguments.put("type", "report");
channel.queueBind(queueName, EXCHANGE_NAME, "",arguments);
|