rabbitmq
1、AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
2、RabbitMQ的使用
1、核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型: direct(默认), fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。 Connection:网络连接,比如一个TCP连接。 Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
2、几种消息模型
概述:一个生产者一个消费者
应用场景:一发一接的结构
例如: 手机短信,邮件单发
private static String QUEUE_NAME = "test-simple";
public void sendMessage() throws Exception{
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello tom!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:"+message);
channel.close();
connection.close();
}
private static String QUEUE_NAME = "test-simple";
public void getMessage() throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("收到消息:" + new String(body));
}
});
channel.close();
connection.close();
}
概述:一个生产者多个消费者
应用场景:一发多收结构
例如:抢红包、资源调度
private static String QUEUE_NAME = "test-work";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE_NAME, null, ("hello tom!"+i).getBytes(StandardCharsets.UTF_8));
System.out.println("发送+"+i);
}
System.out.println("发送消息: success");
channel.close();
connection.close();
}
private static String QUEUE_NAME = "test-work";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1-收到消息:" + new String(body));
channel.basicAck(1,false);
}
});
}
概述:发送消息到交换机 交换机根据绑定队列发送给生产者
应用场景:服务推送
例如:订阅公众号接收文档
private static String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.basicPublish(EXCHANGE_NAME,"",null,"fanout type message".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
private static String EXCHANGE_NAME = "fanout-exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
概述:根据routKey不同,交换机发送到不同队列中。
private static String DIRECT_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_NAME,"direct");
String routingKey = "warning";
channel.basicPublish(DIRECT_NAME,routingKey,null,("这是direct模式发布routKey:["+routingKey+"],发送新增消息").getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
private static String DIRECT_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_NAME,"direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,DIRECT_NAME,"info");
channel.queueBind(queue,DIRECT_NAME,"error");
channel.queueBind(queue,DIRECT_NAME,"warning");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
private static String TOPIC_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_NAME,"topic");
String routingKey = "user.service";
channel.basicPublish(TOPIC_NAME,routingKey,null,("这是topic动态路由模型,routKey["+routingKey).getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
private static String TOPIC_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_NAME,"topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,TOPIC_NAME,"user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
springboot与rabbitmq整合
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: 123456
virtual-host: /test
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testHello() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello world:"+i);
}
}
消费者
@Component
public class HelloConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String message){
System.out.println("message1:"+message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("message2:"+message);
}
}
发送端
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","Fanout的模型发送消息");
}
接收端
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive(String message){
System.out.println("message1 = " +message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive2(String message){
System.out.println("message2 = " +message);
}
}
发送端
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("directs","error","Direct的模式发送信息");
}
接收端
@Component
public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = "direct"),
key = {"info", "error", "warn"}
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = "direct"),
key = {"error"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
发送端
@Test
void testTopic(){
rabbitTemplate.convertAndSend("topics","user.ll","这是topic模式发送信息");
}
接收端
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key = {"user.save","user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key = {"user.get"}
)
})
public void receive2(String message){
System.out.println("message2 = "+message);
}
}
)
})
public void receive1(String message){
System.out.println("message1 = "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//临时队列
exchange = @Exchange(name = "topics",type = "topic"),
key = {"user.get"}
)
})
public void receive2(String message){
System.out.println("message2 = "+message);
}
}
---
|