记录一下自己写的java操作hdfs文件系统代码,留给学校的后人。 其中user/skl是hdfs的用户目录。
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
public class MergeFile {
private static InputStream input;
private static OutputStream output;
public static void upload() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
input = new FileInputStream("/usr/local/hadoop/file6.txt");
output = fileSystem.create(new Path("/user/skl/file6.txt"));
byte[] buffer = new byte[1024];
int len = 0;
while((len=input.read(buffer))!=-1){
output.write(buffer,0,len);
}
output.flush();
input.close();
output.close();
}
public static void download() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.copyToLocalFile(false,new Path("/user/skl/file6.txt"),new Path("/usr/local/hadoop/"),true);
fileSystem.close();
}
public static void textcat(String path) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
FSDataInputStream in=fileSystem.open(new Path(path));
BufferedReader br=new BufferedReader(new InputStreamReader(in));
String line="";
while((line=br.readLine())!=null){
System.out.println(line);
}
fileSystem.close();
}
public static void getAllDetail(String path) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
FileStatus[] fileList = fileSystem.listStatus(new Path(path));
for(FileStatus file:fileList){
if(file.isFile()){
getdetail(file);
}else{
getAllDetail(file.getPath().toString());
}
}
fileSystem.close();
}
public static void delete(String path) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
boolean del = fileSystem.delete(new Path(path));
if(del){
System.out.println("删除成功(>ω・*)ノ");
}else{
System.out.println("删除失败(〃′-ω・)");
}
}
public static void append(String s1) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
FSDataOutputStream append = fileSystem.append(new Path("/user/skl/file5.txt"));
append.write("追加的内容\n".getBytes(StandardCharsets.UTF_8));
append.write(s1.getBytes(StandardCharsets.UTF_8));
append.close();
textcat("/user/skl/file5.txt");
}
public static void textappend() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
Scanner scanner = new Scanner(System.in);
String s = scanner.next();
if("添加到末尾".equals(s)){
FSDataOutputStream append = fileSystem.append(new Path("/user/skl/file5.txt"));
append.write("追加的内容".getBytes(StandardCharsets.UTF_8));
append.close();
textcat("/user/skl/file5.txt");
fileSystem.close();
}else if("添加到开头".equals(s)){
FSDataInputStream in=fileSystem.open(new Path("/user/skl/file5.txt"));
BufferedReader br=new BufferedReader(new InputStreamReader(in));
String line="";
String s1 = "";
while((line=br.readLine())!=null){
s1 += line;
s1 += "\n";
}
delete("/user/skl/file5.txt");
fileSystem.create(new Path("/user/skl/file5.txt"));
fileSystem.close();
append(s1);
}else{
System.out.println("请重新输入");
textappend();
}
fileSystem.close();
}
public static void CreateandDelete() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
System.out.println("请输入您要进行的操作:");
Scanner scanner = new Scanner(System.in);
String require = scanner.next();
if("删除".equals(require)){
System.out.println("请输入您要删除的文件:");
String path = scanner.next();
delete(path);
}else if("创建".equals(require)){
System.out.println("请输入您要创建的文件:");
String path = scanner.next();
try {
fileSystem.create(new Path(path));
}catch (Exception e){
System.out.println("抱歉,创建失败了");
}
}
fileSystem.close();
}
public static void textmove() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem fileSystem = FileSystem.get(conf);
System.out.println("请输入您要移动的文件:");
Scanner scanner = new Scanner(System.in);
String root = scanner.next();
System.out.println("请输入您要移动到的位置:");
String dst = scanner.next();
try {
fileSystem.moveFromLocalFile(new Path(root),new Path(dst));
System.out.println("移动成功!");
}catch (Exception e){
System.out.println("抱歉,移动失败,请重试");
textmove();
}
fileSystem.close();
}
public static void main(String[] args) throws IOException {
upload();
download();
textcat();
getdetail();
getAllDetail("/user/skl/");
delete();
textappend();
CreateandDelete();
textmove();
}
public static void getdetail(FileStatus fileStatus) throws IOException {
System.out.println("文件读写权限:"+fileStatus.getPermission());
System.out.println("文件大小:"+fileStatus.getLen()+"字节");
long timeStamp = fileStatus.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("文件创建时间:"+date);
System.out.println("文件路径:"+fileStatus.getPath().toString());
}
}
|