RabbitMQ入门案例
Rabbit 模式
https://www.rabbitmq.com/getstarted.html
实现步骤
- 构建一个 maven工程
- 导入 rabbitmq的依赖
- 启动 rabbitmq-server服务
- 定义生产者
- 定义消费者
- 观察消息的在 rabbitmq-server服务中的进程
初步实现
前期准备
1.构建项目
2.导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
简单模型
在上图的模型中,有以下概念:
- 生产者,也就是要发送消息的程序
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
所有的中间件技术都是基于tcp/ip 协议基础之上构建新型的协议规范,只不过rabbitmq 遵循的是amqp
实现步骤:
- 创建连接工程
- 创建连接 connection
- 通过连接获取通道 Channel
- 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
- 准备消息内容
- 发送消息给队列 queue
- 关闭连接
- 关闭通道
生产者
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
String quequeName = "queuel";
channel.queueDeclare(quequeName,false,false,false,null);
String message = "Hello,Consumer";
channel.basicPublish("",quequeName,null,message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
if (channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消费者
public class Consumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();
String quequeName = "queue1";
channel.queueDeclare(quequeName,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:"+ envelope.getRoutingKey());
System.out.println("交换机为:"+ envelope.getExchange());
System.out.println("消息id为:"+ envelope.getDeliveryTag());
System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("======================================================");
System.out.println("");
}
};
channel.basicConsume("queue1", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}
AMQP
概念介绍
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP是一个二进制协议,拥有一些现代化特点:多信道 、协商式 ,异步 ,安全 ,扩平台 ,中立 ,高效 。
RabbitMQ 是 AMQP协议 的 Erlang的实现。
概念 | 说明 |
---|
连接 Connection | 一个网络连接,例如:TCP/IP套接字连接。 | 会话 Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 | 信道 Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 | 客户端 Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 | 服务节点Broker | 消息中间件的服务节点。一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 | 端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 | 消费者 Consumer | 一个从消息队列里请求消息的客户端程序。 | 生产者 Producer | 一个向交换机发布消息的客户端应用程序。 |
RabbitMQ运转流程
以 入门案例 为例
生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 声明队列、设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
- 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ从队列中删除相应已经被确认的消息;
- 关闭信道;
- 关闭连接;
生产者流转过程解析
- 客户端与代理服务器Broker建立连接。调用
newConnection() 方法 , 会进一步封装 Protocol Header 0-9-1 的报文头发送给 Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。 - 客户端调用
connection.createChannel 方法。此方法开启信道,其包装的 channel.open 命令发送给 Broker , 等待 channel.basicPublish 方法,对应的AMQP命令为 Basic.Publish , 这个命令包含了content Header 和content Body() 。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。 - 客户端发送完消息需要关闭资源时,涉及到
Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok 的命令交互。
消费者流转过程解析
- 消费者客户端与代理服务器Broker建立连接。会调用
newConnection() 方法,这个方法会进一步封装 Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。 - 消费者客户端调用
connection.createChannel 方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok 命令。 - 在真正消费之前,消费者客户端需要向Broker 发送
Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执 Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。 - Broker 向消费者客户端推送(Push) 消息,即
Basic.Deliver 命令,这个命令和 Basic.Publish 命令一样会携带 Content Header 和Content Body。 - 消费者接收到消息并正确消费之后,向Broker 发送确认,即
Basic.Ack 命令。 - 客户端发送完消息需要关闭资源时,涉及到
Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok 的命令交互。
个人博客为: MoYu’s HomePage
|