IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Java API实现HDFS有关目录与文件的相关功能 -> 正文阅读

[大数据]Java API实现HDFS有关目录与文件的相关功能

实验要求

Java?API实现HDFS以下功能。
1.????在HDFS中创建一个新目录;
2.????从本地拷贝文件到新创建的目录中;
3.????将HDFS中指定文件的内容输出到终端中;
4.????显示一个指定目录下所有文件;
5.????完成指定目录下指定类型文件的合并
6.????在HDFS中,将文件从源路径移动到目的路径。
7.????删除HDFS中指定的文件;

实验环境

操作系统:Linux

Hadoop版本:2.7.0或以上版本

JDK版本:1.8或以上版本

Java IDE:IDEA

项目文件结构

?代码

1.????在HDFS中创建一个新目录

新建Java class命名为 CreateDir

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.Scanner;
import java.net.URI;

public class CreateDir {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String dirPath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path hdfsPath = new Path(dirPath);
            if(fs.mkdirs(hdfsPath)){
                System.out.println("Directory "+ dirPath +" has been created successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行截图:

??终端上进行Hadoop检测

?2.????从本地拷贝文件到新创建的目录中

新建Java class命名为 CopyFile

import java.net.URI;

import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CopyFile {
    public static void main(String args[]) throws Exception {
        System.out.println("Input the filepath:");
        Scanner sc = new Scanner(System.in);
        Path src=new Path(sc.next());
        FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
        Path dst =new Path("/mydir");
        fs.copyFromLocalFile(src, dst);
    }
}

运行结果

终端上进行Hadoop检测

3.????将HDFS中指定文件的内容输出到终端中

新建Java class命名为 ReadFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class ReadFile {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            
            Path srcPath = new Path(filePath);

            FSDataInputStream is = fs.open(srcPath);
            while(true) {
                String line = is.readLine();
                if(line == null) {
                    break;
                }
                System.out.println(line);
            }
            is.close();
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

终端上进行Hadoop检测

这里为了第4,5问达到更好看的效果,Hadoop下面的mydir目录中,又上传了yourfile.txt文件,内容为“goodbye,big data!"。和herfile文件,内容是“big data is good!”如图:

?4.????显示一个指定目录下所有文件

新建Java class命名为 ListFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class ListFiles {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path srcPath = new Path(filePath);
            FileStatus[] stats = fs.listStatus(srcPath);
            Path[] paths = FileUtil.stat2Paths(stats);
            for(Path p : paths)
                System.out.println(p.getName());
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

终端上进行Hadoop检测

5.????完成指定目录下指定类型文件的合并

新建Java class命名为 MergeFiles,MyFileFilter

(本人对此处的“合并”,理解为“内容合并”,即将指定目录下的同类型文件内容写进一个文件。比如mydir目录下有三个文件myfile.txt、yourfile.txt、herfile,需过滤掉herfile文件,将两个文本文件myfile.txt、yourfile.txt内容写入一个文件mergeNew.txt,进行“合并”。)

注:通过自定义过滤函数实现指定类型功能。

MyFileFilter代码

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
class MyFileFilter implements PathFilter {
    String reg = null;
    MyFileFilter(String reg) {
        this.reg = reg;
    }
    public boolean accept(Path path) {
        if( (path.toString().matches(reg)) ) {
            return true;
        }
        return false;
    }
}

?MergeFiles代码

import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
public class MergeFiles{
    Path inputPath = null;//待合并文件所在目录的路径
    Path outputPath = null;//输出文件的路径
    public MergeFiles(String input, String output) {
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException {
        Configuration conf = new Configuration();
        //org.apache.hadoop.conf.Configuration 即访问配置项。
        //所有的配置项的值,如果在core-site.xml中有对应的配置,则以core-site.xml为准。
        conf.set("fs.defaultFS", "hdfs://172.18.0.2:9000");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
        //下面过找到输入目录中后缀为.txt的文件,需自写过滤函数MyPathFilter        
       FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(“.*\\.txt”));
        //org.apache.hadoop.fs.FileStatus是一个接口,用于向客户端展示系统中文件和目录的元数据,
//具体包括文件大小、块大小、副本信息、所有者、修改时间等。可通过FileSystem.listStatus()方法获得具体的实例对象。
//   .*表示0个或多个任意字符  \\.构成一个转义字符,表示.
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        PrintStream ps = new PrintStream(System.out);//文件输出流,用于写Hadoop文件
        for (FileStatus sta : sourceStatus) {//分别读取过滤之后的每个文件的内容,并输出到同一个文件中
            System.out.println("path:" + sta.getPath() + "  file size:" + sta.getLen() +
                    "  auth:" + sta.getPermission() + "  content:");//打印后缀不为.txt的文件的路径、大小、权限、内容
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            //文件输入流,用于读取Hadoop文件
            byte[] data = new byte[1024];
            int read = -1;
            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
            fsdis.close();
        }//end of for
        ps.close();
        fsdos.close();
    }//end of doMerge()
    public static void main(String[] args) throws IOException{
        MergeFiles MergeFiles = new MergeFiles(
                "hdfs://172.18.0.2:9000/mydir",
                "hdfs://172.18.0.2:9000/mergeNew.txt"
        );
        MergeFiles.doMerge();
    }
}

?运行结果

终端上进行Hadoop检测

6.????在HDFS中,将文件从源路径移动到目的路径

新建Java class命名为 MvFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class MvFile{
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String srcStrPath = '/'+sc.next();
            String dstStrPath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path srcPath = new Path(srcStrPath);
            Path dstPath = new Path(dstStrPath);
            if(fs.rename(srcPath,dstPath)) {
                System.out.println("movefile from " + srcStrPath + " to " + dstStrPath + "successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

?运行结果?

?终端上进行Hadoop检测?

?7.????删除HDFS中指定的文件

新建Java class命名为 DeleteFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class DeleteFile {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            
            Path hdfsPath = new Path(filePath);
            if(fs.delete(hdfsPath,false)){
                System.out.println("File "+ filePath +" has been deleted successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

?运行结果

??终端上进行Hadoop检测

?备注

问题1

用java操作hdfs的时候会报这个错误,大概是这个原因:因为在配置Hadoop时,fs.defaultFS值使用的是节点名称,所以ip要与master节点对应。"hdfs://100.65.0.2:9000"

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-26 12:16:31  更:2021-10-26 12:16:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:44:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码