Hadoop的RPC是基于socket的半双工通信机制,两边如果需要互相向对方传数据则需要建立两个通道;还要注意同一时间只能存在一个通道,否则双方的通信都会进入阻塞状态?
package com.linux.rpc2;
import org.apache.hadoop.io.Text;
public interface RPCProtocol {
long versionID = 999;
public void testProgram(int inputData);
public void receiveResult(Text res, Text name);
}
package com.linux.rpc2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Scanner;
public class StudentCheck implements RPCProtocol {
// private static ArrayList<RPCProtocol> students;
private static HashMap<String, RPCProtocol> students;
private static int testInputData;
private static String trueRes;
private static RPC.Server server;
private static Text teacherRes;
private static Text teacherName;
private static HashMap<String, Boolean> results;
// private static RPCProtocol student02;
public static void init() throws IOException {
teacherRes = new Text();
teacherName = new Text();
teacherName.set("Teacher");
students = new HashMap<>();
results = new HashMap<>();
// students = new ArrayList<>();
server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9870)
.setProtocol(RPCProtocol.class)
.setInstance(new StudentCheck())
.build();
}
public static void main(String[] args) throws IOException, InterruptedException {
init();
Scanner scanner = new Scanner(System.in);
server.start();
System.out.println("StudentCheck main thread is running");
// Thread.sleep(10000);
RPCProtocol studentCheck = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
RPCProtocol student01 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9871), new Configuration());
RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
RPCProtocol student03 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9873), new Configuration());
System.out.println("Please input your problem data");
testInputData = scanner.nextInt();
studentCheck.testProgram(testInputData);
// students.add(studentCheck); // 必须最前面
// students.add(student01);
// students.add(student02);
// students.add(student03);
students.put("Student01", student01);
students.put("Student02", student02);
students.put("Student03", student03);
// for (RPCProtocol student : students) {
// student.testProgram(testInputData);
// Thread.sleep(100);
// }
for (RPCProtocol student : students.values()) {
Thread.sleep(100);
student.testProgram(testInputData);
}
Thread.sleep(500);
for (String student : results.keySet()) {
if (results.get(student)) {
teacherRes.set("right");
} else {
teacherRes.set("wrong");
}
students.get(student).receiveResult(teacherRes, teacherName);
}
}
@Override
public void testProgram(int inputData) {
trueRes = Integer.toBinaryString(inputData);
}
@Override
public void receiveResult(Text res, Text name) {
String studentName = name.toString();
// System.out.println(studentName);
if (trueRes.equals(res.toString())) {
System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is right");;
results.put(studentName, true);
} else {
System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is wrong");
results.put(studentName, false);
}
// student02.receiveResult(res, teacherName);
// try {
// RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
// student02.receiveResult(res, name);
// } catch (IOException e) {
// e.printStackTrace();
// }
}
}
package com.linux.rpc2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
public class Student01 implements RPCProtocol {
private static RPCProtocol studentClient;
private static Text name;
private static Text resOutput;
public static void main(String[] args) throws IOException, InterruptedException {
name = new Text();
name.set("Student01");
resOutput = new Text();
RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9871)
.setProtocol(RPCProtocol.class)
.setInstance(new Student01())
.build();
server.start();
System.out.println("Student01 main thread is running");
}
@Override
public void testProgram(int inputData) {
try {
studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
int res;
ArrayList<Integer> stack = new ArrayList<>();
while (inputData > 0) {
res = inputData % 2;
stack.add(res);
inputData /= 2;
}
String resData = "";
for (int i = stack.size() - 1; i >= 0 ; i--) {
resData += stack.get(i);
}
resOutput.set(resData);
studentClient.receiveResult(resOutput, name);
}
@Override
public void receiveResult(Text res, Text name) {
System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
}
}
package com.linux.rpc2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
public class Student02 implements RPCProtocol {
private static RPCProtocol studentClient;
private static Text name;
private static Text resOutput;
public static void main(String[] args) throws IOException, InterruptedException {
name = new Text();
name.set("Student02");
resOutput = new Text();
RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9872)
.setProtocol(RPCProtocol.class)
.setInstance(new Student02())
.build();
server.start();
System.out.println("Student02 main thread is running");
}
@Override
public void testProgram(int inputData) {
try {
studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
int res;
ArrayList<Integer> stack = new ArrayList<>();
while (inputData > 0) {
res = inputData % 2;
stack.add(res);
inputData /= 2;
}
String resData = "";
for (int i = stack.size() - 1; i >= 0 ; i--) {
resData += stack.get(i);
}
resOutput.set(resData);
studentClient.receiveResult(resOutput, name);
}
@Override
public void receiveResult(Text res, Text name) {
System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
}
}
package com.linux.rpc2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
public class Student03 implements RPCProtocol {
private static RPCProtocol studentClient;
private static Text name;
private static Text resOutput;
public static void main(String[] args) throws IOException, InterruptedException {
name = new Text();
name.set("Student03");
resOutput = new Text();
RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("localhost")
.setPort(9873)
.setProtocol(RPCProtocol.class)
.setInstance(new Student03())
.build();
server.start();
System.out.println("Student03 main thread is running");
}
@Override
public void testProgram(int inputData) {
try {
studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
int res;
ArrayList<Integer> stack = new ArrayList<>();
while (inputData > 0) {
res = inputData % 2;
stack.add(res);
inputData /= 2;
}
String resData = "";
for (int i = 0; i < stack.size(); i++) {
resData += stack.get(i);
}
resOutput.set(resData);
studentClient.receiveResult(resOutput, name);
}
@Override
public void receiveResult(Text res, Text name) {
System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
}
}
?
?
?
?
|