前言:在前面的教程里我们学习了工作队列,实现了将工作任务发给不同的工人,如果任务是需要在另一台计算机上运行,我们如何实现运行远程计算机上的一个函数任务并等待其返回的结果呢,这种模式通常被称为远程过程调用 (Remote Procedure Call?or?RPC)。本章我们将要使用RabbitMQ构建一个RPC系统,一个客户端和一个可扩展的PRC服务,RPC服务返回菲波那切数列的方式模拟服务调用过程
RabitMQ安装
如何安装:?https://blog.csdn.net/Beijing_L/article/details/119042261
图例
S(服务器): 接受请求并处理,返回相应response结果
C(客户端): 客户端,请求request的发送方,等待相应response
Queue(队列): RabbitMQ的作用是存储消息,队列的特性是先进先出
基本概念
远程调用RPC
RPC是计算机一个很常见的模式。它经常被批评。当程序员不知道函数调用的是本地还是速度比较慢的RPC的时候,问题就出现了。这种混乱让系统变得不可预测,同时又为调试带来了不必要的复杂性。滥用RPC不仅不能简化程序开发,反而会让程序更加难以维护
牢记这一点, 考虑以下建议
- 确保程序中代码能很清晰的表示那些是本地调用,那些是远程调用
- 系统文档能够很清晰的表示组件之前的关系
- 记录错误案例。当RPC服务器长时间停机时候,客户端该如何处理
- 当有疑问的时候,避免使用RPC,如果可以的话使用异步信道(asynchronous pipeline)来替换RPC
客户端接口 Client Interface
言归正传,为了说明RPC服务如何工作, 我们先创建一个简单的客户端程序,call 方法将会发送一个RPC请求然后阻塞等待接收相应消息
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
回调队列 Callback queue
使用RabbitMQ实现RPC很容易, 客户端发送请求消息。服务器回复响应消息。为了接收相应,我们需要发送一个带有请求的“回调“队列地址。回调队列我们可以使用默认队列
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
AMQP 0-9-1协议预先定义了一组14个与消息相关的属性,常用几个属性
- deliveryMode: 2表示将消息持久化,其他值表示消息为临时
- contentType:描述消息的编码类型, 例如JSON编码 application/json.
- replyTo: 回调队列名称
- correlationId: 关系ID ,用于关联请求request和相应respon,以此建立关系
关系ID Correlation Id
在前面介绍的方法中, 我们建议为每一个RPC请求创建一个回调队列, 这是非常抵消的,更好的做法是, 我们为每一个客户端创建一个回调队列。
这旧产生了一个新问题, 对立中收到相应后,不清楚响应是属于哪一个请求。这个时候CorrelationId属性就体现出了其的重要性,我们为每一个请求设置一个唯一的值,稍后,我们的回调队列中收到一条消息,通过查看CorrelationId属性,并居于此将响应与请求匹配。
当看到未知的CorrelationId,我们可以丢弃。你可能会问,为什么要忽略callback queue中的位置消息,而不是抛出异常,这是由于服务器端可能出现竞争情况。虽然不太可能,但RPC服务器可能会在向我们发送答案后,单在发送请求确认消息之前死亡,如果发生这种情况,重启RPC服务器后它将再次处理这个请求。第2次响应属于重复响应。所以客户端我们必须优雅的处理重复的响应,并且RPC应该是幂等的
RPC的工作方式
- 对于每一个RPC请求,客户端发送的消息有2个重要属性:replyTo 表示为请求创建的匿名队列,correlationId 表示为每一个请求消息创建的唯一值
- 请求消息被发送到一个RPC队列(rpc_queue ) 队列
- RPC Worker 又名RPC服务器 将会等待RPC队列里的消息,当队里里出现请求消息的时候,它开始工作返回相应结果,相应结果将会发送到relayTo属性标明的队列中
- 客户端等待relayTo属性标明的队列中的消息,当消息出现后,检查correlationId属性,如果请求的属性和相应消息中的属性值相同。将相应返回给应用程序
服务器端实现
- 像往常一样, 我们先建立连接, 通道并声明队列
- 我们可能系统运行多个服务器进程,为了在多台服务器上平均分配负载,我们设置channel.basicQos的值为1
- DeliverCallback 回调函数实现处理请求消息,通过fib函数模拟消息处理,然后将处理后的结果发送到ReplyTo队列里
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {
}));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
客户端实现
- 首先RPCClient构造器里创建连接和管道
- 运行main后 调用call方法将32条请求发送个RPC服务器。try语句(try-with-resource)是JDK1.7新增内容 ,try块退出的时候会自动调用close方法关闭资源 ,RPCClient 资源实现了AutoCloseable接口的类这个接口只有一个close接口行为
- call方法中先通过UUID生成唯一编码,并声明一个匿名队列将其封装到AMQP.BasicProperties的replyto和correlationId属性中,
- 客户端接收到响应消息后通过correlationId来判断,如果correlationId和请求值相等就表示是本次请求的相应结果
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
//try语句(try-with-resource) ,try块退出的时候会自动调用close方法关闭资源 ,JDK1.7增加功能
//资源实现了AutoCloseable接口的类 接口是JDK1.7增加
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
//声明随机队列名
String replyQueueName = channel.queueDeclare().getQueue();
// replyTo用于命名回调队列
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//默认交换器将消息发送到队列, 属性声明响应队列到 replyQueueName
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//创建容量为1的阻塞队列
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
//BlockingQueue阻塞队列off表示向队列尾部插入一格元素,队列满则等待
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
//ArrayBlockingQueue 获取到相应后取消消费者队列的订阅关系
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
总结
这篇文章简单介绍了RabbitMq实现RCP的方式和原理,但是本章介绍的设计不是RPCservice的唯一可能实现,但是它有一些重要的优点:
- 如果RPC服务器太慢,您可以通过运行另一个来扩大规模
- 在客户端,RPC只需要发送和接收一条消息,对于单个RPC请求,RPC客户端只需要一次网络往返。
我们的代码仍然非常简单,没有尝试解决更复杂(但很重要)的问题,例如
- 如果没有服务器运行,客户端应该如何反应?
- 客户端应该有某种类型的RPC超时吗?
- 如果服务器出现故障并引发异常,是否应该将其转发给客户端?
- 在处理之前防止无效的传入消息(如检查边界、类型)。
上一篇:RabbitMQ教程主题交换器Topics
|