IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Hadoop RPC进程通信应用 -> 正文阅读

[网络协议]Hadoop RPC进程通信应用

Hadoop的RPC是基于socket的半双工通信机制,两边如果需要互相向对方传数据则需要建立两个通道;还要注意同一时间只能存在一个通道,否则双方的通信都会进入阻塞状态?

package com.linux.rpc2;


import org.apache.hadoop.io.Text;

public interface RPCProtocol {

    long versionID = 999;

    public void testProgram(int inputData);

    public void receiveResult(Text res, Text name);

}
package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Scanner;

public class StudentCheck implements RPCProtocol {

//    private static ArrayList<RPCProtocol> students;
    private static HashMap<String, RPCProtocol> students;
    private static int testInputData;
    private static String trueRes;
    private static RPC.Server server;
    private static Text teacherRes;
    private static Text teacherName;
    private static HashMap<String, Boolean> results;
//    private static RPCProtocol student02;

    public static void init() throws IOException {
        teacherRes = new Text();
        teacherName = new Text();
        teacherName.set("Teacher");
        students = new HashMap<>();
        results = new HashMap<>();
//        students = new ArrayList<>();

        server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9870)
                .setProtocol(RPCProtocol.class)
                .setInstance(new StudentCheck())
                .build();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        init();
        Scanner scanner = new Scanner(System.in);

        server.start();
        System.out.println("StudentCheck main thread is running");
//        Thread.sleep(10000);
        RPCProtocol studentCheck = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        RPCProtocol student01 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9871), new Configuration());
        RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
        RPCProtocol student03 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9873), new Configuration());

        System.out.println("Please input your problem data");
        testInputData = scanner.nextInt();
        studentCheck.testProgram(testInputData);
//        students.add(studentCheck);  // 必须最前面
//        students.add(student01);
//        students.add(student02);
//        students.add(student03);

        students.put("Student01", student01);
        students.put("Student02", student02);
        students.put("Student03", student03);

//        for (RPCProtocol student : students) {
//            student.testProgram(testInputData);
//            Thread.sleep(100);
//        }
        for (RPCProtocol student : students.values()) {
            Thread.sleep(100);
            student.testProgram(testInputData);
        }

        Thread.sleep(500);

        for (String student : results.keySet()) {
            if (results.get(student)) {
                teacherRes.set("right");
            } else {
                teacherRes.set("wrong");
            }
            students.get(student).receiveResult(teacherRes, teacherName);
        }
    }

    @Override
    public void testProgram(int inputData) {
        trueRes = Integer.toBinaryString(inputData);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        String studentName = name.toString();
//        System.out.println(studentName);
        if (trueRes.equals(res.toString())) {
            System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is right");;
            results.put(studentName, true);
        } else {
            System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is wrong");
            results.put(studentName, false);
        }
//        student02.receiveResult(res, teacherName);
//        try {
//            RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
//            student02.receiveResult(res, name);
//        } catch (IOException e) {
//            e.printStackTrace();
//        }

    }
}
package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student01 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student01");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9871)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student01())
                .build();

        server.start();
        System.out.println("Student01 main thread is running");

    }


    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList<Integer> stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = stack.size() - 1; i >= 0 ; i--) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);

    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student02 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student02");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9872)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student02())
                .build();

        server.start();
        System.out.println("Student02 main thread is running");

    }

    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList<Integer> stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = stack.size() - 1; i >= 0 ; i--) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student03 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student03");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9873)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student03())
                .build();

        server.start();
        System.out.println("Student03 main thread is running");

    }

    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList<Integer> stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = 0; i <  stack.size(); i++) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

?

?

?

?

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:28:02  更:2022-05-07 11:28:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/9 17:16:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码