一、RPC处理流程如下
- 当客户端启动时,创建一个匿名的回调队列(名称由RabbitMQ自动创建,如下图中的amqp.gen-Xa2…)。
- 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
- 请求被发送到rpc_queue队列中。
- RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
- 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。
二、服务端代码
这里基本跟《RabbitMQ实战指南一样》
public class RpcServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String args[]) throws Exception {
Connection connection = BaseRabbitmq.initConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties()
.builder()
.correlationId(properties.getCorrelationId())
.build();
String repsonse = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
repsonse += fib(n);
}catch (Exception e) {
e.printStackTrace();
} finally {
channel.basicPublish("", properties.getReplyTo(), replyProps, repsonse.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}
三、客户端代码
这里由于QueuingConsumer已经在amqp-client4.x版本中被删除,所以稍作修改
public class RpcClient {
private final Connection connection;
private final Channel channel;
private final String replyQueueName;
private final DefaultConsumer consumer;
String corrId = UUID.randomUUID().toString();
public RpcClient() throws IOException, TimeoutException {
connection = BaseRabbitmq.initConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
if (properties.getCorrelationId().equals(corrId)){
String response = new String(body);
System.out.println("[.] Got" + response);
}
}catch (Exception e) {
e.printStackTrace();
}
}
};
}
public void call(String message) throws IOException,
ShutdownSignalException, ConsumerCancelledException {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
String requestQueueName = "rpc_queue";
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
channel.basicConsume(replyQueueName, true, consumer);
}
}
public void close() throws Exception{
connection.close();
}
public static void main(String args[]) throws Exception{
RpcClient fibRpc = new RpcClient();
System.out.println(" [x] Requesting fib(30)");
fibRpc.call("30");
fibRpc.close();
}
}
四、RabbitMQ属性配置
public class BaseRabbitmq {
public static Connection initConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}
}
五、结果
1、最终client端输出
|